mirror of
https://github.com/Bao-qing/123pan.git
synced 2025-11-13 12:16:07 +08:00
1017 lines
39 KiB
Python
1017 lines
39 KiB
Python
import hashlib
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
import uuid
|
||
import requests
|
||
from typing import List, Dict, Tuple, Union, Optional
|
||
|
||
class Pan123:
|
||
DEVICE_TYPES = [
|
||
"24075RP89G", "24076RP19G", "24076RP19I", "M1805E10A", "M2004J11G", "M2012K11AG", "M2104K10I", "22021211RG",
|
||
"22021211RI", "21121210G", "23049PCD8G", "23049PCD8I", "23013PC75G", "24069PC21G", "24069PC21I",
|
||
"23113RKC6G", "M1912G7BI", "M2007J20CI", "M2007J20CG", "M2007J20CT", "M2102J20SG", "M2102J20SI",
|
||
"21061110AG", "2201116PG", "2201116PI", "22041216G", "22041216UG", "22111317PG", "22111317PI", "22101320G",
|
||
"22101320I", "23122PCD1G", "23122PCD1I", "2311DRK48G", "2311DRK48I", "2312FRAFDI", "M2004J19PI",
|
||
]
|
||
|
||
OS_VERSIONS = [
|
||
"Android_7.1.2", "Android_8.0.0", "Android_8.1.0", "Android_9.0", "Android_10", "Android_11", "Android_12",
|
||
"Android_13", "Android_6.0.1", "Android_5.1.1", "Android_4.4.4", "Android_4.3", "Android_4.2.2",
|
||
"Android_4.1.2",
|
||
]
|
||
|
||
def __init__(
|
||
self,
|
||
readfile: bool = True,
|
||
user_name: str = "",
|
||
pass_word: str = "",
|
||
authorization: str = "",
|
||
input_pwd: bool = True,
|
||
config_file: str = "123pan.txt",
|
||
protocol: str = "android", # 协议,默认为 android
|
||
):
|
||
self.config_file = config_file
|
||
self.protocol = protocol.lower() # 'android' 或 'web'
|
||
self.devicetype = random.choice(self.DEVICE_TYPES)
|
||
self.osversion = random.choice(self.OS_VERSIONS)
|
||
self.download_mode = 1
|
||
self.cookies = None
|
||
self.user_name = user_name
|
||
self.password = pass_word
|
||
self.authorization = authorization
|
||
self.parent_file_id = 0
|
||
self.parent_file_list = [0]
|
||
self.parent_file_name_list = []
|
||
self.all_file = False
|
||
self.file_page = 0
|
||
self.file_list = []
|
||
self.dir_list = []
|
||
self.name_dict = {}
|
||
self.list = []
|
||
self.total = 0
|
||
|
||
self._init_headers() # 根据 protocol 初始化 headers
|
||
|
||
load_code = 0
|
||
if readfile:
|
||
load_code = self._load_config()
|
||
if not load_code:
|
||
if not (user_name and pass_word) and input_pwd:
|
||
self.user_name = input("请输入用户名: ")
|
||
self.password = input("请输入密码: ")
|
||
elif not (user_name and pass_word):
|
||
raise ValueError("用户名和密码不能为空")
|
||
|
||
# 尝试获取目录,如果失败则登录
|
||
if self.get_dir()[0] != 0:
|
||
self.login()
|
||
self.get_dir()
|
||
|
||
def _init_headers(self):
|
||
"""初始化请求头,根据协议(android/web)选择不同 headers"""
|
||
android_headers = {
|
||
"user-agent": f"123pan/v2.4.0({self.osversion};Xiaomi)",
|
||
"authorization": self.authorization,
|
||
"accept-encoding": "gzip",
|
||
"content-type": "application/json",
|
||
"osversion": self.osversion,
|
||
"loginuuid": uuid.uuid4().hex,
|
||
"platform": "android",
|
||
"devicetype": self.devicetype,
|
||
"devicename": "Xiaomi",
|
||
"host": "www.123pan.com",
|
||
"app-version": "61",
|
||
"x-app-version": "2.4.0"
|
||
}
|
||
|
||
# Web 协议头,来源于 web.py 的 header_logined / header_only_usage(适配为字典)
|
||
web_headers = {
|
||
"Accept": "*/*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
||
"App-Version": "3",
|
||
# 同时设置大小写两种 Authorization 以兼容不同接口要求
|
||
# "Authorization": self.authorization,
|
||
"authorization": self.authorization,
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"LoginUuid": uuid.uuid4().hex,
|
||
"Pragma": "no-cache",
|
||
"Referer": "https://www.123pan.com/",
|
||
"Sec-Fetch-Dest": "empty",
|
||
"Sec-Fetch-Mode": "cors",
|
||
"Sec-Fetch-Site": "same-origin",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
|
||
"platform": "web",
|
||
"sec-ch-ua": "Microsoft",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "Windows",
|
||
"content-type": "application/json",
|
||
}
|
||
|
||
if getattr(self, "protocol", "android").lower() == "web":
|
||
self.headers = web_headers
|
||
else:
|
||
self.headers = android_headers
|
||
|
||
def _load_config(self):
|
||
"""从配置文件加载账号信息及 protocol"""
|
||
try:
|
||
if os.path.exists(self.config_file):
|
||
with open(self.config_file, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
self.user_name = config.get("userName", self.user_name)
|
||
self.password = config.get("passWord", self.password)
|
||
self.authorization = config.get("authorization", self.authorization)
|
||
self.devicetype = config.get("deviceType", self.devicetype)
|
||
self.osversion = config.get("osVersion", self.osversion)
|
||
# 新增 protocol 支持
|
||
self.protocol = config.get("protocol", getattr(self, "protocol", "android")).lower()
|
||
# 重新初始化 headers(确保 authorization 和 protocol 生效)
|
||
self._init_headers()
|
||
# 确保 headers 中 authorization 同步
|
||
if "authorization" in self.headers:
|
||
self.headers["authorization"] = self.authorization
|
||
if "Authorization" in self.headers:
|
||
self.headers["Authorization"] = self.authorization
|
||
print("配置加载成功")
|
||
return 1
|
||
else:
|
||
print("配置文件不存在,将使用传入参数")
|
||
return 0
|
||
except Exception as e:
|
||
print(f"加载配置失败: {e}")
|
||
return 0
|
||
|
||
def _save_config(self):
|
||
"""保存账号信息到配置文件(包含 protocol)"""
|
||
config = {
|
||
"userName": self.user_name,
|
||
"passWord": self.password,
|
||
"authorization": self.authorization,
|
||
"deviceType": self.devicetype,
|
||
"osVersion": self.osversion,
|
||
"protocol": getattr(self, "protocol", "android"),
|
||
}
|
||
try:
|
||
with open(self.config_file, "w", encoding="utf-8") as f:
|
||
json.dump(config, f)
|
||
print("账号信息已保存")
|
||
except Exception as e:
|
||
print(f"保存配置失败: {e}")
|
||
|
||
def _handle_response(self, response: requests.Response) -> dict:
|
||
"""处理API响应"""
|
||
try:
|
||
data = response.json()
|
||
if data.get("code") != 0 and data.get("code") != 200:
|
||
print(f"API错误: {data.get('message', '未知错误')} (代码: {data.get('code')})")
|
||
return data
|
||
except json.JSONDecodeError:
|
||
print("响应解析错误")
|
||
return {"code": -1, "message": "响应解析错误"}
|
||
|
||
def login(self) -> int:
|
||
"""用户登录"""
|
||
data = {"type": 1, "passport": self.user_name, "password": self.password}
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/b/api/user/sign_in",
|
||
headers=self.headers,
|
||
json=data,
|
||
timeout=15
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 200:
|
||
# 更新授权令牌
|
||
token = result["data"]["token"]
|
||
self.authorization = f"Bearer {token}"
|
||
self.headers["authorization"] = self.authorization
|
||
|
||
# 保存cookie
|
||
if "Set-Cookie" in response.headers:
|
||
self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
|
||
|
||
self._save_config()
|
||
print("登录成功")
|
||
return 0
|
||
return result.get("code", -1)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"登录请求失败: {e}")
|
||
return -1
|
||
|
||
def get_dir(self, save: bool = True) -> Tuple[int, list]:
|
||
"""获取当前目录内容"""
|
||
return self.get_dir_by_id(self.parent_file_id, save)
|
||
|
||
def get_dir_by_id(
|
||
self,
|
||
file_id: int,
|
||
save: bool = True,
|
||
all_files: bool = False,
|
||
limit: int = 100
|
||
) -> Tuple[int, list]:
|
||
"""获取指定目录内容"""
|
||
page = self.file_page * 3 + 1 if not all_files else 1
|
||
current_count = len(self.list) if all_files else 0
|
||
items = []
|
||
total = -1
|
||
attempts = 0
|
||
|
||
while (current_count < total or total == -1) and (attempts < 3 or all_files):
|
||
params = {
|
||
"driveId": 0,
|
||
"limit": limit,
|
||
"next": 0,
|
||
"orderBy": "file_id",
|
||
"orderDirection": "desc",
|
||
"parentFileId": str(file_id),
|
||
"trashed": False,
|
||
"SearchData": "",
|
||
"Page": str(page),
|
||
"OnlyLookAbnormalFile": 0,
|
||
}
|
||
|
||
try:
|
||
response = requests.get(
|
||
"https://www.123pan.com/api/file/list/new",
|
||
headers=self.headers,
|
||
params=params,
|
||
timeout=30
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") != 0:
|
||
return result.get("code", -1), []
|
||
|
||
page_items = result["data"]["InfoList"]
|
||
items.extend(page_items)
|
||
total = result["data"]["Total"]
|
||
current_count += len(page_items)
|
||
page += 1
|
||
attempts += 1
|
||
|
||
if attempts % 5 == 0:
|
||
print(f"获取文件中: {current_count}/{total},暂停10秒...")
|
||
time.sleep(10)
|
||
except requests.exceptions.RequestException:
|
||
print("连接失败")
|
||
return -1, []
|
||
|
||
self.all_file = current_count >= total
|
||
self.total = total
|
||
self.file_page += 1 if not all_files else 0
|
||
|
||
if save:
|
||
self.list.extend(items)
|
||
|
||
return 0, items
|
||
|
||
def show(self):
|
||
"""显示当前目录内容"""
|
||
if not self.list:
|
||
print("当前目录为空")
|
||
return
|
||
|
||
print("\n" + "=" * 60)
|
||
print(f"当前路径: /{'/'.join(self.parent_file_name_list)}")
|
||
print("-" * 60)
|
||
print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
|
||
print("-" * 60)
|
||
|
||
for idx, item in enumerate(self.list, 1):
|
||
item_type = "文件夹" if item["Type"] == 1 else "文件"
|
||
size = self._format_size(item["Size"])
|
||
|
||
# 使用颜色区分类型
|
||
color_code = "\033[35m" if item["Type"] == 1 else "\033[33m"
|
||
reset_code = "\033[0m"
|
||
|
||
print(f"{color_code}{idx:<6}{item_type:<8}{size:<12}{item['FileName']}{reset_code}")
|
||
|
||
if not self.all_file:
|
||
remaining = self.total - len(self.list)
|
||
print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
|
||
print("=" * 60 + "\n")
|
||
|
||
def _format_size(self, size_bytes: int) -> str:
|
||
"""格式化文件大小"""
|
||
if size_bytes >= 1073741824:
|
||
return f"{size_bytes / 1073741824:.2f} GB"
|
||
elif size_bytes >= 1048576:
|
||
return f"{size_bytes / 1048576:.2f} MB"
|
||
elif size_bytes >= 1024:
|
||
return f"{size_bytes / 1024:.2f} KB"
|
||
return f"{size_bytes} B"
|
||
|
||
def delete_file(self, file_id: Union[int, dict], by_index: bool = True, delete: bool = True) -> bool:
|
||
"""删除或恢复文件"""
|
||
if by_index:
|
||
if not (0 <= file_id < len(self.list)):
|
||
print("无效的文件编号")
|
||
return False
|
||
file_data = self.list[file_id]
|
||
else:
|
||
file_data = file_id
|
||
|
||
payload = {
|
||
"driveId": 0,
|
||
"fileTrashInfoList": file_data,
|
||
"operation": delete,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/a/api/file/trash",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=10
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 0:
|
||
action = "删除" if delete else "恢复"
|
||
print(f"文件 {file_data['FileName']} {action}成功")
|
||
return True
|
||
return False
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"删除操作失败: {e}")
|
||
return False
|
||
|
||
def share(self, file_ids: List[int], share_pwd: str = "") -> Optional[str]:
|
||
"""创建分享链接"""
|
||
if not file_ids:
|
||
print("未选择文件")
|
||
return None
|
||
|
||
file_names = [self.list[i]["FileName"] for i in file_ids]
|
||
print("分享文件:", ", ".join(file_names))
|
||
|
||
payload = {
|
||
"driveId": 0,
|
||
"expiration": "2099-12-12T08:00:00+08:00",
|
||
"fileIdList": ",".join(str(self.list[i]["FileId"]) for i in file_ids),
|
||
"shareName": "分享文件",
|
||
"sharePwd": share_pwd,
|
||
"event": "shareCreate"
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/a/api/share/create",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=15
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 0:
|
||
share_key = result["data"]["ShareKey"]
|
||
share_url = f"https://www.123pan.com/s/{share_key}"
|
||
print(f"分享创建成功!\n链接: {share_url}")
|
||
if share_pwd:
|
||
print(f"提取码: {share_pwd}")
|
||
return share_url
|
||
return None
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"创建分享失败: {e}")
|
||
return None
|
||
|
||
def upload(self, file_path: str) -> bool:
|
||
"""上传文件"""
|
||
file_path = file_path.strip().replace('"', "").replace("\\", "/")
|
||
if not os.path.exists(file_path):
|
||
print("文件不存在")
|
||
return False
|
||
|
||
if os.path.isdir(file_path):
|
||
print("暂不支持文件夹上传")
|
||
return False
|
||
|
||
file_name = os.path.basename(file_path)
|
||
file_size = os.path.getsize(file_path)
|
||
|
||
# 计算文件MD5
|
||
md5_hash = hashlib.md5()
|
||
try:
|
||
with open(file_path, "rb") as f:
|
||
while chunk := f.read(65536):
|
||
md5_hash.update(chunk)
|
||
file_md5 = md5_hash.hexdigest()
|
||
except IOError as e:
|
||
print(f"读取文件失败: {e}")
|
||
return False
|
||
|
||
# 上传请求
|
||
payload = {
|
||
"driveId": 0,
|
||
"etag": file_md5,
|
||
"fileName": file_name,
|
||
"parentFileId": self.parent_file_id,
|
||
"size": file_size,
|
||
"type": 0,
|
||
"duplicate": 0,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/b/api/file/upload_request",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=15
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 5060: # 文件已存在
|
||
choice = input("检测到同名文件,输入 1 覆盖,2 保留两者,其他取消: ")
|
||
if choice == "1":
|
||
payload["duplicate"] = 1
|
||
elif choice == "2":
|
||
payload["duplicate"] = 2
|
||
else:
|
||
print("上传取消")
|
||
return False
|
||
|
||
response = requests.post(
|
||
"https://www.123pan.com/b/api/file/upload_request",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=15
|
||
)
|
||
result = self._handle_response(response)
|
||
|
||
if result.get("code") != 0:
|
||
return False
|
||
|
||
# 检查是否MD5复用
|
||
if result["data"].get("Reuse", False):
|
||
print("文件已存在,MD5复用成功")
|
||
return True
|
||
|
||
# 分块上传
|
||
return self._upload_chunks(
|
||
file_path,
|
||
result["data"]["Bucket"],
|
||
result["data"]["StorageNode"],
|
||
result["data"]["Key"],
|
||
result["data"]["UploadId"],
|
||
result["data"]["FileId"]
|
||
)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"上传失败: {e}")
|
||
return False
|
||
|
||
def _upload_chunks(
|
||
self,
|
||
file_path: str,
|
||
bucket: str,
|
||
storage_node: str,
|
||
key: str,
|
||
upload_id: str,
|
||
file_id: str
|
||
) -> bool:
|
||
"""分块上传文件"""
|
||
chunk_size = 5 * 1024 * 1024 # 5MB
|
||
total_size = os.path.getsize(file_path)
|
||
uploaded = 0
|
||
part_number = 1
|
||
|
||
try:
|
||
with open(file_path, "rb") as f:
|
||
while True:
|
||
chunk = f.read(chunk_size)
|
||
if not chunk:
|
||
break
|
||
|
||
# 获取上传URL
|
||
url_data = {
|
||
"bucket": bucket,
|
||
"key": key,
|
||
"partNumberEnd": part_number + 1,
|
||
"partNumberStart": part_number,
|
||
"uploadId": upload_id,
|
||
"StorageNode": storage_node,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch",
|
||
headers=self.headers,
|
||
json=url_data,
|
||
timeout=15
|
||
)
|
||
url_result = self._handle_response(response)
|
||
if url_result.get("code") != 0:
|
||
return False
|
||
|
||
upload_url = url_result["data"]["presignedUrls"][str(part_number)]
|
||
except requests.exceptions.RequestException:
|
||
return False
|
||
|
||
# 上传分块
|
||
try:
|
||
upload_response = requests.put(upload_url, data=chunk, timeout=30)
|
||
if upload_response.status_code not in (200, 201):
|
||
return False
|
||
except requests.exceptions.RequestException:
|
||
return False
|
||
|
||
uploaded += len(chunk)
|
||
progress = uploaded / total_size * 100
|
||
print(f"\r上传进度: {progress:.1f}%", end="", flush=True)
|
||
part_number += 1
|
||
|
||
print("\n上传完成,正在验证...")
|
||
time.sleep(1) # 等待服务器处理
|
||
|
||
# 完成上传
|
||
compmultipart_up_url = (
|
||
"https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
|
||
)
|
||
requests.post(
|
||
compmultipart_up_url,
|
||
headers=self.headers,
|
||
data=json.dumps({
|
||
"bucket": bucket,
|
||
"key": key,
|
||
"uploadId": upload_id,
|
||
"StorageNode": storage_node,
|
||
}),
|
||
timeout=10
|
||
)
|
||
complete_data = {"fileId": file_id}
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/b/api/file/upload_complete",
|
||
headers=self.headers,
|
||
json=complete_data,
|
||
timeout=15
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 0:
|
||
print("文件上传成功")
|
||
return True
|
||
return False
|
||
except requests.exceptions.RequestException:
|
||
return False
|
||
except IOError as e:
|
||
print(f"读取文件失败: {e}")
|
||
return False
|
||
|
||
def change_directory(self, target: str):
|
||
"""改变当前目录"""
|
||
if target == "..":
|
||
if len(self.parent_file_list) > 1:
|
||
self.parent_file_list.pop()
|
||
self.parent_file_id = self.parent_file_list[-1]
|
||
self.parent_file_name_list.pop()
|
||
self.refresh_directory()
|
||
else:
|
||
print("已经是根目录")
|
||
elif target == "/":
|
||
self.parent_file_id = 0
|
||
self.parent_file_list = [0]
|
||
self.parent_file_name_list = []
|
||
self.refresh_directory()
|
||
elif target.isdigit():
|
||
idx = int(target) - 1
|
||
if 0 <= idx < len(self.list) and self.list[idx]["Type"] == 1:
|
||
self.parent_file_id = self.list[idx]["FileId"]
|
||
self.parent_file_list.append(self.parent_file_id)
|
||
self.parent_file_name_list.append(self.list[idx]["FileName"])
|
||
self.refresh_directory()
|
||
else:
|
||
print("无效的目录编号或不是文件夹")
|
||
|
||
else:
|
||
print("无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号")
|
||
|
||
def refresh_directory(self):
|
||
"""刷新当前目录内容"""
|
||
self.all_file = False
|
||
self.file_page = 0
|
||
self.list = []
|
||
self.get_dir()
|
||
self.show()
|
||
|
||
def create_directory(self, name: str) -> bool:
|
||
"""创建新目录"""
|
||
if not name:
|
||
print("目录名不能为空")
|
||
return False
|
||
|
||
# 检查是否已存在
|
||
for item in self.list:
|
||
if item["FileName"] == name and item["Type"] == 1:
|
||
print("目录已存在")
|
||
return True
|
||
|
||
payload = {
|
||
"driveId": 0,
|
||
"etag": "",
|
||
"fileName": name,
|
||
"parentFileId": self.parent_file_id,
|
||
"size": 0,
|
||
"type": 1,
|
||
"duplicate": 1,
|
||
"NotReuse": True,
|
||
"event": "newCreateFolder",
|
||
"operateType": 1,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/a/api/file/upload_request",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=10
|
||
)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 0:
|
||
print(f"目录 '{name}' 创建成功")
|
||
self.get_dir()
|
||
return True
|
||
return False
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"创建目录失败: {e}")
|
||
return False
|
||
|
||
def get_download_link(self, file_index: int) -> Optional[str]:
|
||
"""获取文件下载链接"""
|
||
if not (0 <= file_index < len(self.list)):
|
||
print("无效的文件编号")
|
||
return None
|
||
|
||
file_detail = self.list[file_index]
|
||
|
||
if file_detail["Type"] == 1: # 文件夹
|
||
url = "https://www.123pan.com/a/api/file/batch_download_info"
|
||
data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
|
||
else: # 文件
|
||
url = "https://www.123pan.com/a/api/file/download_info"
|
||
data = {
|
||
"driveId": 0,
|
||
"etag": file_detail["Etag"],
|
||
"fileId": file_detail["FileId"],
|
||
"s3keyFlag": file_detail["S3KeyFlag"],
|
||
"type": file_detail["Type"],
|
||
"fileName": file_detail["FileName"],
|
||
"size": file_detail["Size"],
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, headers=self.headers, json=data, timeout=15)
|
||
result = self._handle_response(response)
|
||
if result.get("code") != 0:
|
||
return None
|
||
|
||
download_url = result["data"]["DownloadUrl"]
|
||
# 获取重定向后的真实下载链接
|
||
redirect_response = requests.get(download_url, allow_redirects=False, timeout=15)
|
||
if redirect_response.status_code == 302:
|
||
return redirect_response.headers.get("Location")
|
||
|
||
# 尝试从HTML中提取下载链接
|
||
url_pattern = re.compile(r"href='(https?://[^']+)'")
|
||
match = url_pattern.search(redirect_response.text)
|
||
if match:
|
||
return match.group(1)
|
||
|
||
return None
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"获取下载链接失败: {e}")
|
||
return None
|
||
|
||
def download_file(self, file_index: int, download_path: str = "download") -> bool:
|
||
"""下载单个文件"""
|
||
if not (0 <= file_index < len(self.list)):
|
||
print("无效的文件编号")
|
||
return False
|
||
|
||
file_detail = self.list[file_index]
|
||
|
||
# 文件夹需要特殊处理
|
||
if file_detail["Type"] == 1:
|
||
print("文件夹下载:")
|
||
return self.download_directory(file_detail, download_path)
|
||
|
||
# 获取下载链接
|
||
download_url = self.get_download_link(file_index)
|
||
if not download_url:
|
||
print("无法获取下载链接")
|
||
return False
|
||
|
||
# 确定文件名
|
||
file_name = file_detail["FileName"]
|
||
if not os.path.exists(download_path):
|
||
os.makedirs(download_path)
|
||
|
||
# 处理文件已存在的情况
|
||
full_path = os.path.join(download_path, file_name)
|
||
if os.path.exists(full_path):
|
||
if self.download_mode == 4: # 全部跳过
|
||
print(f"文件已存在,跳过: {file_name}")
|
||
return True
|
||
|
||
print(f"文件已存在: {file_name}")
|
||
choice = input("输入1覆盖,2跳过,3全部覆盖,4全部跳过: ")
|
||
if choice == "2" or choice == "4":
|
||
if choice == "4":
|
||
self.download_mode = 4
|
||
print("跳过下载")
|
||
return True
|
||
elif choice == "3":
|
||
self.download_mode = 3
|
||
os.remove(full_path)
|
||
|
||
# 临时文件名
|
||
temp_path = full_path + ".123pan"
|
||
|
||
try:
|
||
# 开始下载
|
||
print(f"开始下载: {file_name}")
|
||
response = requests.get(download_url, stream=True, timeout=30)
|
||
total_size = int(response.headers.get("Content-Length", 0))
|
||
downloaded = 0
|
||
start_time = time.time()
|
||
|
||
with open(temp_path, "wb") as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
downloaded += len(chunk)
|
||
|
||
# 计算下载速度
|
||
elapsed = time.time() - start_time
|
||
if elapsed > 0:
|
||
speed = downloaded / elapsed
|
||
speed_str = self._format_size(speed) + "/s"
|
||
else:
|
||
speed_str = "未知"
|
||
|
||
# 显示进度
|
||
if total_size > 0:
|
||
percent = downloaded / total_size * 100
|
||
print(f"\r进度: {percent:.1f}% | {self._format_size(downloaded)}/{self._format_size(total_size)} | {speed_str}", end=" ")
|
||
|
||
# 重命名文件
|
||
os.rename(temp_path, full_path)
|
||
print(f"\n下载完成: {file_name}")
|
||
return True
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"\n下载失败: {e}")
|
||
if os.path.exists(temp_path):
|
||
os.remove(temp_path)
|
||
return False
|
||
|
||
def download_directory(self, directory: dict, download_path: str = "download") -> bool:
|
||
"""下载整个目录"""
|
||
if directory["Type"] != 1:
|
||
print("不是文件夹")
|
||
return False
|
||
|
||
print(f"开始下载文件夹: {directory['FileName']}")
|
||
|
||
# 创建目标目录
|
||
target_dir = os.path.join(download_path, directory["FileName"])
|
||
if not os.path.exists(target_dir):
|
||
os.makedirs(target_dir)
|
||
|
||
# 获取目录内容
|
||
_, items = self.get_dir_by_id(directory["FileId"], save=False, all_files=True)
|
||
if not items:
|
||
print("文件夹为空")
|
||
return True
|
||
|
||
# 下载所有内容
|
||
success = True
|
||
for item in items:
|
||
if item["Type"] == 1: # 子文件夹
|
||
sub_success = self.download_directory(item, target_dir)
|
||
success = success and sub_success
|
||
else: # 文件
|
||
# 临时将文件添加到列表中以便下载
|
||
original_list = self.list
|
||
self.list = [item]
|
||
file_success = self.download_file(0, target_dir)
|
||
self.list = original_list
|
||
success = success and file_success
|
||
|
||
print(f"文件夹下载完成: {directory['FileName']}")
|
||
return success
|
||
|
||
def get_recycle_bin(self):
|
||
"""获取回收站内容"""
|
||
url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=0&trashed=true&Page=1"
|
||
try:
|
||
response = requests.get(url, headers=self.headers, timeout=15)
|
||
result = self._handle_response(response)
|
||
if result.get("code") == 0:
|
||
return result["data"]["InfoList"]
|
||
return []
|
||
except requests.exceptions.RequestException:
|
||
return []
|
||
|
||
def restore_file(self, file_id: int) -> bool:
|
||
"""从回收站恢复文件"""
|
||
payload = {
|
||
"driveId": 0,
|
||
"fileTrashInfoList": {"FileId": file_id},
|
||
"operation": False # False表示恢复
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
"https://www.123pan.com/a/api/file/trash",
|
||
headers=self.headers,
|
||
json=payload,
|
||
timeout=10
|
||
)
|
||
result = self._handle_response(response)
|
||
return result.get("code") == 0
|
||
except requests.exceptions.RequestException:
|
||
return False
|
||
|
||
def logout(self):
|
||
"""登出:清除 authorization、cookies 并保存配置"""
|
||
self.authorization = ""
|
||
# 清理 headers 中的 authorization 字段(兼容大小写)
|
||
if "authorization" in self.headers:
|
||
self.headers["authorization"] = ""
|
||
if "Authorization" in self.headers:
|
||
self.headers["Authorization"] = ""
|
||
self.cookies = None
|
||
self._save_config()
|
||
print("已登出,授权信息已清除")
|
||
|
||
def set_protocol(self, protocol: str, save: bool = True):
|
||
"""切换协议:'android' 或 'web',切换后会重新初始化 headers 并可选择保存配置"""
|
||
protocol = protocol.lower()
|
||
if protocol not in ("android", "web"):
|
||
print("不支持的协议,仅支持 'android' 或 'web'")
|
||
return False
|
||
self.protocol = protocol
|
||
self._init_headers()
|
||
# 确保 authorization 字段更新到 headers
|
||
if "authorization" in self.headers:
|
||
self.headers["authorization"] = self.authorization
|
||
if "Authorization" in self.headers:
|
||
self.headers["Authorization"] = self.authorization
|
||
if save:
|
||
self._save_config()
|
||
print(f"已切换到 {protocol} 协议")
|
||
return True
|
||
|
||
|
||
if __name__ == "__main__":
|
||
"""主交互函数"""
|
||
# 解决Windows下cmd颜色转义问题
|
||
if os.name == "nt":
|
||
os.system("")
|
||
|
||
print("=" * 60)
|
||
print("123网盘客户端".center(60))
|
||
print("=" * 60)
|
||
|
||
pan = Pan123(config_file="123pan_config.json")
|
||
pan.show()
|
||
|
||
while True:
|
||
try:
|
||
path = "/" + "/".join(pan.parent_file_name_list) if pan.parent_file_name_list else "/"
|
||
command = input(f"\033[91m{path}>\033[0m ").strip()
|
||
|
||
if not command:
|
||
continue
|
||
|
||
parts = command.split(maxsplit=1)
|
||
cmd = parts[0].lower()
|
||
arg = parts[1] if len(parts) > 1 else ""
|
||
|
||
# 命令映射
|
||
if cmd == "ls":
|
||
pan.show()
|
||
elif cmd == "login":
|
||
pan.login()
|
||
pan.refresh_directory()
|
||
elif cmd == "logout":
|
||
pan.logout()
|
||
pan.refresh_directory()
|
||
elif cmd == "exit":
|
||
break
|
||
elif cmd == "cd":
|
||
pan.change_directory(arg)
|
||
elif cmd == "mkdir":
|
||
pan.create_directory(arg)
|
||
# 刷新当前目录
|
||
pan.refresh_directory()
|
||
elif cmd == "upload":
|
||
pan.upload(arg)
|
||
pan.refresh_directory()
|
||
elif cmd == "rm":
|
||
if arg.isdigit():
|
||
pan.delete_file(int(arg) - 1)
|
||
pan.refresh_directory()
|
||
else:
|
||
print("无效的文件编号")
|
||
elif cmd == "share":
|
||
file_ids = [int(idx) - 1 for idx in arg.split() if idx.isdigit()]
|
||
if file_ids:
|
||
pwd = input("输入提取码(留空跳过): ")
|
||
pan.share(file_ids, pwd)
|
||
else:
|
||
print("请提供文件编号")
|
||
elif cmd == "more":
|
||
pan.get_dir()
|
||
pan.show()
|
||
elif cmd == "link":
|
||
if arg.isdigit():
|
||
idx = int(arg) - 1
|
||
if 0 <= idx < len(pan.list):
|
||
link = pan.get_download_link(idx)
|
||
if link:
|
||
print(f"文件直链: {link}")
|
||
else:
|
||
print("获取直链失败")
|
||
else:
|
||
print("无效的文件编号")
|
||
else:
|
||
print("请提供文件编号")
|
||
elif cmd in ("download", "d"):
|
||
if arg.isdigit():
|
||
idx = int(arg) - 1
|
||
if 0 <= idx < len(pan.list):
|
||
pan.download_file(idx)
|
||
else:
|
||
print("无效的文件编号")
|
||
else:
|
||
print("请提供文件编号")
|
||
elif cmd == "recycle":
|
||
recycle_items = pan.get_recycle_bin()
|
||
if recycle_items:
|
||
print("\n回收站内容:")
|
||
for i, item in enumerate(recycle_items, 1):
|
||
print(f"{i}. {item['FileName']} ({pan._format_size(item['Size'])})")
|
||
|
||
action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
|
||
if action.isdigit():
|
||
idx = int(action) - 1
|
||
if 0 <= idx < len(recycle_items):
|
||
if pan.restore_file(recycle_items[idx]["FileId"]):
|
||
print("文件恢复成功")
|
||
else:
|
||
print("恢复失败")
|
||
elif action == "clear":
|
||
for item in recycle_items:
|
||
pan.delete_file(item, by_index=False)
|
||
print("回收站已清空")
|
||
else:
|
||
print("回收站为空")
|
||
# 刷新当前目录
|
||
pan.refresh_directory()
|
||
elif cmd in ("refresh", "re"):
|
||
pan.refresh_directory()
|
||
elif cmd == "reload":
|
||
pan._load_config()
|
||
pan.refresh_directory()
|
||
elif cmd.isdigit():
|
||
# 切换目录或下载文件
|
||
idx = int(cmd) - 1
|
||
if 0 <= idx < len(pan.list):
|
||
if pan.list[idx]["Type"] == 1:
|
||
pan.change_directory(cmd)
|
||
else:
|
||
pan.download_file(idx)
|
||
else:
|
||
print("无效的文件编号")
|
||
|
||
elif cmd == "protocol":
|
||
if arg.lower() in ("android", "web"):
|
||
pan.set_protocol(arg.lower())
|
||
pan.refresh_directory()
|
||
else:
|
||
print("请指定协议: android 或 web")
|
||
else:
|
||
# 帮助提示,列出支持的命令
|
||
print("可用命令:")
|
||
print(" ls - 显示当前目录")
|
||
print(" cd [编号|..|/] - 切换目录")
|
||
print(" mkdir [名称] - 创建目录")
|
||
print(" upload [路径] - 上传文件")
|
||
print(" rm [编号] - 删除文件")
|
||
print(" share [编号 ...] - 创建分享")
|
||
print(" link [编号] - 获取文件直链")
|
||
print(" download/d [编号] - 下载文件")
|
||
print(" recycle - 管理回收站")
|
||
print(" refresh/re - 刷新目录")
|
||
print(" reload - 重新加载配置并刷新")
|
||
print(" login - 登录")
|
||
print(" logout - 登出并清除 token")
|
||
print(" more - 继续加载更多文件")
|
||
# print(" all - 强制加载当前目录所有文件")
|
||
print(" exit - 退出程序")
|
||
print(" protocol [android|web] - 切换协议")
|
||
except KeyboardInterrupt:
|
||
print("\n操作已取消")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
|