123pan/pan123_cli.py

456 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
123pan 控制台交互界面 —— 仅负责用户 IO所有业务调用 Pan123Core。
"""
import json
import os
import sys
from typing import Dict
from pan123_core import Pan123Core, Pan123Tool, Pan123EventType, format_size
# ──────────────── 颜色工具 ────────────────
class Color:
"""ANSI 颜色常量"""
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
PURPLE = "\033[35m"
CYAN = "\033[96m"
def colored(text: str, color: str) -> str:
return f"{color}{text}{Color.RESET}"
# ──────────────── CLI 类 ────────────────
class Pan123CLI:
"""控制台交互界面"""
HELP_TEXT = """可用命令:
ls - 显示当前目录
cd [编号|..|/] - 切换目录
mkdir [名称] - 创建目录
upload [路径] - 上传文件
rm [编号] - 删除文件
share [编号 ...] - 创建分享
link [编号] - 获取文件直链
download/d [编号] - 下载文件
recycle - 管理回收站
refresh/re - 刷新目录
reload - 重新加载配置并刷新
login - 登录
logout - 登出并清除 token
clearaccount - 清除已登录账号(包括用户名和密码)
more - 继续加载更多文件
protocol [android|web] - 切换协议
exit - 退出程序"""
def __init__(self, config_file: str = "123pan_config.json"):
self.config_file: str = config_file
self.core = Pan123Core()
self.tool = Pan123Tool(self.core)
self._download_mode: int = 0 # 0=询问, 3=全部覆盖, 4=全部跳过
# ──────────────── 启动 ────────────────
def run(self) -> None:
"""主入口"""
# Windows cmd 颜色支持
if os.name == "nt":
os.system("")
self._print_banner()
if not self._init_login():
print(colored("无法登录", Color.RED))
a = input("输入1重新输入账号和密码输入2清除登录信息其他键退出: ")
if a == "1":
user_name = input("请输入用户名: ")
password = input("请输入密码: ")
if not user_name or not password:
print("用户名和密码不能为空,程序退出")
return
self.core.load_config({
"userName": user_name,
"passWord": password,
"authorization": ""
})
self.save_config()
return self.run()
if a == "2":
self._do_clear_account()
return self.run()
return
self.save_config()
self.core.refresh() # 加载文件列表
self.core.get_user_info()
self._show_files()
while True:
try:
prompt = colored(f"{self.core.cwd_path}>", Color.RED) + " "
print(colored(f'用户:{self.core.nick_name}', Color.GREEN))
command = input(prompt).strip()
if not command:
continue
self._dispatch(command)
except KeyboardInterrupt:
print("\n操作已取消")
except EOFError:
break
except Exception as e:
print(colored(f"发生错误: {e}", Color.RED))
# ──────────────── 初始化 ────────────────
def _print_banner(self) -> None:
print("=" * 60)
print("123网盘CLI客户端".center(56))
print("=" * 60)
def _init_login(self) -> bool:
"""尝试加载配置 -> 尝试访问目录 -> 必要时登录"""
res = self.load_config()
r = self.core.init_login_state()
if r["code"] < 0:
print(colored("登录失败", Color.YELLOW))
print(r["message"])
return False
return True
def load_config(self) -> Dict:
"""加载配置"""
try:
with open(self.config_file, "r", encoding="utf-8") as f:
cfg = json.load(f)
except FileNotFoundError:
user_name = input("请输入用户名: ")
password = input("请输入密码: ")
cfg = {
"userName": user_name,
"passWord": password,
"authorization": ""
}
return self.core.load_config(cfg)
def save_config(self) -> None:
"""保存配置"""
cfg = self.core.get_current_config()
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# ──────────────── 命令分发 ────────────────
def _dispatch(self, command: str) -> None:
parts = command.split(maxsplit=1)
cmd = parts[0].lower()
arg = parts[1].strip() if len(parts) > 1 else ""
handler = {
"ls": lambda: self._show_files(),
"login": lambda: self._do_login(),
"logout": lambda: self._do_logout(),
"clearaccount": lambda: self._do_clear_account(),
"exit": lambda: sys.exit(0),
"cd": lambda: self._do_cd(arg),
"mkdir": lambda: self._do_mkdir(arg),
"upload": lambda: self._do_upload(arg),
"rm": lambda: self._do_rm(arg),
"share": lambda: self._do_share(arg),
"more": lambda: self._do_more(),
"link": lambda: self._do_link(arg),
"download": lambda: self._do_download(arg),
"d": lambda: self._do_download(arg),
"recycle": lambda: self._do_recycle(),
"refresh": lambda: self._do_refresh(),
"re": lambda: self._do_refresh(),
"reload": lambda: self._do_reload(),
"protocol": lambda: self._do_protocol(arg),
"help": lambda: print(self.HELP_TEXT),
}.get(cmd)
if handler:
handler()
elif cmd.isdigit():
self._do_select(int(cmd))
else:
print(self.HELP_TEXT)
# ──────────────── 显示 ────────────────
def _show_files(self) -> None:
items = self.core.file_list
if not items:
print("当前目录为空")
return
print()
print("=" * 60)
print(f"当前路径: {self.core.cwd_path}")
print("-" * 60)
print(f"{'编号':<6}{'类型':<8}{'大小':<12}{'名称'}")
print("-" * 60)
for idx, item in enumerate(items, 1):
is_dir = item["Type"] == 1
type_str = "文件夹" if is_dir else "文件"
size_str = format_size(item["Size"])
color = Color.PURPLE if is_dir else Color.YELLOW
print(colored(f"{idx:<6}{type_str:<8}{size_str:<12}{item['FileName']}", color))
if not self.core.all_loaded:
remaining = self.core.file_total - len(items)
print(f"\n还有 {remaining} 个文件未加载,输入 'more' 继续加载")
print("=" * 60 + "\n")
def _print_result(self, r: dict) -> None:
"""打印 Result 消息"""
if r["code"] == 0:
print(colored(r["message"], Color.GREEN))
else:
print(colored(f"[错误 {r['code']}] {r['message']}", Color.RED))
# ──────────────── 命令实现 ────────────────
def _do_login(self) -> None:
if not self.core.user_name:
self.core.user_name = input("请输入用户名: ")
if not self.core.password:
self.core.password = input("请输入密码: ")
r = self.core.login()
self._print_result(r)
if r["code"] == 0:
self._do_refresh()
def _do_logout(self) -> None:
r = self.core.logout()
self.save_config()
self._print_result(r)
def _do_clear_account(self) -> None:
"""清除已登录账号清除用户名、密码、token 等信息"""
confirm = input("确定要清除已登录账号信息吗?(y/N): ").strip().lower()
if confirm == 'y':
r = self.core.clear_account()
self.save_config()
self._print_result(r)
else:
print("操作已取消")
def _do_cd(self, arg: str) -> None:
if arg == "..":
r = self.core.cd_up()
elif arg == "/":
r = self.core.cd_root()
elif arg.isdigit():
r = self.core.cd(int(arg) - 1)
else:
print("用法: cd [编号|..|/]")
return
if r["code"] != 0:
self._print_result(r)
else:
self._show_files()
def _do_mkdir(self, name: str) -> None:
if not name:
name = input("请输入目录名: ")
r = self.core.mkdir(name)
self._print_result(r)
if r["code"] == 0:
self._do_refresh()
def _do_upload(self, path: str) -> None:
if not path:
path = input("请输入文件路径: ")
r = self.core.upload_file(path, on_progress=self._upload_progress)
if r["code"] == 5060:
choice = input("检测到同名文件,输入 1 覆盖2 保留两者,其他取消: ")
if choice == "1":
r = self.core.upload_file(path, duplicate=1, on_progress=self._upload_progress)
elif choice == "2":
r = self.core.upload_file(path, duplicate=2, on_progress=self._upload_progress)
else:
print("上传取消")
return
print() # 换行
self._print_result(r)
if r["code"] == 0:
self._do_refresh()
def _do_rm(self, arg: str) -> None:
if not arg.isdigit():
print("请提供文件编号")
return
r = self.core.trash_by_index(int(arg) - 1)
self._print_result(r)
if r["code"] == 0:
self._do_refresh()
def _do_share(self, arg: str) -> None:
indices = [int(x) - 1 for x in arg.split() if x.isdigit()]
if not indices:
print("请提供文件编号")
return
# 显示待分享的文件
names = [self.core.file_list[i]["FileName"] for i in indices if 0 <= i < len(self.core.file_list)]
print("分享文件:", ", ".join(names))
pwd = input("输入提取码(留空跳过): ").strip()
r = self.core.share_by_indices(indices, pwd)
self._print_result(r)
if r["code"] == 0:
print(f"链接: {r['data']['share_url']}")
if r["data"]["share_pwd"]:
print(f"提取码: {r['data']['share_pwd']}")
def _do_more(self) -> None:
r = self.core.load_more()
if r["code"] != 0:
self._print_result(r)
else:
self._show_files()
def _do_link(self, arg: str) -> None:
if not arg.isdigit():
print("请提供文件编号")
return
r = self.core.get_download_url(int(arg) - 1)
if r["code"] == 0:
print(f"文件直链: \n{r['data']['url']}")
else:
self._print_result(r)
def _do_download(self, arg: str) -> None:
if not arg.isdigit():
print("请提供文件编号")
return
idx = int(arg) - 1
if not (0 <= idx < len(self.core.file_list)):
print("无效的文件编号")
return
item = self.core.file_list[idx]
print(f"开始下载: {item['FileName']}")
overwrite = self._download_mode == 3
skip = self._download_mode == 4
r = self.tool.download_file(
idx,
on_progress=self._download_progress,
overwrite=overwrite,
skip_existing=skip,
)
# 冲突处理
if r["code"] == 1 and r.get("data", {}).get("conflict"):
print(f"文件已存在: {item['FileName']}")
choice = input("输入 1 覆盖2 跳过3 全部覆盖4 全部跳过: ").strip()
if choice == "4":
self._download_mode = 4
print("跳过下载")
return
elif choice == "2":
print("跳过下载")
return
elif choice == "3":
self._download_mode = 3
r = self.tool.download_file(idx, on_progress=self._download_progress, overwrite=True)
print() # 换行
self._print_result(r)
def _do_recycle(self) -> None:
r = self.core.list_recycle()
if r["code"] != 0:
self._print_result(r)
return
items = r["data"]
if not items:
print("回收站为空")
return
print("\n回收站内容:")
for i, item in enumerate(items, 1):
print(f" {i}. {item['FileName']} ({format_size(item['Size'])})")
action = input("\n输入编号恢复文件,或输入 'clear' 清空回收站: ").strip()
if action.isdigit():
idx = int(action) - 1
if 0 <= idx < len(items):
rr = self.core.restore(items[idx]["FileId"])
self._print_result(rr)
else:
print("无效编号")
elif action == "clear":
for item in items:
self.core.trash(item, delete=True)
print("回收站已清空")
self._do_refresh()
def _do_refresh(self) -> None:
self._download_mode = 0
r = self.core.refresh()
if r["code"] != 0:
self._print_result(r)
else:
self._show_files()
def _do_reload(self) -> None:
r = self.core.load_config()
self._print_result(r)
self._do_refresh()
def _do_protocol(self, arg: str) -> None:
if arg.lower() not in ("android", "web"):
print("请指定协议: android 或 web")
return
r = self.core.set_protocol(arg.lower())
self._print_result(r)
if r["code"] == 0:
self._do_refresh()
def _do_select(self, num: int) -> None:
"""数字选择:文件夹进入,文件下载"""
idx = num - 1
if not (0 <= idx < len(self.core.file_list)):
print("无效的文件编号")
return
if self.core.file_list[idx]["Type"] == 1:
self._do_cd(str(num))
else:
self._do_download(str(num))
# ──────────────── 进度回调 ────────────────
@staticmethod
def _download_progress(data) -> None:
if data.get("type") == Pan123EventType.DOWNLOAD_PROGRESS:
downloaded = data.get("downloaded", 0)
total = data.get("total", 0)
speed = data.get("speed", 0)
if total > 0:
pct = downloaded / total * 100
print(
f"\r进度: {pct:.1f}% | {format_size(downloaded)}/{format_size(total)} | {format_size(int(speed))}/s",
end=" ",
flush=True,
)
elif data.get("type") == Pan123EventType.DOWNLOAD_START_FILE:
print(f"开始下载: {data.get('file_name', '未知文件')} ({format_size(data.get('file_size', 0))})")
elif data.get("type") == Pan123EventType.DOWNLOAD_START_DIRECTORY:
print(f"开始下载目录: {data.get('dir_name', '未知目录')}")
else:
print(json.dumps(data, indent=2))
@staticmethod
def _upload_progress(data) -> None:
uploaded = data.get("uploaded", 0)
total = data.get("total", 0)
if total > 0:
pct = uploaded / total * 100
print(f"\r上传进度: {pct:.1f}%", end="", flush=True)
# ──────────────── 入口 ────────────────
if __name__ == "__main__":
Pan123CLI().run()