123pan/123pan.py
2025-10-09 18:57:31 +08:00

1017 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import json
import os
import random
import re
import time
import uuid
import requests
from typing import List, Dict, Tuple, Union, Optional
class Pan123:
DEVICE_TYPES = [
"24075RP89G", "24076RP19G", "24076RP19I", "M1805E10A", "M2004J11G", "M2012K11AG", "M2104K10I", "22021211RG",
"22021211RI", "21121210G", "23049PCD8G", "23049PCD8I", "23013PC75G", "24069PC21G", "24069PC21I",
"23113RKC6G", "M1912G7BI", "M2007J20CI", "M2007J20CG", "M2007J20CT", "M2102J20SG", "M2102J20SI",
"21061110AG", "2201116PG", "2201116PI", "22041216G", "22041216UG", "22111317PG", "22111317PI", "22101320G",
"22101320I", "23122PCD1G", "23122PCD1I", "2311DRK48G", "2311DRK48I", "2312FRAFDI", "M2004J19PI",
]
OS_VERSIONS = [
"Android_7.1.2", "Android_8.0.0", "Android_8.1.0", "Android_9.0", "Android_10", "Android_11", "Android_12",
"Android_13", "Android_6.0.1", "Android_5.1.1", "Android_4.4.4", "Android_4.3", "Android_4.2.2",
"Android_4.1.2",
]
def __init__(
self,
readfile: bool = True,
user_name: str = "",
pass_word: str = "",
authorization: str = "",
input_pwd: bool = True,
config_file: str = "123pan.txt",
protocol: str = "android", # 协议,默认为 android
):
self.config_file = config_file
self.protocol = protocol.lower() # 'android' 或 'web'
self.devicetype = random.choice(self.DEVICE_TYPES)
self.osversion = random.choice(self.OS_VERSIONS)
self.download_mode = 1
self.cookies = None
self.user_name = user_name
self.password = pass_word
self.authorization = authorization
self.parent_file_id = 0
self.parent_file_list = [0]
self.parent_file_name_list = []
self.all_file = False
self.file_page = 0
self.file_list = []
self.dir_list = []
self.name_dict = {}
self.list = []
self.total = 0
self._init_headers() # 根据 protocol 初始化 headers
load_code = 0
if readfile:
load_code = self._load_config()
if not load_code:
if not (user_name and pass_word) and input_pwd:
self.user_name = input("请输入用户名: ")
self.password = input("请输入密码: ")
elif not (user_name and pass_word):
raise ValueError("用户名和密码不能为空")
# 尝试获取目录,如果失败则登录
if self.get_dir()[0] != 0:
self.login()
self.get_dir()
def _init_headers(self):
"""初始化请求头根据协议android/web选择不同 headers"""
android_headers = {
"user-agent": f"123pan/v2.4.0({self.osversion};Xiaomi)",
"authorization": self.authorization,
"accept-encoding": "gzip",
"content-type": "application/json",
"osversion": self.osversion,
"loginuuid": uuid.uuid4().hex,
"platform": "android",
"devicetype": self.devicetype,
"devicename": "Xiaomi",
"host": "www.123pan.com",
"app-version": "61",
"x-app-version": "2.4.0"
}
# Web 协议头,来源于 web.py 的 header_logined / header_only_usage适配为字典
web_headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"App-Version": "3",
# 同时设置大小写两种 Authorization 以兼容不同接口要求
# "Authorization": self.authorization,
"authorization": self.authorization,
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"LoginUuid": uuid.uuid4().hex,
"Pragma": "no-cache",
"Referer": "https://www.123pan.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
"platform": "web",
"sec-ch-ua": "Microsoft",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "Windows",
"content-type": "application/json",
}
if getattr(self, "protocol", "android").lower() == "web":
self.headers = web_headers
else:
self.headers = android_headers
def _load_config(self):
"""从配置文件加载账号信息及 protocol"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
self.user_name = config.get("userName", self.user_name)
self.password = config.get("passWord", self.password)
self.authorization = config.get("authorization", self.authorization)
self.devicetype = config.get("deviceType", self.devicetype)
self.osversion = config.get("osVersion", self.osversion)
# 新增 protocol 支持
self.protocol = config.get("protocol", getattr(self, "protocol", "android")).lower()
# 重新初始化 headers确保 authorization 和 protocol 生效)
self._init_headers()
# 确保 headers 中 authorization 同步
if "authorization" in self.headers:
self.headers["authorization"] = self.authorization
if "Authorization" in self.headers:
self.headers["Authorization"] = self.authorization
print("配置加载成功")
return 1
else:
print("配置文件不存在,将使用传入参数")
return 0
except Exception as e:
print(f"加载配置失败: {e}")
return 0
def _save_config(self):
"""保存账号信息到配置文件(包含 protocol"""
config = {
"userName": self.user_name,
"passWord": self.password,
"authorization": self.authorization,
"deviceType": self.devicetype,
"osVersion": self.osversion,
"protocol": getattr(self, "protocol", "android"),
}
try:
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f)
print("账号信息已保存")
except Exception as e:
print(f"保存配置失败: {e}")
def _handle_response(self, response: requests.Response) -> dict:
"""处理API响应"""
try:
data = response.json()
if data.get("code") != 0 and data.get("code") != 200:
print(f"API错误: {data.get('message', '未知错误')} (代码: {data.get('code')})")
return data
except json.JSONDecodeError:
print("响应解析错误")
return {"code": -1, "message": "响应解析错误"}
def login(self) -> int:
"""用户登录"""
data = {"type": 1, "passport": self.user_name, "password": self.password}
try:
response = requests.post(
"https://www.123pan.com/b/api/user/sign_in",
headers=self.headers,
json=data,
timeout=15
)
result = self._handle_response(response)
if result.get("code") == 200:
# 更新授权令牌
token = result["data"]["token"]
self.authorization = f"Bearer {token}"
self.headers["authorization"] = self.authorization
# 保存cookie
if "Set-Cookie" in response.headers:
self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
self._save_config()
print("登录成功")
return 0
return result.get("code", -1)
except requests.exceptions.RequestException as e:
print(f"登录请求失败: {e}")
return -1
def get_dir(self, save: bool = True) -> Tuple[int, list]:
"""获取当前目录内容"""
return self.get_dir_by_id(self.parent_file_id, save)
def get_dir_by_id(
self,
file_id: int,
save: bool = True,
all_files: bool = False,
limit: int = 100
) -> Tuple[int, list]:
"""获取指定目录内容"""
page = self.file_page * 3 + 1 if not all_files else 1
current_count = len(self.list) if all_files else 0
items = []
total = -1
attempts = 0
while (current_count < total or total == -1) and (attempts < 3 or all_files):
params = {
"driveId": 0,
"limit": limit,
"next": 0,
"orderBy": "file_id",
"orderDirection": "desc",
"parentFileId": str(file_id),
"trashed": False,
"SearchData": "",
"Page": str(page),
"OnlyLookAbnormalFile": 0,
}
try:
response = requests.get(
"https://www.123pan.com/api/file/list/new",
headers=self.headers,
params=params,
timeout=30
)
result = self._handle_response(response)
if result.get("code") != 0:
return result.get("code", -1), []
page_items = result["data"]["InfoList"]
items.extend(page_items)
total = result["data"]["Total"]
current_count += len(page_items)
page += 1
attempts += 1
if attempts % 5 == 0:
print(f"获取文件中: {current_count}/{total}暂停10秒...")
time.sleep(10)
except requests.exceptions.RequestException:
print("连接失败")
return -1, []
self.all_file = current_count >= total
self.total = total
self.file_page += 1 if not all_files else 0
if save:
self.list.extend(items)
return 0, items
def show(self):
"""显示当前目录内容"""
if not self.list:
print("当前目录为空")
return
print("\n" + "=" * 60)
print(f"当前路径: /{'/'.join(self.parent_file_name_list)}")
print("-" * 60)
print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
print("-" * 60)
for idx, item in enumerate(self.list, 1):
item_type = "文件夹" if item["Type"] == 1 else "文件"
size = self._format_size(item["Size"])
# 使用颜色区分类型
color_code = "\033[35m" if item["Type"] == 1 else "\033[33m"
reset_code = "\033[0m"
print(f"{color_code}{idx:<6}{item_type:<8}{size:<12}{item['FileName']}{reset_code}")
if not self.all_file:
remaining = self.total - len(self.list)
print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
print("=" * 60 + "\n")
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes >= 1073741824:
return f"{size_bytes / 1073741824:.2f} GB"
elif size_bytes >= 1048576:
return f"{size_bytes / 1048576:.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
return f"{size_bytes} B"
def delete_file(self, file_id: Union[int, dict], by_index: bool = True, delete: bool = True) -> bool:
"""删除或恢复文件"""
if by_index:
if not (0 <= file_id < len(self.list)):
print("无效的文件编号")
return False
file_data = self.list[file_id]
else:
file_data = file_id
payload = {
"driveId": 0,
"fileTrashInfoList": file_data,
"operation": delete,
}
try:
response = requests.post(
"https://www.123pan.com/a/api/file/trash",
headers=self.headers,
json=payload,
timeout=10
)
result = self._handle_response(response)
if result.get("code") == 0:
action = "删除" if delete else "恢复"
print(f"文件 {file_data['FileName']} {action}成功")
return True
return False
except requests.exceptions.RequestException as e:
print(f"删除操作失败: {e}")
return False
def share(self, file_ids: List[int], share_pwd: str = "") -> Optional[str]:
"""创建分享链接"""
if not file_ids:
print("未选择文件")
return None
file_names = [self.list[i]["FileName"] for i in file_ids]
print("分享文件:", ", ".join(file_names))
payload = {
"driveId": 0,
"expiration": "2099-12-12T08:00:00+08:00",
"fileIdList": ",".join(str(self.list[i]["FileId"]) for i in file_ids),
"shareName": "分享文件",
"sharePwd": share_pwd,
"event": "shareCreate"
}
try:
response = requests.post(
"https://www.123pan.com/a/api/share/create",
headers=self.headers,
json=payload,
timeout=15
)
result = self._handle_response(response)
if result.get("code") == 0:
share_key = result["data"]["ShareKey"]
share_url = f"https://www.123pan.com/s/{share_key}"
print(f"分享创建成功!\n链接: {share_url}")
if share_pwd:
print(f"提取码: {share_pwd}")
return share_url
return None
except requests.exceptions.RequestException as e:
print(f"创建分享失败: {e}")
return None
def upload(self, file_path: str) -> bool:
"""上传文件"""
file_path = file_path.strip().replace('"', "").replace("\\", "/")
if not os.path.exists(file_path):
print("文件不存在")
return False
if os.path.isdir(file_path):
print("暂不支持文件夹上传")
return False
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
# 计算文件MD5
md5_hash = hashlib.md5()
try:
with open(file_path, "rb") as f:
while chunk := f.read(65536):
md5_hash.update(chunk)
file_md5 = md5_hash.hexdigest()
except IOError as e:
print(f"读取文件失败: {e}")
return False
# 上传请求
payload = {
"driveId": 0,
"etag": file_md5,
"fileName": file_name,
"parentFileId": self.parent_file_id,
"size": file_size,
"type": 0,
"duplicate": 0,
}
try:
response = requests.post(
"https://www.123pan.com/b/api/file/upload_request",
headers=self.headers,
json=payload,
timeout=15
)
result = self._handle_response(response)
if result.get("code") == 5060: # 文件已存在
choice = input("检测到同名文件,输入 1 覆盖2 保留两者,其他取消: ")
if choice == "1":
payload["duplicate"] = 1
elif choice == "2":
payload["duplicate"] = 2
else:
print("上传取消")
return False
response = requests.post(
"https://www.123pan.com/b/api/file/upload_request",
headers=self.headers,
json=payload,
timeout=15
)
result = self._handle_response(response)
if result.get("code") != 0:
return False
# 检查是否MD5复用
if result["data"].get("Reuse", False):
print("文件已存在MD5复用成功")
return True
# 分块上传
return self._upload_chunks(
file_path,
result["data"]["Bucket"],
result["data"]["StorageNode"],
result["data"]["Key"],
result["data"]["UploadId"],
result["data"]["FileId"]
)
except requests.exceptions.RequestException as e:
print(f"上传失败: {e}")
return False
def _upload_chunks(
self,
file_path: str,
bucket: str,
storage_node: str,
key: str,
upload_id: str,
file_id: str
) -> bool:
"""分块上传文件"""
chunk_size = 5 * 1024 * 1024 # 5MB
total_size = os.path.getsize(file_path)
uploaded = 0
part_number = 1
try:
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# 获取上传URL
url_data = {
"bucket": bucket,
"key": key,
"partNumberEnd": part_number + 1,
"partNumberStart": part_number,
"uploadId": upload_id,
"StorageNode": storage_node,
}
try:
response = requests.post(
"https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch",
headers=self.headers,
json=url_data,
timeout=15
)
url_result = self._handle_response(response)
if url_result.get("code") != 0:
return False
upload_url = url_result["data"]["presignedUrls"][str(part_number)]
except requests.exceptions.RequestException:
return False
# 上传分块
try:
upload_response = requests.put(upload_url, data=chunk, timeout=30)
if upload_response.status_code not in (200, 201):
return False
except requests.exceptions.RequestException:
return False
uploaded += len(chunk)
progress = uploaded / total_size * 100
print(f"\r上传进度: {progress:.1f}%", end="", flush=True)
part_number += 1
print("\n上传完成,正在验证...")
time.sleep(1) # 等待服务器处理
# 完成上传
compmultipart_up_url = (
"https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
)
requests.post(
compmultipart_up_url,
headers=self.headers,
data=json.dumps({
"bucket": bucket,
"key": key,
"uploadId": upload_id,
"StorageNode": storage_node,
}),
timeout=10
)
complete_data = {"fileId": file_id}
try:
response = requests.post(
"https://www.123pan.com/b/api/file/upload_complete",
headers=self.headers,
json=complete_data,
timeout=15
)
result = self._handle_response(response)
if result.get("code") == 0:
print("文件上传成功")
return True
return False
except requests.exceptions.RequestException:
return False
except IOError as e:
print(f"读取文件失败: {e}")
return False
def change_directory(self, target: str):
"""改变当前目录"""
if target == "..":
if len(self.parent_file_list) > 1:
self.parent_file_list.pop()
self.parent_file_id = self.parent_file_list[-1]
self.parent_file_name_list.pop()
self.refresh_directory()
else:
print("已经是根目录")
elif target == "/":
self.parent_file_id = 0
self.parent_file_list = [0]
self.parent_file_name_list = []
self.refresh_directory()
elif target.isdigit():
idx = int(target) - 1
if 0 <= idx < len(self.list) and self.list[idx]["Type"] == 1:
self.parent_file_id = self.list[idx]["FileId"]
self.parent_file_list.append(self.parent_file_id)
self.parent_file_name_list.append(self.list[idx]["FileName"])
self.refresh_directory()
else:
print("无效的目录编号或不是文件夹")
else:
print("无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号")
def refresh_directory(self):
"""刷新当前目录内容"""
self.all_file = False
self.file_page = 0
self.list = []
self.get_dir()
self.show()
def create_directory(self, name: str) -> bool:
"""创建新目录"""
if not name:
print("目录名不能为空")
return False
# 检查是否已存在
for item in self.list:
if item["FileName"] == name and item["Type"] == 1:
print("目录已存在")
return True
payload = {
"driveId": 0,
"etag": "",
"fileName": name,
"parentFileId": self.parent_file_id,
"size": 0,
"type": 1,
"duplicate": 1,
"NotReuse": True,
"event": "newCreateFolder",
"operateType": 1,
}
try:
response = requests.post(
"https://www.123pan.com/a/api/file/upload_request",
headers=self.headers,
json=payload,
timeout=10
)
result = self._handle_response(response)
if result.get("code") == 0:
print(f"目录 '{name}' 创建成功")
self.get_dir()
return True
return False
except requests.exceptions.RequestException as e:
print(f"创建目录失败: {e}")
return False
def get_download_link(self, file_index: int) -> Optional[str]:
"""获取文件下载链接"""
if not (0 <= file_index < len(self.list)):
print("无效的文件编号")
return None
file_detail = self.list[file_index]
if file_detail["Type"] == 1: # 文件夹
url = "https://www.123pan.com/a/api/file/batch_download_info"
data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
else: # 文件
url = "https://www.123pan.com/a/api/file/download_info"
data = {
"driveId": 0,
"etag": file_detail["Etag"],
"fileId": file_detail["FileId"],
"s3keyFlag": file_detail["S3KeyFlag"],
"type": file_detail["Type"],
"fileName": file_detail["FileName"],
"size": file_detail["Size"],
}
try:
response = requests.post(url, headers=self.headers, json=data, timeout=15)
result = self._handle_response(response)
if result.get("code") != 0:
return None
download_url = result["data"]["DownloadUrl"]
# 获取重定向后的真实下载链接
redirect_response = requests.get(download_url, allow_redirects=False, timeout=15)
if redirect_response.status_code == 302:
return redirect_response.headers.get("Location")
# 尝试从HTML中提取下载链接
url_pattern = re.compile(r"href='(https?://[^']+)'")
match = url_pattern.search(redirect_response.text)
if match:
return match.group(1)
return None
except requests.exceptions.RequestException as e:
print(f"获取下载链接失败: {e}")
return None
def download_file(self, file_index: int, download_path: str = "download") -> bool:
"""下载单个文件"""
if not (0 <= file_index < len(self.list)):
print("无效的文件编号")
return False
file_detail = self.list[file_index]
# 文件夹需要特殊处理
if file_detail["Type"] == 1:
print("文件夹下载:")
return self.download_directory(file_detail, download_path)
# 获取下载链接
download_url = self.get_download_link(file_index)
if not download_url:
print("无法获取下载链接")
return False
# 确定文件名
file_name = file_detail["FileName"]
if not os.path.exists(download_path):
os.makedirs(download_path)
# 处理文件已存在的情况
full_path = os.path.join(download_path, file_name)
if os.path.exists(full_path):
if self.download_mode == 4: # 全部跳过
print(f"文件已存在,跳过: {file_name}")
return True
print(f"文件已存在: {file_name}")
choice = input("输入1覆盖2跳过3全部覆盖4全部跳过: ")
if choice == "2" or choice == "4":
if choice == "4":
self.download_mode = 4
print("跳过下载")
return True
elif choice == "3":
self.download_mode = 3
os.remove(full_path)
# 临时文件名
temp_path = full_path + ".123pan"
try:
# 开始下载
print(f"开始下载: {file_name}")
response = requests.get(download_url, stream=True, timeout=30)
total_size = int(response.headers.get("Content-Length", 0))
downloaded = 0
start_time = time.time()
with open(temp_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 计算下载速度
elapsed = time.time() - start_time
if elapsed > 0:
speed = downloaded / elapsed
speed_str = self._format_size(speed) + "/s"
else:
speed_str = "未知"
# 显示进度
if total_size > 0:
percent = downloaded / total_size * 100
print(f"\r进度: {percent:.1f}% | {self._format_size(downloaded)}/{self._format_size(total_size)} | {speed_str}", end=" ")
# 重命名文件
os.rename(temp_path, full_path)
print(f"\n下载完成: {file_name}")
return True
except requests.exceptions.RequestException as e:
print(f"\n下载失败: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
return False
def download_directory(self, directory: dict, download_path: str = "download") -> bool:
"""下载整个目录"""
if directory["Type"] != 1:
print("不是文件夹")
return False
print(f"开始下载文件夹: {directory['FileName']}")
# 创建目标目录
target_dir = os.path.join(download_path, directory["FileName"])
if not os.path.exists(target_dir):
os.makedirs(target_dir)
# 获取目录内容
_, items = self.get_dir_by_id(directory["FileId"], save=False, all_files=True)
if not items:
print("文件夹为空")
return True
# 下载所有内容
success = True
for item in items:
if item["Type"] == 1: # 子文件夹
sub_success = self.download_directory(item, target_dir)
success = success and sub_success
else: # 文件
# 临时将文件添加到列表中以便下载
original_list = self.list
self.list = [item]
file_success = self.download_file(0, target_dir)
self.list = original_list
success = success and file_success
print(f"文件夹下载完成: {directory['FileName']}")
return success
def get_recycle_bin(self):
"""获取回收站内容"""
url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=0&trashed=true&Page=1"
try:
response = requests.get(url, headers=self.headers, timeout=15)
result = self._handle_response(response)
if result.get("code") == 0:
return result["data"]["InfoList"]
return []
except requests.exceptions.RequestException:
return []
def restore_file(self, file_id: int) -> bool:
"""从回收站恢复文件"""
payload = {
"driveId": 0,
"fileTrashInfoList": {"FileId": file_id},
"operation": False # False表示恢复
}
try:
response = requests.post(
"https://www.123pan.com/a/api/file/trash",
headers=self.headers,
json=payload,
timeout=10
)
result = self._handle_response(response)
return result.get("code") == 0
except requests.exceptions.RequestException:
return False
def logout(self):
"""登出:清除 authorization、cookies 并保存配置"""
self.authorization = ""
# 清理 headers 中的 authorization 字段(兼容大小写)
if "authorization" in self.headers:
self.headers["authorization"] = ""
if "Authorization" in self.headers:
self.headers["Authorization"] = ""
self.cookies = None
self._save_config()
print("已登出,授权信息已清除")
def set_protocol(self, protocol: str, save: bool = True):
"""切换协议:'android''web',切换后会重新初始化 headers 并可选择保存配置"""
protocol = protocol.lower()
if protocol not in ("android", "web"):
print("不支持的协议,仅支持 'android''web'")
return False
self.protocol = protocol
self._init_headers()
# 确保 authorization 字段更新到 headers
if "authorization" in self.headers:
self.headers["authorization"] = self.authorization
if "Authorization" in self.headers:
self.headers["Authorization"] = self.authorization
if save:
self._save_config()
print(f"已切换到 {protocol} 协议")
return True
if __name__ == "__main__":
"""主交互函数"""
# 解决Windows下cmd颜色转义问题
if os.name == "nt":
os.system("")
print("=" * 60)
print("123网盘客户端".center(60))
print("=" * 60)
pan = Pan123(config_file="123pan_config.json")
pan.show()
while True:
try:
path = "/" + "/".join(pan.parent_file_name_list) if pan.parent_file_name_list else "/"
command = input(f"\033[91m{path}>\033[0m ").strip()
if not command:
continue
parts = command.split(maxsplit=1)
cmd = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
# 命令映射
if cmd == "ls":
pan.show()
elif cmd == "login":
pan.login()
pan.refresh_directory()
elif cmd == "logout":
pan.logout()
pan.refresh_directory()
elif cmd == "exit":
break
elif cmd == "cd":
pan.change_directory(arg)
elif cmd == "mkdir":
pan.create_directory(arg)
# 刷新当前目录
pan.refresh_directory()
elif cmd == "upload":
pan.upload(arg)
pan.refresh_directory()
elif cmd == "rm":
if arg.isdigit():
pan.delete_file(int(arg) - 1)
pan.refresh_directory()
else:
print("无效的文件编号")
elif cmd == "share":
file_ids = [int(idx) - 1 for idx in arg.split() if idx.isdigit()]
if file_ids:
pwd = input("输入提取码(留空跳过): ")
pan.share(file_ids, pwd)
else:
print("请提供文件编号")
elif cmd == "more":
pan.get_dir()
pan.show()
elif cmd == "link":
if arg.isdigit():
idx = int(arg) - 1
if 0 <= idx < len(pan.list):
link = pan.get_download_link(idx)
if link:
print(f"文件直链: {link}")
else:
print("获取直链失败")
else:
print("无效的文件编号")
else:
print("请提供文件编号")
elif cmd in ("download", "d"):
if arg.isdigit():
idx = int(arg) - 1
if 0 <= idx < len(pan.list):
pan.download_file(idx)
else:
print("无效的文件编号")
else:
print("请提供文件编号")
elif cmd == "recycle":
recycle_items = pan.get_recycle_bin()
if recycle_items:
print("\n回收站内容:")
for i, item in enumerate(recycle_items, 1):
print(f"{i}. {item['FileName']} ({pan._format_size(item['Size'])})")
action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
if action.isdigit():
idx = int(action) - 1
if 0 <= idx < len(recycle_items):
if pan.restore_file(recycle_items[idx]["FileId"]):
print("文件恢复成功")
else:
print("恢复失败")
elif action == "clear":
for item in recycle_items:
pan.delete_file(item, by_index=False)
print("回收站已清空")
else:
print("回收站为空")
# 刷新当前目录
pan.refresh_directory()
elif cmd in ("refresh", "re"):
pan.refresh_directory()
elif cmd == "reload":
pan._load_config()
pan.refresh_directory()
elif cmd.isdigit():
# 切换目录或下载文件
idx = int(cmd) - 1
if 0 <= idx < len(pan.list):
if pan.list[idx]["Type"] == 1:
pan.change_directory(cmd)
else:
pan.download_file(idx)
else:
print("无效的文件编号")
elif cmd == "protocol":
if arg.lower() in ("android", "web"):
pan.set_protocol(arg.lower())
pan.refresh_directory()
else:
print("请指定协议: android 或 web")
else:
# 帮助提示,列出支持的命令
print("可用命令:")
print(" ls - 显示当前目录")
print(" cd [编号|..|/] - 切换目录")
print(" mkdir [名称] - 创建目录")
print(" upload [路径] - 上传文件")
print(" rm [编号] - 删除文件")
print(" share [编号 ...] - 创建分享")
print(" link [编号] - 获取文件直链")
print(" download/d [编号] - 下载文件")
print(" recycle - 管理回收站")
print(" refresh/re - 刷新目录")
print(" reload - 重新加载配置并刷新")
print(" login - 登录")
print(" logout - 登出并清除 token")
print(" more - 继续加载更多文件")
# print(" all - 强制加载当前目录所有文件")
print(" exit - 退出程序")
print(" protocol [android|web] - 切换协议")
except KeyboardInterrupt:
print("\n操作已取消")
except Exception as e:
print(f"发生错误: {e}")