mirror of
https://github.com/Bao-qing/123pan.git
synced 2026-02-12 04:25:39 +08:00
456 lines
16 KiB
Python
456 lines
16 KiB
Python
"""
|
||
123pan 控制台交互界面 —— 仅负责用户 IO,所有业务调用 Pan123Core。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
from typing import Dict
|
||
|
||
from pan123_core import Pan123Core, Pan123Tool, Pan123EventType, format_size
|
||
|
||
|
||
# ──────────────── 颜色工具 ────────────────
|
||
|
||
class Color:
|
||
"""ANSI 颜色常量"""
|
||
RESET = "\033[0m"
|
||
RED = "\033[91m"
|
||
GREEN = "\033[92m"
|
||
YELLOW = "\033[93m"
|
||
PURPLE = "\033[35m"
|
||
CYAN = "\033[96m"
|
||
|
||
|
||
def colored(text: str, color: str) -> str:
|
||
return f"{color}{text}{Color.RESET}"
|
||
|
||
|
||
# ──────────────── CLI 类 ────────────────
|
||
|
||
class Pan123CLI:
|
||
"""控制台交互界面"""
|
||
|
||
HELP_TEXT = """可用命令:
|
||
ls - 显示当前目录
|
||
cd [编号|..|/] - 切换目录
|
||
mkdir [名称] - 创建目录
|
||
upload [路径] - 上传文件
|
||
rm [编号] - 删除文件
|
||
share [编号 ...] - 创建分享
|
||
link [编号] - 获取文件直链
|
||
download/d [编号] - 下载文件
|
||
recycle - 管理回收站
|
||
refresh/re - 刷新目录
|
||
reload - 重新加载配置并刷新
|
||
login - 登录
|
||
logout - 登出并清除 token
|
||
clearaccount - 清除已登录账号(包括用户名和密码)
|
||
more - 继续加载更多文件
|
||
protocol [android|web] - 切换协议
|
||
exit - 退出程序"""
|
||
|
||
def __init__(self, config_file: str = "123pan_config.json"):
|
||
self.config_file: str = config_file
|
||
self.core = Pan123Core()
|
||
self.tool = Pan123Tool(self.core)
|
||
self._download_mode: int = 0 # 0=询问, 3=全部覆盖, 4=全部跳过
|
||
|
||
# ──────────────── 启动 ────────────────
|
||
|
||
def run(self) -> None:
|
||
"""主入口"""
|
||
# Windows cmd 颜色支持
|
||
if os.name == "nt":
|
||
os.system("")
|
||
|
||
self._print_banner()
|
||
if not self._init_login():
|
||
print(colored("无法登录", Color.RED))
|
||
a = input("输入1重新输入账号和密码,输入2清除登录信息,其他键退出: ")
|
||
if a == "1":
|
||
user_name = input("请输入用户名: ")
|
||
password = input("请输入密码: ")
|
||
if not user_name or not password:
|
||
print("用户名和密码不能为空,程序退出")
|
||
return
|
||
self.core.load_config({
|
||
"userName": user_name,
|
||
"passWord": password,
|
||
"authorization": ""
|
||
})
|
||
self.save_config()
|
||
return self.run()
|
||
if a == "2":
|
||
self._do_clear_account()
|
||
return self.run()
|
||
return
|
||
|
||
self.save_config()
|
||
self.core.refresh() # 加载文件列表
|
||
self.core.get_user_info()
|
||
self._show_files()
|
||
|
||
while True:
|
||
try:
|
||
prompt = colored(f"{self.core.cwd_path}>", Color.RED) + " "
|
||
print(colored(f'用户:{self.core.nick_name}', Color.GREEN))
|
||
command = input(prompt).strip()
|
||
if not command:
|
||
continue
|
||
self._dispatch(command)
|
||
except KeyboardInterrupt:
|
||
print("\n操作已取消")
|
||
except EOFError:
|
||
break
|
||
except Exception as e:
|
||
print(colored(f"发生错误: {e}", Color.RED))
|
||
|
||
# ──────────────── 初始化 ────────────────
|
||
|
||
def _print_banner(self) -> None:
|
||
print("=" * 60)
|
||
print("123网盘CLI客户端".center(56))
|
||
print("=" * 60)
|
||
|
||
def _init_login(self) -> bool:
|
||
"""尝试加载配置 -> 尝试访问目录 -> 必要时登录"""
|
||
res = self.load_config()
|
||
r = self.core.init_login_state()
|
||
if r["code"] < 0:
|
||
print(colored("登录失败", Color.YELLOW))
|
||
print(r["message"])
|
||
return False
|
||
return True
|
||
|
||
def load_config(self) -> Dict:
|
||
"""加载配置"""
|
||
try:
|
||
with open(self.config_file, "r", encoding="utf-8") as f:
|
||
cfg = json.load(f)
|
||
except FileNotFoundError:
|
||
user_name = input("请输入用户名: ")
|
||
password = input("请输入密码: ")
|
||
cfg = {
|
||
"userName": user_name,
|
||
"passWord": password,
|
||
"authorization": ""
|
||
}
|
||
return self.core.load_config(cfg)
|
||
|
||
def save_config(self) -> None:
|
||
"""保存配置"""
|
||
cfg = self.core.get_current_config()
|
||
with open(self.config_file, "w", encoding="utf-8") as f:
|
||
json.dump(cfg, f, indent=2, ensure_ascii=False)
|
||
|
||
# ──────────────── 命令分发 ────────────────
|
||
|
||
def _dispatch(self, command: str) -> None:
|
||
parts = command.split(maxsplit=1)
|
||
cmd = parts[0].lower()
|
||
arg = parts[1].strip() if len(parts) > 1 else ""
|
||
|
||
handler = {
|
||
"ls": lambda: self._show_files(),
|
||
"login": lambda: self._do_login(),
|
||
"logout": lambda: self._do_logout(),
|
||
"clearaccount": lambda: self._do_clear_account(),
|
||
"exit": lambda: sys.exit(0),
|
||
"cd": lambda: self._do_cd(arg),
|
||
"mkdir": lambda: self._do_mkdir(arg),
|
||
"upload": lambda: self._do_upload(arg),
|
||
"rm": lambda: self._do_rm(arg),
|
||
"share": lambda: self._do_share(arg),
|
||
"more": lambda: self._do_more(),
|
||
"link": lambda: self._do_link(arg),
|
||
"download": lambda: self._do_download(arg),
|
||
"d": lambda: self._do_download(arg),
|
||
"recycle": lambda: self._do_recycle(),
|
||
"refresh": lambda: self._do_refresh(),
|
||
"re": lambda: self._do_refresh(),
|
||
"reload": lambda: self._do_reload(),
|
||
"protocol": lambda: self._do_protocol(arg),
|
||
"help": lambda: print(self.HELP_TEXT),
|
||
}.get(cmd)
|
||
|
||
if handler:
|
||
handler()
|
||
elif cmd.isdigit():
|
||
self._do_select(int(cmd))
|
||
else:
|
||
print(self.HELP_TEXT)
|
||
|
||
# ──────────────── 显示 ────────────────
|
||
|
||
def _show_files(self) -> None:
|
||
items = self.core.file_list
|
||
if not items:
|
||
print("当前目录为空")
|
||
return
|
||
print()
|
||
print("=" * 60)
|
||
print(f"当前路径: {self.core.cwd_path}")
|
||
print("-" * 60)
|
||
print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
|
||
print("-" * 60)
|
||
for idx, item in enumerate(items, 1):
|
||
is_dir = item["Type"] == 1
|
||
type_str = "文件夹" if is_dir else "文件"
|
||
size_str = format_size(item["Size"])
|
||
color = Color.PURPLE if is_dir else Color.YELLOW
|
||
print(colored(f"{idx:<6}{type_str:<8}{size_str:<12}{item['FileName']}", color))
|
||
if not self.core.all_loaded:
|
||
remaining = self.core.file_total - len(items)
|
||
print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
|
||
print("=" * 60 + "\n")
|
||
|
||
def _print_result(self, r: dict) -> None:
|
||
"""打印 Result 消息"""
|
||
if r["code"] == 0:
|
||
print(colored(r["message"], Color.GREEN))
|
||
else:
|
||
print(colored(f"[错误 {r['code']}] {r['message']}", Color.RED))
|
||
|
||
# ──────────────── 命令实现 ────────────────
|
||
|
||
def _do_login(self) -> None:
|
||
if not self.core.user_name:
|
||
self.core.user_name = input("请输入用户名: ")
|
||
if not self.core.password:
|
||
self.core.password = input("请输入密码: ")
|
||
r = self.core.login()
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
self._do_refresh()
|
||
|
||
def _do_logout(self) -> None:
|
||
r = self.core.logout()
|
||
self.save_config()
|
||
self._print_result(r)
|
||
|
||
def _do_clear_account(self) -> None:
|
||
"""清除已登录账号:清除用户名、密码、token 等信息"""
|
||
confirm = input("确定要清除已登录账号信息吗?(y/N): ").strip().lower()
|
||
if confirm == 'y':
|
||
r = self.core.clear_account()
|
||
self.save_config()
|
||
self._print_result(r)
|
||
else:
|
||
print("操作已取消")
|
||
|
||
def _do_cd(self, arg: str) -> None:
|
||
if arg == "..":
|
||
r = self.core.cd_up()
|
||
elif arg == "/":
|
||
r = self.core.cd_root()
|
||
elif arg.isdigit():
|
||
r = self.core.cd(int(arg) - 1)
|
||
else:
|
||
print("用法: cd [编号|..|/]")
|
||
return
|
||
if r["code"] != 0:
|
||
self._print_result(r)
|
||
else:
|
||
self._show_files()
|
||
|
||
def _do_mkdir(self, name: str) -> None:
|
||
if not name:
|
||
name = input("请输入目录名: ")
|
||
r = self.core.mkdir(name)
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
self._do_refresh()
|
||
|
||
def _do_upload(self, path: str) -> None:
|
||
if not path:
|
||
path = input("请输入文件路径: ")
|
||
r = self.core.upload_file(path, on_progress=self._upload_progress)
|
||
if r["code"] == 5060:
|
||
choice = input("检测到同名文件,输入 1 覆盖,2 保留两者,其他取消: ")
|
||
if choice == "1":
|
||
r = self.core.upload_file(path, duplicate=1, on_progress=self._upload_progress)
|
||
elif choice == "2":
|
||
r = self.core.upload_file(path, duplicate=2, on_progress=self._upload_progress)
|
||
else:
|
||
print("上传取消")
|
||
return
|
||
print() # 换行
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
self._do_refresh()
|
||
|
||
def _do_rm(self, arg: str) -> None:
|
||
if not arg.isdigit():
|
||
print("请提供文件编号")
|
||
return
|
||
r = self.core.trash_by_index(int(arg) - 1)
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
self._do_refresh()
|
||
|
||
def _do_share(self, arg: str) -> None:
|
||
indices = [int(x) - 1 for x in arg.split() if x.isdigit()]
|
||
if not indices:
|
||
print("请提供文件编号")
|
||
return
|
||
# 显示待分享的文件
|
||
names = [self.core.file_list[i]["FileName"] for i in indices if 0 <= i < len(self.core.file_list)]
|
||
print("分享文件:", ", ".join(names))
|
||
pwd = input("输入提取码(留空跳过): ").strip()
|
||
r = self.core.share_by_indices(indices, pwd)
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
print(f"链接: {r['data']['share_url']}")
|
||
if r["data"]["share_pwd"]:
|
||
print(f"提取码: {r['data']['share_pwd']}")
|
||
|
||
def _do_more(self) -> None:
|
||
r = self.core.load_more()
|
||
if r["code"] != 0:
|
||
self._print_result(r)
|
||
else:
|
||
self._show_files()
|
||
|
||
def _do_link(self, arg: str) -> None:
|
||
if not arg.isdigit():
|
||
print("请提供文件编号")
|
||
return
|
||
r = self.core.get_download_url(int(arg) - 1)
|
||
if r["code"] == 0:
|
||
print(f"文件直链: \n{r['data']['url']}")
|
||
else:
|
||
self._print_result(r)
|
||
|
||
def _do_download(self, arg: str) -> None:
|
||
if not arg.isdigit():
|
||
print("请提供文件编号")
|
||
return
|
||
idx = int(arg) - 1
|
||
if not (0 <= idx < len(self.core.file_list)):
|
||
print("无效的文件编号")
|
||
return
|
||
item = self.core.file_list[idx]
|
||
print(f"开始下载: {item['FileName']}")
|
||
|
||
overwrite = self._download_mode == 3
|
||
skip = self._download_mode == 4
|
||
r = self.tool.download_file(
|
||
idx,
|
||
on_progress=self._download_progress,
|
||
overwrite=overwrite,
|
||
skip_existing=skip,
|
||
)
|
||
# 冲突处理
|
||
if r["code"] == 1 and r.get("data", {}).get("conflict"):
|
||
print(f"文件已存在: {item['FileName']}")
|
||
choice = input("输入 1 覆盖,2 跳过,3 全部覆盖,4 全部跳过: ").strip()
|
||
if choice == "4":
|
||
self._download_mode = 4
|
||
print("跳过下载")
|
||
return
|
||
elif choice == "2":
|
||
print("跳过下载")
|
||
return
|
||
elif choice == "3":
|
||
self._download_mode = 3
|
||
r = self.tool.download_file(idx, on_progress=self._download_progress, overwrite=True)
|
||
|
||
print() # 换行
|
||
self._print_result(r)
|
||
|
||
def _do_recycle(self) -> None:
|
||
r = self.core.list_recycle()
|
||
if r["code"] != 0:
|
||
self._print_result(r)
|
||
return
|
||
items = r["data"]
|
||
if not items:
|
||
print("回收站为空")
|
||
return
|
||
print("\n回收站内容:")
|
||
for i, item in enumerate(items, 1):
|
||
print(f" {i}. {item['FileName']} ({format_size(item['Size'])})")
|
||
action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
|
||
if action.isdigit():
|
||
idx = int(action) - 1
|
||
if 0 <= idx < len(items):
|
||
rr = self.core.restore(items[idx]["FileId"])
|
||
self._print_result(rr)
|
||
else:
|
||
print("无效编号")
|
||
elif action == "clear":
|
||
for item in items:
|
||
self.core.trash(item, delete=True)
|
||
print("回收站已清空")
|
||
self._do_refresh()
|
||
|
||
def _do_refresh(self) -> None:
|
||
self._download_mode = 0
|
||
r = self.core.refresh()
|
||
if r["code"] != 0:
|
||
self._print_result(r)
|
||
else:
|
||
self._show_files()
|
||
|
||
def _do_reload(self) -> None:
|
||
r = self.core.load_config()
|
||
self._print_result(r)
|
||
self._do_refresh()
|
||
|
||
def _do_protocol(self, arg: str) -> None:
|
||
if arg.lower() not in ("android", "web"):
|
||
print("请指定协议: android 或 web")
|
||
return
|
||
r = self.core.set_protocol(arg.lower())
|
||
self._print_result(r)
|
||
if r["code"] == 0:
|
||
self._do_refresh()
|
||
|
||
def _do_select(self, num: int) -> None:
|
||
"""数字选择:文件夹进入,文件下载"""
|
||
idx = num - 1
|
||
if not (0 <= idx < len(self.core.file_list)):
|
||
print("无效的文件编号")
|
||
return
|
||
if self.core.file_list[idx]["Type"] == 1:
|
||
self._do_cd(str(num))
|
||
else:
|
||
self._do_download(str(num))
|
||
|
||
# ──────────────── 进度回调 ────────────────
|
||
|
||
@staticmethod
|
||
def _download_progress(data) -> None:
|
||
if data.get("type") == Pan123EventType.DOWNLOAD_PROGRESS:
|
||
downloaded = data.get("downloaded", 0)
|
||
total = data.get("total", 0)
|
||
speed = data.get("speed", 0)
|
||
if total > 0:
|
||
pct = downloaded / total * 100
|
||
print(
|
||
f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s",
|
||
end=" ",
|
||
flush=True,
|
||
)
|
||
elif data.get("type") == Pan123EventType.DOWNLOAD_START_FILE:
|
||
print(f"开始下载: {data.get('file_name', '未知文件')} ({format_size(data.get('file_size', 0))})")
|
||
elif data.get("type") == Pan123EventType.DOWNLOAD_START_DIRECTORY:
|
||
print(f"开始下载目录: {data.get('dir_name', '未知目录')}")
|
||
else:
|
||
print(json.dumps(data, indent=2))
|
||
|
||
@staticmethod
|
||
def _upload_progress(data) -> None:
|
||
uploaded = data.get("uploaded", 0)
|
||
total = data.get("total", 0)
|
||
if total > 0:
|
||
pct = uploaded / total * 100
|
||
print(f"\r上传进度: {pct:.1f}%", end="", flush=True)
|
||
|
||
|
||
# ──────────────── 入口 ────────────────
|
||
|
||
if __name__ == "__main__":
|
||
Pan123CLI().run()
|