diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6551392
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+123pan_config.json
+download/*
+123pan.txt
\ No newline at end of file
diff --git a/123pan.py b/123pan.py
new file mode 100644
index 0000000..cff3a87
--- /dev/null
+++ b/123pan.py
@@ -0,0 +1,1016 @@
+import hashlib
+import json
+import os
+import random
+import re
+import time
+import uuid
+import requests
+from typing import List, Dict, Tuple, Union, Optional
+
+class Pan123:
+ DEVICE_TYPES = [
+ "24075RP89G", "24076RP19G", "24076RP19I", "M1805E10A", "M2004J11G", "M2012K11AG", "M2104K10I", "22021211RG",
+ "22021211RI", "21121210G", "23049PCD8G", "23049PCD8I", "23013PC75G", "24069PC21G", "24069PC21I",
+ "23113RKC6G", "M1912G7BI", "M2007J20CI", "M2007J20CG", "M2007J20CT", "M2102J20SG", "M2102J20SI",
+ "21061110AG", "2201116PG", "2201116PI", "22041216G", "22041216UG", "22111317PG", "22111317PI", "22101320G",
+ "22101320I", "23122PCD1G", "23122PCD1I", "2311DRK48G", "2311DRK48I", "2312FRAFDI", "M2004J19PI",
+ ]
+
+ OS_VERSIONS = [
+ "Android_7.1.2", "Android_8.0.0", "Android_8.1.0", "Android_9.0", "Android_10", "Android_11", "Android_12",
+ "Android_13", "Android_6.0.1", "Android_5.1.1", "Android_4.4.4", "Android_4.3", "Android_4.2.2",
+ "Android_4.1.2",
+ ]
+
+ def __init__(
+ self,
+ readfile: bool = True,
+ user_name: str = "",
+ pass_word: str = "",
+ authorization: str = "",
+ input_pwd: bool = True,
+ config_file: str = "123pan.txt",
+ protocol: str = "android", # 协议,默认为 android
+ ):
+ self.config_file = config_file
+ self.protocol = protocol.lower() # 'android' 或 'web'
+ self.devicetype = random.choice(self.DEVICE_TYPES)
+ self.osversion = random.choice(self.OS_VERSIONS)
+ self.download_mode = 1
+ self.cookies = None
+ self.user_name = user_name
+ self.password = pass_word
+ self.authorization = authorization
+ self.parent_file_id = 0
+ self.parent_file_list = [0]
+ self.parent_file_name_list = []
+ self.all_file = False
+ self.file_page = 0
+ self.file_list = []
+ self.dir_list = []
+ self.name_dict = {}
+ self.list = []
+ self.total = 0
+
+ self._init_headers() # 根据 protocol 初始化 headers
+
+ load_code = 0
+ if readfile:
+ load_code = self._load_config()
+ if not load_code:
+ if not (user_name and pass_word) and input_pwd:
+ self.user_name = input("请输入用户名: ")
+ self.password = input("请输入密码: ")
+ elif not (user_name and pass_word):
+ raise ValueError("用户名和密码不能为空")
+
+ # 尝试获取目录,如果失败则登录
+ if self.get_dir()[0] != 0:
+ self.login()
+ self.get_dir()
+
+ def _init_headers(self):
+ """初始化请求头,根据协议(android/web)选择不同 headers"""
+ android_headers = {
+ "user-agent": f"123pan/v2.4.0({self.osversion};Xiaomi)",
+ "authorization": self.authorization,
+ "accept-encoding": "gzip",
+ "content-type": "application/json",
+ "osversion": self.osversion,
+ "loginuuid": uuid.uuid4().hex,
+ "platform": "android",
+ "devicetype": self.devicetype,
+ "devicename": "Xiaomi",
+ "host": "www.123pan.com",
+ "app-version": "61",
+ "x-app-version": "2.4.0"
+ }
+
+ # Web 协议头,来源于 web.py 的 header_logined / header_only_usage(适配为字典)
+ web_headers = {
+ "Accept": "*/*",
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
+ "App-Version": "3",
+ # 同时设置大小写两种 Authorization 以兼容不同接口要求
+ # "Authorization": self.authorization,
+ "authorization": self.authorization,
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "LoginUuid": uuid.uuid4().hex,
+ "Pragma": "no-cache",
+ "Referer": "https://www.123pan.com/",
+ "Sec-Fetch-Dest": "empty",
+ "Sec-Fetch-Mode": "cors",
+ "Sec-Fetch-Site": "same-origin",
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
+ "platform": "web",
+ "sec-ch-ua": "Microsoft",
+ "sec-ch-ua-mobile": "?0",
+ "sec-ch-ua-platform": "Windows",
+ "content-type": "application/json",
+ }
+
+ if getattr(self, "protocol", "android").lower() == "web":
+ self.headers = web_headers
+ else:
+ self.headers = android_headers
+
+ def _load_config(self):
+ """从配置文件加载账号信息及 protocol"""
+ try:
+ if os.path.exists(self.config_file):
+ with open(self.config_file, "r", encoding="utf-8") as f:
+ config = json.load(f)
+ self.user_name = config.get("userName", self.user_name)
+ self.password = config.get("passWord", self.password)
+ self.authorization = config.get("authorization", self.authorization)
+ self.devicetype = config.get("deviceType", self.devicetype)
+ self.osversion = config.get("osVersion", self.osversion)
+ # 新增 protocol 支持
+ self.protocol = config.get("protocol", getattr(self, "protocol", "android")).lower()
+ # 重新初始化 headers(确保 authorization 和 protocol 生效)
+ self._init_headers()
+ # 确保 headers 中 authorization 同步
+ if "authorization" in self.headers:
+ self.headers["authorization"] = self.authorization
+ if "Authorization" in self.headers:
+ self.headers["Authorization"] = self.authorization
+ print("配置加载成功")
+ return 1
+ else:
+ print("配置文件不存在,将使用传入参数")
+ return 0
+ except Exception as e:
+ print(f"加载配置失败: {e}")
+ return 0
+
+ def _save_config(self):
+ """保存账号信息到配置文件(包含 protocol)"""
+ config = {
+ "userName": self.user_name,
+ "passWord": self.password,
+ "authorization": self.authorization,
+ "deviceType": self.devicetype,
+ "osVersion": self.osversion,
+ "protocol": getattr(self, "protocol", "android"),
+ }
+ try:
+ with open(self.config_file, "w", encoding="utf-8") as f:
+ json.dump(config, f)
+ print("账号信息已保存")
+ except Exception as e:
+ print(f"保存配置失败: {e}")
+
+ def _handle_response(self, response: requests.Response) -> dict:
+ """处理API响应"""
+ try:
+ data = response.json()
+ if data.get("code") != 0 and data.get("code") != 200:
+ print(f"API错误: {data.get('message', '未知错误')} (代码: {data.get('code')})")
+ return data
+ except json.JSONDecodeError:
+ print("响应解析错误")
+ return {"code": -1, "message": "响应解析错误"}
+
+ def login(self) -> int:
+ """用户登录"""
+ data = {"type": 1, "passport": self.user_name, "password": self.password}
+ try:
+ response = requests.post(
+ "https://www.123pan.com/b/api/user/sign_in",
+ headers=self.headers,
+ json=data,
+ timeout=15
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 200:
+ # 更新授权令牌
+ token = result["data"]["token"]
+ self.authorization = f"Bearer {token}"
+ self.headers["authorization"] = self.authorization
+
+ # 保存cookie
+ if "Set-Cookie" in response.headers:
+ self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
+
+ self._save_config()
+ print("登录成功")
+ return 0
+ return result.get("code", -1)
+ except requests.exceptions.RequestException as e:
+ print(f"登录请求失败: {e}")
+ return -1
+
+ def get_dir(self, save: bool = True) -> Tuple[int, list]:
+ """获取当前目录内容"""
+ return self.get_dir_by_id(self.parent_file_id, save)
+
+ def get_dir_by_id(
+ self,
+ file_id: int,
+ save: bool = True,
+ all_files: bool = False,
+ limit: int = 100
+ ) -> Tuple[int, list]:
+ """获取指定目录内容"""
+ page = self.file_page * 3 + 1 if not all_files else 1
+ current_count = len(self.list) if all_files else 0
+ items = []
+ total = -1
+ attempts = 0
+
+ while (current_count < total or total == -1) and (attempts < 3 or all_files):
+ params = {
+ "driveId": 0,
+ "limit": limit,
+ "next": 0,
+ "orderBy": "file_id",
+ "orderDirection": "desc",
+ "parentFileId": str(file_id),
+ "trashed": False,
+ "SearchData": "",
+ "Page": str(page),
+ "OnlyLookAbnormalFile": 0,
+ }
+
+ try:
+ response = requests.get(
+ "https://www.123pan.com/api/file/list/new",
+ headers=self.headers,
+ params=params,
+ timeout=30
+ )
+ result = self._handle_response(response)
+ if result.get("code") != 0:
+ return result.get("code", -1), []
+
+ page_items = result["data"]["InfoList"]
+ items.extend(page_items)
+ total = result["data"]["Total"]
+ current_count += len(page_items)
+ page += 1
+ attempts += 1
+
+ if attempts % 5 == 0:
+ print(f"获取文件中: {current_count}/{total},暂停10秒...")
+ time.sleep(10)
+ except requests.exceptions.RequestException:
+ print("连接失败")
+ return -1, []
+
+ self.all_file = current_count >= total
+ self.total = total
+ self.file_page += 1 if not all_files else 0
+
+ if save:
+ self.list.extend(items)
+
+ return 0, items
+
+ def show(self):
+ """显示当前目录内容"""
+ if not self.list:
+ print("当前目录为空")
+ return
+
+ print("\n" + "=" * 60)
+ print(f"当前路径: /{'/'.join(self.parent_file_name_list)}")
+ print("-" * 60)
+ print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
+ print("-" * 60)
+
+ for idx, item in enumerate(self.list, 1):
+ item_type = "文件夹" if item["Type"] == 1 else "文件"
+ size = self._format_size(item["Size"])
+
+ # 使用颜色区分类型
+ color_code = "\033[35m" if item["Type"] == 1 else "\033[33m"
+ reset_code = "\033[0m"
+
+ print(f"{color_code}{idx:<6}{item_type:<8}{size:<12}{item['FileName']}{reset_code}")
+
+ if not self.all_file:
+ remaining = self.total - len(self.list)
+ print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
+ print("=" * 60 + "\n")
+
+ def _format_size(self, size_bytes: int) -> str:
+ """格式化文件大小"""
+ if size_bytes >= 1073741824:
+ return f"{size_bytes / 1073741824:.2f} GB"
+ elif size_bytes >= 1048576:
+ return f"{size_bytes / 1048576:.2f} MB"
+ elif size_bytes >= 1024:
+ return f"{size_bytes / 1024:.2f} KB"
+ return f"{size_bytes} B"
+
+ def delete_file(self, file_id: Union[int, dict], by_index: bool = True, delete: bool = True) -> bool:
+ """删除或恢复文件"""
+ if by_index:
+ if not (0 <= file_id < len(self.list)):
+ print("无效的文件编号")
+ return False
+ file_data = self.list[file_id]
+ else:
+ file_data = file_id
+
+ payload = {
+ "driveId": 0,
+ "fileTrashInfoList": file_data,
+ "operation": delete,
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/a/api/file/trash",
+ headers=self.headers,
+ json=payload,
+ timeout=10
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 0:
+ action = "删除" if delete else "恢复"
+ print(f"文件 {file_data['FileName']} {action}成功")
+ return True
+ return False
+ except requests.exceptions.RequestException as e:
+ print(f"删除操作失败: {e}")
+ return False
+
+ def share(self, file_ids: List[int], share_pwd: str = "") -> Optional[str]:
+ """创建分享链接"""
+ if not file_ids:
+ print("未选择文件")
+ return None
+
+ file_names = [self.list[i]["FileName"] for i in file_ids]
+ print("分享文件:", ", ".join(file_names))
+
+ payload = {
+ "driveId": 0,
+ "expiration": "2099-12-12T08:00:00+08:00",
+ "fileIdList": ",".join(str(self.list[i]["FileId"]) for i in file_ids),
+ "shareName": "分享文件",
+ "sharePwd": share_pwd,
+ "event": "shareCreate"
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/a/api/share/create",
+ headers=self.headers,
+ json=payload,
+ timeout=15
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 0:
+ share_key = result["data"]["ShareKey"]
+ share_url = f"https://www.123pan.com/s/{share_key}"
+ print(f"分享创建成功!\n链接: {share_url}")
+ if share_pwd:
+ print(f"提取码: {share_pwd}")
+ return share_url
+ return None
+ except requests.exceptions.RequestException as e:
+ print(f"创建分享失败: {e}")
+ return None
+
+ def upload(self, file_path: str) -> bool:
+ """上传文件"""
+ file_path = file_path.strip().replace('"', "").replace("\\", "/")
+ if not os.path.exists(file_path):
+ print("文件不存在")
+ return False
+
+ if os.path.isdir(file_path):
+ print("暂不支持文件夹上传")
+ return False
+
+ file_name = os.path.basename(file_path)
+ file_size = os.path.getsize(file_path)
+
+ # 计算文件MD5
+ md5_hash = hashlib.md5()
+ try:
+ with open(file_path, "rb") as f:
+ while chunk := f.read(65536):
+ md5_hash.update(chunk)
+ file_md5 = md5_hash.hexdigest()
+ except IOError as e:
+ print(f"读取文件失败: {e}")
+ return False
+
+ # 上传请求
+ payload = {
+ "driveId": 0,
+ "etag": file_md5,
+ "fileName": file_name,
+ "parentFileId": self.parent_file_id,
+ "size": file_size,
+ "type": 0,
+ "duplicate": 0,
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/b/api/file/upload_request",
+ headers=self.headers,
+ json=payload,
+ timeout=15
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 5060: # 文件已存在
+ choice = input("检测到同名文件,输入 1 覆盖,2 保留两者,其他取消: ")
+ if choice == "1":
+ payload["duplicate"] = 1
+ elif choice == "2":
+ payload["duplicate"] = 2
+ else:
+ print("上传取消")
+ return False
+
+ response = requests.post(
+ "https://www.123pan.com/b/api/file/upload_request",
+ headers=self.headers,
+ json=payload,
+ timeout=15
+ )
+ result = self._handle_response(response)
+
+ if result.get("code") != 0:
+ return False
+
+ # 检查是否MD5复用
+ if result["data"].get("Reuse", False):
+ print("文件已存在,MD5复用成功")
+ return True
+
+ # 分块上传
+ return self._upload_chunks(
+ file_path,
+ result["data"]["Bucket"],
+ result["data"]["StorageNode"],
+ result["data"]["Key"],
+ result["data"]["UploadId"],
+ result["data"]["FileId"]
+ )
+ except requests.exceptions.RequestException as e:
+ print(f"上传失败: {e}")
+ return False
+
+ def _upload_chunks(
+ self,
+ file_path: str,
+ bucket: str,
+ storage_node: str,
+ key: str,
+ upload_id: str,
+ file_id: str
+ ) -> bool:
+ """分块上传文件"""
+ chunk_size = 5 * 1024 * 1024 # 5MB
+ total_size = os.path.getsize(file_path)
+ uploaded = 0
+ part_number = 1
+
+ try:
+ with open(file_path, "rb") as f:
+ while True:
+ chunk = f.read(chunk_size)
+ if not chunk:
+ break
+
+ # 获取上传URL
+ url_data = {
+ "bucket": bucket,
+ "key": key,
+ "partNumberEnd": part_number + 1,
+ "partNumberStart": part_number,
+ "uploadId": upload_id,
+ "StorageNode": storage_node,
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch",
+ headers=self.headers,
+ json=url_data,
+ timeout=15
+ )
+ url_result = self._handle_response(response)
+ if url_result.get("code") != 0:
+ return False
+
+ upload_url = url_result["data"]["presignedUrls"][str(part_number)]
+ except requests.exceptions.RequestException:
+ return False
+
+ # 上传分块
+ try:
+ upload_response = requests.put(upload_url, data=chunk, timeout=30)
+ if upload_response.status_code not in (200, 201):
+ return False
+ except requests.exceptions.RequestException:
+ return False
+
+ uploaded += len(chunk)
+ progress = uploaded / total_size * 100
+ print(f"\r上传进度: {progress:.1f}%", end="", flush=True)
+ part_number += 1
+
+ print("\n上传完成,正在验证...")
+ time.sleep(1) # 等待服务器处理
+
+ # 完成上传
+ compmultipart_up_url = (
+ "https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
+ )
+ requests.post(
+ compmultipart_up_url,
+ headers=self.headers,
+ data=json.dumps({
+ "bucket": bucket,
+ "key": key,
+ "uploadId": upload_id,
+ "StorageNode": storage_node,
+ }),
+ timeout=10
+ )
+ complete_data = {"fileId": file_id}
+ try:
+ response = requests.post(
+ "https://www.123pan.com/b/api/file/upload_complete",
+ headers=self.headers,
+ json=complete_data,
+ timeout=15
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 0:
+ print("文件上传成功")
+ return True
+ return False
+ except requests.exceptions.RequestException:
+ return False
+ except IOError as e:
+ print(f"读取文件失败: {e}")
+ return False
+
+ def change_directory(self, target: str):
+ """改变当前目录"""
+ if target == "..":
+ if len(self.parent_file_list) > 1:
+ self.parent_file_list.pop()
+ self.parent_file_id = self.parent_file_list[-1]
+ self.parent_file_name_list.pop()
+ self.refresh_directory()
+ else:
+ print("已经是根目录")
+ elif target == "/":
+ self.parent_file_id = 0
+ self.parent_file_list = [0]
+ self.parent_file_name_list = []
+ self.refresh_directory()
+ elif target.isdigit():
+ idx = int(target) - 1
+ if 0 <= idx < len(self.list) and self.list[idx]["Type"] == 1:
+ self.parent_file_id = self.list[idx]["FileId"]
+ self.parent_file_list.append(self.parent_file_id)
+ self.parent_file_name_list.append(self.list[idx]["FileName"])
+ self.refresh_directory()
+ else:
+ print("无效的目录编号或不是文件夹")
+
+ else:
+ print("无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号")
+
+ def refresh_directory(self):
+ """刷新当前目录内容"""
+ self.all_file = False
+ self.file_page = 0
+ self.list = []
+ self.get_dir()
+ self.show()
+
+ def create_directory(self, name: str) -> bool:
+ """创建新目录"""
+ if not name:
+ print("目录名不能为空")
+ return False
+
+ # 检查是否已存在
+ for item in self.list:
+ if item["FileName"] == name and item["Type"] == 1:
+ print("目录已存在")
+ return True
+
+ payload = {
+ "driveId": 0,
+ "etag": "",
+ "fileName": name,
+ "parentFileId": self.parent_file_id,
+ "size": 0,
+ "type": 1,
+ "duplicate": 1,
+ "NotReuse": True,
+ "event": "newCreateFolder",
+ "operateType": 1,
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/a/api/file/upload_request",
+ headers=self.headers,
+ json=payload,
+ timeout=10
+ )
+ result = self._handle_response(response)
+ if result.get("code") == 0:
+ print(f"目录 '{name}' 创建成功")
+ self.get_dir()
+ return True
+ return False
+ except requests.exceptions.RequestException as e:
+ print(f"创建目录失败: {e}")
+ return False
+
+ def get_download_link(self, file_index: int) -> Optional[str]:
+ """获取文件下载链接"""
+ if not (0 <= file_index < len(self.list)):
+ print("无效的文件编号")
+ return None
+
+ file_detail = self.list[file_index]
+
+ if file_detail["Type"] == 1: # 文件夹
+ url = "https://www.123pan.com/a/api/file/batch_download_info"
+ data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
+ else: # 文件
+ url = "https://www.123pan.com/a/api/file/download_info"
+ data = {
+ "driveId": 0,
+ "etag": file_detail["Etag"],
+ "fileId": file_detail["FileId"],
+ "s3keyFlag": file_detail["S3KeyFlag"],
+ "type": file_detail["Type"],
+ "fileName": file_detail["FileName"],
+ "size": file_detail["Size"],
+ }
+
+ try:
+ response = requests.post(url, headers=self.headers, json=data, timeout=15)
+ result = self._handle_response(response)
+ if result.get("code") != 0:
+ return None
+
+ download_url = result["data"]["DownloadUrl"]
+ # 获取重定向后的真实下载链接
+ redirect_response = requests.get(download_url, allow_redirects=False, timeout=15)
+ if redirect_response.status_code == 302:
+ return redirect_response.headers.get("Location")
+
+ # 尝试从HTML中提取下载链接
+ url_pattern = re.compile(r"href='(https?://[^']+)'")
+ match = url_pattern.search(redirect_response.text)
+ if match:
+ return match.group(1)
+
+ return None
+ except requests.exceptions.RequestException as e:
+ print(f"获取下载链接失败: {e}")
+ return None
+
+ def download_file(self, file_index: int, download_path: str = "download") -> bool:
+ """下载单个文件"""
+ if not (0 <= file_index < len(self.list)):
+ print("无效的文件编号")
+ return False
+
+ file_detail = self.list[file_index]
+
+ # 文件夹需要特殊处理
+ if file_detail["Type"] == 1:
+ print("文件夹下载:")
+ return self.download_directory(file_detail, download_path)
+
+ # 获取下载链接
+ download_url = self.get_download_link(file_index)
+ if not download_url:
+ print("无法获取下载链接")
+ return False
+
+ # 确定文件名
+ file_name = file_detail["FileName"]
+ if not os.path.exists(download_path):
+ os.makedirs(download_path)
+
+ # 处理文件已存在的情况
+ full_path = os.path.join(download_path, file_name)
+ if os.path.exists(full_path):
+ if self.download_mode == 4: # 全部跳过
+ print(f"文件已存在,跳过: {file_name}")
+ return True
+
+ print(f"文件已存在: {file_name}")
+ choice = input("输入1覆盖,2跳过,3全部覆盖,4全部跳过: ")
+ if choice == "2" or choice == "4":
+ if choice == "4":
+ self.download_mode = 4
+ print("跳过下载")
+ return True
+ elif choice == "3":
+ self.download_mode = 3
+ os.remove(full_path)
+
+ # 临时文件名
+ temp_path = full_path + ".123pan"
+
+ try:
+ # 开始下载
+ print(f"开始下载: {file_name}")
+ response = requests.get(download_url, stream=True, timeout=30)
+ total_size = int(response.headers.get("Content-Length", 0))
+ downloaded = 0
+ start_time = time.time()
+
+ with open(temp_path, "wb") as f:
+ for chunk in response.iter_content(chunk_size=8192):
+ if chunk:
+ f.write(chunk)
+ downloaded += len(chunk)
+
+ # 计算下载速度
+ elapsed = time.time() - start_time
+ if elapsed > 0:
+ speed = downloaded / elapsed
+ speed_str = self._format_size(speed) + "/s"
+ else:
+ speed_str = "未知"
+
+ # 显示进度
+ if total_size > 0:
+ percent = downloaded / total_size * 100
+ print(f"\r进度: {percent:.1f}% | {self._format_size(downloaded)}/{self._format_size(total_size)} | {speed_str}", end=" ")
+
+ # 重命名文件
+ os.rename(temp_path, full_path)
+ print(f"\n下载完成: {file_name}")
+ return True
+ except requests.exceptions.RequestException as e:
+ print(f"\n下载失败: {e}")
+ if os.path.exists(temp_path):
+ os.remove(temp_path)
+ return False
+
+ def download_directory(self, directory: dict, download_path: str = "download") -> bool:
+ """下载整个目录"""
+ if directory["Type"] != 1:
+ print("不是文件夹")
+ return False
+
+ print(f"开始下载文件夹: {directory['FileName']}")
+
+ # 创建目标目录
+ target_dir = os.path.join(download_path, directory["FileName"])
+ if not os.path.exists(target_dir):
+ os.makedirs(target_dir)
+
+ # 获取目录内容
+ _, items = self.get_dir_by_id(directory["FileId"], save=False, all_files=True)
+ if not items:
+ print("文件夹为空")
+ return True
+
+ # 下载所有内容
+ success = True
+ for item in items:
+ if item["Type"] == 1: # 子文件夹
+ sub_success = self.download_directory(item, target_dir)
+ success = success and sub_success
+ else: # 文件
+ # 临时将文件添加到列表中以便下载
+ original_list = self.list
+ self.list = [item]
+ file_success = self.download_file(0, target_dir)
+ self.list = original_list
+ success = success and file_success
+
+ print(f"文件夹下载完成: {directory['FileName']}")
+ return success
+
+ def get_recycle_bin(self):
+ """获取回收站内容"""
+ url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=0&trashed=true&Page=1"
+ try:
+ response = requests.get(url, headers=self.headers, timeout=15)
+ result = self._handle_response(response)
+ if result.get("code") == 0:
+ return result["data"]["InfoList"]
+ return []
+ except requests.exceptions.RequestException:
+ return []
+
+ def restore_file(self, file_id: int) -> bool:
+ """从回收站恢复文件"""
+ payload = {
+ "driveId": 0,
+ "fileTrashInfoList": {"FileId": file_id},
+ "operation": False # False表示恢复
+ }
+
+ try:
+ response = requests.post(
+ "https://www.123pan.com/a/api/file/trash",
+ headers=self.headers,
+ json=payload,
+ timeout=10
+ )
+ result = self._handle_response(response)
+ return result.get("code") == 0
+ except requests.exceptions.RequestException:
+ return False
+
+ def logout(self):
+ """登出:清除 authorization、cookies 并保存配置"""
+ self.authorization = ""
+ # 清理 headers 中的 authorization 字段(兼容大小写)
+ if "authorization" in self.headers:
+ self.headers["authorization"] = ""
+ if "Authorization" in self.headers:
+ self.headers["Authorization"] = ""
+ self.cookies = None
+ self._save_config()
+ print("已登出,授权信息已清除")
+
+ def set_protocol(self, protocol: str, save: bool = True):
+ """切换协议:'android' 或 'web',切换后会重新初始化 headers 并可选择保存配置"""
+ protocol = protocol.lower()
+ if protocol not in ("android", "web"):
+ print("不支持的协议,仅支持 'android' 或 'web'")
+ return False
+ self.protocol = protocol
+ self._init_headers()
+ # 确保 authorization 字段更新到 headers
+ if "authorization" in self.headers:
+ self.headers["authorization"] = self.authorization
+ if "Authorization" in self.headers:
+ self.headers["Authorization"] = self.authorization
+ if save:
+ self._save_config()
+ print(f"已切换到 {protocol} 协议")
+ return True
+
+
+if __name__ == "__main__":
+ """主交互函数"""
+ # 解决Windows下cmd颜色转义问题
+ if os.name == "nt":
+ os.system("")
+
+ print("=" * 60)
+ print("123网盘客户端".center(60))
+ print("=" * 60)
+
+ pan = Pan123(config_file="123pan_config.json")
+ pan.show()
+
+ while True:
+ try:
+ path = "/" + "/".join(pan.parent_file_name_list) if pan.parent_file_name_list else "/"
+ command = input(f"\033[91m{path}>\033[0m ").strip()
+
+ if not command:
+ continue
+
+ parts = command.split(maxsplit=1)
+ cmd = parts[0].lower()
+ arg = parts[1] if len(parts) > 1 else ""
+
+ # 命令映射
+ if cmd == "ls":
+ pan.show()
+ elif cmd == "login":
+ pan.login()
+ pan.refresh_directory()
+ elif cmd == "logout":
+ pan.logout()
+ pan.refresh_directory()
+ elif cmd == "exit":
+ break
+ elif cmd == "cd":
+ pan.change_directory(arg)
+ elif cmd == "mkdir":
+ pan.create_directory(arg)
+ # 刷新当前目录
+ pan.refresh_directory()
+ elif cmd == "upload":
+ pan.upload(arg)
+ pan.refresh_directory()
+ elif cmd == "rm":
+ if arg.isdigit():
+ pan.delete_file(int(arg) - 1)
+ pan.refresh_directory()
+ else:
+ print("无效的文件编号")
+ elif cmd == "share":
+ file_ids = [int(idx) - 1 for idx in arg.split() if idx.isdigit()]
+ if file_ids:
+ pwd = input("输入提取码(留空跳过): ")
+ pan.share(file_ids, pwd)
+ else:
+ print("请提供文件编号")
+ elif cmd == "more":
+ pan.get_dir()
+ pan.show()
+ elif cmd == "link":
+ if arg.isdigit():
+ idx = int(arg) - 1
+ if 0 <= idx < len(pan.list):
+ link = pan.get_download_link(idx)
+ if link:
+ print(f"文件直链: {link}")
+ else:
+ print("获取直链失败")
+ else:
+ print("无效的文件编号")
+ else:
+ print("请提供文件编号")
+ elif cmd in ("download", "d"):
+ if arg.isdigit():
+ idx = int(arg) - 1
+ if 0 <= idx < len(pan.list):
+ pan.download_file(idx)
+ else:
+ print("无效的文件编号")
+ else:
+ print("请提供文件编号")
+ elif cmd == "recycle":
+ recycle_items = pan.get_recycle_bin()
+ if recycle_items:
+ print("\n回收站内容:")
+ for i, item in enumerate(recycle_items, 1):
+ print(f"{i}. {item['FileName']} ({pan._format_size(item['Size'])})")
+
+ action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
+ if action.isdigit():
+ idx = int(action) - 1
+ if 0 <= idx < len(recycle_items):
+ if pan.restore_file(recycle_items[idx]["FileId"]):
+ print("文件恢复成功")
+ else:
+ print("恢复失败")
+ elif action == "clear":
+ for item in recycle_items:
+ pan.delete_file(item, by_index=False)
+ print("回收站已清空")
+ else:
+ print("回收站为空")
+ # 刷新当前目录
+ pan.refresh_directory()
+ elif cmd in ("refresh", "re"):
+ pan.refresh_directory()
+ elif cmd == "reload":
+ pan._load_config()
+ pan.refresh_directory()
+ elif cmd.isdigit():
+ # 切换目录或下载文件
+ idx = int(cmd) - 1
+ if 0 <= idx < len(pan.list):
+ if pan.list[idx]["Type"] == 1:
+ pan.change_directory(cmd)
+ else:
+ pan.download_file(idx)
+ else:
+ print("无效的文件编号")
+
+ elif cmd == "protocol":
+ if arg.lower() in ("android", "web"):
+ pan.set_protocol(arg.lower())
+ pan.refresh_directory()
+ else:
+ print("请指定协议: android 或 web")
+ else:
+ # 帮助提示,列出支持的命令
+ print("可用命令:")
+ print(" ls - 显示当前目录")
+ print(" cd [编号|..|/] - 切换目录")
+ print(" mkdir [名称] - 创建目录")
+ print(" upload [路径] - 上传文件")
+ print(" rm [编号] - 删除文件")
+ print(" share [编号 ...] - 创建分享")
+ print(" link [编号] - 获取文件直链")
+ print(" download/d [编号] - 下载文件")
+ print(" recycle - 管理回收站")
+ print(" refresh/re - 刷新目录")
+ print(" reload - 重新加载配置并刷新")
+ print(" login - 登录")
+ print(" logout - 登出并清除 token")
+ print(" more - 继续加载更多文件")
+ # print(" all - 强制加载当前目录所有文件")
+ print(" exit - 退出程序")
+ print(" protocol [android|web] - 切换协议")
+ except KeyboardInterrupt:
+ print("\n操作已取消")
+ except Exception as e:
+ print(f"发生错误: {e}")
+
diff --git a/README.md b/README.md
index 6d58dbe..1b0b7b5 100644
--- a/README.md
+++ b/README.md
@@ -1,232 +1,162 @@
-# 123云盘 下载工具
+# 123Pan 下载工具
-***不要把123变成下一个baidu。***
-## 项目介绍
-#### 1.下载工具
-123云盘下载工具是一个使用 Python 编写的脚本,通过模拟安卓客户端协议来绕过 123云盘的自用下载流量限制。该工具可以帮助用户在电脑上方便地下载 123云盘上的文件,并提供了多种操作功能,如列出文件、下载文件、上传文件、分享文件等。
-#### 2.油猴脚本(解除网页端限流)
-123download.js是一个油猴(篡改猴)脚本,使用油猴扩展安装该脚本后,用户可以直接在123云盘官网不限流量下载文件(文件夹打包仍有流量限制,请尽量单文件下载)
-# 一、123云盘下载工具
-## 功能特点
+123 网盘命令行工具,支持列出文件、下载、上传、分享、删除、创建目录及回收站管理。Android 客户端或 Web 协议,便于在本地批量管理与下载文件。
-- **登录**:使用用户名和密码登录 123Pan 账号。
-- **列出文件**:显示当前目录下的所有文件和文件夹。
-- **下载文件**:通过模拟安卓客户端协议下载文件,绕过流量限制。
-
- 文件均下载到同目录下的download文件夹,下载时使用.123pan后缀,完成后重命名为文件原始名称。
-- **上传文件**:将本地文件上传到 123Pan。
-- **分享文件**:生成文件分享链接。
-- **删除文件**:删除指定的文件或文件夹。
-- **创建文件夹**:在当前目录下创建新文件夹。
+安卓客户端协议不受下载流量限制(推荐使用)。
-## 使用方法
+123download.js 是网页端下载油猴脚本最初版本,仍然可以使用,仅保留最基本的解锁下载功能,不再更新。
+可以参考其他项目:[123云盘解锁 (@QingJ)](https://greasyfork.org/zh-CN/scripts/519353-123%E4%BA%91%E7%9B%98%E8%A7%A3%E9%94%81)
+[123 云盘会员青春版 (@hmjz100)](https://greasyfork.org/zh-CN/scripts/513528-123-%E4%BA%91%E7%9B%98%E4%BC%9A%E5%91%98%E9%9D%92%E6%98%A5%E7%89%88)
-**可以直接下载release的可执行文件运行或运行python脚本。**
+## 特性
+- 登录 / 登出
+- 列出当前目录文件(ls)
+- 切换目录(cd)与刷新(refresh / re)
+- 下载单文件或递归下载文件夹(download / d)
+- 上传文件(upload)
+- 创建文件夹(mkdir)
+- 删除文件(rm)
+- 创建分享链接(share)
+- 获取文件直链(link)
+- 回收站管理(recycle / restore)
+- 协议切换(protocol android|web)
+- 支持保存配置到 JSON 文件(authorization、device/os、protocol 等)
-### 直接运行
-
-Windows用户下载123pan_win_x86_64.zip,解压后直接运行android.exe。
+## 脚本环境要求
+- Python 3.7+
+- 依赖库:requests
+ 安装:
+ ```bash
+ pip install requests
+ ```
+## 安装与运行
### 脚本运行
-#### 环境准备
-
-1. 确保已正确安装并配置 Python 3.x。
-
-2. 克隆或下载本项目代码到本地,在项目目录打开终端
-
-3. 安装所需的 Python 库:
-
+1. 克隆或下载本仓库到本地。
+2. 进入项目目录。
+3. 运行脚本:
```bash
- pip install -r requirements.txt
+ python 123pan.py
```
+ 启动后会提示输入用户名 / 密码,或自动读取配置文件(默认 `123pan_config.json` 或 `123pan.txt`,脚本内部根据传入参数使用该文件)。
+### 下载release版
+ 根据系统下载对应的 release 版本,解压后运行 `123pan.exe`(Windows)或 `123pan`(Linux)。
+## 配置文件(JSON)
+脚本会读取并保存一个配置文件(示例 `123pan_config.json`),保存登录状态与偏好,格式示例:
+```json
+{
+ "userName": "your_username",
+ "passWord": "your_password",
+ "authorization": "Bearer xxxxx",
+ "deviceType": "M2007J20CI",
+ "osVersion": "Android_10",
+ "protocol": "android"
+}
+```
+注意:保存密码或 token 到本地会有安全风险,请在可信环境下使用并妥善保护该文件。
-#### 运行脚本
+## 常用命令(交互式)
+在脚本交互提示符中输入命令,部分带参数:
-
-1. 打开终端或命令提示符,进入项目目录。
-
-2. 运行脚本:
-
- 使用安卓客户端协议
-
- ```bash
- python android.py
- ```
-
- 使用web协议(不建议,已停止更新)
-
- ```bash
- python web.py
- ```
-
-### 命令
-
-- **登录**:`log`
-
-- **列出文件**:`ls`
-
-- **刷新目录**:`re`
-
-- **下载文件**:直接输入文件编号或者
-
- `download <文件编号>`
+- 直接输入编号
+ - 若编号对应文件夹 → 进入该文件夹
+ - 若编号对应文件 → 直接下载该文件
- Android协议下使用download命令可以直接下载文件夹
+- ls
+ - 显示当前目录的文件与文件夹列表。
-- **获取下载链接** `link <文件编号>`
+- cd [编号|..|/]
+ - 切换目录:
+ - cd 3 —— 进入当前列表第3项(如果是文件夹)。
+ - cd .. —— 返回上级。
+ - cd / —— 返回根目录。
-- **分享文件**:`share`
+- mkdir [名称]
+ - 在当前目录创建文件夹。例如:mkdir test
-- **删除文件**:`delete <文件编号>`
+- upload [路径]
+ - 上传文件到当前目录。例如:upload C:\Users\you\Desktop\file.txt
+ - 仅支持文件,暂不支持目录批量上传。
-- **创建文件夹**:`mkdir <文件夹名称>`
+- rm [编号]
+ - 删除当前列表中的文件/文件夹(会移动到回收站)。例如:rm 2
-- **切换目录**:直接输入文件夹编号,或
+- share [编号 ...]
+ - 为一个或多个文件创建分享链接,例如:share 2 4
+ - 程序会提示输入提取码(可留空)。
- `cd <目录编号>`
-
- 使用`cd ..` 返回上一级目录
-
-- **上传文件**:`upload`,然后输入文件路径
+- link [编号]
+ - 获取文件直链。例如:link 3
-- **直接输入数字**:进入文件夹,或是下载文件
+- download / d [编号]
+ - 下载指定编号的文件或文件夹(如果是文件夹,会递归下载)。
+ - 例如:download 5
-- **退出**:`exit`
+- recycle
+ - 显示回收站内容,并可恢复或清空。
+ - 可输入编号恢复,或输入 clear 清空回收站。
-### 示例
+- refresh / re
+ - 刷新当前目录列表。
-1. 登录:
-
- ```bash
- > log
- ```
+- reload
+ - 重新加载配置文件并刷新目录。
-2. 列出文件:
-
- ```bash
- > ls↵
- ```
+- login / logout
+ - login:手动登录(使用当前配置或提示输入)。
+ - logout:登出并清除授权信息(保存配置时会写入空 token)。
-3. 下载文件:
-
- ```bash
- > download 1↵
- ```
-
- 或直接输入文件编号:
-
- ```bash
- >54↵
- 1.20.0.01_v8a.apk 193.53M
- press 1 to download now: 1↵
- 1.20.0.01_v8a.apk 193.53M
- [██████████████████████████████████████████████████] 100%
- ok
- ```
-
- 下载文件夹(android):
-
- 打包下载依旧会受到流量限制,递归下载无限制,请尽量使用递归下载。
-
- ```bash
- >download 13
- test
- 输入1遍历下载,输入2打包下载:1
- 文件 FLUID.dat 已存在,是否要覆盖?
- 输入1覆盖,2跳过,3全部覆盖,4全部跳过:4
- 已跳过
- 文件 FLUID.cas已跳过
- 文件 profili2.3.zip已跳过
- ...
- ```
+- more
+ - 如果当前目录分页未加载完,输入 more 继续加载更多文件。
-4. 获取下载链接
-
- ```bash
- >link 32
- https://1-180-24-9.pd1.cjjd19.com:30443/(A Long Link)
- ```
-
- 获取下载链接后可以使用IDM下载
+- protocol [android|web]
+ - 切换协议。例:protocol web
+ - 切换后会重新初始化请求头,并可选择保存到配置文件。
-5. 分享文件:
-
- ```bash
- >share↵
- 分享文件的编号:58↵
- ['Something.jpg']
- 输入1添加文件,0发起分享,其他取消0↵
- 提取码,不设留空:↵
- ok
- 分享链接:
- https://www.123pan.com/s/(someting)提取码:(something)
- ```
+- exit
+ - 退出程序。
-6. 删除文件:
-
- ```bash
- > delete 1
- ```
+---
-7. 创建文件夹:
-
- ```bash
- > mkdir 新建文件夹
- ```
+交互示例:
+```
+/> cd demo
+无效命令,使用 '..' 返回上级,'/' 返回根目录,或输入文件夹编号
+/> cd 1
+当前目录为空
+/demo1> ls
+当前目录为空
+/demo1> mkdir test
+目录 'test' 创建成功
-8. 上传文件:
-
- ```bash
- >upload
- 请输入文件路径:C:\(some path)\config
- 文件名: config
- 检测到1个同名文件,输入1覆盖,2保留两者,0取消:2
- 上传文件的fileId: (someThing)
- 已上传:100.0%
- 处理中
- 上传成功
- ```
-### 配置文件说明
+/demo1> 1
+当前目录为空
+/demo1/test> upload 123pan.py
+上传进度: 100.0%
+上传完成,正在验证...
+文件上传成功
+------------------------------------------------------------
+编号 类型 大小 名称
+------------------------------------------------------------
+1 文件 38.66 KB 123pan.py
+============================================================
-在首次运行脚本时,会自动生成一个 `123pan.txt` 文件,用于保存用户的登录信息。请确保该文件与脚本在同一目录下。
-# 二、123云盘辅助下载脚本
-## 功能
-不影响网页端的操作下,绕开流量限制,可以直接点击下载
-(文件夹打包仍无法不限流量下载,文件夹请使用下载工具下载)
+/demo1/test> 1
+开始下载: 123pan.py
+进度: 100.0% | 38.66 KB/38.66 KB | 10.29 MB/s
+下载完成: 123pan.py
+/demo1/test>
+```
-## 安装方法
-## 1.通过油叉安装(推荐)
-### 下载油猴扩展
+## 下载说明
+- 下载到脚本所在目录的 `download` 文件夹,下载过程中使用临时后缀 `.123pan`,下载完成后会重命名为原文件名。
+- 如果文件已存在,会提示覆盖 / 跳过 / 全部覆盖 / 全部跳过等选项。
-进入谷歌浏览器或edge浏览器扩展商店,搜索“篡改猴”,安装:
-
+## 注意事项
+- 本工具用于学习与自用场景,请勿用于违法用途。对任何滥用造成的后果,本人概不负责。
+- 模拟客户端协议可能存在账号或服务端策略风险,请谨慎使用。
+- 建议不要在公用或不受信任的机器上保存明文密码或授权信息。
-### 安装脚本
-打开篡改猴,点击获取新脚本:
+## 免责声明
-
-
-### 进入油叉: 或直接进入脚本地址:```https://greasyfork.org/zh-CN/scripts/510621-123%E4%BA%91%E7%9B%98%E4%B8%8B%E8%BD%BD%E8%BE%85%E5%8A%A9```
-
-
-
-### 搜索```123云盘下载辅助```:
-
-### 安装:
-
-
-
-
-
-
-### 安装完成后即可进入123云盘官网不限速下载:
-
-
-## 2.直接安装
-下载本仓库的123download.js,安装进油猴即可,具体方法不再详述,请自行搜索。
-
-# 注意事项
-
-- 请确保在使用过程中网络连接正常。
-- 由于使用了模拟安卓客户端协议,可能会有一定的风险,请谨慎使用。
-- 本工具仅供学习和研究使用,请勿用于非法用途,任何滥用行为造成的后果由使用者承担
+本工具用于学习场景,请勿用于违法用途。对任何滥用造成的后果,作者概不负责。
\ No newline at end of file
diff --git a/android.py b/android.py
deleted file mode 100644
index 13340bf..0000000
--- a/android.py
+++ /dev/null
@@ -1,1088 +0,0 @@
-import hashlib
-import json
-import os
-import random
-import re
-import time
-import uuid
-
-import requests
-
-
-class Pan123:
- def __init__(
- self,
- readfile=True,
- user_name="",
- pass_word="",
- authorization="",
- input_pwd=True,
- ):
-
- self.all_device_type = [
- "MI-ONE PLUS", "MI-ONE C1", "MI-ONE", "2012051", "2012053", "2012052", "2012061", "2012062", "2013012",
- "2013021", "2012121", "2013061", "2013062", "2013063", "2014215", "2014218", "2014216", "2014719",
- "2014716", "2014726", "2015015", "2015561", "2015562", "2015911", "2015201", "2015628", "2015105",
- "2015711", "2016070", "2016089", "MDE2", "MDT2", "MCE16", "MCT1", "M1804D2SE", "M1804D2ST", "M1804D2SC",
- "M1803E1A", "M1803E1T", "M1803E1C", "M1807E8S", "M1807E8A", "M1805E2A", "M1808D2TE", "M1808D2TT",
- "M1808D2TC", "M1808D2TG", "M1902F1A", "M1902F1T", "M1902F1C", "M1902F1G", "M1908F1XE", "M1903F2A",
- "M1903F2G", "M1903F10G", "M1903F11G", "M1904F3BG", "M2001J2E", "M2001J2G", "M2001J2I", "M2001J1E",
- "M2001J1G", "M2002J9E", "M2002J9G", "M2002J9S", "M2002J9R", "M2007J1SC", "M2007J3SY", "M2007J3SP",
- "M2007J3SG", "M2007J3SI", "M2007J17G", "M2007J17I", "M2102J2SC", "M2011K2C", "M2011K2G", "M2102K1AC",
- "M2102K1C", "M2102K1G", "M2101K9C", "M2101K9G", "M2101K9R", "M2101K9AG", "M2101K9AI", "2107119DC",
- "2109119DG", "2109119DI", "M2012K11G", "M2012K11AI", "M2012K11I", "21081111RG", "2107113SG", "2107113SI",
- "2107113SR", "21091116I", "21091116UI", "2201123C", "2201123G", "2112123AC", "2112123AG", "2201122C",
- "2201122G", "2207122MC", "2203129G", "2203129I", "2206123SC", "2206122SC", "2203121C", "22071212AG",
- "22081212UG", "22081212R", "A201XM", "2211133C", "2211133G", "2210132C", "2210132G", "2304FPN6DC",
- "2304FPN6DG", "2210129SG", "2306EPN60G", "2306EPN60R", "XIG04", "23078PND5G", "23088PND5R", "A301XM",
- "23127PN0CC", "23127PN0CG", "23116PN5BC", "2311BPN23C", "24031PN0DC", "24030PN60G", "24053PY09I",
- "2406APNFAG", "XIG06", "2407FPN8EG", "2407FPN8ER", "A402XM", "2014616", "2014619", "2014618", "2014617",
- "2015011", "2015021", "2015022", "2015501", "2015211", "2015212", "2015213", "MCE8", "MCT8", "M1910F4G",
- "M1910F4S", "M2002F4LG", "2016080", "MDE5", "MDT5", "MDE5S", "M1803D5XE", "M1803D5XA", "M1803D5XT",
- "M1803D5XC", "M1810E5E", "M1810E5A", "M1810E5GG", "2106118C", "M2011J18C", "22061218C", "2308CPXD0C",
- "24072PX77C", "2405CPX3DC", "2405CPX3DG", "2016001", "2016002", "2016007", "MDE40", "MDT4", "MDI40",
- "M1804E4A", "M1804E4T", "M1804E4C", "M1904F3BC", "M1904F3BT", "M1906F9SC", "M1910F4E", "2109119BC",
- "2109119BC", "2209129SC", "23046PNC9C", "24053PY09C", "M1901F9E", "M1901F9T", "MDG2", "MDI2", "M1804D2SG",
- "M1804D2SI", "M1805D1SG", "M1906F9SH", "M1906F9SI", "A0101", "2015716", "MCE91", "M1806D9W", "M1806D9E",
- "M1806D9PE", "21051182C", "21051182G", "M2105K81AC", "M2105K81C", "22081281AC", "23043RP34C", "23043RP34G",
- "23043RP34I", "23046RP50C", "2307BRPDCC", "24018RPACC", "24018RPACG", "2013022", "2013023", "2013029",
- "2013028", "2014011", "2014501", "2014813", "2014112", "2014811", "2014812", "2014821", "2014817",
- "2014818", "2014819", "2014502", "2014512", "2014816", "2015811", "2015812", "2015810", "2015817",
- "2015818", "2015816", "2016030", "2016031", "2016032", "2016037", "2016036", "2016035", "2016033",
- "2016090", "2016060", "2016111", "2016112", "2016117", "2016116", "MAE136", "MAT136", "MAG138", "MAI132",
- "MDE1", "MDT1", "MDG1", "MDI1", "MEE7", "MET7", "MEG7", "MCE3B", "MCT3B", "MCG3B", "MCI3B", "M1804C3DE",
- "M1804C3DT", "M1804C3DC", "M1804C3DG", "M1804C3DI", "M1805D1SE", "M1805D1ST", "M1805D1SC", "M1805D1SI",
- "M1804C3CE", "M1804C3CT", "M1804C3CC", "M1804C3CG", "M1804C3CI", "M1810F6LE", "M1810F6LT", "M1810F6LG",
- "M1810F6LI", "M1903C3EE", "M1903C3ET", "M1903C3EC", "M1903C3EG", "M1903C3EI", "M1908C3IE", "M1908C3IC",
- "M1908C3IG", "M1908C3II", "M1908C3KE", "M1908C3KG", "M1908C3KI", "M2001C3K3I", "M2004J19C", "M2004J19G",
- "M2004J19I", "M2004J19AG", "M2006C3LC", "M2006C3LG", "M2006C3LVG", "M2006C3LI", "M2006C3LII", "M2006C3MG",
- "M2006C3MT", "M2006C3MNG", "M2006C3MII", "M2010J19SG", "M2010J19SI", "M2010J19SR", "M2010J19ST",
- "M2010J19SY", "M2010J19SL", "21061119AG", "21061119AL", "21061119BI", "21061119DG", "21121119SG",
- "21121119VL", "22011119TI", "22011119UY", "22041219G", "22041219I", "22041219NY", "220333QAG", "220333QBI",
- "220333QNY", "220333QL", "220233L2C", "220233L2G", "220233L2I", "22071219AI", "23053RN02A", "23053RN02I",
- "23053RN02L", "23053RN02Y", "23077RABDC", "23076RN8DY", "23076RA4BR", "XIG03", "A401XM", "23076RN4BI",
- "23076RA4BC", "22120RN86C", "22120RN86G", "22120RN86H", "2212ARNC4L", "22126RN91Y", "2404ARN45A",
- "2404ARN45I", "24049RN28L", "24040RN64Y", "2406ERN9CI", "23106RN0DA", "2311DRN14I", "23100RN82L",
- "23108RN04Y", "23124RN87C", "23124RN87I", "23124RN87G", "2409BRN2CA", "2409BRN2CI", "2409BRN2CL",
- "2409BRN2CY", "2411DRN47C", "2014018", "2013121", "2014017", "2013122", "2014022", "2014021", "2014715",
- "2014712", "2014915", "2014912", "2014916", "2014911", "2014910", "2015052", "2015051", "2015712",
- "2015055", "2015056", "2015617", "2015611", "2015112", "2015116", "2015161", "2016050", "2016051",
- "2016101", "2016130", "2016100", "MBE6A5", "MBT6A5", "MEI7", "MEE7S", "MET7S", "MEC7S", "M1803E7SG",
- "MEI7S", "MDE6", "MDT6", "MDG6", "MDI6", "MDE6S", "MDT6S", "MDG6S", "MDI6S", "M1806E7TG", "M1806E7TI",
- "M1901F7E", "M1901F7T", "M1901F7C", "M1901F7G", "M1901F7I", "M1901F7BE", "M1901F7S", "M1908C3JE",
- "M1908C3JC", "M1908C3JG", "M1908C3JI", "M1908C3XG", "M1908C3JGG", "M1906G7E", "M1906G7T", "M1906G7G",
- "M1906G7I", "M2010J19SC", "M2007J22C", "M2003J15SS", "M2003J15SI", "M2003J15SG", "M2007J22G", "M2007J22R",
- "M2007J17C", "M2003J6A1G", "M2003J6A1R", "M2003J6A1I", "M2003J6B1I", "M2003J6B2G", "M2101K7AG", "M2101K7AI",
- "M2101K7BG", "M2101K7BI", "M2101K7BNY", "M2101K7BL", "M2103K19C", "M2103K19I", "M2103K19G", "M2103K19Y",
- "M2104K19J", "22021119KR", "A101XM", "M2101K6G", "M2101K6T", "M2101K6R", "M2101K6P", "M2101K6I",
- "M2104K10AC", "2109106A1I", "21121119SC", "2201117TG", "2201117TI", "2201117TL", "2201117TY", "21091116AC",
- "21091116AI", "22041219C", "2201117SG", "2201117SI", "2201117SL", "2201117SY", "22087RA4DI", "22031116BG",
- "21091116C", "2201116TG", "2201116TI", "2201116SC", "2201116SG", "2201116SR", "2201116SI", "21091116UC",
- "21091116UG", "22041216C", "22041216UC", "22095RA98C", "23021RAAEG", "23027RAD4I", "23028RA60L",
- "23021RAA2Y", "22101317C", "22111317G", "22111317I", "23076RA4BC", "2303CRA44A", "2303ERA42L", "23030RAC7Y",
- "2209116AG", "22101316C", "22101316G", "22101316I", "22101316UCP", "22101316UG", "22101316UP", "22101316UC",
- "22101320C", "23054RA19C", "23049RAD8C", "23129RAA4G", "23129RA5FL", "23124RA7EO", "2312DRAABC",
- "2312DRAABI", "2312DRAABG", "23117RA68G", "2312DRA50C", "2312DRA50G", "2312DRA50I", "XIG05", "23090RA98C",
- "23090RA98G", "23090RA98I", "24040RA98R", "2406ERN9CC", "2311FRAFDC", "24094RAD4C", "24094RAD4G",
- "24094RAD4I", "24090RA29C", "24090RA29G", "24090RA29I", "24115RA8EC", "24115RA8EG", "24115RA8EI",
- "M2004J7AC", "M2004J7BC", "M2003J15SC", "24069RA21C", "M1903F10A", "M1903F10C", "M1903F10I", "M1903F11A",
- "M1903F11C", "M1903F11I", "M1903F11A", "M2001G7AE", "M2001G7AC", "M2001G7AC", "M1912G7BE", "M1912G7BC",
- "M2001J11C", "M2001J11C", "M2006J10C", "M2007J3SC", "M2012K11AC", "M2012K11C", "M2012K10C", "22021211RC",
- "22041211AC", "22011211C", "21121210C", "22081212C", "22041216I", "23013RK75C", "22127RK46C", "22122RK93C",
- "23078RKD5C", "23113RKC6C", "23117RK66C", "2311DRK48C", "2407FRK8EC", "2016020", "2016021", "M1803E6E",
- "M1803E6T", "M1803E6C", "M1803E6G", "M1803E6I", "M1810F6G", "M1810F6I", "M1903C3GG", "M1903C3GI",
- "220733SG", "220733SH", "220733SL", "220733SFG", "220733SFH", "23028RN4DG", "23028RN4DH", "23026RN54G",
- "23028RNCAG", "23028RNCAH", "23129RN51X", "23129RN51H", "2312CRNCCL", "24048RN6CG", "24048RN6CI",
- "24044RN32L", "2409BRN2CG", "22081283C", "22081283G", "23073RPBFC", "23073RPBFG", "23073RPBFL",
- "2405CRPFDC", "2405CRPFDG", "2405CRPFDI", "2405CRPFDL", "24074RPD2C", "24074RPD2G", "24074RPD2I",
- "24075RP89G", "24076RP19G", "24076RP19I", "M1805E10A", "M2004J11G", "M2012K11AG", "M2104K10I", "22021211RG",
- "22021211RI", "21121210G", "23049PCD8G", "23049PCD8I", "23013PC75G", "24069PC21G", "24069PC21I",
- "23113RKC6G", "M1912G7BI", "M2007J20CI", "M2007J20CG", "M2007J20CT", "M2102J20SG", "M2102J20SI",
- "21061110AG", "2201116PG", "2201116PI", "22041216G", "22041216UG", "22111317PG", "22111317PI", "22101320G",
- "22101320I", "23122PCD1G", "23122PCD1I", "2311DRK48G", "2311DRK48I", "2312FRAFDI", "M2004J19PI",
- "M2003J6CI", "M2010J19CG", "M2010J19CT", "M2010J19CI", "M2103K19PG", "M2103K19PI", "22041219PG",
- "22041219PI", "2201117PG", "2201117PI", "21091116AG", "22031116AI", "22071219CG", "22071219CI",
- "2207117BPG", "2404APC5FG", "2404APC5FI", "23128PC33I", "24066PC95I", "2312FPCA6G", "23076PC4BI",
- "M2006C3MI", "211033MI", "220333QPG", "220333QPI", "220733SPH", "2305EPCC4G", "2302EPCC4H", "22127PC95G",
- "22127PC95H", "2312BPC51X", "2312BPC51H", "2310FPCA4G", "2310FPCA4I", "2405CPCFBG", "24074PCD2I", "FYJ01QP",
- "21051191C"
- ]
- self.all_os_versions = [
- "Android_7.1.2", "Android_8.0.0", "Android_8.1.0", "Android_9.0", "Android_10", "Android_11", "Android_12",
- "Android_13", "Android_6.0.1", "Android_5.1.1", "Android_4.4.4", "Android_4.3", "Android_4.2.2",
- "Android_4.1.2",
- ]
- # 随机生成设备信息
- self.devicetype = random.choice(self.all_device_type)
- self.osversion = random.choice(self.all_os_versions)
-
- self.download_mode = 1
- self.cookies = None
- self.recycle_list = None
- self.list = []
- self.total = 0
- self.parent_file_name_list = []
- self.all_file = False
- self.file_page = 0
- self.file_list = []
- self.dir_list = []
- self.name_dict = {}
- if readfile:
- self.read_ini(user_name, pass_word, input_pwd, authorization)
- else:
- if user_name == "" or pass_word == "":
- print("读取已禁用,用户名或密码为空")
- if input_pwd:
- user_name = input("请输入用户名:")
- pass_word = input("请输入密码:")
- else:
- raise Exception("用户名或密码为空:读取禁用时,userName和passWord不能为空")
- self.user_name = user_name
- self.password = pass_word
- self.authorization = authorization
- self.header_logined = {
- "user-agent": "123pan/v2.4.0(" + self.osversion + ";Xiaomi)",
- "authorization": self.authorization,
- "accept-encoding": "gzip",
- # "authorization": "",
- "content-type": "application/json",
- "osversion": self.osversion,
- "loginuuid": str(uuid.uuid4().hex),
- "platform": "android",
- "devicetype": self.devicetype,
- "devicename": "Xiaomi",
- # "x-channel": "1004",
-
- # "Content-Length": "65",
- "host": "www.123pan.com",
- "app-version": "61",
- "x-app-version": "2.4.0"
- }
- self.parent_file_id = 0 # 路径,文件夹的id,0为根目录
- self.parent_file_list = [0]
- res_code_getdir = self.get_dir()[0]
- if res_code_getdir != 0:
- self.login()
- self.get_dir()
-
- def login(self):
- data = {"type": 1, "passport": self.user_name, "password": self.password}
- # sign = getSign("/b/api/user/sign_in")
- login_res = requests.post(
- "https://www.123pan.com/b/api/user/sign_in",
- headers=self.header_logined,
- data=data,
- # params={sign[0]: sign[1]}, timeout=10,
- # verify=False
- )
-
- res_sign = login_res.json()
- # print("登录结果:", res_sign)
- # print("登录结果header:", login_res.headers)
- res_code_login = res_sign["code"]
- if res_code_login != 200:
- print("code = 1 Error:" + str(res_code_login))
- print(res_sign["message"])
- return res_code_login
- set_cookies = login_res.headers["Set-Cookie"]
- set_cookies_list = {}
-
- for cookie in set_cookies.split(';'):
- if '=' in cookie:
- key, value = cookie.strip().split('=', 1)
- set_cookies_list[key] = value
- else:
- set_cookies_list[cookie.strip()] = None
-
- self.cookies = set_cookies_list
-
- token = res_sign["data"]["token"]
- self.authorization = "Bearer " + token
- self.header_logined["authorization"] = self.authorization
- # ret['cookie'] = cookie
- self.save_file()
- return res_code_login
-
- def save_file(self):
- with open("123pan.txt", "w", encoding="utf_8") as f:
- save_list = {
- "userName": self.user_name,
- "passWord": self.password,
- "authorization": self.authorization,
- "deviceType": self.devicetype,
- "osVersion": self.osversion,
- }
-
- f.write(json.dumps(save_list))
- print("账号已保存")
-
- def get_dir(self, save=True):
- return self.get_dir_by_id(self.parent_file_id, save)
-
- # 按页(非123页数)读取文件
- # all = True 强制获取所有文件
- def get_dir_by_id(self, file_id, save=True, all=False, limit=100):
- get_pages = 3
- res_code_getdir = 0
- page = self.file_page * get_pages + 1
- lenth_now = len(self.list)
- if all:
- # 强制获取所有文件
- page = 1
- lenth_now = 0
- lists = []
-
- total = -1
- times = 0
- while (lenth_now < total or total == -1) and (times < get_pages or all):
- base_url = "https://www.123pan.com/api/file/list/new"
- # print(self.headerLogined)
- # sign = getSign("/b/api/file/list/new")
- # print(sign)
- params = {
- # sign[0]: sign[1],
- "driveId": 0,
- "limit": limit,
- "next": 0,
- "orderBy": "file_id",
- "orderDirection": "desc",
- "parentFileId": str(file_id),
- "trashed": False,
- "SearchData": "",
- "Page": str(page),
- "OnlyLookAbnormalFile": 0,
- }
- try:
- a = requests.get(base_url, headers=self.header_logined, params=params, timeout=30) # , verify=False)
- except:
- print("连接失败")
- return -1, []
- # print(a.text)
- # print(a.headers)
- text = a.json()
- res_code_getdir = text["code"]
- if res_code_getdir != 0:
- # print(a.text)
- # print(a.headers)
- print("code = 2 Error:" + str(res_code_getdir))
- print(text["message"])
- return res_code_getdir, []
- lists_page = text["data"]["InfoList"]
- lists += lists_page
- total = text["data"]["Total"]
- lenth_now += len(lists_page)
- page += 1
- times += 1
- if times % 5 == 0:
- print("警告:文件夹内文件过多:" + str(lenth_now) + "/" + str(total))
- print("为防止对服务器造成影响,暂停10秒")
- print("请耐心等待!")
- time.sleep(10)
-
- if lenth_now < total:
- print("文件夹内文件过多:" + str(lenth_now) + "/" + str(total))
- self.all_file = False
- else:
- self.all_file = True
- self.total = total
- self.file_page += 1
- # 编号
- # file_num = 0
- # for i in lists:
- # i["FileNum"] = file_num
- # file_num += 1
- if save:
- # 合并list
- self.list = self.list + lists
-
- return res_code_getdir, lists
-
- def show(self):
- print("--------------------")
- for i in self.list:
- file_size = i["Size"]
- if file_size > 1073741824:
- download_size_print = str(round(file_size / 1073741824, 2)).ljust(6) + " GB"
- elif file_size > 1048576:
- download_size_print = str(round(file_size / 1048576, 2)).ljust(6) + " MB"
- else:
- download_size_print = str(round(file_size / 1024, 2)).ljust(6) + " KB"
-
- if i["Type"] == 0:
- print(
- "\033[33m" + "编号:",
- self.list.index(i) + 1,
- "\033[0m \t\t" + download_size_print + "\t\t\033[36m",
- i["FileName"],
- "\033[0m",
- )
- elif i["Type"] == 1:
- print(
- "\033[35m" + "编号:",
- self.list.index(i) + 1,
- " \t\t\033[36m",
- i["FileName"],
- "\033[0m",
- )
- if not self.all_file:
- print("剩余" + str(self.total - len(self.list)) + "个文件未获取")
- print("输入more继续获取")
- print("--------------------")
-
- # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!!
- def link_by_number(self, file_number, showlink=True):
- file_detail = self.list[file_number]
- return self.link_by_fileDetail(file_detail, showlink)
-
- def link_by_fileDetail(self, file_detail, showlink=True):
- type_detail = file_detail["Type"]
-
- if type_detail == 1:
- down_request_url = "https://www.123pan.com/a/api/file/batch_download_info"
- down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
-
- else:
- down_request_url = "https://www.123pan.com/a/api/file/download_info"
- down_request_data = {
- "driveId": 0,
- "etag": file_detail["Etag"],
- "fileId": file_detail["FileId"],
- "s3keyFlag": file_detail["S3KeyFlag"],
- "type": file_detail["Type"],
- "fileName": file_detail["FileName"],
- "size": file_detail["Size"],
- }
- # print(down_request_data)
-
- # sign = getSign("/a/api/file/download_info")
-
- link_res = requests.post(
- down_request_url,
- headers=self.header_logined,
- # params={sign[0]: sign[1]},
- data=json.dumps(down_request_data),
- timeout=10
- )
- # print(linkRes.text)
- link_res_json = link_res.json()
- res_code_download = link_res_json["code"]
- if res_code_download != 0:
- print("code = 3 Error:" + str(res_code_download))
- print(link_res_json["message"])
- return res_code_download
- down_load_url = link_res.json()["data"]["DownloadUrl"]
- next_to_get = requests.get(down_load_url, timeout=10, allow_redirects=False).text
- url_pattern = re.compile(r"href='(https?://[^']+)'")
- redirect_url = url_pattern.findall(next_to_get)[0]
- if showlink:
- print(redirect_url)
-
- return redirect_url
-
- def download(self, file_number, download_path="download"):
- file_detail = self.list[file_number]
- if file_detail["Type"] == 1:
- print("开始下载")
- file_name = file_detail["FileName"] + ".zip"
- else:
- file_name = file_detail["FileName"] # 文件名
-
- down_load_url = self.link_by_number(file_number, showlink=False)
- if type(down_load_url) == int:
- return
- self.download_from_url(down_load_url, file_name, download_path)
-
- def download_from_url(self, url, file_name, download_path="download"):
- if os.path.exists(download_path + "/" + file_name):
- if self.download_mode == 4:
- print("文件 " + file_name + "已跳过")
- return
- print("文件 " + file_name + " 已存在,是否要覆盖?")
- sure_download = input("输入1覆盖,2跳过,3全部覆盖,4全部跳过:")
- if sure_download == "2":
- return
- elif sure_download == "3":
- self.download_mode = 3
- elif sure_download == "4":
- self.download_mode = 4
- print("已跳过")
- return
- else:
- os.remove(download_path + "/" + file_name)
-
- if not os.path.exists(download_path):
- print("文件夹不存在,创建文件夹")
- os.makedirs(download_path)
- down = requests.get(url, stream=True, timeout=10)
-
- file_size = int(down.headers["Content-Length"]) # 文件大小
- content_size = int(file_size) # 文件总大小
- data_count = 0 # 当前已传输的大小
- if file_size > 1048576:
- size_print_download = str(round(file_size / 1048576, 2)) + "MB"
- else:
- size_print_download = str(round(file_size / 1024, 2)) + "KB"
- print(file_name + " " + size_print_download)
- time1 = time.time()
- time_temp = time1
- data_count_temp = 0
- # 以.123pan后缀下载,下载完成重命名,防止下载中断
- with open(download_path + "/" + file_name + ".123pan", "wb") as f:
- for i in down.iter_content(1024):
- f.write(i)
- done_block = int((data_count / content_size) * 50)
- data_count = data_count + len(i)
- # 实时进度条进度
- now_jd = (data_count / content_size) * 100
- # "%%" 表示%
- # 测速
- time1 = time.time()
- pass_time = time1 - time_temp
- if pass_time > 1:
- time_temp = time1
- pass_data = int(data_count) - int(data_count_temp)
- data_count_temp = data_count
- speed = pass_data / int(pass_time)
- speed_m = speed / 1048576
- if speed_m > 1:
- speed_print = str(round(speed_m, 2)) + "MB/S"
- else:
- speed_print = str(round(speed_m * 1024, 2)) + "KB/S"
- print(
- "\r [%s%s] %d%% %s"
- % (
- done_block * "█",
- " " * (50 - 1 - done_block),
- now_jd,
- speed_print,
- ),
- end="",
- )
- elif data_count == content_size:
- print("\r [%s%s] %d%% %s" % (50 * "█", "", 100, ""), end="")
- print("\nok")
-
- os.rename(download_path + "/" + file_name + ".123pan", download_path + "/" + file_name)
-
- def get_all_things(self, id):
- # print(id)
- # print(self.dir_list)
- self.dir_list.remove(id)
- all_list = self.get_dir_by_id(id, save=False)[1]
-
- for i in all_list:
- if i["Type"] == 0:
- self.file_list.append(i)
- else:
- self.dir_list.append(i["FileId"])
- self.name_dict[i["FileId"]] = i["FileName"]
-
- for i in self.dir_list:
- self.get_all_things(i)
-
- def download_dir(self, file_detail, download_path_root="download"):
- # file_detail = self.list[file_number]
- self.name_dict[file_detail["FileId"]] = file_detail["FileName"]
- if file_detail["Type"] != 1:
- print("不是文件夹")
- return
-
- all_list = self.get_dir_by_id(file_detail["FileId"], save=False, all=True, limit=100)[1]
- for i in all_list[::-1]:
- if i["Type"] == 0: # 直接开始下载
- AbsPath = i["AbsPath"]
- for key, value in self.name_dict.items():
- AbsPath = AbsPath.replace(str(key), value)
- download_path = download_path_root + AbsPath
- download_path = download_path.replace("/" + str(i["FileId"]), "")
- self.download_from_url(i["DownloadUrl"], i["FileName"], download_path)
-
- else:
- self.download_dir(i, download_path_root)
-
- def recycle(self):
- recycle_id = 0
- url = (
- "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0"
- "&orderBy=fileId&orderDirection=desc&parentFileId="
- + str(recycle_id)
- + "&trashed=true&&Page=1"
- )
- recycle_res = requests.get(url, headers=self.header_logined, timeout=10)
- json_recycle = recycle_res.json()
- recycle_list = json_recycle["data"]["InfoList"]
- self.recycle_list = recycle_list
-
- # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!!
- def delete_file(self, file, by_num=True, operation=True):
- # operation = 'true' 删除 , operation = 'false' 恢复
- if by_num:
- print(file)
- if not str(file).isdigit():
- print("请输入数字")
- return -1
- if 0 <= file < len(self.list):
- file_detail = self.list[file]
- else:
- print("不在合理范围内")
- return
- else:
- if file in self.list:
- file_detail = file
- else:
- print("文件不存在")
- return
- data_delete = {
- "driveId": 0,
- "fileTrashInfoList": file_detail,
- "operation": operation,
- }
- delete_res = requests.post(
- "https://www.123pan.com/a/api/file/trash",
- data=json.dumps(data_delete),
- headers=self.header_logined,
- timeout=10
- )
- dele_json = delete_res.json()
- print(dele_json)
- message = dele_json["message"]
- print(message)
-
- def share(self):
- file_id_list = ""
- share_name_list = []
- add = "1"
- while str(add) == "1":
- share_num = input("分享文件的编号:")
- num_test2 = share_num.isdigit()
- if num_test2:
- share_num = int(share_num)
- if 0 < share_num < len(self.list) + 1:
- share_id = self.list[int(share_num) - 1]["FileId"]
- share_name = self.list[int(share_num) - 1]["FileName"]
- share_name_list.append(share_name)
- print(share_name_list)
- file_id_list = file_id_list + str(share_id) + ","
- add = input("输入1添加文件,0发起分享,其他取消")
- else:
- print("请输入数字,,")
- add = "1"
- if str(add) == "0":
- share_pwd = input("提取码,不设留空:")
- file_id_list = file_id_list.strip(",")
- data = {
- "driveId": 0,
- "expiration": "2099-12-12T08:00:00+08:00",
- "fileIdList": file_id_list,
- "shareName": "My Share",
- "sharePwd": share_pwd,
- "event": "shareCreate"
- }
- share_res = requests.post(
- "https://www.123pan.com/a/api/share/create",
- headers=self.header_logined,
- data=json.dumps(data),
- timeout=10
- )
- share_res_json = share_res.json()
- if share_res_json["code"] != 0:
- print(share_res_json["message"])
- print("分享失败")
- return
- message = share_res_json["message"]
- print(message)
- share_key = share_res_json["data"]["ShareKey"]
- share_url = "https://www.123pan.com/s/" + share_key
- print("分享链接:\n" + share_url + "提取码:" + share_pwd)
- else:
- print("退出分享")
-
- def up_load(self, file_path):
- file_path = file_path.replace('"', "")
- file_path = file_path.replace("\\", "/")
- file_name = file_path.split("/")[-1]
- print("文件名:", file_name)
- if not os.path.exists(file_path):
- print("文件不存在,请检查路径是否正确")
- return
- if os.path.isdir(file_path):
- print("暂不支持文件夹上传")
- return
- fsize = os.path.getsize(file_path)
- with open(file_path, "rb") as f:
- md5 = hashlib.md5()
- while True:
- data = f.read(64 * 1024)
- if not data:
- break
- md5.update(data)
- readable_hash = md5.hexdigest()
-
- list_up_request = {
- "driveId": 0,
- "etag": readable_hash,
- "fileName": file_name,
- "parentFileId": self.parent_file_id,
- "size": fsize,
- "type": 0,
- "duplicate": 0,
- }
-
- # sign = getSign("/b/api/file/upload_request")
- up_res = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.header_logined,
- # params={sign[0]: sign[1]},
- data=list_up_request,
- timeout=10
- )
- up_res_json = up_res.json()
- res_code_up = up_res_json["code"]
- if res_code_up == 5060:
- sure_upload = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:")
- if sure_upload == "1":
- list_up_request["duplicate"] = 1
-
- elif sure_upload == "2":
- list_up_request["duplicate"] = 2
- else:
- print("取消上传")
- return
- # sign = getSign("/b/api/file/upload_request")
- up_res = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.header_logined,
- # params={sign[0]: sign[1]},
- data=json.dumps(list_up_request),
- timeout=10
- )
- up_res_json = up_res.json()
- res_code_up = up_res_json["code"]
- if res_code_up == 0:
- # print(upResJson)
- # print("上传请求成功")
- reuse = up_res_json["data"]["Reuse"]
- if reuse:
- print("上传成功,文件已MD5复用")
- return
- else:
- print(up_res_json)
- print("上传请求失败")
- return
-
- bucket = up_res_json["data"]["Bucket"]
- storage_node = up_res_json["data"]["StorageNode"]
- upload_key = up_res_json["data"]["Key"]
- upload_id = up_res_json["data"]["UploadId"]
- up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到
- print("上传文件的fileId:", up_file_id)
-
- # 获取已将上传的分块
- start_data = {
- "bucket": bucket,
- "key": upload_key,
- "uploadId": upload_id,
- "storageNode": storage_node,
- }
- start_res = requests.post(
- "https://www.123pan.com/b/api/file/s3_list_upload_parts",
- headers=self.header_logined,
- data=json.dumps(start_data),
- timeout=10
- )
- start_res_json = start_res.json()
- res_code_up = start_res_json["code"]
- if res_code_up == 0:
- # print(startResJson)
- pass
- else:
- print(start_data)
- print(start_res_json)
-
- print("获取传输列表失败")
- return
-
- # 分块,每一块取一次链接,依次上传
- block_size = 5242880
- with open(file_path, "rb") as f:
- part_number_start = 1
- put_size = 0
- while True:
- data = f.read(block_size)
-
- precent = round(put_size / fsize, 2)
- print("\r已上传:" + str(precent * 100) + "%", end="")
- put_size = put_size + len(data)
-
- if not data:
- break
- get_link_data = {
- "bucket": bucket,
- "key": upload_key,
- "partNumberEnd": part_number_start + 1,
- "partNumberStart": part_number_start,
- "uploadId": upload_id,
- "StorageNode": storage_node,
- }
-
- get_link_url = (
- "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch"
- )
- get_link_res = requests.post(
- get_link_url,
- headers=self.header_logined,
- data=json.dumps(get_link_data),
- timeout=10
- )
- get_link_res_json = get_link_res.json()
- res_code_up = get_link_res_json["code"]
- if res_code_up == 0:
- # print("获取链接成功")
- pass
- else:
- print("获取链接失败")
- # print(getLinkResJson)
- return
- # print(getLinkResJson)
- upload_url = get_link_res_json["data"]["presignedUrls"][
- str(part_number_start)
- ]
- # print("上传链接",uploadUrl)
- requests.put(upload_url, data=data, timeout=10)
- # print("put")
-
- part_number_start = part_number_start + 1
-
- print("\n处理中")
- # 完成标志
- # 1.获取已上传的块
- uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts"
- uploaded_comp_data = {
- "bucket": bucket,
- "key": upload_key,
- "uploadId": upload_id,
- "storageNode": storage_node,
- }
- # print(uploadedCompData)
- requests.post(
- uploaded_list_url,
- headers=self.header_logined,
- data=json.dumps(uploaded_comp_data),
- timeout=10
- )
- compmultipart_up_url = (
- "https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
- )
- requests.post(
- compmultipart_up_url,
- headers=self.header_logined,
- data=json.dumps(uploaded_comp_data),
- timeout=10
- )
-
- # 3.报告完成上传,关闭upload session
- if fsize > 64 * 1024 * 1024:
- time.sleep(3)
- close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete"
- close_up_session_data = {"fileId": up_file_id}
- # print(closeUpSessionData)
- close_up_session_res = requests.post(
- close_up_session_url,
- headers=self.header_logined,
- data=json.dumps(close_up_session_data),
- timeout=10
- )
- close_res_json = close_up_session_res.json()
- # print(closeResJson)
- res_code_up = close_res_json["code"]
- if res_code_up == 0:
- print("上传成功")
- else:
- print("上传失败")
- print(close_res_json)
- return
-
- # dirId 就是 fileNumber,从0开始,0为第一个文件,传入时需要减一 !!!(好像文件夹都排在前面)
- def cd(self, dir_num):
- if not dir_num.isdigit():
- if dir_num == "..":
- if len(self.parent_file_list) > 1:
- # 没有获取全部文件
- self.all_file = False
- # 重置文件页数
- self.file_page = 0
-
- self.parent_file_list.pop()
- self.parent_file_id = self.parent_file_list[-1]
- # 清空当前文件列表
- self.list = []
- self.parent_file_name_list.pop()
- self.get_dir()
- self.show()
- else:
- print("已经是根目录")
- return
- if dir_num == "/":
- # 没有获取全部文件
- self.all_file = False
- # 重置文件页数
- self.file_page = 0
-
- self.parent_file_id = 0
- self.parent_file_list = [0]
- # 清空当前文件列表
- self.list = []
- self.parent_file_name_list = []
- self.get_dir()
- self.show()
- return
- print("输入错误")
- return
- dir_num = int(dir_num) - 1
- if dir_num > (len(self.list) - 1) or dir_num < 0:
- print("输入错误")
- return
- if self.list[dir_num]["Type"] != 1:
- print("不是文件夹")
- return
-
- # 没有获取全部文件
- self.all_file = False
- # 重置文件页数
- self.file_page = 0
-
- self.parent_file_id = self.list[dir_num]["FileId"]
- self.parent_file_list.append(self.parent_file_id)
- self.parent_file_name_list.append(self.list[dir_num]["FileName"])
- # 清空当前文件列表
- self.list = []
- self.get_dir()
- self.show()
-
- def cdById(self, file_id):
- # 没有获取全部文件
- self.all_file = False
- # 重置文件页数
- self.file_page = 0
- # 清空当前文件列表
- self.list = []
- self.parent_file_id = file_id
- self.parent_file_list.append(self.parent_file_id)
- # self.parent_file_name_list.append(self.list[dir_num]["FileName"])
- # self.get_dir()
- self.get_dir()
- self.show()
-
- def read_ini(
- self,
- user_name,
- pass_word,
- input_pwd,
- authorization="",
- ):
- try:
- with open("123pan.txt", "r", encoding="utf-8") as f:
- text = f.read()
- text = json.loads(text)
- deviceType = text["deviceType"]
- osVersion = text["osVersion"]
- self.devicetype = deviceType
- self.osversion = osVersion
- # 读取设备信息
- user_name = text["userName"]
- pass_word = text["passWord"]
- authorization = text["authorization"]
-
- except: # FileNotFoundError or json.decoder.JSONDecodeError:
- print("获取配置失败,重新输入")
-
- if user_name == "" or pass_word == "":
- if input_pwd:
- user_name = input("userName:")
- pass_word = input("passWord:")
- authorization = ""
-
- else:
- raise Exception("禁止输入模式下,没有账号或密码")
-
- self.user_name = user_name
- self.password = pass_word
- self.authorization = authorization
-
- def mkdir(self, dirname, remakedir=False):
- if not remakedir:
- for i in self.list:
- if i["FileName"] == dirname:
- print("文件夹已存在")
- return i["FileId"]
-
- url = "https://www.123pan.com/a/api/file/upload_request"
- data_mk = {
- "driveId": 0,
- "etag": "",
- "fileName": dirname,
- "parentFileId": self.parent_file_id,
- "size": 0,
- "type": 1,
- "duplicate": 1,
- "NotReuse": True,
- "event": "newCreateFolder",
- "operateType": 1,
- }
- # sign = getSign("/a/api/file/upload_request")
- res_mk = requests.post(
- url,
- headers=self.header_logined,
- data=json.dumps(data_mk),
- # params={sign[0]: sign[1]},
- timeout=10
- )
- try:
- res_json = res_mk.json()
- # print(res_json)
- except json.decoder.JSONDecodeError:
- print("创建失败")
- print(res_mk.text)
- return
- code_mkdir = res_json["code"]
-
- if code_mkdir == 0:
- print("创建成功: ", res_json["data"]["FileId"])
- self.get_dir()
- return res_json["data"]["Info"]["FileId"]
- print(res_json)
- print("创建失败")
- return
-
-
-if __name__ == "__main__":
- # 用于解决windows下cmd颜色转义乱码问题
- if os.name == "nt":
- os.system("")
-
- pan = Pan123(readfile=True, input_pwd=True)
- pan.show()
- while True:
- # pan.get_all_things()
- command = input("\033[91m " + "/" + "/".join(pan.parent_file_name_list) + ">\033[0m")
- if command == "ls":
- pan.show()
- if command == "re":
- code = pan.get_dir()[0]
- if code == 0:
- print("刷新目录成功")
- pan.show()
- if command.isdigit():
- if int(command) > len(pan.list) or int(command) < 1:
- print("输入错误")
- continue
- if pan.list[int(command) - 1]["Type"] == 1:
- # pan.cdById(pan.list[int(command) - 1]["FileId"])
- pan.cd(command)
- else:
- size = pan.list[int(command) - 1]["Size"]
- if size > 1048576:
- size_print_show = str(round(size / 1048576, 2)) + "M"
- else:
- size_print_show = str(round(size / 1024, 2)) + "K"
- # print(pan.list[int(command) - 1])
- name = pan.list[int(command) - 1]["FileName"]
- print(name + " " + size_print_show)
- print("输入1开始下载: ", end="")
- sure = input()
- if sure == "1":
- pan.download(int(command) - 1)
- elif command[0:9] == "download ":
- if command[9:].isdigit():
- if int(command[9:]) > len(pan.list) or int(command[9:]) < 1:
- print("输入错误")
- continue
- if pan.list[int(command[9:]) - 1]["Type"] == 1:
- print(pan.list[int(command[9:]) - 1]["FileName"])
- print("打包下载没有客户端api,仍然使用网页接口,会有流量限制,请尽量使用遍历下载。")
- print("输入1遍历下载,输入2打包下载:", end="")
- sure = input()
- if sure == "2":
- pan.download(int(command[9:]) - 1)
- elif sure == "1":
- file_detail = pan.list[int(command[9:]) - 1]
- pan.download_dir(file_detail)
- # 重置下载模式
- pan.download_mode = 1
- else:
- pan.download(int(command[9:]) - 1)
-
- else:
- print("输入错误")
- elif command == "exit":
- break
- elif command == "log":
- pan.login()
- pan.get_dir()
- pan.show()
-
- elif command[0:5] == "link ":
- if command[5:].isdigit():
- if int(command[5:]) > len(pan.list) or int(command[5:]) < 1:
- print("输入错误")
- continue
- pan.link_by_number(int(command[5:]) - 1)
- else:
- print("输入错误")
- elif command == "upload":
- filepath = input("请输入文件路径:")
- pan.up_load(filepath)
- pan.get_dir()
- pan.show()
- elif command == "share":
- pan.share()
- elif command[0:6] == "delete":
- if command == "delete":
- print("请输入要删除的文件编号:", end="")
- fileNumber = input()
- else:
- if command[6] == " ":
- fileNumber = command[7:]
- else:
- print("输入错误")
- continue
- if fileNumber == "":
- print("请输入要删除的文件编号:", end="")
- fileNumber = input()
- else:
- fileNumber = fileNumber[0:]
- if fileNumber.isdigit():
- if int(fileNumber) > len(pan.list) or int(fileNumber) < 1:
- print("输入错误")
- continue
- pan.delete_file(int(fileNumber) - 1)
- pan.get_dir()
- pan.show()
- else:
- print("输入错误")
-
- elif command[:3] == "cd ":
- path = command[3:]
- pan.cd(path)
- elif command[0:5] == "mkdir":
- if command == "mkdir":
- newPath = input("请输入目录名:")
- else:
- newPath = command[6:]
- if newPath == "":
- newPath = input("请输入目录名:")
- else:
- newPath = newPath[0:]
- print(pan.mkdir(newPath))
- pan.get_dir()
- pan.show()
-
- elif command == "reload":
- pan.read_ini("", "", True)
- print("读取成功")
- pan.get_dir()
- pan.show()
- elif command == "more":
- # 继续加载文件
- pan.get_dir()
- pan.show()
diff --git a/pack.sh b/pack.sh
index 530ba8b..61eec49 100755
--- a/pack.sh
+++ b/pack.sh
@@ -1,2 +1 @@
-pyinstaller -F android.py
-pyinstaller -F web.py
\ No newline at end of file
+pyinstaller -F 123pan.py --icon icon.ico --clean --noconfirm
\ No newline at end of file
diff --git a/sign_py.py b/sign_py.py
index f10ac79..ba72bd5 100644
--- a/sign_py.py
+++ b/sign_py.py
@@ -1,3 +1,4 @@
+# 网页端加签算法,目前123pan已弃用
import time
import random
from datetime import datetime
diff --git a/web.py b/web.py
deleted file mode 100644
index 5fd7cc1..0000000
--- a/web.py
+++ /dev/null
@@ -1,824 +0,0 @@
-import re
-import time
-from sign_py import getSign
-import requests
-import hashlib
-import os
-import json
-import base64
-
-
-class Pan123:
- def __init__(
- self,
- readfile=True,
- user_name="",
- pass_word="",
- authorization="",
- input_pwd=True,
- ):
- self.recycle_list = None
- self.list = None
- if readfile:
- self.read_ini(user_name, pass_word, input_pwd, authorization)
- else:
- if user_name == "" or pass_word == "":
- print("读取已禁用,用户名或密码为空")
- if input_pwd:
- user_name = input("请输入用户名:")
- pass_word = input("请输入密码:")
- else:
- raise Exception("用户名或密码为空:读取禁用时,userName和passWord不能为空")
- self.user_name = user_name
- self.password = pass_word
- self.authorization = authorization
- self.header_only_usage = {
- "user-agent": "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/"
- "537.36 (KHTML, like Gecko) Chrome/109.0.0.0 "
- "Safari/537.36 Edg/109.0.1474.0",
- "app-version": "2",
- "platform": "web",
- }
- self.header_logined = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- "App-Version": "3",
- "Authorization": self.authorization,
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "LoginUuid": "z-uk_yT8HwR4raGX1gqGk",
- "Pragma": "no-cache",
- "Referer": "https://www.123pan.com/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/"
- "537.36 (KHTML, like Gecko) Chrome/119.0.0.0 "
- "Safari/537.36 Edg/119.0.0.0",
- "platform": "web",
- "sec-ch-ua": "^\\^Microsoft",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "^\\^Windows^^",
- }
- self.parent_file_id = 0 # 路径,文件夹的id,0为根目录
- self.parent_file_list = [0]
- res_code_getdir = self.get_dir()
- if res_code_getdir != 0:
- self.login()
- self.get_dir()
-
- def login(self):
- data = {"remember": True, "passport": self.user_name, "password": self.password}
- sign = getSign("/b/api/user/sign_in")
- login_res = requests.post(
- "https://www.123pan.com/b/api/user/sign_in",
- headers=self.header_only_usage,
- data=data,
- params={sign[0]: sign[1]}, timeout=10
- )
- res_sign = login_res.json()
- res_code_login = res_sign["code"]
- if res_code_login != 200:
- print("code = 1 Error:" + str(res_code_login))
- print(res_sign["message"])
- return res_code_login
- token = res_sign["data"]["token"]
- self.authorization = "Bearer " + token
- header_logined = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- "App-Version": "3",
- "Authorization": self.authorization,
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "LoginUuid": "z-uk_yT8HwR4raGX1gqGk",
- "Pragma": "no-cache",
- "Referer": "https://www.123pan.com/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/"
- "537.36 (KHTML, like"
- " Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
- "platform": "web",
- "sec-ch-ua": "^\\^Microsoft",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "^\\^Windows^^",
- }
- self.header_logined = header_logined
- # ret['cookie'] = cookie
- self.save_file()
- return res_code_login
-
- def save_file(self):
- with open("123pan.txt", "w",encoding="utf_8") as f:
- save_list = {
- "userName": self.user_name,
- "passWord": self.password,
- "authorization": self.authorization,
- }
-
- f.write(json.dumps(save_list))
- print("Save!")
-
- def get_dir(self):
- res_code_getdir = 0
- page = 1
- lists = []
- lenth_now = 0
- total = -1
- while lenth_now < total or total == -1:
- base_url = "https://www.123pan.com/b/api/file/list/new"
-
- # print(self.headerLogined)
- sign = getSign("/b/api/file/list/new")
- print(sign)
- params = {
- sign[0]: sign[1],
- "driveId": 0,
- "limit": 100,
- "next": 0,
- "orderBy": "file_id",
- "orderDirection": "desc",
- "parentFileId": str(self.parent_file_id),
- "trashed": False,
- "SearchData": "",
- "Page": str(page),
- "OnlyLookAbnormalFile": 0,
- }
-
- a = requests.get(base_url, headers=self.header_logined, params=params, timeout=10)
- # print(a.text)
- # print(a.headers)
- text = a.json()
- res_code_getdir = text["code"]
- if res_code_getdir != 0:
- print(a.text)
- print(a.headers)
- print("code = 2 Error:" + str(res_code_getdir))
- return res_code_getdir
- lists_page = text["data"]["InfoList"]
- lists += lists_page
- total = text["data"]["Total"]
- lenth_now += len(lists_page)
- page += 1
- file_num = 0
- for i in lists:
- i["FileNum"] = file_num
- file_num += 1
-
- self.list = lists
- return res_code_getdir
-
- def show(self):
- print("--------------------")
- for i in self.list:
- file_size = i["Size"]
- if file_size > 1048576:
- download_size_print = str(round(file_size / 1048576, 2)) + "M"
- else:
- download_size_print = str(round(file_size / 1024, 2)) + "K"
-
- if i["Type"] == 0:
- print(
- "\033[33m" + "编号:",
- self.list.index(i) + 1,
- "\033[0m \t\t" + download_size_print + "\t\t\033[36m",
- i["FileName"],
- "\033[0m",
- )
- elif i["Type"] == 1:
- print(
- "\033[35m" + "编号:",
- self.list.index(i) + 1,
- " \t\t\033[36m",
- i["FileName"],
- "\033[0m",
- )
-
- print("--------------------")
-
- # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!!
- def link(self, file_number, showlink=True):
- file_detail = self.list[file_number]
- type_detail = file_detail["Type"]
- if type_detail == 1:
- down_request_url = "https://www.123pan.com/a/api/file/batch_download_info"
- down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
-
- else:
- down_request_url = "https://www.123pan.com/a/api/file/download_info"
- down_request_data = {
- "driveId": 0,
- "etag": file_detail["Etag"],
- "fileId": file_detail["FileId"],
- "s3keyFlag": file_detail["S3KeyFlag"],
- "type": file_detail["Type"],
- "fileName": file_detail["FileName"],
- "size": file_detail["Size"],
- }
- # print(down_request_data)
-
- sign = getSign("/a/api/file/download_info")
-
- link_res = requests.post(
- down_request_url,
- headers=self.header_logined,
- params={sign[0]: sign[1]},
- data=down_request_data,
- timeout=10
- )
- # print(linkRes.text)
- res_code_download = link_res.json()["code"]
- if res_code_download != 0:
- print("code = 3 Error:" + str(res_code_download))
- # print(linkRes.json())
- return res_code_download
- download_link_base64 = link_res.json()["data"]["DownloadUrl"]
- base64_url = re.findall("params=(.*)&", download_link_base64)[0]
- # print(Base64Url)
- down_load_url = base64.b64decode(base64_url)
- down_load_url = down_load_url.decode("utf-8")
-
- next_to_get = requests.get(down_load_url,timeout=10).json()
- redirect_url = next_to_get["data"]["redirect_url"]
- if showlink:
- print(redirect_url)
-
- return redirect_url
-
- def download(self, file_number):
- file_detail = self.list[file_number]
- down_load_url = self.link(file_number, showlink=False)
- file_name = file_detail["FileName"] # 文件名
- if os.path.exists(file_name):
- print("文件 " + file_name + " 已存在,是否要覆盖?")
- sure_download = input("输入1覆盖,2取消:")
- if sure_download != "1":
- return
- down = requests.get(down_load_url, stream=True, timeout=10)
-
- file_size = int(down.headers["Content-Length"]) # 文件大小
- content_size = int(file_size) # 文件总大小
- data_count = 0 # 当前已传输的大小
- if file_size > 1048576:
- size_print_download = str(round(file_size / 1048576, 2)) + "M"
- else:
- size_print_download = str(round(file_size / 1024, 2)) + "K"
- print(file_name + " " + size_print_download)
- time1 = time.time()
- time_temp = time1
- data_count_temp = 0
- with open(file_name, "wb") as f:
- for i in down.iter_content(1024):
- f.write(i)
- done_block = int((data_count / content_size) * 50)
- data_count = data_count + len(i)
- # 实时进度条进度
- now_jd = (data_count / content_size) * 100
- # %% 表示%
- # 测速
- time1 = time.time()
- pass_time = time1 - time_temp
- if pass_time > 1:
- time_temp = time1
- pass_data = int(data_count) - int(data_count_temp)
- data_count_temp = data_count
- speed = pass_data / int(pass_time)
- speed_m = speed / 1048576
- if speed_m > 1:
- speed_print = str(round(speed_m, 2)) + "M/S"
- else:
- speed_print = str(round(speed_m * 1024, 2)) + "K/S"
- print(
- "\r [%s%s] %d%% %s"
- % (
- done_block * "█",
- " " * (50 - 1 - done_block),
- now_jd,
- speed_print,
- ),
- end="",
- )
- elif data_count == content_size:
- print("\r [%s%s] %d%% %s" % (50 * "█", "", 100, ""), end="")
- print("\nok")
-
- def recycle(self):
- recycle_id = 0
- url = (
- "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0"
- "&orderBy=fileId&orderDirection=desc&parentFileId="
- + str(recycle_id)
- + "&trashed=true&&Page=1"
- )
- recycle_res = requests.get(url, headers=self.header_logined, timeout=10)
- json_recycle = recycle_res.json()
- recycle_list = json_recycle["data"]["InfoList"]
- self.recycle_list = recycle_list
-
- # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!!
- def delete_file(self, file, by_num=True, operation=True):
- # operation = 'true' 删除 , operation = 'false' 恢复
- if by_num:
- print(file)
- if not str(file).isdigit():
- print("请输入数字")
- return -1
- if 0 <= file < len(self.list):
- file_detail = self.list[file]
- else:
- print("不在合理范围内")
- return
- else:
- if file in self.list:
- file_detail = file
- else:
- print("文件不存在")
- return
- data_delete = {
- "driveId": 0,
- "fileTrashInfoList": file_detail,
- "operation": operation,
- }
- delete_res = requests.post(
- "https://www.123pan.com/a/api/file/trash",
- data=json.dumps(data_delete),
- headers=self.header_logined,
- timeout=10
- )
- dele_json = delete_res.json()
- print(dele_json)
- message = dele_json["message"]
- print(message)
-
- def share(self):
- file_id_list = ""
- share_name_list = []
- add = "1"
- while str(add) == "1":
- share_num = input("分享文件的编号:")
- num_test2 = share_num.isdigit()
- if num_test2:
- share_num = int(share_num)
- if 0 < share_num < len(self.list) + 1:
- share_id = self.list[int(share_num) - 1]["FileId"]
- share_name = self.list[int(share_num) - 1]["FileName"]
- share_name_list.append(share_name)
- print(share_name_list)
- file_id_list = file_id_list + str(share_id) + ","
- add = input("输入1添加文件,0发起分享,其他取消")
- else:
- print("请输入数字,,")
- add = "1"
- if str(add) == "0":
- share_pwd = input("提取码,不设留空:")
- file_id_list = file_id_list.strip(",")
- data = {
- "driveId": 0,
- "expiration": "2024-02-09T11:42:45+08:00",
- "fileIdList": file_id_list,
- "shareName": "我的分享",
- "sharePwd": share_pwd,
- }
- share_res = requests.post(
- "https://www.123pan.com/a/api/share/create",
- headers=self.header_logined,
- data=json.dumps(data),
- timeout=10
- )
- share_res_json = share_res.json()
- message = share_res_json["message"]
- print(message)
- share_key = share_res_json["data"]["ShareKey"]
- share_url = "https://www.123pan.com/s/" + share_key
- print("分享链接:\n" + share_url + "提取码:" + share_pwd)
- else:
- print("退出分享")
-
- def up_load(self, file_path):
- file_path = file_path.replace('"', "")
- file_path = file_path.replace("\\", "/")
- file_name = file_path.split("/")[-1]
- print("文件名:", file_name)
- if not os.path.exists(file_path):
- print("文件不存在,请检查路径是否正确")
- return
- if os.path.isdir(file_path):
- print("暂不支持文件夹上传")
- return
- fsize = os.path.getsize(file_path)
- with open(file_path, "rb") as f:
- md5 = hashlib.md5()
- while True:
- data = f.read(64 * 1024)
- if not data:
- break
- md5.update(data)
- readable_hash = md5.hexdigest()
-
- list_up_request = {
- "driveId": 0,
- "etag": readable_hash,
- "fileName": file_name,
- "parentFileId": self.parent_file_id,
- "size": fsize,
- "type": 0,
- "duplicate": 0,
- }
-
- sign = getSign("/b/api/file/upload_request")
- up_res = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.header_logined,
- params={sign[0]: sign[1]},
- data=list_up_request,
- timeout=10
- )
- up_res_json = up_res.json()
- res_code_up = up_res_json["code"]
- if res_code_up == 5060:
- sure_upload = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:")
- if sure_upload == "1":
- list_up_request["duplicate"] = 1
-
- elif sure_upload == "2":
- list_up_request["duplicate"] = 2
- else:
- print("取消上传")
- return
- sign = getSign("/b/api/file/upload_request")
- up_res = requests.post(
- "https://www.123pan.com/b/api/file/upload_request",
- headers=self.header_logined,
- params={sign[0]: sign[1]},
- data=json.dumps(list_up_request),
- timeout=10
- )
- up_res_json = up_res.json()
- res_code_up = up_res_json["code"]
- if res_code_up == 0:
- # print(upResJson)
- # print("上传请求成功")
- reuse = up_res_json["data"]["Reuse"]
- if reuse:
- print("上传成功,文件已MD5复用")
- return
- else:
- print(up_res_json)
- print("上传请求失败")
- return
-
- bucket = up_res_json["data"]["Bucket"]
- storage_node = up_res_json["data"]["StorageNode"]
- upload_key = up_res_json["data"]["Key"]
- upload_id = up_res_json["data"]["UploadId"]
- up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到
- print("上传文件的fileId:", up_file_id)
-
- # 获取已将上传的分块
- start_data = {
- "bucket": bucket,
- "key": upload_key,
- "uploadId": upload_id,
- "storageNode": storage_node,
- }
- start_res = requests.post(
- "https://www.123pan.com/b/api/file/s3_list_upload_parts",
- headers=self.header_logined,
- data=json.dumps(start_data),
- timeout=10
- )
- start_res_json = start_res.json()
- res_code_up = start_res_json["code"]
- if res_code_up == 0:
- # print(startResJson)
- pass
- else:
- print(start_data)
- print(start_res_json)
-
- print("获取传输列表失败")
- return
-
- # 分块,每一块取一次链接,依次上传
- block_size = 5242880
- with open(file_path, "rb") as f:
- part_number_start = 1
- put_size = 0
- while True:
- data = f.read(block_size)
-
- precent = round(put_size / fsize, 2)
- print("\r已上传:" + str(precent * 100) + "%", end="")
- put_size = put_size + len(data)
-
- if not data:
- break
- get_link_data = {
- "bucket": bucket,
- "key": upload_key,
- "partNumberEnd": part_number_start + 1,
- "partNumberStart": part_number_start,
- "uploadId": upload_id,
- "StorageNode": storage_node,
- }
-
- get_link_url = (
- "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch"
- )
- get_link_res = requests.post(
- get_link_url,
- headers=self.header_logined,
- data=json.dumps(get_link_data),
- timeout=10
- )
- get_link_res_json = get_link_res.json()
- res_code_up = get_link_res_json["code"]
- if res_code_up == 0:
- # print("获取链接成功")
- pass
- else:
- print("获取链接失败")
- # print(getLinkResJson)
- return
- # print(getLinkResJson)
- upload_url = get_link_res_json["data"]["presignedUrls"][
- str(part_number_start)
- ]
- # print("上传链接",uploadUrl)
- requests.put(upload_url, data=data, timeout=10)
- # print("put")
-
- part_number_start = part_number_start + 1
-
- print("\n处理中")
- # 完成标志
- # 1.获取已上传的块
- uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts"
- uploaded_comp_data = {
- "bucket": bucket,
- "key": upload_key,
- "uploadId": upload_id,
- "storageNode": storage_node,
- }
- # print(uploadedCompData)
- requests.post(
- uploaded_list_url,
- headers=self.header_logined,
- data=json.dumps(uploaded_comp_data),
- timeout=10
- )
- compmultipart_up_url = (
- "https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
- )
- requests.post(
- compmultipart_up_url,
- headers=self.header_logined,
- data=json.dumps(uploaded_comp_data),
- timeout=10
- )
-
- # 3.报告完成上传,关闭upload session
- if fsize > 64 * 1024 * 1024:
- time.sleep(3)
- close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete"
- close_up_session_data = {"fileId": up_file_id}
- # print(closeUpSessionData)
- close_up_session_res = requests.post(
- close_up_session_url,
- headers=self.header_logined,
- data=json.dumps(close_up_session_data),
- timeout=10
- )
- close_res_json = close_up_session_res.json()
- # print(closeResJson)
- res_code_up = close_res_json["code"]
- if res_code_up == 0:
- print("上传成功")
- else:
- print("上传失败")
- print(close_res_json)
- return
-
- # dirId 就是 fileNumber,从0开始,0为第一个文件,传入时需要减一 !!!(好像文件夹都排在前面)
- def cd(self, dir_num):
- if not dir_num.isdigit():
- if dir_num == "..":
- if len(self.parent_file_list) > 1:
- self.parent_file_list.pop()
- self.parent_file_id = self.parent_file_list[-1]
- self.get_dir()
- self.show()
- else:
- print("已经是根目录")
- return
- if dir_num == "/":
- self.parent_file_id = 0
- self.parent_file_list = [0]
- self.get_dir()
- self.show()
- return
- print("输入错误")
- return
- dir_num = int(dir_num) - 1
- if dir_num >= (len(self.list) - 1) or dir_num < 0:
- print("输入错误")
- return
- if self.list[dir_num]["Type"] != 1:
- print("不是文件夹")
- return
- self.parent_file_id = self.list[dir_num]["FileId"]
- self.parent_file_list.append(self.parent_file_id)
- self.get_dir()
- self.show()
-
- def cdById(self, file_id):
- self.parent_file_id = file_id
- self.parent_file_list.append(self.parent_file_id)
- self.get_dir()
- self.get_dir()
- self.show()
-
- def read_ini(
- self,
- user_name,
- pass_word,
- input_pwd,
- authorization="",
- ):
- try:
- with open("123pan.txt", "r",encoding="utf-8") as f:
- text = f.read()
- text = json.loads(text)
- user_name = text["userName"]
- pass_word = text["passWord"]
- authorization = text["authorization"]
-
- except:
- print("获取配置失败,重新登录")
-
- if user_name == "" or pass_word == "":
- if input_pwd:
- user_name = input("userName:")
- pass_word = input("passWord:")
- authorization = ""
- else:
- raise Exception("禁止输入模式下,没有账号或密码")
-
- self.user_name = user_name
- self.password = pass_word
- self.authorization = authorization
-
- def mkdir(self, dirname, remakedir=False):
- if not remakedir:
- for i in self.list:
- if i["FileName"] == dirname:
- print("文件夹已存在")
- return i["FileId"]
-
- url = "https://www.123pan.com/a/api/file/upload_request"
- data_mk = {
- "driveId": 0,
- "etag": "",
- "fileName": dirname,
- "parentFileId": self.parent_file_id,
- "size": 0,
- "type": 1,
- "duplicate": 1,
- "NotReuse": True,
- "event": "newCreateFolder",
- "operateType": 1,
- }
- sign = getSign("/a/api/file/upload_request")
- res_mk = requests.post(
- url,
- headers=self.header_logined,
- data=json.dumps(data_mk),
- params={sign[0]: sign[1]},
- timeout=10
- )
- try:
- res_json = res_mk.json()
- print(res_json)
- except json.decoder.JSONDecodeError:
- print("创建失败")
- print(res_mk.text)
- return
- code_mkdir = res_json["code"]
-
- if code_mkdir == 0:
- print("创建成功: ", res_json["data"]["FileId"])
- self.get_dir()
- return res_json["data"]["Info"]["FileId"]
- print("创建失败")
- print(res_json)
- return
-
-
-if __name__ == "__main__":
- print("web协议将废弃,请使用android协议")
- pan = Pan123(readfile=True, input_pwd=True)
- pan.show()
- while True:
- command = input("\033[91m >\033[0m")
- if command == "ls":
- pan.show()
- if command == "re":
- code = pan.get_dir()
- if code == 0:
- print("刷新目录成功")
- pan.show()
- if command.isdigit():
- if int(command) > len(pan.list) or int(command) < 1:
- print("输入错误")
- continue
- if pan.list[int(command) - 1]["Type"] == 1:
- pan.cdById(pan.list[int(command) - 1]["FileId"])
- else:
- size = pan.list[int(command) - 1]["Size"]
- if size > 1048576:
- size_print_show = str(round(size / 1048576, 2)) + "M"
- else:
- size_print_show = str(round(size / 1024, 2)) + "K"
- # print(pan.list[int(command) - 1])
- name = pan.list[int(command) - 1]["FileName"]
- print(name + " " + size_print_show)
- print("press 1 to download now: ", end="")
- sure = input()
- if sure == "1":
- pan.download(int(command) - 1)
- elif command[0:9] == "download ":
- if command[9:].isdigit():
- if int(command[9:]) > len(pan.list) or int(command[9:]) < 1:
- print("输入错误")
- continue
- pan.download(int(command[9:]) - 1)
- else:
- print("输入错误")
- elif command == "exit":
- break
- elif command == "log":
- pan.login()
- pan.get_dir()
- pan.show()
-
- elif command[0:5] == "link ":
- if command[5:].isdigit():
- if int(command[5:]) > len(pan.list) or int(command[5:]) < 1:
- print("输入错误")
- continue
- pan.link(int(command[5:]) - 1)
- else:
- print("输入错误")
- elif command == "upload":
- filepath = input("请输入文件路径:")
- pan.up_load(filepath)
- pan.get_dir()
- pan.show()
- elif command == "share":
- pan.share()
- elif command[0:6] == "delete":
- if command == "delete":
- print("请输入要删除的文件编号:", end="")
- fileNumber = input()
- else:
- if command[6] == " ":
- fileNumber = command[7:]
- else:
- print("输入错误")
- continue
- if fileNumber == "":
- print("请输入要删除的文件编号:", end="")
- fileNumber = input()
- else:
- fileNumber = fileNumber[0:]
- if fileNumber.isdigit():
- if int(fileNumber) > len(pan.list) or int(fileNumber) < 1:
- print("输入错误")
- continue
- pan.delete_file(int(fileNumber) - 1)
- pan.get_dir()
- pan.show()
- else:
- print("输入错误")
-
- elif command[:3] == "cd ":
- path = command[3:]
- pan.cd(path)
- elif command[0:5] == "mkdir":
- if command == "mkdir":
- newPath = input("请输入目录名:")
- else:
- newPath = command[6:]
- if newPath == "":
- newPath = input("请输入目录名:")
- else:
- newPath = newPath[0:]
- print(pan.mkdir(newPath))
-
- elif command == "reload":
- pan.read_ini("", "", True)
- print("读取成功")
- pan.get_dir()
- pan.show()