From cb737c0d956efb9944a711c270f185d0dc6ac77e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E4=BF=9D=E6=B8=85?= <919836565@qq.com>
Date: Sun, 8 Feb 2026 17:34:55 +0800
Subject: [PATCH] =?UTF-8?q?=E5=88=86=E7=A6=BB=E5=87=BAcore=E5=92=8Ccli?=
=?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3,=E6=89=93?=
=?UTF-8?q?=E5=8C=85=E8=84=9A=E6=9C=AC?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
123download.js | 86 ----
123pan.py | 1031 ------------------------------------------------
README.md | 445 +++++++++++++++------
pack.sh | 2 +-
pan123_cli.py | 119 ++++--
pan123_core.py | 919 ++++++++++++++++++++++++++++++------------
6 files changed, 1077 insertions(+), 1525 deletions(-)
delete mode 100644 123download.js
delete mode 100644 123pan.py
diff --git a/123download.js b/123download.js
deleted file mode 100644
index 6f50b67..0000000
--- a/123download.js
+++ /dev/null
@@ -1,86 +0,0 @@
-// ==UserScript==
-// @name 123云盘下载辅助
-// @namespace https://github.com/Bao-qing/123pan
-// @version 0.3
-// @description 123 Cloud Drive Unlimited Flow
-// @match https://www.123pan.com/*
-// @match https://www.123pan.cn/*
-// @match https://www.123865.com/*
-// @match https://www.123684.com/*
-// @grant none
-// @author Qing
-// ==/UserScript==
-
-(function () {
- // 重写 XMLHttpRequest
- const originalXHR = window.XMLHttpRequest;
-
- function newXHR() {
- const realXHR = new originalXHR();
-
- realXHR.open = function (method, url, async, user, password) {
- this._url = url; // 记录请求的 URL
- return originalXHR.prototype.open.apply(this, arguments);
- };
-
- realXHR.setRequestHeader = function (header, value) {
- let headers = {
- "user-agent": "123pan/v2.4.0(Android_7.1.2;Xiaomi)",
- //"loginuuid": generateUUIDHex(),
- "platform": "android",
- "app-version": "61",
- "x-app-version": "2.4.0"
- }
- // 如果header在列表中,则修改
- if (header.toLowerCase() in headers) {
- value = headers[header.toLowerCase()];
- } else {
- console.log('header:', header);
- }
-
- return originalXHR.prototype.setRequestHeader.apply(this, arguments);
- };
-
- // 拦截响应内容,修改 DownloadUrl以适应网页端下载
- realXHR.send = function () {
- const xhrInstance = this;
- this.addEventListener('readystatechange', function () {
- let origin_url;
- let new_url_no_redirect;
- let base64data;
- if (xhrInstance.readyState === 4 && xhrInstance.status === 200) {
- // 解析响应的 JSON
- let responseText = xhrInstance.responseText;
- let responseJSON = JSON.parse(responseText);
- console.log('Original Response:', responseJSON);
-
- // 修改 DownloadUrl
- if (responseJSON.data && responseJSON.data.DownloadUrl) {
- origin_url = responseJSON.data.DownloadUrl;
- new_url_no_redirect = origin_url + "&auto_redirect=0";
- base64data = btoa(new_url_no_redirect);
- responseJSON.data.DownloadUrl = "https://web-pro2.123952.com/download-v2/?params=" + base64data + "&is_s3=0";
- console.log('Modified DownloadUrl:', responseJSON.data.DownloadUrl);
- }
-
- // 将修改后的 JSON 转为字符串
- let modifiedResponseText = JSON.stringify(responseJSON);
-
- // 使用 defineProperty 重写 responseText
- Object.defineProperty(xhrInstance, 'responseText', {
- get: function () {
- return modifiedResponseText;
- }
- });
- console.log('Modified Response:', modifiedResponseText);
- }
- });
-
- return originalXHR.prototype.send.apply(this, arguments);
- };
-
- return realXHR;
- }
-
- window.XMLHttpRequest = newXHR;
-})();
diff --git a/123pan.py b/123pan.py
deleted file mode 100644
index aa0f3a3..0000000
--- a/123pan.py
+++ /dev/null
@@ -1,1031 +0,0 @@
-import hashlib
-import json
-import os
-import random
-import re
-import time
-import uuid
-import requests
-from typing import List, Dict, Tuple, Union, Optional
-
-class Pan123:
- DEVICE_TYPES = [
- "24075RP89G", "24076RP19G", "24076RP19I", "M1805E10A", "M2004J11G", "M2012K11AG", "M2104K10I", "22021211RG",
- "22021211RI", "21121210G", "23049PCD8G", "23049PCD8I", "23013PC75G", "24069PC21G", "24069PC21I",
- "23113RKC6G", "M1912G7BI", "M2007J20CI", "M2007J20CG", "M2007J20CT", "M2102J20SG", "M2102J20SI",
- "21061110AG", "2201116PG", "2201116PI", "22041216G", "22041216UG", "22111317PG", "22111317PI", "22101320G",
- "22101320I", "23122PCD1G", "23122PCD1I", "2311DRK48G", "2311DRK48I", "2312FRAFDI", "M2004J19PI",
- ]
-
- OS_VERSIONS = [
- "Android_7.1.2", "Android_8.0.0", "Android_8.1.0", "Android_9.0", "Android_10", "Android_11", "Android_12",
- "Android_13", "Android_6.0.1", "Android_5.1.1", "Android_4.4.4", "Android_4.3", "Android_4.2.2",
- "Android_4.1.2",
- ]
-
- def __init__(
- self,
- readfile: bool = True,
- user_name: str = "",
- pass_word: str = "",
- authorization: str = "",
- input_pwd: bool = True,
- config_file: str = "123pan.txt",
- protocol: str = "android", # 协议,默认为 android
- ):
- self.config_file = config_file
- self.protocol = protocol.lower() # 'android' 或 'web'
- self.devicetype = random.choice(self.DEVICE_TYPES)
- self.osversion = random.choice(self.OS_VERSIONS)
- self.download_mode = 1
- self.cookies = None
- self.user_name = user_name
- self.password = pass_word
- self.authorization = authorization
- self.parent_file_id = 0
- self.parent_file_list = [0]
- self.parent_file_name_list = []
- self.all_file = False
- self.file_page = 0
- self.file_list = []
- self.dir_list = []
- self.name_dict = {}
- self.list = []
- self.total = 0
-
- self._init_headers() # 根据 protocol 初始化 headers
-
- load_code = 0
- if readfile:
- load_code = self._load_config()
- if not load_code:
- if not (user_name and pass_word) and input_pwd:
- self.user_name = input("请输入用户名: ")
- self.password = input("请输入密码: ")
- elif not (user_name and pass_word):
- raise ValueError("用户名和密码不能为空")
-
- # 尝试获取目录,如果失败则登录
- if self.get_dir()[0] != 0:
- self.login()
- self.get_dir()
-
- def _init_headers(self):
- """初始化请求头,根据协议(android/web)选择不同 headers"""
- android_headers = {
- "user-agent": f"123pan/v2.4.0({self.osversion};Xiaomi)",
- "authorization": self.authorization,
- "accept-encoding": "gzip",
- "content-type": "application/json",
- "osversion": self.osversion,
- "loginuuid": uuid.uuid4().hex,
- "platform": "android",
- "devicetype": self.devicetype,
- "devicename": "Xiaomi",
- "host": "www.123pan.com",
- "app-version": "61",
- "x-app-version": "2.4.0"
- }
-
- # Web 协议头,来源于 web.py 的 header_logined / header_only_usage(适配为字典)
- web_headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- "App-Version": "3",
- # 同时设置大小写两种 Authorization 以兼容不同接口要求
- # "Authorization": self.authorization,
- "authorization": self.authorization,
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "LoginUuid": uuid.uuid4().hex,
- "Pragma": "no-cache",
- "Referer": "https://www.123pan.com/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
- "platform": "web",
- "sec-ch-ua": "Microsoft",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "Windows",
- "content-type": "application/json",
- }
-
- if getattr(self, "protocol", "android").lower() == "web":
- self.headers = web_headers
- else:
- self.headers = android_headers
-
- def _load_config(self):
- """从配置文件加载账号信息及 protocol"""
- try:
- if os.path.exists(self.config_file):
- with open(self.config_file, "r", encoding="utf-8") as f:
- config = json.load(f)
- self.user_name = config.get("userName", self.user_name)
- self.password = config.get("passWord", self.password)
- self.authorization = config.get("authorization", self.authorization)
- self.devicetype = config.get("deviceType", self.devicetype)
- self.osversion = config.get("osVersion", self.osversion)
- # 新增 protocol 支持
- self.protocol = config.get("protocol", getattr(self, "protocol", "android")).lower()
- # 重新初始化 headers(确保 authorization 和 protocol 生效)
- self._init_headers()
- # 确保 headers 中 authorization 同步
- if "authorization" in self.headers:
- self.headers["authorization"] = self.authorization
- if "Authorization" in self.headers:
- self.headers["Authorization"] = self.authorization
- print("配置加载成功")
- return 1
- else:
- print("配置文件不存在,将使用传入参数")
- return 0
- except Exception as e:
- print(f"加载配置失败: {e}")
- return 0
-
- def _save_config(self):
- """保存账号信息到配置文件(包含 protocol)"""
- config = {
- "userName": self.user_name,
- "passWord": self.password,
- "authorization": self.authorization,
- "deviceType": self.devicetype,
- "osVersion": self.osversion,
- "protocol": getattr(self, "protocol", "android"),
- }
- try:
- with open(self.config_file, "w", encoding="utf-8") as f:
- json.dump(config, f)
- print("账号信息已保存")
- except Exception as e:
- print(f"保存配置失败: {e}")
-
- def _handle_response(self, response: requests.Response) -> dict:
- """处理API响应"""
- try:
- data = response.json()
- if data.get("code") != 0 and data.get("code") != 200:
- print(f"API错误: {data.get('message', '未知错误')} (代码: {data.get('code')})")
- return data
- except json.JSONDecodeError:
- print("响应解析错误")
- return {"code": -1, "message": "响应解析错误"}
-
- def login(self) -> int:
- """用户登录"""
- data = {"type": 1, "passport": self.user_name, "password": self.password}
- try:
- response = requests.post(
- "https://www.123pan.com/b/api/user/sign_in",
- headers=self.headers,
- json=data,
- timeout=15
- )
- result = self._handle_response(response)
- if result.get("code") == 200:
- # 更新授权令牌
- token = result["data"]["token"]
- self.authorization = f"Bearer {token}"
- self.headers["authorization"] = self.authorization
-
- # 保存cookie
- if "Set-Cookie" in response.headers:
- self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
-
- self._save_config()
- print("登录成功")
- return 0
- return result.get("code", -1)
- except requests.exceptions.RequestException as e:
- print(f"登录请求失败: {e}")
- return -1
-
- def get_dir(self, save: bool = True) -> Tuple[int, list]:
- """获取当前目录内容"""
- return self.get_dir_by_id(self.parent_file_id, save)
-
- def get_dir_by_id(
- self,
- file_id: int,
- save: bool = True,
- all_files: bool = False,
- limit: int = 100
- ) -> Tuple[int, list]:
- """获取指定目录内容"""
- page = self.file_page * 3 + 1 if not all_files else 1
- current_count = len(self.list) if all_files else 0
- items = []
- total = -1
- attempts = 0
-
- while (current_count < total or total == -1) and (attempts < 3 or all_files):
- params = {
- "driveId": 0,
- "limit": limit,
- "next": 0,
- "orderBy": "file_id",
- "orderDirection": "desc",
- "parentFileId": str(file_id),
- "trashed": False,
- "SearchData": "",
- "Page": str(page),
- "OnlyLookAbnormalFile": 0,
- }
-
- try:
- response = requests.get(
- "https://www.123pan.com/api/file/list/new",
- headers=self.headers,
- params=params,
- timeout=30
- )
- result = self._handle_response(response)
- if result.get("code") != 0:
- return result.get("code", -1), []
-
- page_items = result["data"]["InfoList"]
- items.extend(page_items)
- total = result["data"]["Total"]
- current_count += len(page_items)
- page += 1
- attempts += 1
-
- if attempts % 5 == 0:
- print(f"获取文件中: {current_count}/{total},暂停10秒...")
- time.sleep(10)
- except requests.exceptions.RequestException:
- print("连接失败")
- return -1, []
-
- self.all_file = current_count >= total
- self.total = total
- self.file_page += 1 if not all_files else 0
-
- if save:
- self.list.extend(items)
-
- return 0, items
-
- def show(self):
- """显示当前目录内容"""
- if not self.list:
- print("当前目录为空")
- return
-
- print("\n" + "=" * 60)
- print(f"当前路径: /{'/'.join(self.parent_file_name_list)}")
- print("-" * 60)
- print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
- print("-" * 60)
-
- for idx, item in enumerate(self.list, 1):
- item_type = "文件夹" if item["Type"] == 1 else "文件"
- size = self._format_size(item["Size"])
-
- # 使用颜色区分类型
- color_code = "\033[35m" if item["Type"] == 1 else "\033[33m"
- reset_code = "\033[0m"
-
- print(f"{color_code}{idx:<6}{item_type:<8}{size:<12}{item['FileName']}{reset_code}")
-
- if not self.all_file:
- remaining = self.total - len(self.list)
- print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
- print("=" * 60 + "\n")
-
- def _format_size(self, size_bytes: int) -> str:
- """格式化文件大小"""
- if size_bytes >= 1073741824:
- return f"{size_bytes / 1073741824:.2f} GB"
- elif size_bytes >= 1048576:
- return f"{size_bytes / 1048576:.2f} MB"
- elif size_bytes >= 1024:
- return f"{size_bytes / 1024:.2f} KB"
- return f"{size_bytes} B"
-
- def delete_file(self, file_id: Union[int, dict], by_index: bool = True, delete: bool = True) -> bool:
- """删除或恢复文件"""
- if by_index:
- if not (0 <= file_id < len(self.list)):
- print("无效的文件编号")
- return False
- file_data = self.list[file_id]
- else:
- file_data = file_id
-
- payload = {
- "driveId": 0,
- "fileTrashInfoList": file_data,
- "operation": delete,
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/a/api/file/trash",
- headers=self.headers,
- json=payload,
- timeout=10
- )
- result = self._handle_response(response)
- if result.get("code") == 0:
- action = "删除" if delete else "恢复"
- print(f"文件 {file_data['FileName']} {action}成功")
- return True
- return False
- except requests.exceptions.RequestException as e:
- print(f"删除操作失败: {e}")
- return False
-
- def share(self, file_ids: List[int], share_pwd: str = "") -> Optional[str]:
- """创建分享链接"""
- if not file_ids:
- print("未选择文件")
- return None
-
- file_names = [self.list[i]["FileName"] for i in file_ids]
- print("分享文件:", ", ".join(file_names))
-
- payload = {
- "driveId": 0,
- "expiration": "2099-12-12T08:00:00+08:00",
- "fileIdList": ",".join(str(self.list[i]["FileId"]) for i in file_ids),
- "shareName": "分享文件",
- "sharePwd": share_pwd,
- "event": "shareCreate"
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/a/api/share/create",
- headers=self.headers,
- json=payload,
- timeout=15
- )
- result = self._handle_response(response)
- if result.get("code") == 0:
- share_key = result["data"]["ShareKey"]
- share_url = f"https://www.123pan.com/s/{share_key}"
- print(f"分享创建成功!\n链接: {share_url}")
- if share_pwd:
- print(f"提取码: {share_pwd}")
- return share_url
- return None
- except requests.exceptions.RequestException as e:
- print(f"创建分享失败: {e}")
- return None
-
- def upload(self, file_path: str) -> bool:
- """上传文件"""
- file_path = file_path.strip().replace('"', "").replace("\\", "/")
- if not os.path.exists(file_path):
- print("文件不存在")
- return False
-
- if os.path.isdir(file_path):
- print("暂不支持文件夹上传")
- return False
-
- file_name = os.path.basename(file_path)
- file_size = os.path.getsize(file_path)
-
- # 计算文件MD5
- md5_hash = hashlib.md5()
- try:
- with open(file_path, "rb") as f:
- while chunk := f.read(65536):
- md5_hash.update(chunk)
- file_md5 = md5_hash.hexdigest()
- except IOError as e:
- print(f"读取文件失败: {e}")
- return False
-
- # 上传请求
- payload = {
- "driveId": 0,
- "etag": file_md5,
- "fileName": file_name,
- "parentFileId": self.parent_file_id,
- "size": file_size,
- "type": 0,
- "duplicate": 0,
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.headers,
- json=payload,
- timeout=15
- )
- result = self._handle_response(response)
- if result.get("code") == 5060: # 文件已存在
- choice = input("检测到同名文件,输入 1 覆盖,2 保留两者,其他取消: ")
- if choice == "1":
- payload["duplicate"] = 1
- elif choice == "2":
- payload["duplicate"] = 2
- else:
- print("上传取消")
- return False
-
- response = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.headers,
- json=payload,
- timeout=15
- )
- result = self._handle_response(response)
-
- if result.get("code") != 0:
- return False
-
- # 检查是否MD5复用
- if result["data"].get("Reuse", False):
- print("文件已存在,MD5复用成功")
- return True
-
- # 分块上传
- return self._upload_chunks(
- file_path,
- result["data"]["Bucket"],
- result["data"]["StorageNode"],
- result["data"]["Key"],
- result["data"]["UploadId"],
- result["data"]["FileId"]
- )
- except requests.exceptions.RequestException as e:
- print(f"上传失败: {e}")
- return False
-
- def _upload_chunks(
- self,
- file_path: str,
- bucket: str,
- storage_node: str,
- key: str,
- upload_id: str,
- file_id: str
- ) -> bool:
- """分块上传文件"""
- chunk_size = 5 * 1024 * 1024 # 5MB
- total_size = os.path.getsize(file_path)
- uploaded = 0
- part_number = 1
-
- try:
- with open(file_path, "rb") as f:
- while True:
- chunk = f.read(chunk_size)
- if not chunk:
- break
-
- # 获取上传URL
- url_data = {
- "bucket": bucket,
- "key": key,
- "partNumberEnd": part_number + 1,
- "partNumberStart": part_number,
- "uploadId": upload_id,
- "StorageNode": storage_node,
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch",
- headers=self.headers,
- json=url_data,
- timeout=15
- )
- url_result = self._handle_response(response)
- if url_result.get("code") != 0:
- return False
-
- upload_url = url_result["data"]["presignedUrls"][str(part_number)]
- except requests.exceptions.RequestException:
- return False
-
- # 上传分块
- try:
- upload_response = requests.put(upload_url, data=chunk, timeout=30)
- if upload_response.status_code not in (200, 201):
- return False
- except requests.exceptions.RequestException:
- return False
-
- uploaded += len(chunk)
- progress = uploaded / total_size * 100
- print(f"\r上传进度: {progress:.1f}%", end="", flush=True)
- part_number += 1
-
- print("\n上传完成,正在验证...")
- time.sleep(1) # 等待服务器处理
-
- # 完成上传
- compmultipart_up_url = (
- "https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
- )
- requests.post(
- compmultipart_up_url,
- headers=self.headers,
- data=json.dumps({
- "bucket": bucket,
- "key": key,
- "uploadId": upload_id,
- "StorageNode": storage_node,
- }),
- timeout=10
- )
- complete_data = {"fileId": file_id}
- try:
- response = requests.post(
- "https://www.123pan.com/b/api/file/upload_complete",
- headers=self.headers,
- json=complete_data,
- timeout=15
- )
- result = self._handle_response(response)
- if result.get("code") == 0:
- print("文件上传成功")
- return True
- return False
- except requests.exceptions.RequestException:
- return False
- except IOError as e:
- print(f"读取文件失败: {e}")
- return False
-
- def change_directory(self, target: str):
- """改变当前目录"""
- if target == "..":
- if len(self.parent_file_list) > 1:
- self.parent_file_list.pop()
- self.parent_file_id = self.parent_file_list[-1]
- self.parent_file_name_list.pop()
- self.refresh_directory()
- else:
- print("已经是根目录")
- elif target == "/":
- self.parent_file_id = 0
- self.parent_file_list = [0]
- self.parent_file_name_list = []
- self.refresh_directory()
- elif target.isdigit():
- idx = int(target) - 1
- if 0 <= idx < len(self.list) and self.list[idx]["Type"] == 1:
- self.parent_file_id = self.list[idx]["FileId"]
- self.parent_file_list.append(self.parent_file_id)
- self.parent_file_name_list.append(self.list[idx]["FileName"])
- self.refresh_directory()
- else:
- print("无效的目录编号或不是文件夹")
-
- else:
- print("无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号")
-
- def refresh_directory(self):
- """刷新当前目录内容"""
- self.all_file = False
- self.file_page = 0
- self.list = []
- self.get_dir()
- self.show()
-
- def create_directory(self, name: str) -> bool:
- """创建新目录"""
- if not name:
- print("目录名不能为空")
- return False
-
- # 检查是否已存在
- for item in self.list:
- if item["FileName"] == name and item["Type"] == 1:
- print("目录已存在")
- return True
-
- payload = {
- "driveId": 0,
- "etag": "",
- "fileName": name,
- "parentFileId": self.parent_file_id,
- "size": 0,
- "type": 1,
- "duplicate": 1,
- "NotReuse": True,
- "event": "newCreateFolder",
- "operateType": 1,
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/a/api/file/upload_request",
- headers=self.headers,
- json=payload,
- timeout=10
- )
- result = self._handle_response(response)
- if result.get("code") == 0:
- print(f"目录 '{name}' 创建成功")
- self.get_dir()
- return True
- return False
- except requests.exceptions.RequestException as e:
- print(f"创建目录失败: {e}")
- return False
-
- def get_download_link(self, file_index: int) -> Optional[str]:
- """获取文件下载链接"""
- if not (0 <= file_index < len(self.list)):
- print("无效的文件编号")
- return None
-
- file_detail = self.list[file_index]
-
- if file_detail["Type"] == 1: # 文件夹
- url = "https://www.123pan.com/a/api/file/batch_download_info"
- data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
- else: # 文件
- url = "https://www.123pan.com/a/api/file/download_info"
- data = {
- "driveId": 0,
- "etag": file_detail["Etag"],
- "fileId": file_detail["FileId"],
- "s3keyFlag": file_detail["S3KeyFlag"],
- "type": file_detail["Type"],
- "fileName": file_detail["FileName"],
- "size": file_detail["Size"],
- }
-
- try:
- response = requests.post(url, headers=self.headers, json=data, timeout=15)
- result = self._handle_response(response)
- if result.get("code") != 0:
- return None
-
- download_url = result["data"]["DownloadUrl"]
- # 获取重定向后的真实下载链接
- redirect_response = requests.get(download_url, allow_redirects=False, timeout=15)
- if redirect_response.status_code == 302:
- return redirect_response.headers.get("Location")
-
- # 尝试从HTML中提取下载链接
- url_pattern = re.compile(r"href='(https?://[^']+)'")
- match = url_pattern.search(redirect_response.text)
- if match:
- return match.group(1)
-
- return None
- except requests.exceptions.RequestException as e:
- print(f"获取下载链接失败: {e}")
- return None
-
- def download_file(self, file_index: int, download_path: str = "download") -> bool:
- """下载单个文件"""
- if not (0 <= file_index < len(self.list)):
- print("无效的文件编号")
- return False
-
- file_detail = self.list[file_index]
-
- # 文件夹需要特殊处理
- if file_detail["Type"] == 1:
- print("文件夹下载:")
- return self.download_directory(file_detail, download_path)
-
- # 获取下载链接
- download_url = self.get_download_link(file_index)
- if not download_url:
- print("无法获取下载链接")
- return False
-
- # 确定文件名
- file_name = file_detail["FileName"]
- if not os.path.exists(download_path):
- os.makedirs(download_path)
-
- # 处理文件已存在的情况
- full_path = os.path.join(download_path, file_name)
- if os.path.exists(full_path):
- if self.download_mode == 4: # 全部跳过
- print(f"文件已存在,跳过: {file_name}")
- return True
-
- print(f"文件已存在: {file_name}")
- choice = input("输入1覆盖,2跳过,3全部覆盖,4全部跳过: ")
- if choice == "2" or choice == "4":
- if choice == "4":
- self.download_mode = 4
- print("跳过下载")
- return True
- elif choice == "3":
- self.download_mode = 3
- os.remove(full_path)
-
- # 临时文件名
- temp_path = full_path + ".123pan"
-
- try:
- # 开始下载
- print(f"开始下载: {file_name}")
- response = requests.get(download_url, stream=True, timeout=30)
- total_size = int(response.headers.get("Content-Length", 0))
- downloaded = 0
- start_time = time.time()
-
- with open(temp_path, "wb") as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
- downloaded += len(chunk)
-
- # 计算下载速度
- elapsed = time.time() - start_time
- if elapsed > 0:
- speed = downloaded / elapsed
- speed_str = self._format_size(speed) + "/s"
- else:
- speed_str = "未知"
-
- # 显示进度
- if total_size > 0:
- percent = downloaded / total_size * 100
- print(f"\r进度: {percent:.1f}% | {self._format_size(downloaded)}/{self._format_size(total_size)} | {speed_str}", end=" ")
-
- # 重命名文件
- os.rename(temp_path, full_path)
- print(f"\n下载完成: {file_name}")
- return True
- except requests.exceptions.RequestException as e:
- print(f"\n下载失败: {e}")
- if os.path.exists(temp_path):
- os.remove(temp_path)
- return False
-
- def download_directory(self, directory: dict, download_path: str = "download") -> bool:
- """下载整个目录"""
- if directory["Type"] != 1:
- print("不是文件夹")
- return False
-
- print(f"开始下载文件夹: {directory['FileName']}")
-
- # 创建目标目录
- target_dir = os.path.join(download_path, directory["FileName"])
- if not os.path.exists(target_dir):
- os.makedirs(target_dir)
-
- # 获取目录内容
- _, items = self.get_dir_by_id(directory["FileId"], save=False, all_files=True)
- if not items:
- print("文件夹为空")
- return True
-
- # 下载所有内容
- success = True
- for item in items:
- if item["Type"] == 1: # 子文件夹
- sub_success = self.download_directory(item, target_dir)
- success = success and sub_success
- else: # 文件
- # 临时将文件添加到列表中以便下载
- original_list = self.list
- self.list = [item]
- file_success = self.download_file(0, target_dir)
- self.list = original_list
- success = success and file_success
-
- print(f"文件夹下载完成: {directory['FileName']}")
- return success
-
- def get_recycle_bin(self):
- """获取回收站内容"""
- url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=0&trashed=true&Page=1"
- try:
- response = requests.get(url, headers=self.headers, timeout=15)
- result = self._handle_response(response)
- if result.get("code") == 0:
- return result["data"]["InfoList"]
- return []
- except requests.exceptions.RequestException:
- return []
-
- def restore_file(self, file_id: int) -> bool:
- """从回收站恢复文件"""
- payload = {
- "driveId": 0,
- "fileTrashInfoList": {"FileId": file_id},
- "operation": False # False表示恢复
- }
-
- try:
- response = requests.post(
- "https://www.123pan.com/a/api/file/trash",
- headers=self.headers,
- json=payload,
- timeout=10
- )
- result = self._handle_response(response)
- return result.get("code") == 0
- except requests.exceptions.RequestException:
- return False
-
- def logout(self):
- """登出:清除 authorization、cookies 并保存配置"""
- self.authorization = ""
- # 清理 headers 中的 authorization 字段(兼容大小写)
- if "authorization" in self.headers:
- self.headers["authorization"] = ""
- if "Authorization" in self.headers:
- self.headers["Authorization"] = ""
- self.cookies = None
- self._save_config()
- print("已登出,授权信息已清除")
-
- def set_protocol(self, protocol: str, save: bool = True):
- """切换协议:'android' 或 'web',切换后会重新初始化 headers 并可选择保存配置"""
- protocol = protocol.lower()
- if protocol not in ("android", "web"):
- print("不支持的协议,仅支持 'android' 或 'web'")
- return False
- self.protocol = protocol
- self._init_headers()
- # 确保 authorization 字段更新到 headers
- if "authorization" in self.headers:
- self.headers["authorization"] = self.authorization
- if "Authorization" in self.headers:
- self.headers["Authorization"] = self.authorization
- if save:
- self._save_config()
- print(f"已切换到 {protocol} 协议")
- return True
-
- def cdById(self, file_id: int):
- """根据文件夹ID切换目录"""
- # 重置文件页数和文件列表
- self.all_file = False
- self.file_page = 0
- self.list = []
-
- # 更新当前目录ID和目录列表
- self.parent_file_id = file_id
- self.parent_file_list.append(self.parent_file_id)
-
- # 获取新目录内容并显示
- self.get_dir()
- self.show()
-
-
-if __name__ == "__main__":
- """主交互函数"""
- # 解决Windows下cmd颜色转义问题
- if os.name == "nt":
- os.system("")
-
- print("=" * 60)
- print("123网盘客户端".center(60))
- print("=" * 60)
-
- pan = Pan123(config_file="123pan_config.json")
- pan.show()
-
- while True:
- try:
- path = "/" + "/".join(pan.parent_file_name_list) if pan.parent_file_name_list else "/"
- command = input(f"\033[91m{path}>\033[0m ").strip()
-
- if not command:
- continue
-
- parts = command.split(maxsplit=1)
- cmd = parts[0].lower()
- arg = parts[1] if len(parts) > 1 else ""
-
- # 命令映射
- if cmd == "ls":
- pan.show()
- elif cmd == "login":
- pan.login()
- pan.refresh_directory()
- elif cmd == "logout":
- pan.logout()
- pan.refresh_directory()
- elif cmd == "exit":
- break
- elif cmd == "cd":
- pan.change_directory(arg)
- elif cmd == "mkdir":
- pan.create_directory(arg)
- # 刷新当前目录
- pan.refresh_directory()
- elif cmd == "upload":
- pan.upload(arg)
- pan.refresh_directory()
- elif cmd == "rm":
- if arg.isdigit():
- pan.delete_file(int(arg) - 1)
- pan.refresh_directory()
- else:
- print("无效的文件编号")
- elif cmd == "share":
- file_ids = [int(idx) - 1 for idx in arg.split() if idx.isdigit()]
- if file_ids:
- pwd = input("输入提取码(留空跳过): ")
- pan.share(file_ids, pwd)
- else:
- print("请提供文件编号")
- elif cmd == "more":
- pan.get_dir()
- pan.show()
- elif cmd == "link":
- if arg.isdigit():
- idx = int(arg) - 1
- if 0 <= idx < len(pan.list):
- link = pan.get_download_link(idx)
- if link:
- print(f"文件直链: {link}")
- else:
- print("获取直链失败")
- else:
- print("无效的文件编号")
- else:
- print("请提供文件编号")
- elif cmd in ("download", "d"):
- if arg.isdigit():
- idx = int(arg) - 1
- if 0 <= idx < len(pan.list):
- pan.download_file(idx)
- else:
- print("无效的文件编号")
- else:
- print("请提供文件编号")
- elif cmd == "recycle":
- recycle_items = pan.get_recycle_bin()
- if recycle_items:
- print("\n回收站内容:")
- for i, item in enumerate(recycle_items, 1):
- print(f"{i}. {item['FileName']} ({pan._format_size(item['Size'])})")
-
- action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
- if action.isdigit():
- idx = int(action) - 1
- if 0 <= idx < len(recycle_items):
- if pan.restore_file(recycle_items[idx]["FileId"]):
- print("文件恢复成功")
- else:
- print("恢复失败")
- elif action == "clear":
- for item in recycle_items:
- pan.delete_file(item, by_index=False)
- print("回收站已清空")
- else:
- print("回收站为空")
- # 刷新当前目录
- pan.refresh_directory()
- elif cmd in ("refresh", "re"):
- pan.refresh_directory()
- elif cmd == "reload":
- pan._load_config()
- pan.refresh_directory()
- elif cmd.isdigit():
- # 切换目录或下载文件
- idx = int(cmd) - 1
- if 0 <= idx < len(pan.list):
- if pan.list[idx]["Type"] == 1:
- pan.change_directory(cmd)
- else:
- pan.download_file(idx)
- else:
- print("无效的文件编号")
-
- elif cmd == "protocol":
- if arg.lower() in ("android", "web"):
- pan.set_protocol(arg.lower())
- pan.refresh_directory()
- else:
- print("请指定协议: android 或 web")
- else:
- # 帮助提示,列出支持的命令
- print("可用命令:")
- print(" ls - 显示当前目录")
- print(" cd [编号|..|/] - 切换目录")
- print(" mkdir [名称] - 创建目录")
- print(" upload [路径] - 上传文件")
- print(" rm [编号] - 删除文件")
- print(" share [编号 ...] - 创建分享")
- print(" link [编号] - 获取文件直链")
- print(" download/d [编号] - 下载文件")
- print(" recycle - 管理回收站")
- print(" refresh/re - 刷新目录")
- print(" reload - 重新加载配置并刷新")
- print(" login - 登录")
- print(" logout - 登出并清除 token")
- print(" more - 继续加载更多文件")
- # print(" all - 强制加载当前目录所有文件")
- print(" exit - 退出程序")
- print(" protocol [android|web] - 切换协议")
- except KeyboardInterrupt:
- print("\n操作已取消")
- except Exception as e:
- print(f"发生错误: {e}")
-
diff --git a/README.md b/README.md
index 10d51d7..1036e62 100644
--- a/README.md
+++ b/README.md
@@ -1,18 +1,43 @@
-# 123Pan 下载工具
+# 1、123Pan Cli工具
-123 网盘命令行工具,支持列出文件、下载、上传、分享、删除、创建目录及回收站管理。Android 客户端或 Web 协议,便于在本地批量管理与下载文件。
+123 网盘命令行工具,支持列出文件、下载、上传、分享、删除、创建目录及回收站管理。
-安卓客户端协议不受下载流量限制(推荐使用)。
+
+* [1、123Pan Cli工具](#1123pan-cli工具)
+ * [1.1 特性](#11-特性)
+ * [1.2 脚本环境要求](#12-脚本环境要求)
+ * [1.3 安装与运行](#13-安装与运行)
+ * [1.3.1 脚本运行](#131-脚本运行)
+ * [1.3.2 下载release版](#132-下载release版)
+ * [1.4 配置文件(JSON)](#14-配置文件json)
+ * [1.5 常用命令(交互式)](#15-常用命令交互式)
+* [2、123Pan接口模块(pan123_core.py)](#2123pan接口模块pan123_corepy)
+ * [2.1 核心类:`Pan123Core`](#21-核心类pan123core)
+ * [2.1.1 属性说明](#211-属性说明)
+ * [2.1.2 方法清单](#212-方法清单)
+ * [2.1.2.1 (1)登录操作](#2121-1登录操作)
+ * [2.1.2.2 (2)配置管理](#2122-2配置管理)
+ * [2.1.2.3 (3)目录操作](#2123-3目录操作)
+ * [2.1.2.4 (4)文件操作](#2124-4文件操作)
+ * [2.1.2.5 (5)用户信息](#2125-5用户信息)
+ * [2.2 工具类:`Pan123Tool`](#22-工具类pan123tool)
+ * [2.2.1 属性说明](#221-属性说明)
+ * [2.2.2 方法清单](#222-方法清单)
+ * [2.2.2.1 (1)配置管理](#2221-1配置管理)
+ * [2.2.2.2 (2)文件下载](#2222-2文件下载)
+ * [2.2.2.3 (3)文件上传](#2223-3文件上传)
+ * [2.3 全局配置参数](#23-全局配置参数)
+ * [2.3.1 协议相关](#231-协议相关)
+ * [2.3.2 设备伪装](#232-设备伪装)
+ * [2.4 错误码说明](#24-错误码说明)
+ * [2.5 典型使用示例](#25-典型使用示例)
+* [3、下载说明](#3下载说明)
+* [4、注意事项](#4注意事项)
+* [5、免责声明](#5免责声明)
+
-123download.js 是网页端下载油猴脚本最初版本,仍然可以使用,仅保留最基本的解锁下载功能,不再更新。
+## 1.1 特性
-可以参考其他项目:
-
-[123云盘解锁 (@QingJ)](https://greasyfork.org/zh-CN/scripts/519353-123%E4%BA%91%E7%9B%98%E8%A7%A3%E9%94%81)
-
-[123 云盘会员青春版 (@hmjz100)](https://greasyfork.org/zh-CN/scripts/513528-123-%E4%BA%91%E7%9B%98%E4%BC%9A%E5%91%98%E9%9D%92%E6%98%A5%E7%89%88)
-
-## 特性
- 登录 / 登出
- 列出当前目录文件(ls)
- 切换目录(cd)与刷新(refresh / re)
@@ -26,28 +51,36 @@
- 协议切换(protocol android|web)
- 支持保存配置到 JSON 文件(authorization、device/os、protocol 等)
-## 脚本环境要求
-- Python 3.7+
-- 依赖库:requests
- 安装:
- ```bash
- pip install requests
- ```
+## 1.2 脚本环境要求
+
+- Python 3
+- 依赖库:requests
+ 安装:
+ ```bash
+ pip install requests
+ ```
+
+## 1.3 安装与运行
+
+### 1.3.1 脚本运行
-## 安装与运行
-### 脚本运行
1. 克隆或下载本仓库到本地。
2. 进入项目目录。
3. 运行脚本:
- ```bash
- python 123pan.py
- ```
- 启动后会提示输入用户名 / 密码,或自动读取配置文件(默认 `123pan_config.json` 或 `123pan.txt`,脚本内部根据传入参数使用该文件)。
-### 下载release版
- 根据系统下载对应的 release 版本,解压后运行 `123pan.exe`(Windows)或 `123pan`(Linux)。
-## 配置文件(JSON)
+ ```bash
+ python pan123_cli.py
+ ```
+ 启动后会提示输入用户名 / 密码,或自动读取配置文件(默认 `123pan_config.json`,脚本内部根据传入参数使用该文件)。
+
+### 1.3.2 下载release版
+
+根据系统下载对应的 release 版本,解压后运行 `123pan.exe`(Windows)或 `123pan`(Linux)。
+
+## 1.4 配置文件(JSON)
+
脚本会读取并保存一个配置文件(示例 `123pan_config.json`),保存登录状态与偏好,格式示例:
-```json
+
+```json
{
"userName": "your_username",
"passWord": "your_password",
@@ -55,112 +88,288 @@
"deviceType": "M2007J20CI",
"osVersion": "Android_10",
"protocol": "android"
-}
-```
+}
+```
+
注意:保存密码或 token 到本地会有安全风险,请在可信环境下使用并妥善保护该文件。
-## 常用命令(交互式)
+## 1.5 常用命令(交互式)
+
在脚本交互提示符中输入命令,部分带参数:
-- 直接输入编号
- - 若编号对应文件夹 → 进入该文件夹
- - 若编号对应文件 → 直接下载该文件
-
-- ls
- - 显示当前目录的文件与文件夹列表。
+| 指令 | 用法示例 | 功能说明 |
+|-----------------------------|----------------------------------------|----------------------------------|
+| 直接输入编号 | `3` | 若编号对应文件夹 → 进入该文件夹;若为文件 → 直接下载该文件 |
+| ls | `ls` | 显示当前目录的文件与文件夹列表 |
+| cd [编号|..|/] | `cd 3`、`cd ..`、`cd /` | 切换目录:进入指定编号的文件夹、返回上级、返回根目录 |
+| mkdir [名称] | `mkdir test` | 在当前目录创建文件夹 |
+| upload [路径] | `upload C:\Users\you\Desktop\file.txt` | 上传文件到当前目录(仅支持单个文件) |
+| rm [编号] | `rm 2` | 删除当前列表中指定编号的文件/文件夹(移入回收站) |
+| share [编号 ...] | `share 2 4` | 为指定文件创建一个或多个分享链接,可设置提取码(可为空) |
+| link [编号] | `link 3` | 获取指定文件的直链地址 |
+| download / d [编号] | `download 5` 或 `d 5` | 下载指定编号的文件或文件夹(文件夹将递归下载) |
+| recycle | `recycle` | 查看回收站内容,可恢复指定编号项或输入 clear 清空回收站 |
+| refresh / re | `refresh` 或 `re` | 刷新当前目录列表 |
+| reload | `reload` | 重新加载配置文件并刷新目录 |
+| login / logout | `login`、`logout` | 手动登录或登出(清除授权信息) |
+| clearaccount | clearaccount | 清除已登录账号(包括用户名和密码) |
+| more | `more` | 当目录分页未加载完时,继续加载更多内容 |
+| protocol [android|web] | `protocol web` | 切换通信协议(如 android/web),并可选择保存配置 |
+| exit | `exit` | 退出程序 |
-- cd [编号|..|/]
- - 切换目录:
- - cd 3 —— 进入当前列表第3项(如果是文件夹)。
- - cd .. —— 返回上级。
- - cd / —— 返回根目录。
-
-- mkdir [名称]
- - 在当前目录创建文件夹。例如:mkdir test
-
-- upload [路径]
- - 上传文件到当前目录。例如:upload C:\Users\you\Desktop\file.txt
- - 仅支持文件,暂不支持目录批量上传。
-
-- rm [编号]
- - 删除当前列表中的文件/文件夹(会移动到回收站)。例如:rm 2
-
-- share [编号 ...]
- - 为一个或多个文件创建分享链接,例如:share 2 4
- - 程序会提示输入提取码(可留空)。
-
-- link [编号]
- - 获取文件直链。例如:link 3
-
-- download / d [编号]
- - 下载指定编号的文件或文件夹(如果是文件夹,会递归下载)。
- - 例如:download 5
-
-- recycle
- - 显示回收站内容,并可恢复或清空。
- - 可输入编号恢复,或输入 clear 清空回收站。
-
-- refresh / re
- - 刷新当前目录列表。
-
-- reload
- - 重新加载配置文件并刷新目录。
-
-- login / logout
- - login:手动登录(使用当前配置或提示输入)。
- - logout:登出并清除授权信息(保存配置时会写入空 token)。
-
-- more
- - 如果当前目录分页未加载完,输入 more 继续加载更多文件。
-
-- protocol [android|web]
- - 切换协议。例:protocol web
- - 切换后会重新初始化请求头,并可选择保存到配置文件。
-
-- exit
- - 退出程序。
-
----
+---
交互示例:
-```
-/> cd demo
-无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号
-/> cd 1
-当前目录为空
-/demo1> ls
-当前目录为空
-/demo1> mkdir test
-目录 'test' 创建成功
-/demo1> 1
-当前目录为空
-/demo1/test> upload 123pan.py
-上传进度: 100.0%
-上传完成,正在验证...
-文件上传成功
-------------------------------------------------------------
-编号 类型 大小 名称
-------------------------------------------------------------
-1 文件 38.66 KB 123pan.py
-============================================================
+```
+/> cd demo
+无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号
+/> cd 1
+当前目录为空
+/demo1> ls
+当前目录为空
+/demo1> mkdir test
+目录 'test' 创建成功
-/demo1/test> 1
-开始下载: 123pan.py
-进度: 100.0% | 38.66 KB/38.66 KB | 10.29 MB/s
-下载完成: 123pan.py
-/demo1/test>
+/demo1> 1
+当前目录为空
+/demo1/test> upload 123pan.py
+上传进度: 100.0%
+上传完成,正在验证...
+文件上传成功
+------------------------------------------------------------
+编号 类型 大小 名称
+------------------------------------------------------------
+1 文件 38.66 KB 123pan.py
+============================================================
+
+/demo1/test> 1
+开始下载: 123pan.py
+进度: 100.0% | 38.66 KB/38.66 KB | 10.29 MB/s
+下载完成: 123pan.py
+/demo1/test>
```
-## 下载说明
+# 2、123Pan接口模块(pan123_core.py)
+
+以下是基于代码实现的 **123pan 网盘 API**,按类结构分类说明:
+
+---
+
+### 2.1 核心类:`Pan123Core`
+
+负责与 123pan 服务器的通信,管理认证状态、目录浏览、文件操作等核心逻辑。
+
+#### 2.1.1 属性说明
+
+| 属性名 | 类型 | 描述 |
+|-----------------|------------|-----------------------------|
+| `user_name` | str | 登录账号(手机号/用户名) |
+| `password` | str | 登录密码 |
+| `authorization` | str | 认证 Token(登录后自动填充) |
+| `protocol` | str | 请求协议(`"android"` 或 `"web"`) |
+| `cwd_id` | int | 当前工作目录 ID(0 表示根目录) |
+| `file_list` | List[dict] | 当前目录文件列表 |
+| `nick_name` | str | 当前用户昵称 |
+| `uid` | int | 当前用户 UID |
+
+---
+
+#### 2.1.2 方法清单
+
+##### 2.1.2.1 (1)登录操作
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|-------------------|------|--------|----------------------------|
+| `login()` | 无 | Result | 使用 `user_name/password` 登录 |
+| `logout()` | 无 | Result | 登出并清除 Token |
+| `check_login()` | 无 | Result | 检查当前 Token 是否有效 |
+| `clear_account()` | 无 | Result | 清除账号信息(不保存配置) |
+
+##### 2.1.2.2 (2)配置管理
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|--------------------------|-----------------------------------|--------|----------------------|
+| `load_config(cfg)` | `cfg`: 包含账号信息的字典(见下方配置参数) | Result | 加载配置并更新实例状态 |
+| `get_current_config()` | 无 | dict | 获取当前配置(账号、Token、协议等) |
+| `set_protocol(protocol)` | `protocol`: `"android"` 或 `"web"` | Result | 切换请求协议 |
+
+##### 2.1.2.3 (3)目录操作
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|-----------------------------------------------|---------------------------------------------------------|--------|--------------|
+| `list_dir(parent_id=None, page=1, limit=100)` | `parent_id`: 父目录 ID
`page`: 页码
`limit`: 单页数量 | Result | 获取单页文件列表 |
+| `list_dir_all(parent_id=None, limit=100)` | 同上 | Result | 获取全部文件(自动翻页) |
+| `mkdir(name)` | `name`: 目录名 | Result | 在当前目录创建子目录 |
+| `cd(folder_index)` | `folder_index`: `file_list` 中的目标文件夹下标 | Result | 进入目标文件夹 |
+| `cd_up()` | 无 | Result | 返回上级目录 |
+| `cd_root()` | 无 | Result | 返回根目录 |
+| `trash(file_data, delete=True)` | `file_data`: 文件信息字典
`delete`: 是否删除(True=删除,False=恢复) | Result | 删除或恢复文件 |
+| `list_recycle()` | 无 | Result | 获取回收站文件列表 |
+
+##### 2.1.2.4 (4)文件操作
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|-------------------------------------------------------------------------|---------------------------------------------------------------------------------|--------|-----------------|
+| `upload_file(file_path, duplicate=0, on_progress=None)` | `file_path`: 本地文件路径
`duplicate`: 冲突策略(0=报错,1=覆盖,2=保留)
`on_progress`: 进度回调 | Result | 上传文件(支持秒传和分块上传) |
+| `get_download_url(index)` | `index`: `file_list` 中的目标文件下标 | Result | 获取文件直链(自动处理重定向) |
+| `share(file_ids, share_pwd="", expiration="2099-12-12T08:00:00+08:00")` | `file_ids`: 文件 ID 列表
`share_pwd`: 提取码
`expiration`: 过期时间 | Result | 创建分享链接 |
+
+##### 2.1.2.5 (5)用户信息
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|-------------------|------|--------|------------------------|
+| `get_user_info()` | 无 | Result | 获取当前用户信息(UID、昵称、空间用量等) |
+
+---
+
+### 2.2 工具类:`Pan123Tool`
+
+基于 `Pan123Core` 提供文件交互功能(依赖文件系统操作)。
+
+#### 2.2.1 属性说明
+
+| 属性名 | 类型 | 描述 |
+|---------------|------------|-----------------------------------|
+| `core` | Pan123Core | 关联的核心实例 |
+| `config_file` | str | 配置文件路径(默认 `"123pan_config.json"`) |
+
+---
+
+#### 2.2.2 方法清单
+
+##### 2.2.2.1 (1)配置管理
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|---------------------------|------|--------|------------|
+| `load_config_from_file()` | 无 | Result | 从文件加载配置 |
+| `save_config_to_file()` | 无 | Result | 将当前配置保存到文件 |
+
+##### 2.2.2.2 (2)文件下载
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|--------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------|--------|--------|
+| `download_file(index, save_dir="download", on_progress=None, overwrite=False, skip_existing=False)` | `index`: 文件列表下标
`save_dir`: 保存路径
`on_progress`: 进度回调
`overwrite`: 是否覆盖
`skip_existing`: 是否跳过已存在文件 | Result | 下载单个文件 |
+| `download_directory(directory, save_dir="download", on_progress=None, overwrite=False, skip_existing=False)` | `directory`: 目录信息字典
其他参数同上 | Result | 递归下载目录 |
+
+##### 2.2.2.3 (3)文件上传
+
+| 方法名 | 参数说明 | 返回值类型 | 功能描述 |
+|---------------------------------------------------------|----------------------------|--------|-------------------|
+| `upload_file(file_path, duplicate=0, on_progress=None)` | 同 `Pan123Core.upload_file` | Result | 上传文件(与 Core 方法一致) |
+
+---
+
+### 2.3 全局配置参数
+
+#### 2.3.1 协议相关
+
+| 参数名 | 默认值 | 描述 |
+|-----------------------|----------------------------|-----------------|
+| `API_BASE_URL` | `"https://www.123pan.com"` | API 根地址 |
+| `TIMEOUT_DEFAULT` | `15` | 默认请求超时时间(秒) |
+| `UPLOAD_CHUNK_SIZE` | `5*1024*1024` | 分块上传单块大小(5MB) |
+| `DOWNLOAD_CHUNK_SIZE` | `8192` | 下载流式读取单块大小(8KB) |
+
+#### 2.3.2 设备伪装
+
+| 参数名 | 默认值 | 描述 |
+|----------------|-----|-------------------|
+| `DEVICE_TYPES` | 见代码 | 可选 Android 设备型号列表 |
+| `OS_VERSIONS` | 见代码 | 可选 Android 系统版本列表 |
+
+---
+
+### 2.4 错误码说明
+
+| 错误码 | 含义 | 可能触发场景 |
+|------|--------|----------------------------|
+| 0 | 操作成功 | 所有接口成功时返回 |
+| -1 | 网络请求失败 | 连接超时、SSL 错误等 |
+| 5060 | 文件名冲突 | 上传时 `duplicate=0` 且目标文件已存在 |
+| 1 | 本地文件冲突 | 下载时目标文件已存在 |
+
+---
+
+### 2.5 典型使用示例
+
+```python
+import json
+
+from pan123_core import Pan123Core, Pan123Tool, Pan123EventType, format_size
+
+# 初始化核心对象(Android 协议)
+core = Pan123Core(
+ user_name="13800138000",
+ password="your_password",
+ protocol=Pan123Core.PROTOCOL_ANDROID
+)
+
+# 登录
+result = core.login()
+if result["code"] != 0:
+ raise Exception("登录失败")
+
+# 创建工具类实例
+tool = Pan123Tool(core)
+
+
+# 下载文件
+def _download_progress(data) -> None:
+ if data.get("type") == Pan123EventType.DOWNLOAD_PROGRESS:
+ downloaded = data.get("downloaded", 0)
+ total = data.get("total", 0)
+ speed = data.get("speed", 0)
+ if total > 0:
+ pct = downloaded / total * 100
+ print(
+ f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s",
+ end=" ",
+ flush=True,
+ )
+ elif data.get("type") == Pan123EventType.DOWNLOAD_START_FILE:
+ print(f"开始下载: {data.get('file_name', '未知文件')} ({format_size(data.get('file_size', 0))})")
+ elif data.get("type") == Pan123EventType.DOWNLOAD_START_DIRECTORY:
+ print(f"开始下载目录: {data.get('dir_name', '未知目录')}")
+ else:
+ print(json.dumps(data, indent=2))
+
+
+result = tool.download_file(
+ index=0,
+ save_dir="downloads",
+ on_progress=lambda e: print(f"下载进度: {e['percent']:.2f}%")
+)
+
+# 上传文件
+result = core.upload_file(
+ file_path="local_file.txt",
+ duplicate=1, # 覆盖已有文件
+ on_progress=None # ...
+)
+
+# 创建分享链接
+result = core.share(
+ file_ids=[12345],
+ share_pwd="123456",
+ expiration="2026-12-31T23:59:59+08:00"
+)
+```
+
+---
+
+# 3、下载说明
+
- 下载到脚本所在目录的 `download` 文件夹,下载过程中使用临时后缀 `.123pan`,下载完成后会重命名为原文件名。
- 如果文件已存在,会提示覆盖 / 跳过 / 全部覆盖 / 全部跳过等选项。
-## 注意事项
+# 4、注意事项
+
- 本工具用于学习与自用场景,请勿用于违法用途。对任何滥用造成的后果,本人概不负责。
- 模拟客户端协议可能存在账号或服务端策略风险,请谨慎使用。
- 建议不要在公用或不受信任的机器上保存明文密码或授权信息。
-## 免责声明
+# 5、免责声明
-本工具用于学习场景,请勿用于违法用途。对任何滥用造成的后果,作者概不负责。
\ No newline at end of file
+本工具用于学习场景,请勿用于违法用途。对任何滥用造成的后果,作者概不负责。
+任何未经允许的api调用都是不被官方允许的,对于因此产生的账号风险、数据损失等后果自负。
\ No newline at end of file
diff --git a/pack.sh b/pack.sh
index 48ece22..08609c9 100755
--- a/pack.sh
+++ b/pack.sh
@@ -1 +1 @@
-pyinstaller -F 123pan.py --icon favicon.ico --clean --noconfirm
\ No newline at end of file
+pyinstaller -F pan123_cli.py --icon favicon.ico --clean --noconfirm
\ No newline at end of file
diff --git a/pan123_cli.py b/pan123_cli.py
index 8e8b1e1..6c30bec 100644
--- a/pan123_cli.py
+++ b/pan123_cli.py
@@ -2,11 +2,12 @@
123pan 控制台交互界面 —— 仅负责用户 IO,所有业务调用 Pan123Core。
"""
+import json
import os
import sys
-from typing import List
+from typing import Dict
-from pan123_core import Pan123Core, format_size, make_result
+from pan123_core import Pan123Core, Pan123Tool, Pan123EventType, format_size
# ──────────────── 颜色工具 ────────────────
@@ -50,7 +51,9 @@ class Pan123CLI:
exit - 退出程序"""
def __init__(self, config_file: str = "123pan_config.json"):
- self.core = Pan123Core(config_file=config_file)
+ self.config_file: str = config_file
+ self.core = Pan123Core()
+ self.tool = Pan123Tool(self.core)
self._download_mode: int = 0 # 0=询问, 3=全部覆盖, 4=全部跳过
# ──────────────── 启动 ────────────────
@@ -62,11 +65,36 @@ class Pan123CLI:
os.system("")
self._print_banner()
- self._init_login()
+ if not self._init_login():
+ print(colored("无法登录", Color.RED))
+ a = input("输入1重新输入账号和密码,输入2清除登录信息,其他键退出: ")
+ if a == "1":
+ user_name = input("请输入用户名: ")
+ password = input("请输入密码: ")
+ if not user_name or not password:
+ print("用户名和密码不能为空,程序退出")
+ return
+ self.core.load_config({
+ "userName": user_name,
+ "passWord": password,
+ "authorization": ""
+ })
+ self.save_config()
+ return self.run()
+ if a == "2":
+ self._do_clear_account()
+ return self.run()
+ return
+
+ self.save_config()
+ self.core.refresh() # 加载文件列表
+ self.core.get_user_info()
+ self._show_files()
while True:
try:
prompt = colored(f"{self.core.cwd_path}>", Color.RED) + " "
+ print(colored(f'用户:{self.core.nick_name}', Color.GREEN))
command = input(prompt).strip()
if not command:
continue
@@ -82,26 +110,39 @@ class Pan123CLI:
def _print_banner(self) -> None:
print("=" * 60)
- print("123网盘客户端".center(56))
+ print("123网盘CLI客户端".center(56))
print("=" * 60)
- def _init_login(self) -> None:
+ def _init_login(self) -> bool:
"""尝试加载配置 -> 尝试访问目录 -> 必要时登录"""
- self.core.load_config()
+ res = self.load_config()
+ r = self.core.init_login_state()
+ if r["code"] < 0:
+ print(colored("登录失败", Color.YELLOW))
+ print(r["message"])
+ return False
+ return True
- r = self.core.refresh()
- if r["code"] != 0:
- # 需要登录
- if not self.core.user_name:
- self.core.user_name = input("请输入用户名: ")
- if not self.core.password:
- self.core.password = input("请输入密码: ")
- lr = self.core.login()
- self._print_result(lr)
- if lr["code"] == 0:
- self.core.refresh()
+ def load_config(self) -> Dict:
+ """加载配置"""
+ try:
+ with open(self.config_file, "r", encoding="utf-8") as f:
+ cfg = json.load(f)
+ except FileNotFoundError:
+ user_name = input("请输入用户名: ")
+ password = input("请输入密码: ")
+ cfg = {
+ "userName": user_name,
+ "passWord": password,
+ "authorization": ""
+ }
+ return self.core.load_config(cfg)
- self._show_files()
+ def save_config(self) -> None:
+ """保存配置"""
+ cfg = self.core.get_current_config()
+ with open(self.config_file, "w", encoding="utf-8") as f:
+ json.dump(cfg, f, indent=2, ensure_ascii=False)
# ──────────────── 命令分发 ────────────────
@@ -185,6 +226,7 @@ class Pan123CLI:
def _do_logout(self) -> None:
r = self.core.logout()
+ self.save_config()
self._print_result(r)
def _do_clear_account(self) -> None:
@@ -192,6 +234,7 @@ class Pan123CLI:
confirm = input("确定要清除已登录账号信息吗?(y/N): ").strip().lower()
if confirm == 'y':
r = self.core.clear_account()
+ self.save_config()
self._print_result(r)
else:
print("操作已取消")
@@ -275,7 +318,7 @@ class Pan123CLI:
return
r = self.core.get_download_url(int(arg) - 1)
if r["code"] == 0:
- print(f"文件直链: {r['data']['url']}")
+ print(f"文件直链: \n{r['data']['url']}")
else:
self._print_result(r)
@@ -292,7 +335,7 @@ class Pan123CLI:
overwrite = self._download_mode == 3
skip = self._download_mode == 4
- r = self.core.download_file(
+ r = self.tool.download_file(
idx,
on_progress=self._download_progress,
overwrite=overwrite,
@@ -311,7 +354,7 @@ class Pan123CLI:
return
elif choice == "3":
self._download_mode = 3
- r = self.core.download_file(idx, on_progress=self._download_progress, overwrite=True)
+ r = self.tool.download_file(idx, on_progress=self._download_progress, overwrite=True)
print() # 换行
self._print_result(r)
@@ -378,17 +421,29 @@ class Pan123CLI:
# ──────────────── 进度回调 ────────────────
@staticmethod
- def _download_progress(downloaded: int, total: int, speed: float) -> None:
- if total > 0:
- pct = downloaded / total * 100
- print(
- f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s",
- end=" ",
- flush=True,
- )
+ def _download_progress(data) -> None:
+ if data.get("type") == Pan123EventType.DOWNLOAD_PROGRESS:
+ downloaded = data.get("downloaded", 0)
+ total = data.get("total", 0)
+ speed = data.get("speed", 0)
+ if total > 0:
+ pct = downloaded / total * 100
+ print(
+ f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s",
+ end=" ",
+ flush=True,
+ )
+ elif data.get("type") == Pan123EventType.DOWNLOAD_START_FILE:
+ print(f"开始下载: {data.get('file_name', '未知文件')} ({format_size(data.get('file_size', 0))})")
+ elif data.get("type") == Pan123EventType.DOWNLOAD_START_DIRECTORY:
+ print(f"开始下载目录: {data.get('dir_name', '未知目录')}")
+ else:
+ print(json.dumps(data, indent=2))
@staticmethod
- def _upload_progress(uploaded: int, total: int) -> None:
+ def _upload_progress(data) -> None:
+ uploaded = data.get("uploaded", 0)
+ total = data.get("total", 0)
if total > 0:
pct = uploaded / total * 100
print(f"\r上传进度: {pct:.1f}%", end="", flush=True)
@@ -397,4 +452,4 @@ class Pan123CLI:
# ──────────────── 入口 ────────────────
if __name__ == "__main__":
- Pan123CLI().run()
\ No newline at end of file
+ Pan123CLI().run()
diff --git a/pan123_core.py b/pan123_core.py
index ca73240..fecbcf6 100644
--- a/pan123_core.py
+++ b/pan123_core.py
@@ -1,12 +1,9 @@
"""
-123pan 网盘内核模块 —— 纯业务逻辑,无任何 IO(print / input)。
-
-可直接移植到 GUI / Web / API 等上层应用。
+123pan 网盘内核模块
所有公开方法统一返回 Result 字典::
-
{
- "code": int, # 0 = 成功,非 0 = 失败(含 API 原始错误码)
- "message": str, # 人类可读的结果描述
+ "code": int, # 0 = 成功,小于 0 = 失败 大于 0 = 警告
+ "message": str, # 结果描述
"data": Any # 业务数据,失败时为 None
}
"""
@@ -18,11 +15,11 @@ import random
import re
import time
import uuid
+from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
import requests
-
# ════════════════════════════════════════════════════════════════
# 全局常量 —— URL / 端点 / 超时 / 分块 / 设备信息
# ════════════════════════════════════════════════════════════════
@@ -32,92 +29,98 @@ API_BASE_URL = "https://www.123pan.com"
"""123pan API 根地址"""
# ── 接口端点(相对路径,使用时拼接 API_BASE_URL)────────────────
-URL_LOGIN = "/b/api/user/sign_in"
+URL_LOGIN = "/b/api/user/sign_in"
"""登录接口"""
-URL_FILE_LIST = "/api/file/list/new"
+URL_FILE_LIST = "/api/file/list/new"
"""文件列表接口(GET,支持目录浏览与回收站查询)"""
-URL_FILE_TRASH = "/a/api/file/trash"
+URL_FILE_TRASH = "/a/api/file/trash"
"""文件删除 / 恢复接口"""
-URL_SHARE_CREATE = "/a/api/share/create"
+URL_SHARE_CREATE = "/a/api/share/create"
"""创建分享接口"""
-URL_DOWNLOAD_INFO = "/a/api/file/download_info"
+URL_DOWNLOAD_INFO = "/a/api/file/download_info"
"""单文件下载信息接口"""
-URL_BATCH_DOWNLOAD = "/a/api/file/batch_download_info"
-"""批量(文件夹)下载信息接口"""
+URL_BATCH_DOWNLOAD = "/a/api/file/batch_download_info"
+"""批量(文件夹)下载信息接口 服务端会进行打包下载"""
-URL_UPLOAD_REQUEST = "/b/api/file/upload_request"
+URL_UPLOAD_REQUEST = "/b/api/file/upload_request"
"""上传请求接口(含创建目录)"""
-URL_UPLOAD_PARTS = "/b/api/file/s3_repare_upload_parts_batch"
+URL_UPLOAD_PARTS = "/b/api/file/s3_repare_upload_parts_batch"
"""分块上传预签名 URL 获取接口"""
URL_UPLOAD_COMPLETE_S3 = "/b/api/file/s3_complete_multipart_upload"
"""S3 分块合并接口"""
-URL_UPLOAD_COMPLETE = "/b/api/file/upload_complete"
+URL_UPLOAD_COMPLETE = "/b/api/file/upload_complete"
"""上传完成确认接口"""
-URL_MKDIR = "/a/api/file/upload_request"
+URL_MKDIR = "/a/api/file/upload_request"
"""创建目录接口(复用 upload_request,type=1)"""
-SHARE_URL_TEMPLATE = "{base}/s/{key}"
+URL_USER_INFO = "/b/api/user/info"
+"""获取用户信息接口"""
+
+URL_DETAILS = "/b/api/restful/goapi/v1/file/details"
+"""获取文件夹详情接口"""
+
+SHARE_URL_TEMPLATE = "{base}/s/{key}"
"""分享链接模板,{base} = API_BASE_URL,{key} = ShareKey"""
# ── 超时配置(秒)────────────────────────────────────────────
-TIMEOUT_DEFAULT = 15
+TIMEOUT_DEFAULT = 15
"""默认请求超时"""
-TIMEOUT_FILE_LIST = 30
+TIMEOUT_FILE_LIST = 30
"""文件列表请求超时(数据量可能较大)"""
-TIMEOUT_UPLOAD_CHUNK = 30
+TIMEOUT_UPLOAD_CHUNK = 30
"""单个分块上传超时"""
-TIMEOUT_DOWNLOAD = 30
+TIMEOUT_DOWNLOAD = 30
"""下载请求超时"""
-TIMEOUT_TRASH = 10
+TIMEOUT_TRASH = 10
"""删除 / 恢复操作超时"""
# ── 上传 / 下载参数 ──────────────────────────────────────────
-UPLOAD_CHUNK_SIZE = 5 * 1024 * 1024
+UPLOAD_CHUNK_SIZE = 5 * 1024 * 1024
"""分块上传单块大小(5 MB)"""
-DOWNLOAD_CHUNK_SIZE = 8192
+DOWNLOAD_CHUNK_SIZE = 8192
"""下载流式读取单块大小(8 KB)"""
-MD5_READ_CHUNK_SIZE = 65536
+MD5_READ_CHUNK_SIZE = 65536
"""计算文件 MD5 时的读取块大小(64 KB)"""
# ── 翻页 / 限频 ─────────────────────────────────────────────
-FILE_LIST_PAGE_LIMIT = 100
+FILE_LIST_PAGE_LIMIT = 100
"""单页最大文件数"""
-RATE_LIMIT_INTERVAL = 10
+RATE_LIMIT_INTERVAL = 10
"""连续翻页时的限频等待秒数"""
-RATE_LIMIT_PAGES = 5
+RATE_LIMIT_PAGES = 5
"""每翻多少页触发一次限频等待"""
-S3_MERGE_DELAY = 1
+S3_MERGE_DELAY = 1
"""S3 分块合并后等待服务器处理的秒数"""
# ── 业务错误码 ───────────────────────────────────────────────
-CODE_OK = 0
+CODE_OK = 0
"""统一成功码"""
-CODE_LOGIN_OK = 200
+CODE_LOGIN_OK = 200
"""123pan 登录接口成功时返回的原始码"""
-CODE_DUPLICATE_FILE = 5060
+CODE_DUPLICATE_FILE = 5060
"""上传时同名文件已存在的错误码"""
-CODE_CONFLICT = 1
+CODE_CONFLICT = 1
"""自定义:本地文件冲突(下载时目标已存在)"""
# ── 设备信息池(Android 协议伪装)─────────────────────────────
@@ -142,9 +145,9 @@ OS_VERSIONS: List[str] = [
"""可选的 Android 系统版本列表"""
# ── Android 协议版本号 ──────────────────────────────────────
-ANDROID_APP_VERSION = "61"
-ANDROID_X_APP_VERSION = "2.4.0"
-ANDROID_DEVICE_BRAND = "Xiaomi"
+ANDROID_APP_VERSION = "61"
+ANDROID_X_APP_VERSION = "2.4.0"
+ANDROID_DEVICE_BRAND = "Xiaomi"
# ── Web 协议 User-Agent ─────────────────────────────────────
WEB_USER_AGENT = (
@@ -155,6 +158,15 @@ WEB_USER_AGENT = (
WEB_APP_VERSION = "3"
+# ─── 事件类型 ────────────────────────────────────────────────
+@dataclass
+class Pan123EventType:
+ DOWNLOAD_START_FILE = "download_start_file"
+ DOWNLOAD_START_DIRECTORY = "download_start_directory"
+ DOWNLOAD_PROGRESS: str = "download_progress"
+ UPLOAD_PROGRESS: str = "upload_progress"
+
+
# ════════════════════════════════════════════════════════════════
# 工具函数
# ════════════════════════════════════════════════════════════════
@@ -163,7 +175,7 @@ def make_result(code: int = CODE_OK, message: str = "ok", data: Any = None) -> D
"""构造统一返回结构。
Args:
- code: 状态码,0 表示成功,非 0 表��失败。
+ code: 状态码,0 表示成功,小于 0 表示失败,大于 0 表示成功但有警告信息
message: 人类可读的结果描述。
data: 业务数据,失败时通常为 None。
@@ -226,9 +238,8 @@ ProgressCallback = Optional[Callable[..., None]]
class Pan123Core:
"""123 网盘内核类。
- 提供登录、目录浏览、上传、下载、分享、删除、回收站等纯逻辑接口。
- 不做任何 print / input,所有结果通过 ``make_result`` 统一返回,
- 方便上层(CLI / GUI / Web)自行处理展示与交互。
+ 提供登录、目录浏览、上传、下载链接、分享、删除、回收站等纯逻辑接口。
+ 所有结果通过 ``make_result`` 统一返回,
Attributes:
user_name (str): 登录用户名 / 手机号。
@@ -236,8 +247,8 @@ class Pan123Core:
authorization (str): Bearer Token,登录后自动填充。
protocol (str): 请求协议,"android" 或 "web"。
config_file (str): 配置文件路径。
- device_type (str): Android 设备型号。
- os_version (str): Android 系统版本。
+ device_type (str): Android 设备型号。 留空则随机选取 DEVICE_TYPES 中的一个。
+ os_version (str): Android 系统版本。 留空则随机选取 OS_VERSIONS 中的一个。
cwd_id (int): 当前工作目录 FileId(0 = 根目录)。
cwd_stack (List[int]): 目录 ID 导航栈。
cwd_name_stack (List[str]): 目录名称导航栈。
@@ -246,21 +257,28 @@ class Pan123Core:
all_loaded (bool): 当前目录是否已全部加载。
cookies (Optional[Dict]): 登录后保存的 Cookie。
headers (Dict[str, str]): 当前使用的请求头。
+
+ nick_name (str): 当前用户昵称(获取用户信息时填充)。
+ uid (int): 当前用户 UID(获取用户信息时填充)。
+
+ :note:
+ 流程:初始化内核实例 -> 加载配置(可以初始化时提供)-> 初始化登录状态(self.init_login_state()) -> 进行目录浏览 / 上传 / 下载等操作 -> 需要时保存配置
+ 配置说明:如果传入了authorization,会先使用它尝试获取用户信息来验证登录状态,如果无效则根据提供的用户名和密码重新登录;如果未传入authorization,则直接根据用户名和密码登录。登录成功后会更新authorization属性。
"""
# ── 协议常量 ──────────────────────────────────────────────
PROTOCOL_ANDROID = "android"
- PROTOCOL_WEB = "web"
+ PROTOCOL_WEB = "web"
def __init__(
- self,
- user_name: str = "",
- password: str = "",
- authorization: str = "",
- protocol: str = PROTOCOL_ANDROID,
- config_file: str = "123pan.txt",
- device_type: str = "",
- os_version: str = "",
+ self,
+ user_name: str = "",
+ password: str = "",
+ authorization: str = "",
+ protocol: str = PROTOCOL_ANDROID,
+ device_type: str = "",
+ os_version: str = "",
+ # config_file: str = "123pan_config.json",
):
"""初始化内核实例。
@@ -272,6 +290,7 @@ class Pan123Core:
config_file: 配置文件路径,用于持久化账号和 Token。
device_type: 指定 Android 设备型号,为空则随机选取。
os_version: 指定 Android 系统版本,为空则随机选取。
+ use_config_file: 是否在初始化时自动从配置文件加载账号信息和 Token,默认为 False,以避免内核直接依赖文件系统
"""
# 账号信息
self.user_name: str = user_name
@@ -285,7 +304,7 @@ class Pan123Core:
self.login_uuid: str = uuid.uuid4().hex
# 配置文件
- self.config_file: str = config_file
+ # self.config_file: str = config_file
# 目录导航状态
self.cwd_id: int = 0
@@ -305,6 +324,10 @@ class Pan123Core:
self.headers: Dict[str, str] = {}
self._build_headers()
+ # 运行参数
+ self.nick_name = None
+ self.uid = None
+
# ════════════════════════════════════════════════════════════
# 请求头构建
# ════════════════════════════════════════════════════════════
@@ -364,10 +387,13 @@ class Pan123Core:
# 配置持久化
# ════════════════════════════════════════════════════════════
- def load_config(self) -> Dict[str, Any]:
- """从配置文件加载账号信息、Token 及协议设置。
+ def load_config(self, cfg: Dict) -> Dict[str, Any]:
+ """从配置加载账号信息、Token 及协议设置。仅更新cfg中存在的字段,
- 会自���重建 headers 并同步 authorization。
+ 会自动重建 headers 并同步 authorization。
+
+ Args:
+ cfg: { userName: str, passWord: str, authorization: str, deviceType: str, osVersion: str, protocol: str }
Returns:
Result 字典::
@@ -375,33 +401,35 @@ class Pan123Core:
成功: {"code": 0, "message": "配置加载成功", "data": {配置内容 dict}}
失败: {"code": -1, "message": "错误描述", "data": None}
"""
- if not os.path.exists(self.config_file):
- return make_result(-1, "配置文件不存在")
try:
- with open(self.config_file, "r", encoding="utf-8") as f:
- cfg = json.load(f)
- self.user_name = cfg.get("userName", self.user_name)
- self.password = cfg.get("passWord", self.password)
+ self.user_name = cfg.get("userName", self.user_name)
+ self.password = cfg.get("passWord", self.password)
self.authorization = cfg.get("authorization", self.authorization)
- self.device_type = cfg.get("deviceType", self.device_type)
- self.os_version = cfg.get("osVersion", self.os_version)
- self.protocol = cfg.get("protocol", self.protocol).lower()
+ self.device_type = cfg.get("deviceType", self.device_type)
+ self.os_version = cfg.get("osVersion", self.os_version)
+ self.protocol = cfg.get("protocol", self.protocol).lower()
self._build_headers()
self._sync_authorization()
return make_result(CODE_OK, "配置加载成功", cfg)
except Exception as e:
return make_result(-1, f"加载配置失败: {e}")
- def save_config(self) -> Dict[str, Any]:
- """将当前账号信息、Token 及协议设置保存到配置文件。
+ def get_current_config(self) -> Dict[str, Any]:
+ """获取当前账号信息、Token 及协议设置的字典表示。
Returns:
- Result 字典::
+ 当前配置的字典,例如::
- 成功: {"code": 0, "message": "配置已保存", "data": {配置内容 dict}}
- 失败: {"code": -1, "message": "错误描述", "data": None}
+ {
+ "userName": str,
+ "passWord": str,
+ "authorization": str,
+ "deviceType": str,
+ "osVersion": str,
+ "protocol": str,
+ }
"""
- cfg = {
+ return {
"userName": self.user_name,
"passWord": self.password,
"authorization": self.authorization,
@@ -409,25 +437,19 @@ class Pan123Core:
"osVersion": self.os_version,
"protocol": self.protocol,
}
- try:
- with open(self.config_file, "w", encoding="utf-8") as f:
- json.dump(cfg, f, ensure_ascii=False, indent=2)
- return make_result(CODE_OK, "配置已保存", cfg)
- except Exception as e:
- return make_result(-1, f"保存配置失败: {e}")
# ════════════════════════════════════════════════════════════
# 统一网络请求
# ════════════════════════════════════════════════════════════
def _request(
- self,
- method: str,
- path: str,
- *,
- json_data: Any = None,
- params: Any = None,
- timeout: int = TIMEOUT_DEFAULT,
+ self,
+ method: str,
+ path: str,
+ *,
+ json_data: Any = None,
+ params: Any = None,
+ timeout: int = TIMEOUT_DEFAULT,
) -> Dict[str, Any]:
"""发送 HTTP 请求并返回统一 Result。
@@ -445,7 +467,7 @@ class Pan123Core:
Result 字典::
成功: {"code": 0, "message": "ok", "data": {API 原始响应 JSON}}
- 失败: {"code": , "message": "错误描述", "data": {API响应} | None}
+ 失败: {"code": <0, "message": "错误描述", "data": {API响应} | None}
"""
url = f"{API_BASE_URL}{path}" if path.startswith("/") else path
try:
@@ -458,14 +480,46 @@ class Pan123Core:
)
data = resp.json()
api_code = data.get("code", -1)
- # 123pan 登录成功返回 200,其余接口成功返回 0
+ # 123pan 登录成功/退出登录 成功返回 code 200,其余接口成功返回 0
if api_code not in (CODE_OK, CODE_LOGIN_OK):
- return make_result(api_code, data.get("message", "未知错误"), data)
+ return make_result(-3, data.get("message", "未知错误"), data)
return make_result(CODE_OK, "ok", data)
except requests.RequestException as e:
return make_result(-1, f"请求失败: {e}")
except json.JSONDecodeError:
- return make_result(-1, "响应 JSON 解析错误")
+ return make_result(-2, "响应 JSON 解析错误")
+
+ # ════════════════════════════════════════════════════════════
+ # 用户信息
+ # ════════════════════════════════════════════════════════════
+
+ def get_user_info(self) -> Dict[str, Any]:
+ """获取当前登录用户的信息。
+
+ Returns:
+ Result:
+ 成功: {"code": 0, "message": "ok", "data": {用户信息 dict}}
+ 失败: {"code": <错误码>, "message": "错误描述", "data": None}
+ data: {
+ "UID": ,
+ "Nickname": "",
+ "SpaceUsed": ,
+ "SpacePermanent": ,
+ "SpaceTemp": 0,
+ "FileCount": ,
+ "SpaceTempExpr": "",
+ "Mail": "",
+ "Passport": ,
+ "HeadImage": "",
+ ......
+ }
+ """
+ user_info_res = self._request("GET", URL_USER_INFO)
+ if user_info_res["code"] != CODE_OK:
+ return make_result(user_info_res["code"], f"获取用户信息失败: {user_info_res['message']}")
+ self.nick_name = user_info_res["data"]["data"].get("Nickname", "")
+ self.uid = user_info_res["data"]["data"].get("UID", None)
+ return make_result(CODE_OK, "ok", user_info_res["data"]["data"])
# ════════════════════════════════════════════════════════════
# 登录 / 登出
@@ -478,7 +532,7 @@ class Pan123Core:
Result 字典::
成功: {"code": 0, "message": "登录成功", "data": None}
- 失败: {"code": -1|, "message": "错误描述", "data": None}
+ 失败: {"code": -1, "message": "错误描述", "data": None}
"""
if not self.user_name or not self.password:
return make_result(-1, "用户名和密码不能为空")
@@ -494,7 +548,8 @@ class Pan123Core:
self.authorization = f"Bearer {token}"
self._build_headers()
self._sync_authorization()
- self.save_config()
+ # 为避免内核依赖文件系统,登录成功后不自动保存配置到文件,由上层调用者决定何时保存。
+ # self.save_config_to_file()
return make_result(CODE_OK, "登录成功")
def logout(self) -> Dict[str, Any]:
@@ -508,11 +563,11 @@ class Pan123Core:
self.authorization = ""
self._sync_authorization()
self.cookies = None
- self.save_config()
+ # self.save_config_to_file()
return make_result(CODE_OK, "已登出")
def clear_account(self) -> Dict[str, Any]:
- """清除已登录账号:清除用户名、密码、authorization 和 cookies,并保存配置。
+ """清除已登录账号:清除用户名、密码、authorization 和 cookies,不保存配置,但重建请求头。
Returns:
Result 字典::
@@ -524,18 +579,81 @@ class Pan123Core:
self.authorization = ""
self._sync_authorization()
self.cookies = None
- self.save_config()
+ # self.save_config_to_file()
return make_result(CODE_OK, "账号信息已清除")
+ def check_login(self) -> Dict[str, Any]:
+ """检查当前登录状态是否有效。
+
+ 通过尝试获取根目录列表来验证 Token 是否有效。
+
+ Returns:
+ Result 字典::
+
+ 成功: {"code": 0, "message": "登录状态有效", "data": None}
+ 失败: {"code": -1, "message": "登录状态无效: 错误描述", "data": None}
+ """
+ result = self.get_user_info()
+ if result["code"] == CODE_OK:
+ return make_result(CODE_OK, "登录状态有效")
+ return make_result(-1, f"登录状态无效: {result['message']}")
+
+ def init_login_state(self) -> Dict[str, Any]:
+ """根据提供的配置初始化登录状态。
+
+ Args:
+ cfg: 包含账号信息和 Token 的配置字典,结构同 get_current_config() 的返回值。
+
+ Returns:
+ Result:
+ {"code": Num, "message": "..."}
+ """
+ # 直接获取目录列表来验证登录状态和 Token 是否有效
+ is_valid = self.check_login()
+ if is_valid["code"] == CODE_OK:
+ return make_result(CODE_OK, "登录状态初始化成功")
+ else:
+ # 登录状态无效,重新登录
+ if not self.user_name or not self.password:
+ return make_result(-1, "登录状态无效,且用户名或密码缺失,无法重新登录")
+ login_result = self.login()
+ if login_result["code"] == CODE_OK:
+ return make_result(CODE_OK, "登录状态无效,重新登录成功")
+ else:
+ return make_result(-2, f"登录状态无效,重新登录失败: {login_result['message']}")
+
# ════════════════════════════════════════════════════════════
# 目录浏览
# ════════════════════════════════════════════════════════════
+ def get_folder_details(self, folder_id: int) -> Dict[str, Any]:
+ """获取指定文件夹的详情信息。
+
+ Args:
+ folder_id: 目标文件夹的 FileId。
+
+ Returns:
+ Result 字典::
+
+ 成功: {"code": 0, "message": "ok", "data": {文件夹详情 dict}}
+ 失败: {"code": <错误码>, "message": "错误描述", "data": None}
+ """
+ # 要传递一个包含 folder_id 的列表,但接口只返回第一个文件夹的详情
+ data = {"file_ids": [folder_id]}
+ res = self._request("POST", URL_DETAILS, json_data=data)
+ if res["code"] != CODE_OK:
+ return make_result(-1, f"获取文件夹详情失败: {res['message']}", res["data"])
+ details = res["data"]["data"]
+ if not details:
+ return make_result(-2, "文件夹详情数据为空", res["data"])
+ return make_result(CODE_OK, "ok", details)
+
+
def list_dir(
- self,
- parent_id: Optional[int] = None,
- page: int = 1,
- limit: int = FILE_LIST_PAGE_LIMIT,
+ self,
+ parent_id: Optional[int] = None,
+ page: int = 1,
+ limit: int = FILE_LIST_PAGE_LIMIT,
) -> Dict[str, Any]:
"""获取指定目录的单页文件列表。
@@ -581,9 +699,9 @@ class Pan123Core:
})
def list_dir_all(
- self,
- parent_id: Optional[int] = None,
- limit: int = FILE_LIST_PAGE_LIMIT,
+ self,
+ parent_id: Optional[int] = None,
+ limit: int = FILE_LIST_PAGE_LIMIT,
) -> Dict[str, Any]:
"""获取指定目录下的全部文件(自动翻页,含限频等待)。
@@ -735,7 +853,7 @@ class Pan123Core:
Result 字典::
成功: {"code": 0, "message": "ok", "data": {API 响应}}
- 失败: {"code": -1|, "message": "...", "data": ...}
+ 失败: {"code": -1, "message": "...", "data": ...}
"""
if not name:
return make_result(-1, "目录名不能为空")
@@ -839,10 +957,10 @@ class Pan123Core:
# ════════════════════════════════════════════════════════════
def share(
- self,
- file_ids: List[int],
- share_pwd: str = "",
- expiration: str = "2099-12-12T08:00:00+08:00",
+ self,
+ file_ids: List[int],
+ share_pwd: str = "",
+ expiration: str = "2099-12-12T08:00:00+08:00",
) -> Dict[str, Any]:
"""创建分享链接。
@@ -914,14 +1032,26 @@ class Pan123Core:
Returns:
Result 字典::
-
- 成功: {"code": 0, "message": "ok", "data": {"url": "https://..."}}
- 失败: {"code": -1, "message": "...", "data": None}
+ 来自 self.get_item_download_url() 的结果:
+ 成功: {"code": 0, "message": "ok", "data": {"url": "https://..."}}
+ 失败: {"code": -1, "message": "...", "data": None}
"""
if not (0 <= index < len(self.file_list)):
return make_result(-1, "无效的文件编号")
item = self.file_list[index]
+ return self.get_item_download_url(item)
+ def get_item_download_url(self, item: Dict) -> Dict[str, Any]:
+ """获取单个文件或文件夹的真实下载链接。
+ Args:
+ item: 文件信息字典,文件夹(Type = 1)需包含 "FileId"
+ 文件(Type = 0)需包含 "FileId", "Etag", "S3KeyFlag", "Type", "FileName", "Size"。可以来自 file_list 中的条目或手动构造的 dict。
+
+ Returns:
+ Result 字典::
+ 成功: {"code": 0, "message": "ok", "data": {"url": "https://..."}}
+ 失败: {"code": -1, "message": "...", "data": None}
+ """
# 文件夹走批量下载接口,文件走单文件接口
if item["Type"] == 1:
api_path = URL_BATCH_DOWNLOAD
@@ -946,7 +1076,12 @@ class Pan123Core:
# 跟随重定向获取真实下载链接
try:
- resp = requests.get(download_url, allow_redirects=False, timeout=TIMEOUT_DEFAULT)
+ # 直接请求会报错证书错误
+ # 此服务器无法证明它是 user-app-free-download-cdn.123295.com;它的安全证书来自 *.123pan.cn。这可能是由错误配置或者有攻击者截获你的连接而导致的。
+ # 关闭 SSL 验证以避免下载链接获取失败
+ # 仅在获取下载链接时关闭验证
+ requests.packages.urllib3.disable_warnings()
+ resp = requests.get(download_url, allow_redirects=False, timeout=TIMEOUT_DEFAULT, verify=False)
if resp.status_code == 302:
location = resp.headers.get("Location")
if location:
@@ -959,148 +1094,152 @@ class Pan123Core:
except requests.RequestException as e:
return make_result(-1, f"获取真实下载链接失败: {e}")
- def download_file(
- self,
- index: int,
- save_dir: str = "download",
- on_progress: ProgressCallback = None,
- overwrite: bool = False,
- skip_existing: bool = False,
- ) -> Dict[str, Any]:
- """下载 file_list 中指定下标的文件到本地。
+ # 文件交互,方法已移至 Pan123Tool
+ # def download_file(
+ # self,
+ # index: int,
+ # save_dir: str = "download",
+ # on_progress: ProgressCallback = None,
+ # overwrite: bool = False,
+ # skip_existing: bool = False,
+ # ) -> Dict[str, Any]:
+ # """下载 file_list 中指定下标的文件到本地。
+ #
+ # 如果目标是文件夹,则自动递归调用 download_directory()。
+ # 下载过程中使用 ".123pan" 临时文件,完成后重命名。
+ #
+ # Args:
+ # index: file_list 中的 0-based 下标。
+ # save_dir: 本地保存目录路径,不存在会自动创建。
+ # on_progress: 下载进度回调函数,签名:
+ # (downloaded_bytes: int, total_bytes: int, speed_bps: float) -> None
+ # overwrite: True = 覆盖已存在的同名文件。
+ # skip_existing: True = 跳过已存在的同名文件。
+ #
+ # Returns:
+ # Result 字典::
+ #
+ # 成功: {"code": 0, "message": "下载完成", "data": {"path": "本地文件路径"}}
+ # 冲突: {"code": 1, "message": "文件已存在", "data": {"path": "...", "conflict": True}}
+ # 跳过: {"code": 0, "message": "文件已存在,已跳过", "data": {"path": "..."}}
+ # 失败: {"code": -1, "message": "...", "data": None}
+ # """
+ # if not (0 <= index < len(self.file_list)):
+ # return make_result(-1, "无效的文件编号")
+ # item = self.file_list[index]
+ #
+ # # 文件夹递归下载
+ # if item["Type"] == 1:
+ # return self.download_directory(item, save_dir, on_progress, overwrite, skip_existing)
+ #
+ # # 获取下载链接
+ # r = self.get_download_url(index)
+ # if r["code"] != CODE_OK:
+ # return r
+ # url = r["data"]["url"]
+ #
+ # file_name = item["FileName"]
+ # os.makedirs(save_dir, exist_ok=True)
+ # full_path = os.path.join(save_dir, file_name)
+ #
+ # # 文件冲突处理
+ # if os.path.exists(full_path):
+ # if skip_existing:
+ # return make_result(CODE_OK, "文件已存在,已跳过", {"path": full_path})
+ # if not overwrite:
+ # return make_result(CODE_CONFLICT, "文件已存在", {"path": full_path, "conflict": True})
+ # os.remove(full_path)
+ #
+ # # 使用临时文件下载
+ # temp_path = full_path + ".123pan"
+ # try:
+ # resp = requests.get(url, stream=True, timeout=TIMEOUT_DOWNLOAD)
+ # total = int(resp.headers.get("Content-Length", 0))
+ # downloaded = 0
+ # start = time.time()
+ # with open(temp_path, "wb") as f:
+ # for chunk in resp.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
+ # if chunk:
+ # f.write(chunk)
+ # downloaded += len(chunk)
+ # if on_progress:
+ # elapsed = time.time() - start
+ # speed = downloaded / elapsed if elapsed > 0 else 0.0
+ # # on_progress(
+ #
+ # )
+ # os.rename(temp_path, full_path)
+ # return make_result(CODE_OK, "下载完成", {"path": full_path})
+ # except Exception as e:
+ # if os.path.exists(temp_path):
+ # os.remove(temp_path)
+ # return make_result(-1, f"下载失败: {e}")
- 如果目标是文件夹,则自动递归调用 download_directory()。
- 下载过程中使用 ".123pan" 临时文件,完成后重命名。
-
- Args:
- index: file_list 中的 0-based 下标。
- save_dir: 本地保存目录路径,不存在会自动创建。
- on_progress: 下载进度回调函数,签名:
- (downloaded_bytes: int, total_bytes: int, speed_bps: float) -> None
- overwrite: True = 覆盖已存在的同名文件。
- skip_existing: True = 跳过已存在的同名文件。
-
- Returns:
- Result 字典::
-
- 成功: {"code": 0, "message": "下载完成", "data": {"path": "本地文件路径"}}
- 冲突: {"code": 1, "message": "文件已存在", "data": {"path": "...", "conflict": True}}
- 跳过: {"code": 0, "message": "文件已存在,已跳过", "data": {"path": "..."}}
- 失败: {"code": -1, "message": "...", "data": None}
- """
- if not (0 <= index < len(self.file_list)):
- return make_result(-1, "无效的文件编号")
- item = self.file_list[index]
-
- # 文件夹递归下载
- if item["Type"] == 1:
- return self.download_directory(item, save_dir, on_progress, overwrite, skip_existing)
-
- # 获取下载链接
- r = self.get_download_url(index)
- if r["code"] != CODE_OK:
- return r
- url = r["data"]["url"]
-
- file_name = item["FileName"]
- os.makedirs(save_dir, exist_ok=True)
- full_path = os.path.join(save_dir, file_name)
-
- # 文件冲突处理
- if os.path.exists(full_path):
- if skip_existing:
- return make_result(CODE_OK, "文件已存在,已跳过", {"path": full_path})
- if not overwrite:
- return make_result(CODE_CONFLICT, "文件已存在", {"path": full_path, "conflict": True})
- os.remove(full_path)
-
- # 使用临时文件下载
- temp_path = full_path + ".123pan"
- try:
- resp = requests.get(url, stream=True, timeout=TIMEOUT_DOWNLOAD)
- total = int(resp.headers.get("Content-Length", 0))
- downloaded = 0
- start = time.time()
- with open(temp_path, "wb") as f:
- for chunk in resp.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
- if chunk:
- f.write(chunk)
- downloaded += len(chunk)
- if on_progress:
- elapsed = time.time() - start
- speed = downloaded / elapsed if elapsed > 0 else 0.0
- on_progress(downloaded, total, speed)
- os.rename(temp_path, full_path)
- return make_result(CODE_OK, "下载完成", {"path": full_path})
- except Exception as e:
- if os.path.exists(temp_path):
- os.remove(temp_path)
- return make_result(-1, f"下载失败: {e}")
-
- def download_directory(
- self,
- directory: Dict,
- save_dir: str = "download",
- on_progress: ProgressCallback = None,
- overwrite: bool = False,
- skip_existing: bool = False,
- ) -> Dict[str, Any]:
- """递归下载整个目录到本地。
-
- Args:
- directory: 文件夹信息字典(需包含 "FileId"、"FileName"、"Type" 字段)。
- save_dir: 本地保存根目录路径。
- on_progress: 下载进度回调函数(同 download_file)。
- overwrite: True = 覆盖已存在文件。
- skip_existing: True = 跳过已存在文件。
-
- Returns:
- Result 字典::
-
- 成功: {"code": 0, "message": "文件夹下载完成", "data": {"path": "本地目录路径"}}
- 部分失败: {"code": -1, "message": "部分文件下载失败: ...", "data": {"path": "..."}}
- 失败: {"code": <错误码>, "message": "...", "data": None}
- """
- if directory["Type"] != 1:
- return make_result(-1, "不是文件夹")
-
- target_dir = os.path.join(save_dir, directory["FileName"])
- os.makedirs(target_dir, exist_ok=True)
-
- r = self.list_dir_all(parent_id=directory["FileId"])
- if r["code"] != CODE_OK:
- return r
-
- items = r["data"]["items"]
- if not items:
- return make_result(CODE_OK, "文件夹为空", {"path": target_dir})
-
- errors: List[str] = []
- for item in items:
- if item["Type"] == 1:
- sub = self.download_directory(item, target_dir, on_progress, overwrite, skip_existing)
- else:
- # 临时替换 file_list 以复用 download_file 逻辑
- orig_list = self.file_list
- self.file_list = [item]
- sub = self.download_file(0, target_dir, on_progress, overwrite, skip_existing)
- self.file_list = orig_list
- if sub["code"] != CODE_OK:
- errors.append(f"{item['FileName']}: {sub['message']}")
-
- if errors:
- return make_result(-1, f"部分文件下载失败: {'; '.join(errors)}", {"path": target_dir})
- return make_result(CODE_OK, "文件夹下载完成", {"path": target_dir})
+ # 文件交互,方法已移至 Pan123Tool,Pan123Core 仅保留获取下载链接的功能,目录下载逻辑也移至工具类以避免内核依赖文件系统。
+ # def download_directory(
+ # self,
+ # directory: Dict,
+ # save_dir: str = "download",
+ # on_progress: ProgressCallback = None,
+ # overwrite: bool = False,
+ # skip_existing: bool = False,
+ # ) -> Dict[str, Any]:
+ # """递归下载整个目录到本地。
+ #
+ # Args:
+ # directory: 文件夹信息字典(需包含 "FileId"、"FileName"、"Type" 字段)。
+ # save_dir: 本地保存根目录路径。
+ # on_progress: 下载进度回调函数(同 download_file)。
+ # overwrite: True = 覆盖已存在文件。
+ # skip_existing: True = 跳过已存在文件。
+ #
+ # Returns:
+ # Result 字典::
+ #
+ # 成功: {"code": 0, "message": "文件夹下载完成", "data": {"path": "本地目录路径"}}
+ # 部分失败: {"code": -1, "message": "部分文件下载失败: ...", "data": {"path": "..."}}
+ # 失败: {"code": <错误码>, "message": "...", "data": None}
+ # """
+ # if directory["Type"] != 1:
+ # return make_result(-1, "不是文件夹")
+ #
+ # target_dir = os.path.join(save_dir, directory["FileName"])
+ # os.makedirs(target_dir, exist_ok=True)
+ #
+ # r = self.list_dir_all(parent_id=directory["FileId"])
+ # if r["code"] != CODE_OK:
+ # return r
+ #
+ # items = r["data"]["items"]
+ # if not items:
+ # return make_result(CODE_OK, "文件夹为空", {"path": target_dir})
+ #
+ # errors: List[str] = []
+ # for item in items:
+ # if item["Type"] == 1:
+ # sub = self.download_directory(item, target_dir, on_progress, overwrite, skip_existing)
+ # else:
+ # # 临时替换 file_list 以复用 download_file 逻辑
+ # orig_list = self.file_list
+ # self.file_list = [item]
+ # sub = self.download_file(0, target_dir, on_progress, overwrite, skip_existing)
+ # self.file_list = orig_list
+ # if sub["code"] != CODE_OK:
+ # errors.append(f"{item['FileName']}: {sub['message']}")
+ #
+ # if errors:
+ # return make_result(-1, f"部分文件下载失败: {'; '.join(errors)}", {"path": target_dir})
+ # return make_result(CODE_OK, "文件夹下载完成", {"path": target_dir})
# ════════════════════════════════════════════════════════════
# 上传
# ════════════════════════════════════════════════════════════
def upload_file(
- self,
- file_path: str,
- duplicate: int = 0,
- on_progress: ProgressCallback = None,
+ self,
+ file_path: str,
+ duplicate: int = 0,
+ on_progress: ProgressCallback = None,
) -> Dict[str, Any]:
"""上传本地文件到当前目录。
@@ -1169,15 +1308,15 @@ class Pan123Core:
)
def _upload_chunks(
- self,
- file_path: str,
- *,
- bucket: str,
- storage_node: str,
- key: str,
- upload_id: str,
- file_id: str,
- on_progress: ProgressCallback = None,
+ self,
+ file_path: str,
+ *,
+ bucket: str,
+ storage_node: str,
+ key: str,
+ upload_id: str,
+ file_id: str,
+ on_progress: ProgressCallback = None,
) -> Dict[str, Any]:
"""执行 S3 分块上传流程(内部方法)。
@@ -1235,7 +1374,12 @@ class Pan123Core:
uploaded += len(chunk)
if on_progress:
- on_progress(uploaded, total_size)
+ on_progress({
+ "type": Pan123EventType.UPLOAD_PROGRESS,
+ "uploaded": uploaded,
+ "total": total_size,
+ "percent": uploaded / total_size * 100,
+ })
part_number += 1
# 步骤 3: 通知服务端合并所有分块
@@ -1281,5 +1425,266 @@ class Pan123Core:
self.protocol = protocol
self._build_headers()
self._sync_authorization()
- self.save_config()
- return make_result(CODE_OK, f"已切换到 {protocol} 协议")
\ No newline at end of file
+ # self.save_config_to_file()
+ return make_result(CODE_OK, f"已切换到 {protocol} 协议")
+
+
+class Pan123Tool:
+ """123pan 工具类,提供更高层次的文件交互方法,依赖 Pan123Core 实现具体 API 调用。
+
+ Args:
+ core: Pan123Core 实例,负责 API 请求和状态管理。
+ config_file: 配置文件路径,默认为 "123pan_config.json",用于保存和加载账号信息、Token 及协议设置。
+
+ :note
+ Pan123Tool 主要负责文件下载、上传、目录操作等依赖文件系统的功能,而 Pan123Core 负责 API 请求、认证和状态管理。
+ """
+
+ def __init__(self, core: Pan123Core, config_file: str = "123pan_config.json"):
+ self.core = core
+ self.config_file = config_file
+
+ def load_config_from_file(self) -> Dict[str, Any]:
+ """从配置文件加载账号信息、Token 及协议设置。
+
+ 会自动重建 headers 并同步 authorization。
+
+ Returns:
+ Result 字典::
+
+ 成功: {"code": 0, "message": "配置加载成功", "data": {配置内容 dict}}
+ 失败: {"code": -1, "message": "错误描述", "data": None}
+ """
+ if not os.path.exists(self.config_file):
+ return make_result(-1, "配置文件不存在")
+ try:
+ with open(self.config_file, "r", encoding="utf-8") as f:
+ cfg = json.load(f)
+ return self.core.load_config(cfg)
+ except Exception as e:
+ return make_result(-1, f"加载配置失败: {e}")
+
+ def save_config_to_file(self) -> Dict[str, Any]:
+ """将当前账号信息、Token 及协议设置保存到配置文件。
+
+ Returns:
+ Result 字典::
+
+ 成功: {"code": 0, "message": "配置已保存", "data": {配置内容 dict}}
+ 失败: {"code": -1, "message": "错误描述", "data": None}
+ """
+ cfg = {
+ "userName": self.core.user_name,
+ "passWord": self.core.password,
+ "authorization": self.core.authorization,
+ "deviceType": self.core.device_type,
+ "osVersion": self.core.os_version,
+ "protocol": self.core.protocol,
+ }
+ try:
+ with open(self.config_file, "w", encoding="utf-8") as f:
+ json.dump(cfg, f, ensure_ascii=False, indent=2)
+ return make_result(CODE_OK, "配置已保存", cfg)
+ except Exception as e:
+ return make_result(-1, f"保存配置失败: {e}")
+
+ def download_file(
+ self,
+ index: int,
+ save_dir: str = "download",
+ on_progress: ProgressCallback = None,
+ overwrite: bool = False,
+ skip_existing: bool = False,
+ ) -> Dict[str, Any]:
+ """下载 file_list 中指定下标的文件到本地。
+
+ 如果目标是文件夹,则自动递归调用 download_directory()。
+ 下载过程中使用 ".123pan" 临时文件,完成后重命名。
+
+ Args:
+ index: file_list 中的 0-based 下标。
+ save_dir: 本地保存目录路径,不存在会自动创建。
+ on_progress: 下载进度回调函数,签名:
+ (downloaded_bytes: int, total_bytes: int, speed_bps: float) -> None
+ overwrite: True = 覆盖已存在的同名文件。
+ skip_existing: True = 跳过已存在的同名文件。
+
+ Returns:
+ Result 字典:: 来自 download_url() 或 download_directory() 的结果:
+ 成功: {"code": 0, "message": "下载完成", "data": {"path": "本地文件路径"}}
+ 冲突: {"code": 1, "message": "文件已存在", "data": {"path": "...", "conflict": True}}
+ 跳过: {"code": 0, "message": "文件已存在,已跳过", "data": {"path": "..."}}
+ 失败: {"code": -1, "message": "...", "data": None}
+ """
+ if not (0 <= index < len(self.core.file_list)):
+ return make_result(-1, "无效的文件编号")
+ item = self.core.file_list[index]
+ return self.download_item(item, save_dir, on_progress, overwrite, skip_existing)
+
+ def download_item(
+ self,
+ item: Dict,
+ save_dir: str = "download",
+ on_progress: ProgressCallback = None,
+ overwrite: bool = False,
+ skip_existing: bool = False,
+ ):
+ """下载单个文件或文件夹项,自动区分类型并处理。
+ Args:
+ item: 文件信息字典,文件夹(Type = 1)需包含 "FileId"
+ 文件(Type = 0)需包含 "FileId", "Etag", "S3KeyFlag", "Type", "FileName", "Size"。
+ save_dir: 本地保存目录路径,不存在会自动创建。
+ on_progress: 下载进度回调函数,签名:
+ (downloaded_bytes: int, total_bytes: int, speed_bps: float) -> None
+ overwrite: True = 覆盖已存在的同名文件。
+ skip_existing: True = 跳过已存在的同名文件。
+ Returns:
+ Result 字典:: 来自 download_url() 或 download_directory() 的结果:
+ 成功: {"code": 0, "message": "下载完成", "data": {"path": "本地文件路径"}}
+ 冲突: {"code": 1, "message": "文件已存在", "data": {"path": "...", "conflict": True}}
+ 跳过: {"code": 0, "message": "文件已存在,已跳过", "data": {"path": "..."}}
+ 失败: {"code": -1, "message": "...", "data": None}
+ """
+ # 文件夹递归下载
+ if item["Type"] == 1:
+ return self.download_directory(item, save_dir, on_progress, overwrite, skip_existing)
+
+ # 获取下载链接
+ r = self.core.get_item_download_url(item)
+ if r["code"] != CODE_OK:
+ return r
+ url = r["data"]["url"]
+ file_name = item["FileName"]
+ return self.download_url(url, file_name, save_dir, on_progress, overwrite, skip_existing)
+
+ def download_url(
+ self,
+ url: str,
+ file_name: str,
+ save_dir: str = "download",
+ on_progress: ProgressCallback = None,
+ overwrite: bool = False,
+ skip_existing: bool = False,
+ ) -> Dict[str, Any]:
+ """根据下载链接下载文件到本地,支持进度回调和冲突处理。
+
+ Args:
+ url: 真实下载链接。
+ file_name: 保存的文件名(不含路径)。
+ save_dir: 本地保存目录路径,不存在会自动创建。
+ on_progress: 下载进度回调函数,签名:
+ (downloaded_bytes: int, total_bytes: int, speed_bps: float) -> None
+ overwrite: True = 覆盖已存在的同名文件。
+ skip_existing: True = 跳过已存在的同名文件。
+
+ Returns:
+ Result 字典::
+ 成功: {"code": 0, "message": "下载完成", "data": {"path": "本地文件路径"}}
+ 冲突: {"code": 1, "message": "文件已存在", "data": {"path": "...", "conflict": True}}
+ 跳过: {"code": 0, "message": "文件已存在,已跳过", "data": {"path": "..."}}
+ 失败: {"code": -1, "message": "...", "data": None}
+ """
+
+ os.makedirs(save_dir, exist_ok=True)
+ full_path = os.path.join(save_dir, file_name)
+
+ # 文件冲突处理
+ if os.path.exists(full_path):
+ if skip_existing:
+ return make_result(CODE_OK, "文件已存在,已跳过", {"path": full_path})
+ if not overwrite:
+ return make_result(CODE_CONFLICT, "文件已存在", {"path": full_path, "conflict": True})
+ os.remove(full_path)
+
+ # TODO: 可以考虑断点续传
+ # 使用临时文件下载
+ temp_path = full_path + ".123pan"
+ try:
+ resp = requests.get(url, stream=True, timeout=TIMEOUT_DOWNLOAD)
+ total = int(resp.headers.get("Content-Length", 0))
+ downloaded = 0
+ start = time.time()
+ with open(temp_path, "wb") as f:
+ for chunk in resp.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
+ if chunk:
+ f.write(chunk)
+ downloaded += len(chunk)
+ if on_progress:
+ elapsed = time.time() - start
+ speed = downloaded / elapsed if elapsed > 0 else 0.0
+ on_progress({
+ "type": Pan123EventType.DOWNLOAD_PROGRESS,
+ "downloaded": downloaded,
+ "total": total,
+ "speed": speed,
+ })
+ os.rename(temp_path, full_path)
+ return make_result(CODE_OK, "下载完成", {"path": full_path})
+ except Exception as e:
+ if os.path.exists(temp_path):
+ os.remove(temp_path)
+ return make_result(-1, f"下载失败: {e}")
+
+ def download_directory(
+ self,
+ directory: Dict,
+ save_dir: str = "download",
+ on_progress: ProgressCallback = None,
+ overwrite: bool = False,
+ skip_existing: bool = False,
+ ) -> Dict[str, Any]:
+ """递归下载整个目录到本地。
+
+ Args:
+ directory: 文件夹信息字典(需包含 "FileId"、"FileName"、"Type" 字段)。
+ save_dir: 本地保存根目录路径。
+ on_progress: 下载进度回调函数(同 download_file)。
+ overwrite: True = 覆盖已存在文件。
+ skip_existing: True = 跳过已存在文件。
+
+ Returns:
+ 成功: {"code": 0, "message": "文件夹下载完成", "data": {"path": "本地目录路径"}}
+ 部分失败: {"code": -1, "message": "部分文件下载失败: ...", "data": {"path": "..."}}
+ 失败: {"code": <错误码>, "message": "...", "data": None}
+ """
+ if directory["Type"] != 1:
+ return make_result(-1, "不是文件夹")
+
+ target_dir = os.path.join(save_dir, directory["FileName"])
+ os.makedirs(target_dir, exist_ok=True)
+
+ r = self.core.list_dir_all(parent_id=directory["FileId"])
+ if r["code"] != CODE_OK:
+ return r
+
+ items = r["data"]["items"]
+ if not items:
+ return make_result(CODE_OK, "文件夹为空", {"path": target_dir})
+
+ errors: List[str] = []
+ for item in items:
+ if item["Type"] == 1:
+ # 递归下载子目录
+ if on_progress:
+ on_progress({
+ "type": Pan123EventType.DOWNLOAD_START_DIRECTORY,
+ "file_name": item["FileName"],
+ "dir_name": item["FileName"],
+ "message": f"正在下载目录: {item['FileName']}",
+ })
+ sub = self.download_directory(item, target_dir, on_progress, overwrite, skip_existing)
+ else:
+ if on_progress:
+ on_progress({
+ "type": Pan123EventType.DOWNLOAD_START_FILE,
+ "file_name": item["FileName"],
+ "file_size": item["Size"],
+ "message": f"正在下载文件: {item['FileName']}",
+ })
+ sub = self.download_item(item, target_dir, on_progress, overwrite, skip_existing)
+ if sub["code"] != CODE_OK:
+ errors.append(f"{item['FileName']}: {sub['message']}")
+
+ if errors:
+ return make_result(-1, f"部分文件下载失败: {'; '.join(errors)}", {"path": target_dir})
+ return make_result(CODE_OK, "文件夹下载完成", {"path": target_dir})