vpspid.py
· 6.4 KiB · Python
Sin formato
import argparse
import requests
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse
import sys
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
})
def is_valid_url(url):
try:
parts = urlparse(url)
return parts.scheme in ('http', 'https') and parts.netloc != ''
except Exception:
return False
def extract_last_segment(location):
parsed = urlparse(location)
if parsed.query:
qs = parse_qs(parsed.query)
if 'rp' in qs:
path = qs['rp'][0]
else:
path = parsed.path
else:
path = parsed.path
if path.endswith('/'):
path = path[:-1]
segments = path.split('/')
return segments[-1] if segments else ""
def get_http_version(resp):
version_map = {10: "1.0", 11: "1.1", 20: "2"}
try:
return version_map.get(resp.raw.version, str(resp.raw.version))
except Exception:
return "unknown"
def main():
parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果")
parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数')
parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid')
parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串')
parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php')
parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径(黑名单)')
parser.add_argument('-q', '--query', type=str, default='', help='匹配最终 Location 的关键字(白名单)。若以 "*" 结尾(如 "/store*")则为通配查找,匹配后继续执行;若无 "*" 则查找到一条就退出')
args = parser.parse_args()
if args.start > args.end:
print("错误:起始 pid 不应大于结束 pid。")
sys.exit(1)
if not is_valid_url(args.url):
print("错误:无效的 URL,请传入合法的 http 或 https URL。")
sys.exit(1)
filters =[x.strip() for x in args.filter.split(',') if x.strip()]
output_file = "result.md"
# 解析 query 参数的通配逻辑
is_wildcard = False
search_target = args.query
if args.query and args.query.endswith('*'):
is_wildcard = True
search_target = args.query[:-1] # 去掉末尾的 * 以便进行精确子串匹配
with open(output_file, "w", encoding="utf-8") as f:
for pid in range(args.end, args.start - 1, -1):
base_parsed = urlparse(args.url)
query_dict = parse_qs(base_parsed.query)
query_dict['pid'] = [str(pid)]
new_query = urlencode(query_dict, doseq=True)
request_url = urlunparse((
base_parsed.scheme,
base_parsed.netloc,
base_parsed.path,
base_parsed.params,
new_query,
base_parsed.fragment
))
print(f"\n===== PID={pid} =====")
print(f"-- 请求 URL: {request_url}")
try:
resp1 = session.get(request_url, allow_redirects=False, timeout=10)
if resp1.status_code == 403:
raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden),脚本终止。")
ver1 = get_http_version(resp1)
print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}")
for header, value in resp1.headers.items():
print(f"{header}: {value}")
if 'Location' not in resp1.headers:
print(f"PID={pid} 无跳转")
continue
first_location = resp1.headers['Location']
second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location)
resp2 = session.get(second_url, allow_redirects=False, timeout=10)
ver2 = get_http_version(resp2)
print(f"\n-- 请求 URL: {second_url}")
print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}")
for header, value in resp2.headers.items():
print(f"{header}: {value}")
if 'Location' in resp2.headers:
second_location = resp2.headers['Location']
# === 改造后的 query 匹配逻辑 ===
if search_target and search_target not in second_location:
print(f"PID={pid} 最终 Location 不匹配目标规则 '{search_target}',跳过写入")
continue
# =================================
last_segment = extract_last_segment(second_location)
if filters and any(filt in last_segment for filt in filters):
print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}")
continue
md_link = f"[{last_segment}]({request_url}&aff={args.aff})"
f.write(md_link + "\n")
f.flush()
print(f"PID={pid} => 成功记录数据: {md_link}")
# === 根据是否是通配符决定是否退出 ===
if args.query:
if not is_wildcard:
print(f"\n[!] 已成功精准匹配到目标 '{args.query}',获取到最终数据,任务完成并退出。")
break # 没有星号,精确匹配一条即退出
else:
print(f"[*] 匹配到通配目标 '{args.query}',已记录,继续往下查找...")
# =====================================
else:
print(f"PID={pid} 无第二次跳转")
except requests.RequestException as e:
print(f"请求失败,PID={pid} 错误信息: {e}")
continue
print(f"\n结果已保存到 {output_file}")
if __name__ == "__main__":
main()
| 1 | import argparse |
| 2 | import requests |
| 3 | from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse |
| 4 | import sys |
| 5 | |
| 6 | session = requests.Session() |
| 7 | session.headers.update({ |
| 8 | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", |
| 9 | "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| 10 | "Accept-Language": "en-US,en;q=0.9", |
| 11 | }) |
| 12 | |
| 13 | def is_valid_url(url): |
| 14 | try: |
| 15 | parts = urlparse(url) |
| 16 | return parts.scheme in ('http', 'https') and parts.netloc != '' |
| 17 | except Exception: |
| 18 | return False |
| 19 | |
| 20 | def extract_last_segment(location): |
| 21 | parsed = urlparse(location) |
| 22 | if parsed.query: |
| 23 | qs = parse_qs(parsed.query) |
| 24 | if 'rp' in qs: |
| 25 | path = qs['rp'][0] |
| 26 | else: |
| 27 | path = parsed.path |
| 28 | else: |
| 29 | path = parsed.path |
| 30 | |
| 31 | if path.endswith('/'): |
| 32 | path = path[:-1] |
| 33 | |
| 34 | segments = path.split('/') |
| 35 | return segments[-1] if segments else "" |
| 36 | |
| 37 | def get_http_version(resp): |
| 38 | version_map = {10: "1.0", 11: "1.1", 20: "2"} |
| 39 | try: |
| 40 | return version_map.get(resp.raw.version, str(resp.raw.version)) |
| 41 | except Exception: |
| 42 | return "unknown" |
| 43 | |
| 44 | def main(): |
| 45 | parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果") |
| 46 | parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数') |
| 47 | parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid') |
| 48 | parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串') |
| 49 | parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php') |
| 50 | parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径(黑名单)') |
| 51 | parser.add_argument('-q', '--query', type=str, default='', help='匹配最终 Location 的关键字(白名单)。若以 "*" 结尾(如 "/store*")则为通配查找,匹配后继续执行;若无 "*" 则查找到一条就退出') |
| 52 | |
| 53 | args = parser.parse_args() |
| 54 | |
| 55 | if args.start > args.end: |
| 56 | print("错误:起始 pid 不应大于结束 pid。") |
| 57 | sys.exit(1) |
| 58 | |
| 59 | if not is_valid_url(args.url): |
| 60 | print("错误:无效的 URL,请传入合法的 http 或 https URL。") |
| 61 | sys.exit(1) |
| 62 | |
| 63 | filters =[x.strip() for x in args.filter.split(',') if x.strip()] |
| 64 | output_file = "result.md" |
| 65 | |
| 66 | # 解析 query 参数的通配逻辑 |
| 67 | is_wildcard = False |
| 68 | search_target = args.query |
| 69 | if args.query and args.query.endswith('*'): |
| 70 | is_wildcard = True |
| 71 | search_target = args.query[:-1] # 去掉末尾的 * 以便进行精确子串匹配 |
| 72 | |
| 73 | with open(output_file, "w", encoding="utf-8") as f: |
| 74 | for pid in range(args.end, args.start - 1, -1): |
| 75 | base_parsed = urlparse(args.url) |
| 76 | query_dict = parse_qs(base_parsed.query) |
| 77 | query_dict['pid'] = [str(pid)] |
| 78 | new_query = urlencode(query_dict, doseq=True) |
| 79 | request_url = urlunparse(( |
| 80 | base_parsed.scheme, |
| 81 | base_parsed.netloc, |
| 82 | base_parsed.path, |
| 83 | base_parsed.params, |
| 84 | new_query, |
| 85 | base_parsed.fragment |
| 86 | )) |
| 87 | |
| 88 | print(f"\n===== PID={pid} =====") |
| 89 | print(f"-- 请求 URL: {request_url}") |
| 90 | |
| 91 | try: |
| 92 | resp1 = session.get(request_url, allow_redirects=False, timeout=10) |
| 93 | if resp1.status_code == 403: |
| 94 | raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden),脚本终止。") |
| 95 | |
| 96 | ver1 = get_http_version(resp1) |
| 97 | print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}") |
| 98 | for header, value in resp1.headers.items(): |
| 99 | print(f"{header}: {value}") |
| 100 | |
| 101 | if 'Location' not in resp1.headers: |
| 102 | print(f"PID={pid} 无跳转") |
| 103 | continue |
| 104 | |
| 105 | first_location = resp1.headers['Location'] |
| 106 | second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location) |
| 107 | |
| 108 | resp2 = session.get(second_url, allow_redirects=False, timeout=10) |
| 109 | |
| 110 | ver2 = get_http_version(resp2) |
| 111 | print(f"\n-- 请求 URL: {second_url}") |
| 112 | print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}") |
| 113 | for header, value in resp2.headers.items(): |
| 114 | print(f"{header}: {value}") |
| 115 | |
| 116 | if 'Location' in resp2.headers: |
| 117 | second_location = resp2.headers['Location'] |
| 118 | |
| 119 | # === 改造后的 query 匹配逻辑 === |
| 120 | if search_target and search_target not in second_location: |
| 121 | print(f"PID={pid} 最终 Location 不匹配目标规则 '{search_target}',跳过写入") |
| 122 | continue |
| 123 | # ================================= |
| 124 | |
| 125 | last_segment = extract_last_segment(second_location) |
| 126 | |
| 127 | if filters and any(filt in last_segment for filt in filters): |
| 128 | print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}") |
| 129 | continue |
| 130 | |
| 131 | md_link = f"[{last_segment}]({request_url}&aff={args.aff})" |
| 132 | f.write(md_link + "\n") |
| 133 | f.flush() |
| 134 | print(f"PID={pid} => 成功记录数据: {md_link}") |
| 135 | |
| 136 | # === 根据是否是通配符决定是否退出 === |
| 137 | if args.query: |
| 138 | if not is_wildcard: |
| 139 | print(f"\n[!] 已成功精准匹配到目标 '{args.query}',获取到最终数据,任务完成并退出。") |
| 140 | break # 没有星号,精确匹配一条即退出 |
| 141 | else: |
| 142 | print(f"[*] 匹配到通配目标 '{args.query}',已记录,继续往下查找...") |
| 143 | # ===================================== |
| 144 | |
| 145 | else: |
| 146 | print(f"PID={pid} 无第二次跳转") |
| 147 | |
| 148 | except requests.RequestException as e: |
| 149 | print(f"请求失败,PID={pid} 错误信息: {e}") |
| 150 | continue |
| 151 | |
| 152 | print(f"\n结果已保存到 {output_file}") |
| 153 | |
| 154 | if __name__ == "__main__": |
| 155 | main() |