Last active 1 month ago

抓取 VPS 平台商品的 pid 方便添加 aff

vpspid.py Raw
1import argparse
2import requests
3from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
4import sys
5
6session = requests.Session()
7session.headers.update({
8 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
9 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
10 "Accept-Language": "en-US,en;q=0.9",
11})
12
13def is_valid_url(url):
14 try:
15 parts = urlparse(url)
16 return parts.scheme in ('http', 'https') and parts.netloc != ''
17 except Exception:
18 return False
19
20def extract_last_segment(location):
21 parsed = urlparse(location)
22 if parsed.query:
23 qs = parse_qs(parsed.query)
24 if 'rp' in qs:
25 path = qs['rp'][0]
26 else:
27 path = parsed.path
28 else:
29 path = parsed.path
30
31 if path.endswith('/'):
32 path = path[:-1]
33
34 segments = path.split('/')
35 return segments[-1] if segments else ""
36
37def get_http_version(resp):
38 version_map = {10: "1.0", 11: "1.1", 20: "2"}
39 try:
40 return version_map.get(resp.raw.version, str(resp.raw.version))
41 except Exception:
42 return "unknown"
43
44def main():
45 parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果")
46 parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数')
47 parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid')
48 parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串')
49 parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php')
50 parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径(黑名单)')
51 parser.add_argument('-q', '--query', type=str, default='', help='匹配最终 Location 的关键字(白名单)。若以 "*" 结尾(如 "/store*")则为通配查找,匹配后继续执行;若无 "*" 则查找到一条就退出')
52
53 args = parser.parse_args()
54
55 if args.start > args.end:
56 print("错误:起始 pid 不应大于结束 pid。")
57 sys.exit(1)
58
59 if not is_valid_url(args.url):
60 print("错误:无效的 URL,请传入合法的 http 或 https URL。")
61 sys.exit(1)
62
63 filters =[x.strip() for x in args.filter.split(',') if x.strip()]
64 output_file = "result.md"
65
66 # 解析 query 参数的通配逻辑
67 is_wildcard = False
68 search_target = args.query
69 if args.query and args.query.endswith('*'):
70 is_wildcard = True
71 search_target = args.query[:-1] # 去掉末尾的 * 以便进行精确子串匹配
72
73 with open(output_file, "w", encoding="utf-8") as f:
74 for pid in range(args.end, args.start - 1, -1):
75 base_parsed = urlparse(args.url)
76 query_dict = parse_qs(base_parsed.query)
77 query_dict['pid'] = [str(pid)]
78 new_query = urlencode(query_dict, doseq=True)
79 request_url = urlunparse((
80 base_parsed.scheme,
81 base_parsed.netloc,
82 base_parsed.path,
83 base_parsed.params,
84 new_query,
85 base_parsed.fragment
86 ))
87
88 print(f"\n===== PID={pid} =====")
89 print(f"-- 请求 URL: {request_url}")
90
91 try:
92 resp1 = session.get(request_url, allow_redirects=False, timeout=10)
93 if resp1.status_code == 403:
94 raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden),脚本终止。")
95
96 ver1 = get_http_version(resp1)
97 print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}")
98 for header, value in resp1.headers.items():
99 print(f"{header}: {value}")
100
101 if 'Location' not in resp1.headers:
102 print(f"PID={pid} 无跳转")
103 continue
104
105 first_location = resp1.headers['Location']
106 second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location)
107
108 resp2 = session.get(second_url, allow_redirects=False, timeout=10)
109
110 ver2 = get_http_version(resp2)
111 print(f"\n-- 请求 URL: {second_url}")
112 print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}")
113 for header, value in resp2.headers.items():
114 print(f"{header}: {value}")
115
116 if 'Location' in resp2.headers:
117 second_location = resp2.headers['Location']
118
119 # === 改造后的 query 匹配逻辑 ===
120 if search_target and search_target not in second_location:
121 print(f"PID={pid} 最终 Location 不匹配目标规则 '{search_target}',跳过写入")
122 continue
123 # =================================
124
125 last_segment = extract_last_segment(second_location)
126
127 if filters and any(filt in last_segment for filt in filters):
128 print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}")
129 continue
130
131 md_link = f"[{last_segment}]({request_url}&aff={args.aff})"
132 f.write(md_link + "\n")
133 f.flush()
134 print(f"PID={pid} => 成功记录数据: {md_link}")
135
136 # === 根据是否是通配符决定是否退出 ===
137 if args.query:
138 if not is_wildcard:
139 print(f"\n[!] 已成功精准匹配到目标 '{args.query}',获取到最终数据,任务完成并退出。")
140 break # 没有星号,精确匹配一条即退出
141 else:
142 print(f"[*] 匹配到通配目标 '{args.query}',已记录,继续往下查找...")
143 # =====================================
144
145 else:
146 print(f"PID={pid} 无第二次跳转")
147
148 except requests.RequestException as e:
149 print(f"请求失败,PID={pid} 错误信息: {e}")
150 continue
151
152 print(f"\n结果已保存到 {output_file}")
153
154if __name__ == "__main__":
155 main()