import argparse import requests from urllib.parse import urljoin, urlparse, parse_qs, urlencode, urlunparse import sys session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", }) def is_valid_url(url): try: parts = urlparse(url) return parts.scheme in ('http', 'https') and parts.netloc != '' except Exception: return False def extract_last_segment(location): parsed = urlparse(location) if parsed.query: qs = parse_qs(parsed.query) if 'rp' in qs: path = qs['rp'][0] else: path = parsed.path else: path = parsed.path if path.endswith('/'): path = path[:-1] segments = path.split('/') return segments[-1] if segments else "" def get_http_version(resp): version_map = {10: "1.0", 11: "1.1", 20: "2"} try: return version_map.get(resp.raw.version, str(resp.raw.version)) except Exception: return "unknown" def main(): parser = argparse.ArgumentParser(description="根据pid范围请求URL并记录跳转结果") parser.add_argument('-s', '--start', type=int, required=True, help='起始 pid,必须为整数') parser.add_argument('-e', '--end', type=int, required=True, help='结束 pid,必须为整数且不小于起始 pid') parser.add_argument('-a', '--aff', type=str, required=True, help='aff 参数,字符串') parser.add_argument('-u', '--url', type=str, required=True, help='基础 URL,必须为合法 URL,例如 https://cloud.colocrossing.com/aff.php') parser.add_argument('--filter', type=str, default='', help='过滤关键字,逗号分隔,跳过包含关键字的跳转路径(黑名单)') parser.add_argument('-q', '--query', type=str, default='', help='匹配最终 Location 的关键字(白名单)。若以 "*" 结尾(如 "/store*")则为通配查找,匹配后继续执行;若无 "*" 则查找到一条就退出') args = parser.parse_args() if args.start > args.end: print("错误:起始 pid 不应大于结束 pid。") sys.exit(1) if not is_valid_url(args.url): print("错误:无效的 URL,请传入合法的 http 或 https URL。") sys.exit(1) filters =[x.strip() for x in args.filter.split(',') if x.strip()] output_file = "result.md" # 解析 query 参数的通配逻辑 is_wildcard = False search_target = args.query if args.query and args.query.endswith('*'): is_wildcard = True search_target = args.query[:-1] # 去掉末尾的 * 以便进行精确子串匹配 with open(output_file, "w", encoding="utf-8") as f: for pid in range(args.end, args.start - 1, -1): base_parsed = urlparse(args.url) query_dict = parse_qs(base_parsed.query) query_dict['pid'] = [str(pid)] new_query = urlencode(query_dict, doseq=True) request_url = urlunparse(( base_parsed.scheme, base_parsed.netloc, base_parsed.path, base_parsed.params, new_query, base_parsed.fragment )) print(f"\n===== PID={pid} =====") print(f"-- 请求 URL: {request_url}") try: resp1 = session.get(request_url, allow_redirects=False, timeout=10) if resp1.status_code == 403: raise SystemExit(f"PID={pid} 被 Cloudflare 拦截(403 Forbidden),脚本终止。") ver1 = get_http_version(resp1) print(f"HTTP/{ver1} {resp1.status_code} {resp1.reason}") for header, value in resp1.headers.items(): print(f"{header}: {value}") if 'Location' not in resp1.headers: print(f"PID={pid} 无跳转") continue first_location = resp1.headers['Location'] second_url = first_location if first_location.startswith("http") else urljoin(request_url, first_location) resp2 = session.get(second_url, allow_redirects=False, timeout=10) ver2 = get_http_version(resp2) print(f"\n-- 请求 URL: {second_url}") print(f"HTTP/{ver2} {resp2.status_code} {resp2.reason}") for header, value in resp2.headers.items(): print(f"{header}: {value}") if 'Location' in resp2.headers: second_location = resp2.headers['Location'] # === 改造后的 query 匹配逻辑 === if search_target and search_target not in second_location: print(f"PID={pid} 最终 Location 不匹配目标规则 '{search_target}',跳过写入") continue # ================================= last_segment = extract_last_segment(second_location) if filters and any(filt in last_segment for filt in filters): print(f"PID={pid} 跳转路径包含过滤关键词,跳过写入: {last_segment}") continue md_link = f"[{last_segment}]({request_url}&aff={args.aff})" f.write(md_link + "\n") f.flush() print(f"PID={pid} => 成功记录数据: {md_link}") # === 根据是否是通配符决定是否退出 === if args.query: if not is_wildcard: print(f"\n[!] 已成功精准匹配到目标 '{args.query}',获取到最终数据,任务完成并退出。") break # 没有星号,精确匹配一条即退出 else: print(f"[*] 匹配到通配目标 '{args.query}',已记录,继续往下查找...") # ===================================== else: print(f"PID={pid} 无第二次跳转") except requests.RequestException as e: print(f"请求失败,PID={pid} 错误信息: {e}") continue print(f"\n结果已保存到 {output_file}") if __name__ == "__main__": main()