import subprocess import sys import os import re import urllib.parse """ torrent_to_magnet_batch.py 功能: 批量扫描指定目录及其所有子目录下的 .torrent 文件, 使用 aria2c 将每个 .torrent 文件转换成磁力链接(magnet URI), 并尝试从磁力链接中提取显示名称 (dn) 作为新的文件名重命名原 .torrent 文件。 最终将所有成功转换的文件名及对应的磁力链接按文件名排序,写入指定的日志文件。 每个条目格式为: 文件名 磁力链接 条目之间以空行分隔。 转换失败的文件会在日志中标注为“Failed to convert”。 用法: python torrent_to_magnet_batch.py <目录路径> 例如: python torrent_to_magnet_batch.py /path/to/torrents 依赖: - Python 3 标准库 - 系统需安装 aria2c 命令行工具,并确保可通过环境变量调用 限制: - 磁力链接中默认不包含文件大小,脚本不尝试获取或记录文件大小信息。 - 需要网络环境正常,aria2c 能够成功下载种子元信息(metadata)才能正确提取磁力链接。 - 如果 aria2c 不可用或转换失败,该文件会被记录为转换失败。 日志输出: 输出日志文件路径为固定名称 output.log,位于当前运行目录。 示例输出: Ubuntu 20.04 ISO magnet:?xt=urn:btih:... Another File magnet:?xt=urn:btih:... 作者: ChatGPT(OpenAI) 日期: 2025年7月27日 """ def extract_dn_from_magnet(magnet_link): """Extract the 'dn' (display name) from magnet URI""" parsed = urllib.parse.urlparse(magnet_link) params = urllib.parse.parse_qs(parsed.query) dn = params.get('dn', [None])[0] return dn def torrent_to_magnet(torrent_file): try: if not os.path.exists(torrent_file): return None, None, None subprocess.run(['aria2c', '--version'], capture_output=True, check=True) cmd = ['aria2c', '--bt-metadata-only=true', '--bt-save-metadata=false', '--show-files=true', torrent_file] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return None, None, None output = result.stdout # Extract magnet link magnet_link = None for line in output.splitlines(): if line.startswith('Magnet URI:'): magnet_link = line.replace('Magnet URI:', '').strip() break # Extract actual name from .torrent (fallback: filename) name = os.path.splitext(os.path.basename(torrent_file))[0] # Extract size total_size = 0 file_line_pattern = re.compile(r'^\s+\d+\|\s+(.+?)\s+\|\s+([\d.]+)\s+([A-Z]+)$') for line in output.splitlines(): match = file_line_pattern.match(line) if match: size_value = float(match.group(2)) unit = match.group(3) total_size += size_to_bytes(size_value, unit) size_str = human_readable_size(total_size) if total_size > 0 else "Unknown" return name, magnet_link, size_str except Exception: return None, None, None def size_to_bytes(value, unit): units = ['B', 'KB', 'MB', 'GB', 'TB'] try: index = units.index(unit.upper()) return int(value * (1024 ** index)) except ValueError: return 0 def human_readable_size(size_bytes): for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} PB" def process_directory(directory, log_file_path): results = [] for root, _, files in os.walk(directory): for file in files: if file.lower().endswith('.torrent'): torrent_path = os.path.join(root, file) name, magnet, _ = torrent_to_magnet(torrent_path) if name and magnet: real_name = extract_dn_from_magnet(magnet) if real_name: new_filename = f"{real_name}.torrent" new_path = os.path.join(root, new_filename) if not os.path.exists(new_path): try: os.rename(torrent_path, new_path) torrent_path = new_path name = real_name except Exception as e: print(f"Failed to rename: {file} -> {new_filename}: {e}") else: print(f"Skip rename: {new_filename} already exists") results.append((name, magnet)) else: results.append((file, "Failed to convert")) # 按文件名排序(忽略大小写) results.sort(key=lambda x: x[0].lower()) # 写入日志 with open(log_file_path, 'w', encoding='utf-8') as log_file: for name, magnet in results: log_file.write(f"{name}\n{magnet}\n\n") def main(): if len(sys.argv) < 2: print("Usage: python torrent_to_magnet_batch.py [output.log]") sys.exit(1) directory = sys.argv[1] log_file_path = sys.argv[2] if len(sys.argv) >= 3 else "output.log" if not os.path.isdir(directory): print(f"Error: '{directory}' is not a valid directory") sys.exit(1) process_directory(directory, log_file_path) print(f"Conversion completed. Output saved to: {log_file_path}") if __name__ == "__main__": main()