torrent_to_magnet_batch.py
· 5.6 KiB · Python
Bruto
import subprocess
import sys
import os
import re
import urllib.parse
"""
torrent_to_magnet_batch.py
功能:
批量扫描指定目录及其所有子目录下的 .torrent 文件,
使用 aria2c 将每个 .torrent 文件转换成磁力链接(magnet URI),
并尝试从磁力链接中提取显示名称 (dn) 作为新的文件名重命名原 .torrent 文件。
最终将所有成功转换的文件名及对应的磁力链接按文件名排序,写入指定的日志文件。
每个条目格式为:
文件名
磁力链接
条目之间以空行分隔。
转换失败的文件会在日志中标注为“Failed to convert”。
用法:
python torrent_to_magnet_batch.py <目录路径>
例如:
python torrent_to_magnet_batch.py /path/to/torrents
依赖:
- Python 3 标准库
- 系统需安装 aria2c 命令行工具,并确保可通过环境变量调用
限制:
- 磁力链接中默认不包含文件大小,脚本不尝试获取或记录文件大小信息。
- 需要网络环境正常,aria2c 能够成功下载种子元信息(metadata)才能正确提取磁力链接。
- 如果 aria2c 不可用或转换失败,该文件会被记录为转换失败。
日志输出:
输出日志文件路径为固定名称 output.log,位于当前运行目录。
示例输出:
Ubuntu 20.04 ISO
magnet:?xt=urn:btih:...
Another File
magnet:?xt=urn:btih:...
作者:
ChatGPT(OpenAI)
日期:
2025年7月27日
"""
def extract_dn_from_magnet(magnet_link):
"""Extract the 'dn' (display name) from magnet URI"""
parsed = urllib.parse.urlparse(magnet_link)
params = urllib.parse.parse_qs(parsed.query)
dn = params.get('dn', [None])[0]
return dn
def torrent_to_magnet(torrent_file):
try:
if not os.path.exists(torrent_file):
return None, None, None
subprocess.run(['aria2c', '--version'], capture_output=True, check=True)
cmd = ['aria2c', '--bt-metadata-only=true', '--bt-save-metadata=false',
'--show-files=true', torrent_file]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return None, None, None
output = result.stdout
# Extract magnet link
magnet_link = None
for line in output.splitlines():
if line.startswith('Magnet URI:'):
magnet_link = line.replace('Magnet URI:', '').strip()
break
# Extract actual name from .torrent (fallback: filename)
name = os.path.splitext(os.path.basename(torrent_file))[0]
# Extract size
total_size = 0
file_line_pattern = re.compile(r'^\s+\d+\|\s+(.+?)\s+\|\s+([\d.]+)\s+([A-Z]+)$')
for line in output.splitlines():
match = file_line_pattern.match(line)
if match:
size_value = float(match.group(2))
unit = match.group(3)
total_size += size_to_bytes(size_value, unit)
size_str = human_readable_size(total_size) if total_size > 0 else "Unknown"
return name, magnet_link, size_str
except Exception:
return None, None, None
def size_to_bytes(value, unit):
units = ['B', 'KB', 'MB', 'GB', 'TB']
try:
index = units.index(unit.upper())
return int(value * (1024 ** index))
except ValueError:
return 0
def human_readable_size(size_bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def process_directory(directory, log_file_path):
results = []
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith('.torrent'):
torrent_path = os.path.join(root, file)
name, magnet, _ = torrent_to_magnet(torrent_path)
if name and magnet:
real_name = extract_dn_from_magnet(magnet)
if real_name:
new_filename = f"{real_name}.torrent"
new_path = os.path.join(root, new_filename)
if not os.path.exists(new_path):
try:
os.rename(torrent_path, new_path)
torrent_path = new_path
name = real_name
except Exception as e:
print(f"Failed to rename: {file} -> {new_filename}: {e}")
else:
print(f"Skip rename: {new_filename} already exists")
results.append((name, magnet))
else:
results.append((file, "Failed to convert"))
# 按文件名排序(忽略大小写)
results.sort(key=lambda x: x[0].lower())
# 写入日志
with open(log_file_path, 'w', encoding='utf-8') as log_file:
for name, magnet in results:
log_file.write(f"{name}\n{magnet}\n\n")
def main():
if len(sys.argv) < 2:
print("Usage: python torrent_to_magnet_batch.py <directory_path> [output.log]")
sys.exit(1)
directory = sys.argv[1]
log_file_path = sys.argv[2] if len(sys.argv) >= 3 else "output.log"
if not os.path.isdir(directory):
print(f"Error: '{directory}' is not a valid directory")
sys.exit(1)
process_directory(directory, log_file_path)
print(f"Conversion completed. Output saved to: {log_file_path}")
if __name__ == "__main__":
main()
| 1 | import subprocess |
| 2 | import sys |
| 3 | import os |
| 4 | import re |
| 5 | import urllib.parse |
| 6 | |
| 7 | """ |
| 8 | torrent_to_magnet_batch.py |
| 9 | |
| 10 | 功能: |
| 11 | 批量扫描指定目录及其所有子目录下的 .torrent 文件, |
| 12 | 使用 aria2c 将每个 .torrent 文件转换成磁力链接(magnet URI), |
| 13 | 并尝试从磁力链接中提取显示名称 (dn) 作为新的文件名重命名原 .torrent 文件。 |
| 14 | |
| 15 | 最终将所有成功转换的文件名及对应的磁力链接按文件名排序,写入指定的日志文件。 |
| 16 | 每个条目格式为: |
| 17 | 文件名 |
| 18 | 磁力链接 |
| 19 | |
| 20 | 条目之间以空行分隔。 |
| 21 | |
| 22 | 转换失败的文件会在日志中标注为“Failed to convert”。 |
| 23 | |
| 24 | 用法: |
| 25 | python torrent_to_magnet_batch.py <目录路径> |
| 26 | |
| 27 | 例如: |
| 28 | python torrent_to_magnet_batch.py /path/to/torrents |
| 29 | |
| 30 | 依赖: |
| 31 | - Python 3 标准库 |
| 32 | - 系统需安装 aria2c 命令行工具,并确保可通过环境变量调用 |
| 33 | |
| 34 | 限制: |
| 35 | - 磁力链接中默认不包含文件大小,脚本不尝试获取或记录文件大小信息。 |
| 36 | - 需要网络环境正常,aria2c 能够成功下载种子元信息(metadata)才能正确提取磁力链接。 |
| 37 | - 如果 aria2c 不可用或转换失败,该文件会被记录为转换失败。 |
| 38 | |
| 39 | 日志输出: |
| 40 | 输出日志文件路径为固定名称 output.log,位于当前运行目录。 |
| 41 | |
| 42 | 示例输出: |
| 43 | Ubuntu 20.04 ISO |
| 44 | magnet:?xt=urn:btih:... |
| 45 | |
| 46 | Another File |
| 47 | magnet:?xt=urn:btih:... |
| 48 | |
| 49 | 作者: |
| 50 | ChatGPT(OpenAI) |
| 51 | |
| 52 | 日期: |
| 53 | 2025年7月27日 |
| 54 | """ |
| 55 | |
| 56 | |
| 57 | def extract_dn_from_magnet(magnet_link): |
| 58 | """Extract the 'dn' (display name) from magnet URI""" |
| 59 | parsed = urllib.parse.urlparse(magnet_link) |
| 60 | params = urllib.parse.parse_qs(parsed.query) |
| 61 | dn = params.get('dn', [None])[0] |
| 62 | return dn |
| 63 | |
| 64 | def torrent_to_magnet(torrent_file): |
| 65 | try: |
| 66 | if not os.path.exists(torrent_file): |
| 67 | return None, None, None |
| 68 | |
| 69 | subprocess.run(['aria2c', '--version'], capture_output=True, check=True) |
| 70 | |
| 71 | cmd = ['aria2c', '--bt-metadata-only=true', '--bt-save-metadata=false', |
| 72 | '--show-files=true', torrent_file] |
| 73 | |
| 74 | result = subprocess.run(cmd, capture_output=True, text=True) |
| 75 | |
| 76 | if result.returncode != 0: |
| 77 | return None, None, None |
| 78 | |
| 79 | output = result.stdout |
| 80 | |
| 81 | # Extract magnet link |
| 82 | magnet_link = None |
| 83 | for line in output.splitlines(): |
| 84 | if line.startswith('Magnet URI:'): |
| 85 | magnet_link = line.replace('Magnet URI:', '').strip() |
| 86 | break |
| 87 | |
| 88 | # Extract actual name from .torrent (fallback: filename) |
| 89 | name = os.path.splitext(os.path.basename(torrent_file))[0] |
| 90 | |
| 91 | # Extract size |
| 92 | total_size = 0 |
| 93 | file_line_pattern = re.compile(r'^\s+\d+\|\s+(.+?)\s+\|\s+([\d.]+)\s+([A-Z]+)$') |
| 94 | |
| 95 | for line in output.splitlines(): |
| 96 | match = file_line_pattern.match(line) |
| 97 | if match: |
| 98 | size_value = float(match.group(2)) |
| 99 | unit = match.group(3) |
| 100 | total_size += size_to_bytes(size_value, unit) |
| 101 | |
| 102 | size_str = human_readable_size(total_size) if total_size > 0 else "Unknown" |
| 103 | |
| 104 | return name, magnet_link, size_str |
| 105 | |
| 106 | except Exception: |
| 107 | return None, None, None |
| 108 | |
| 109 | def size_to_bytes(value, unit): |
| 110 | units = ['B', 'KB', 'MB', 'GB', 'TB'] |
| 111 | try: |
| 112 | index = units.index(unit.upper()) |
| 113 | return int(value * (1024 ** index)) |
| 114 | except ValueError: |
| 115 | return 0 |
| 116 | |
| 117 | def human_readable_size(size_bytes): |
| 118 | for unit in ['B', 'KB', 'MB', 'GB', 'TB']: |
| 119 | if size_bytes < 1024.0: |
| 120 | return f"{size_bytes:.2f} {unit}" |
| 121 | size_bytes /= 1024.0 |
| 122 | return f"{size_bytes:.2f} PB" |
| 123 | |
| 124 | def process_directory(directory, log_file_path): |
| 125 | results = [] |
| 126 | |
| 127 | for root, _, files in os.walk(directory): |
| 128 | for file in files: |
| 129 | if file.lower().endswith('.torrent'): |
| 130 | torrent_path = os.path.join(root, file) |
| 131 | name, magnet, _ = torrent_to_magnet(torrent_path) |
| 132 | |
| 133 | if name and magnet: |
| 134 | real_name = extract_dn_from_magnet(magnet) |
| 135 | if real_name: |
| 136 | new_filename = f"{real_name}.torrent" |
| 137 | new_path = os.path.join(root, new_filename) |
| 138 | |
| 139 | if not os.path.exists(new_path): |
| 140 | try: |
| 141 | os.rename(torrent_path, new_path) |
| 142 | torrent_path = new_path |
| 143 | name = real_name |
| 144 | except Exception as e: |
| 145 | print(f"Failed to rename: {file} -> {new_filename}: {e}") |
| 146 | else: |
| 147 | print(f"Skip rename: {new_filename} already exists") |
| 148 | |
| 149 | results.append((name, magnet)) |
| 150 | else: |
| 151 | results.append((file, "Failed to convert")) |
| 152 | |
| 153 | # 按文件名排序(忽略大小写) |
| 154 | results.sort(key=lambda x: x[0].lower()) |
| 155 | |
| 156 | # 写入日志 |
| 157 | with open(log_file_path, 'w', encoding='utf-8') as log_file: |
| 158 | for name, magnet in results: |
| 159 | log_file.write(f"{name}\n{magnet}\n\n") |
| 160 | |
| 161 | def main(): |
| 162 | if len(sys.argv) < 2: |
| 163 | print("Usage: python torrent_to_magnet_batch.py <directory_path> [output.log]") |
| 164 | sys.exit(1) |
| 165 | |
| 166 | directory = sys.argv[1] |
| 167 | log_file_path = sys.argv[2] if len(sys.argv) >= 3 else "output.log" |
| 168 | |
| 169 | if not os.path.isdir(directory): |
| 170 | print(f"Error: '{directory}' is not a valid directory") |
| 171 | sys.exit(1) |
| 172 | |
| 173 | process_directory(directory, log_file_path) |
| 174 | print(f"Conversion completed. Output saved to: {log_file_path}") |
| 175 | |
| 176 | if __name__ == "__main__": |
| 177 | main() |
| 178 |