Last active 1 month ago

抓取 VPS 平台商品的 pid 方便添加 aff

jetsung revised this gist 2 months ago. Go to revision

1 file changed, 155 insertions

vpspid.py(file created)

@@ -0,0 +1,155 @@
1 + import argparse
2 + import requests
3 + from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
4 + import sys
5 +
6 + session = requests.Session()
7 + session.headers.update({
8 + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
9 + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
10 + "Accept-Language": "en-US,en;q=0.9",
11 + })
12 +
13 + def is_valid_url(url):
14 + try:
15 + parts = urlparse(url)
16 + return parts.scheme in ('http', 'https') and parts.netloc != ''
17 + except Exception:
18 + return False
19 +
20 + def extract_last_segment(location):
21 + parsed = urlparse(location)
22 + if parsed.query:
23 + qs = parse_qs(parsed.query)
24 + if 'rp' in qs:
25 + path = qs['rp'][0]
26 + else:
27 + path = parsed.path
28 + else:
29 + path = parsed.path
30 +
31 + if path.endswith('/'):
32 + path = path[:-1]
33 +
34 + segments = path.split('/')
35 + return segments[-1] if segments else ""
36 +
37 + def get_http_version(resp):
38 + version_map = {10: "1.0", 11: "1.1", 20: "2"}
39 + try:
40 + return version_map.get(resp.raw.version, str(resp.raw.version))
41 + except Exception:
42 + return "unknown"
43 +
44 + def main():
45 + parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果")
46 + parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数')
47 + parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid')
48 + parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串')
49 + parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php')
50 + parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径(黑名单)')
51 + parser.add_argument('-q', '--query', type=str, default='', help='匹配最终 Location 的关键字(白名单)。若以 "*" 结尾(如 "/store*")则为通配查找,匹配后继续执行;若无 "*" 则查找到一条就退出')
52 +
53 + args = parser.parse_args()
54 +
55 + if args.start > args.end:
56 + print("错误:起始 pid 不应大于结束 pid。")
57 + sys.exit(1)
58 +
59 + if not is_valid_url(args.url):
60 + print("错误:无效的 URL,请传入合法的 http 或 https URL。")
61 + sys.exit(1)
62 +
63 + filters =[x.strip() for x in args.filter.split(',') if x.strip()]
64 + output_file = "result.md"
65 +
66 + # 解析 query 参数的通配逻辑
67 + is_wildcard = False
68 + search_target = args.query
69 + if args.query and args.query.endswith('*'):
70 + is_wildcard = True
71 + search_target = args.query[:-1] # 去掉末尾的 * 以便进行精确子串匹配
72 +
73 + with open(output_file, "w", encoding="utf-8") as f:
74 + for pid in range(args.end, args.start - 1, -1):
75 + base_parsed = urlparse(args.url)
76 + query_dict = parse_qs(base_parsed.query)
77 + query_dict['pid'] = [str(pid)]
78 + new_query = urlencode(query_dict, doseq=True)
79 + request_url = urlunparse((
80 + base_parsed.scheme,
81 + base_parsed.netloc,
82 + base_parsed.path,
83 + base_parsed.params,
84 + new_query,
85 + base_parsed.fragment
86 + ))
87 +
88 + print(f"\n===== PID={pid} =====")
89 + print(f"-- 请求 URL: {request_url}")
90 +
91 + try:
92 + resp1 = session.get(request_url, allow_redirects=False, timeout=10)
93 + if resp1.status_code == 403:
94 + raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden),脚本终止。")
95 +
96 + ver1 = get_http_version(resp1)
97 + print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}")
98 + for header, value in resp1.headers.items():
99 + print(f"{header}: {value}")
100 +
101 + if 'Location' not in resp1.headers:
102 + print(f"PID={pid} 无跳转")
103 + continue
104 +
105 + first_location = resp1.headers['Location']
106 + second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location)
107 +
108 + resp2 = session.get(second_url, allow_redirects=False, timeout=10)
109 +
110 + ver2 = get_http_version(resp2)
111 + print(f"\n-- 请求 URL: {second_url}")
112 + print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}")
113 + for header, value in resp2.headers.items():
114 + print(f"{header}: {value}")
115 +
116 + if 'Location' in resp2.headers:
117 + second_location = resp2.headers['Location']
118 +
119 + # === 改造后的 query 匹配逻辑 ===
120 + if search_target and search_target not in second_location:
121 + print(f"PID={pid} 最终 Location 不匹配目标规则 '{search_target}',跳过写入")
122 + continue
123 + # =================================
124 +
125 + last_segment = extract_last_segment(second_location)
126 +
127 + if filters and any(filt in last_segment for filt in filters):
128 + print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}")
129 + continue
130 +
131 + md_link = f"[{last_segment}]({request_url}&aff={args.aff})"
132 + f.write(md_link + "\n")
133 + f.flush()
134 + print(f"PID={pid} => 成功记录数据: {md_link}")
135 +
136 + # === 根据是否是通配符决定是否退出 ===
137 + if args.query:
138 + if not is_wildcard:
139 + print(f"\n[!] 已成功精准匹配到目标 '{args.query}',获取到最终数据,任务完成并退出。")
140 + break # 没有星号,精确匹配一条即退出
141 + else:
142 + print(f"[*] 匹配到通配目标 '{args.query}',已记录,继续往下查找...")
143 + # =====================================
144 +
145 + else:
146 + print(f"PID={pid} 无第二次跳转")
147 +
148 + except requests.RequestException as e:
149 + print(f"请求失败,PID={pid} 错误信息: {e}")
150 + continue
151 +
152 + print(f"\n结果已保存到 {output_file}")
153 +
154 + if __name__ == "__main__":
155 + main()
Newer Older