""" 123pan 控制台交互界面 —— 仅负责用户 IO,所有业务调用 Pan123Core。 """ import json import os import sys from typing import Dict from pan123_core import Pan123Core, Pan123Tool, Pan123EventType, format_size # ──────────────── 颜色工具 ──────────────── class Color: """ANSI 颜色常量""" RESET = "\033[0m" RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" PURPLE = "\033[35m" CYAN = "\033[96m" def colored(text: str, color: str) -> str: return f"{color}{text}{Color.RESET}" # ──────────────── CLI 类 ──────────────── class Pan123CLI: """控制台交互界面""" HELP_TEXT = """可用命令: ls - 显示当前目录 cd [编号|..|/] - 切换目录 mkdir [名称] - 创建目录 upload [路径] - 上传文件 rm [编号] - 删除文件 share [编号 ...] - 创建分享 link [编号] - 获取文件直链 download/d [编号] - 下载文件 recycle - 管理回收站 refresh/re - 刷新目录 reload - 重新加载配置并刷新 login - 登录 logout - 登出并清除 token clearaccount - 清除已登录账号(包括用户名和密码) more - 继续加载更多文件 protocol [android|web] - 切换协议 exit - 退出程序""" def __init__(self, config_file: str = "123pan_config.json"): self.config_file: str = config_file self.core = Pan123Core() self.tool = Pan123Tool(self.core) self._download_mode: int = 0 # 0=询问, 3=全部覆盖, 4=全部跳过 # ──────────────── 启动 ──────────────── def run(self) -> None: """主入口""" # Windows cmd 颜色支持 if os.name == "nt": os.system("") self._print_banner() if not self._init_login(): print(colored("无法登录", Color.RED)) a = input("输入1重新输入账号和密码,输入2清除登录信息,其他键退出: ") if a == "1": user_name = input("请输入用户名: ") password = input("请输入密码: ") if not user_name or not password: print("用户名和密码不能为空,程序退出") return self.core.load_config({ "userName": user_name, "passWord": password, "authorization": "" }) self.save_config() return self.run() if a == "2": self._do_clear_account() return self.run() return self.save_config() self.core.refresh() # 加载文件列表 self.core.get_user_info() self._show_files() while True: try: prompt = colored(f"{self.core.cwd_path}>", Color.RED) + " " print(colored(f'用户:{self.core.nick_name}', Color.GREEN)) command = input(prompt).strip() if not command: continue self._dispatch(command) except KeyboardInterrupt: print("\n操作已取消") except EOFError: break except Exception as e: print(colored(f"发生错误: {e}", Color.RED)) # ──────────────── 初始化 ──────────────── def _print_banner(self) -> None: print("=" * 60) print("123网盘CLI客户端".center(56)) print("=" * 60) def _init_login(self) -> bool: """尝试加载配置 -> 尝试访问目录 -> 必要时登录""" res = self.load_config() r = self.core.init_login_state() if r["code"] < 0: print(colored("登录失败", Color.YELLOW)) print(r["message"]) return False return True def load_config(self) -> Dict: """加载配置""" try: with open(self.config_file, "r", encoding="utf-8") as f: cfg = json.load(f) except FileNotFoundError: user_name = input("请输入用户名: ") password = input("请输入密码: ") cfg = { "userName": user_name, "passWord": password, "authorization": "" } return self.core.load_config(cfg) def save_config(self) -> None: """保存配置""" cfg = self.core.get_current_config() with open(self.config_file, "w", encoding="utf-8") as f: json.dump(cfg, f, indent=2, ensure_ascii=False) # ──────────────── 命令分发 ──────────────── def _dispatch(self, command: str) -> None: parts = command.split(maxsplit=1) cmd = parts[0].lower() arg = parts[1].strip() if len(parts) > 1 else "" handler = { "ls": lambda: self._show_files(), "login": lambda: self._do_login(), "logout": lambda: self._do_logout(), "clearaccount": lambda: self._do_clear_account(), "exit": lambda: sys.exit(0), "cd": lambda: self._do_cd(arg), "mkdir": lambda: self._do_mkdir(arg), "upload": lambda: self._do_upload(arg), "rm": lambda: self._do_rm(arg), "share": lambda: self._do_share(arg), "more": lambda: self._do_more(), "link": lambda: self._do_link(arg), "download": lambda: self._do_download(arg), "d": lambda: self._do_download(arg), "recycle": lambda: self._do_recycle(), "refresh": lambda: self._do_refresh(), "re": lambda: self._do_refresh(), "reload": lambda: self._do_reload(), "protocol": lambda: self._do_protocol(arg), "help": lambda: print(self.HELP_TEXT), }.get(cmd) if handler: handler() elif cmd.isdigit(): self._do_select(int(cmd)) else: print(self.HELP_TEXT) # ──────────────── 显示 ──────────────── def _show_files(self) -> None: items = self.core.file_list if not items: print("当前目录为空") return print() print("=" * 60) print(f"当前路径: {self.core.cwd_path}") print("-" * 60) print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}") print("-" * 60) for idx, item in enumerate(items, 1): is_dir = item["Type"] == 1 type_str = "文件夹" if is_dir else "文件" size_str = format_size(item["Size"]) color = Color.PURPLE if is_dir else Color.YELLOW print(colored(f"{idx:<6}{type_str:<8}{size_str:<12}{item['FileName']}", color)) if not self.core.all_loaded: remaining = self.core.file_total - len(items) print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载") print("=" * 60 + "\n") def _print_result(self, r: dict) -> None: """打印 Result 消息""" if r["code"] == 0: print(colored(r["message"], Color.GREEN)) else: print(colored(f"[错误 {r['code']}] {r['message']}", Color.RED)) # ──────────────── 命令实现 ──────────────── def _do_login(self) -> None: if not self.core.user_name: self.core.user_name = input("请输入用户名: ") if not self.core.password: self.core.password = input("请输入密码: ") r = self.core.login() self._print_result(r) if r["code"] == 0: self._do_refresh() def _do_logout(self) -> None: r = self.core.logout() self.save_config() self._print_result(r) def _do_clear_account(self) -> None: """清除已登录账号:清除用户名、密码、token 等信息""" confirm = input("确定要清除已登录账号信息吗?(y/N): ").strip().lower() if confirm == 'y': r = self.core.clear_account() self.save_config() self._print_result(r) else: print("操作已取消") def _do_cd(self, arg: str) -> None: if arg == "..": r = self.core.cd_up() elif arg == "/": r = self.core.cd_root() elif arg.isdigit(): r = self.core.cd(int(arg) - 1) else: print("用法: cd [编号|..|/]") return if r["code"] != 0: self._print_result(r) else: self._show_files() def _do_mkdir(self, name: str) -> None: if not name: name = input("请输入目录名: ") r = self.core.mkdir(name) self._print_result(r) if r["code"] == 0: self._do_refresh() def _do_upload(self, path: str) -> None: if not path: path = input("请输入文件路径: ") r = self.core.upload_file(path, on_progress=self._upload_progress) if r["code"] == 5060: choice = input("检测到同名文件,输入 1 覆盖,2 保留两者,其他取消: ") if choice == "1": r = self.core.upload_file(path, duplicate=1, on_progress=self._upload_progress) elif choice == "2": r = self.core.upload_file(path, duplicate=2, on_progress=self._upload_progress) else: print("上传取消") return print() # 换行 self._print_result(r) if r["code"] == 0: self._do_refresh() def _do_rm(self, arg: str) -> None: if not arg.isdigit(): print("请提供文件编号") return r = self.core.trash_by_index(int(arg) - 1) self._print_result(r) if r["code"] == 0: self._do_refresh() def _do_share(self, arg: str) -> None: indices = [int(x) - 1 for x in arg.split() if x.isdigit()] if not indices: print("请提供文件编号") return # 显示待分享的文件 names = [self.core.file_list[i]["FileName"] for i in indices if 0 <= i < len(self.core.file_list)] print("分享文件:", ", ".join(names)) pwd = input("输入提取码(留空跳过): ").strip() r = self.core.share_by_indices(indices, pwd) self._print_result(r) if r["code"] == 0: print(f"链接: {r['data']['share_url']}") if r["data"]["share_pwd"]: print(f"提取码: {r['data']['share_pwd']}") def _do_more(self) -> None: r = self.core.load_more() if r["code"] != 0: self._print_result(r) else: self._show_files() def _do_link(self, arg: str) -> None: if not arg.isdigit(): print("请提供文件编号") return r = self.core.get_download_url(int(arg) - 1) if r["code"] == 0: print(f"文件直链: \n{r['data']['url']}") else: self._print_result(r) def _do_download(self, arg: str) -> None: if not arg.isdigit(): print("请提供文件编号") return idx = int(arg) - 1 if not (0 <= idx < len(self.core.file_list)): print("无效的文件编号") return item = self.core.file_list[idx] print(f"开始下载: {item['FileName']}") overwrite = self._download_mode == 3 skip = self._download_mode == 4 r = self.tool.download_file( idx, on_progress=self._download_progress, overwrite=overwrite, skip_existing=skip, ) # 冲突处理 if r["code"] == 1 and r.get("data", {}).get("conflict"): print(f"文件已存在: {item['FileName']}") choice = input("输入 1 覆盖,2 跳过,3 全部覆盖,4 全部跳过: ").strip() if choice == "4": self._download_mode = 4 print("跳过下载") return elif choice == "2": print("跳过下载") return elif choice == "3": self._download_mode = 3 r = self.tool.download_file(idx, on_progress=self._download_progress, overwrite=True) print() # 换行 self._print_result(r) def _do_recycle(self) -> None: r = self.core.list_recycle() if r["code"] != 0: self._print_result(r) return items = r["data"] if not items: print("回收站为空") return print("\n回收站内容:") for i, item in enumerate(items, 1): print(f" {i}. {item['FileName']} ({format_size(item['Size'])})") action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip() if action.isdigit(): idx = int(action) - 1 if 0 <= idx < len(items): rr = self.core.restore(items[idx]["FileId"]) self._print_result(rr) else: print("无效编号") elif action == "clear": for item in items: self.core.trash(item, delete=True) print("回收站已清空") self._do_refresh() def _do_refresh(self) -> None: self._download_mode = 0 r = self.core.refresh() if r["code"] != 0: self._print_result(r) else: self._show_files() def _do_reload(self) -> None: r = self.core.load_config() self._print_result(r) self._do_refresh() def _do_protocol(self, arg: str) -> None: if arg.lower() not in ("android", "web"): print("请指定协议: android 或 web") return r = self.core.set_protocol(arg.lower()) self._print_result(r) if r["code"] == 0: self._do_refresh() def _do_select(self, num: int) -> None: """数字选择:文件夹进入,文件下载""" idx = num - 1 if not (0 <= idx < len(self.core.file_list)): print("无效的文件编号") return if self.core.file_list[idx]["Type"] == 1: self._do_cd(str(num)) else: self._do_download(str(num)) # ──────────────── 进度回调 ──────────────── @staticmethod def _download_progress(data) -> None: if data.get("type") == Pan123EventType.DOWNLOAD_PROGRESS: downloaded = data.get("downloaded", 0) total = data.get("total", 0) speed = data.get("speed", 0) if total > 0: pct = downloaded / total * 100 print( f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s", end=" ", flush=True, ) elif data.get("type") == Pan123EventType.DOWNLOAD_START_FILE: print(f"开始下载: {data.get('file_name', '未知文件')} ({format_size(data.get('file_size', 0))})") elif data.get("type") == Pan123EventType.DOWNLOAD_START_DIRECTORY: print(f"开始下载目录: {data.get('dir_name', '未知目录')}") else: print(json.dumps(data, indent=2)) @staticmethod def _upload_progress(data) -> None: uploaded = data.get("uploaded", 0) total = data.get("total", 0) if total > 0: pct = uploaded / total * 100 print(f"\r上传进度: {pct:.1f}%", end="", flush=True) # ──────────────── 入口 ──────────────── if __name__ == "__main__": Pan123CLI().run()