feat: add folder upload & make ls auto-refreshing

This commit is contained in:
baoqing 2026-05-13 00:19:55 +08:00
parent 3f83e4fce6
commit 4053b0a4f5
3 changed files with 230 additions and 7 deletions

View File

@ -211,7 +211,8 @@
| 方法名 | 参数说明 | 返回值类型 | 功能描述 | | 方法名 | 参数说明 | 返回值类型 | 功能描述 |
|-------------------------------------------------------------------------|---------------------------------------------------------------------------------|--------|-----------------| |-------------------------------------------------------------------------|---------------------------------------------------------------------------------|--------|-----------------|
| `upload_file(file_path, duplicate=0, on_progress=None)` | `file_path`: 本地文件路径<br>`duplicate`: 冲突策略0=报错1=覆盖2=保留)<br>`on_progress`: 进度回调 | Result | 上传文件(支持秒传和分块上传) | | `upload_file(file_path, duplicate=0, on_progress=None)` | `file_path`: 本地文件或文件夹路径<br>`duplicate`: 冲突策略0=报错1=覆盖2=保留)<br>`on_progress`: 进度回调 | Result | 上传文件;传入文件夹时递归上传 |
| `upload_directory(dir_path, duplicate=0, on_progress=None)` | `dir_path`: 本地文件夹路径<br>其他参数同上 | Result | 上传文件夹(保留根目录结构) |
| `get_download_url(index)` | `index`: `file_list` 中的目标文件下标 | Result | 获取文件直链(自动处理重定向) | | `get_download_url(index)` | `index`: `file_list` 中的目标文件下标 | Result | 获取文件直链(自动处理重定向) |
| `share(file_ids, share_pwd="", expiration="2099-12-12T08:00:00+08:00")` | `file_ids`: 文件 ID 列表<br>`share_pwd`: 提取码<br>`expiration`: 过期时间 | Result | 创建分享链接 | | `share(file_ids, share_pwd="", expiration="2099-12-12T08:00:00+08:00")` | `file_ids`: 文件 ID 列表<br>`share_pwd`: 提取码<br>`expiration`: 过期时间 | Result | 创建分享链接 |
@ -256,7 +257,7 @@
| 方法名 | 参数说明 | 返回值类型 | 功能描述 | | 方法名 | 参数说明 | 返回值类型 | 功能描述 |
|---------------------------------------------------------|----------------------------|--------|-------------------| |---------------------------------------------------------|----------------------------|--------|-------------------|
| `upload_file(file_path, duplicate=0, on_progress=None)` | 同 `Pan123Core.upload_file` | Result | 上传文件(与 Core 方法一致) | | `upload_file(file_path, duplicate=0, on_progress=None)` | 同 `Pan123Core.upload_file` | Result | 上传文件或文件夹(与 Core 方法一致) |
--- ---
@ -372,4 +373,4 @@ result = core.share(
# 5、免责声明 # 5、免责声明
本工具用于学习场景,请勿用于违法用途。对任何滥用造成的后果,作者概不负责。 本工具用于学习场景,请勿用于违法用途。对任何滥用造成的后果,作者概不负责。
任何未经允许的api调用都是不被官方允许的对于因此产生的账号风险、数据损失等后果自负。 任何未经允许的api调用都是不被官方允许的对于因此产生的账号风险、数据损失等后果自负。

View File

@ -35,7 +35,7 @@ class Pan123CLI:
ls - 显示当前目录 ls - 显示当前目录
cd [编号|..|/] - 切换目录 cd [编号|..|/] - 切换目录
mkdir [名称] - 创建目录 mkdir [名称] - 创建目录
upload [路径] - 上传文件 upload [路径] - 上传文件或文件夹
rm [编号] - 删除文件 rm [编号] - 删除文件
share [编号 ...] - 创建分享 share [编号 ...] - 创建分享
link [编号] - 获取文件直链 link [编号] - 获取文件直链
@ -152,7 +152,8 @@ class Pan123CLI:
arg = parts[1].strip() if len(parts) > 1 else "" arg = parts[1].strip() if len(parts) > 1 else ""
handler = { handler = {
"ls": lambda: self._show_files(), #"ls": lambda: self._show_files(),
"ls": lambda: self._do_refresh(), # 用户希望ls是列出目前文件故直接刷新并列出
"login": lambda: self._do_login(), "login": lambda: self._do_login(),
"logout": lambda: self._do_logout(), "logout": lambda: self._do_logout(),
"clearaccount": lambda: self._do_clear_account(), "clearaccount": lambda: self._do_clear_account(),
@ -442,6 +443,21 @@ class Pan123CLI:
@staticmethod @staticmethod
def _upload_progress(data) -> None: def _upload_progress(data) -> None:
uploaded_total = data.get("uploaded_total")
total_size = data.get("total_size")
if uploaded_total is not None and total_size is not None:
pct = data.get("percent", uploaded_total / total_size * 100 if total_size else 100)
file_index = data.get("file_index", 0)
file_count = data.get("file_count", 0)
file_name = data.get("file_name", "")
print(
f"\r上传进度: {pct:.1f}% | {format_size(uploaded_total)}/{format_size(total_size)} | "
f"{file_index}/{file_count} {file_name}",
end=" ",
flush=True,
)
return
uploaded = data.get("uploaded", 0) uploaded = data.get("uploaded", 0)
total = data.get("total", 0) total = data.get("total", 0)
if total > 0: if total > 0:

View File

@ -1266,7 +1266,147 @@ class Pan123Core:
if not os.path.exists(file_path): if not os.path.exists(file_path):
return make_result(-1, "文件不存在") return make_result(-1, "文件不存在")
if os.path.isdir(file_path): if os.path.isdir(file_path):
return make_result(-1, "暂不支持文件夹上传") return self.upload_directory(file_path, duplicate=duplicate, on_progress=on_progress)
return self._upload_file_at(file_path, self.cwd_id, duplicate, on_progress)
def upload_directory(
self,
dir_path: str,
duplicate: int = 0,
on_progress: ProgressCallback = None,
) -> Dict[str, Any]:
"""递归上传本地文件夹到当前目录,并保留本地根目录名。
Args:
dir_path: 本地目录路径
duplicate: 文件同名处理策略 upload_file
on_progress: 上传进度回调除单文件进度外会附加目录整体进度字段
Returns:
Result 字典::
成功: {"code": 0, "message": "文件夹上传完成", "data": {...}}
部分失败: {"code": -1, "message": "部分文件上传失败: ...", "data": {...}}
失败: {"code": -1, "message": "...", "data": None}
"""
dir_path = dir_path.strip().replace('"', "")
if not os.path.exists(dir_path):
return make_result(-1, "目录不存在")
if not os.path.isdir(dir_path):
return make_result(-1, "不是文件夹")
root_path = os.path.abspath(dir_path)
root_name = os.path.basename(os.path.normpath(root_path))
if not root_name:
return make_result(-1, "目录名不能为空")
total_size = 0
file_count = 0
dir_count = 1
for current_dir, dir_names, file_names in os.walk(root_path):
dir_count += len(dir_names)
for file_name in file_names:
file_path = os.path.join(current_dir, file_name)
if os.path.isfile(file_path):
file_count += 1
total_size += os.path.getsize(file_path)
root_res = self._mkdir_at(root_name, self.cwd_id)
if root_res["code"] != CODE_OK:
return root_res
root_remote_id = root_res["data"]["file_id"]
remote_dirs = {root_path: root_remote_id}
errors: List[str] = []
uploaded_total = 0
uploaded_files = 0
def emit_progress(file_path: str, uploaded_in_file: int = 0) -> None:
if not on_progress:
return
current_total = min(uploaded_total + uploaded_in_file, total_size)
percent = current_total / total_size * 100 if total_size else 100.0
on_progress({
"type": Pan123EventType.UPLOAD_PROGRESS,
"file_name": os.path.basename(file_path) if file_path else "",
"current_file": file_path,
"uploaded": uploaded_in_file,
"total": os.path.getsize(file_path) if file_path and os.path.isfile(file_path) else 0,
"uploaded_total": current_total,
"total_size": total_size,
"percent": percent,
"file_index": uploaded_files + (1 if uploaded_in_file > 0 else 0),
"file_count": file_count,
})
for current_dir, dir_names, file_names in os.walk(root_path):
parent_remote_id = remote_dirs.get(current_dir)
if parent_remote_id is None:
dir_names[:] = []
continue
for dir_name in list(dir_names):
local_dir = os.path.join(current_dir, dir_name)
sub_res = self._mkdir_at(dir_name, parent_remote_id)
if sub_res["code"] == CODE_OK:
remote_dirs[local_dir] = sub_res["data"]["file_id"]
else:
errors.append(f"{os.path.relpath(local_dir, root_path)}: {sub_res['message']}")
dir_names.remove(dir_name)
for file_name in file_names:
local_file = os.path.join(current_dir, file_name)
if not os.path.isfile(local_file):
continue
def file_progress(data: Dict[str, Any], current_file: str = local_file) -> None:
emit_progress(current_file, data.get("uploaded", 0))
sub = self._upload_file_at(local_file, parent_remote_id, duplicate, file_progress)
rel_path = os.path.relpath(local_file, root_path)
if sub["code"] == CODE_OK:
file_size = os.path.getsize(local_file)
uploaded_files += 1
uploaded_total += file_size
emit_progress(local_file, 0)
else:
errors.append(f"{rel_path}: {sub['message']}")
data = {
"path": root_path,
"file_id": root_remote_id,
"file_count": file_count,
"dir_count": dir_count,
"uploaded_files": uploaded_files,
"total_size": total_size,
"errors": errors,
}
if errors:
return make_result(-1, f"部分文件上传失败: {'; '.join(errors)}", data)
return make_result(CODE_OK, "文件夹上传完成", data)
def upload_folder(
self,
dir_path: str,
duplicate: int = 0,
on_progress: ProgressCallback = None,
) -> Dict[str, Any]:
"""upload_directory 的别名。"""
return self.upload_directory(dir_path, duplicate=duplicate, on_progress=on_progress)
def _upload_file_at(
self,
file_path: str,
parent_id: int,
duplicate: int = 0,
on_progress: ProgressCallback = None,
) -> Dict[str, Any]:
"""上传本地文件到指定网盘目录。"""
if not os.path.exists(file_path):
return make_result(-1, "文件不存在")
if os.path.isdir(file_path):
return make_result(-1, "不是文件")
file_name = os.path.basename(file_path) file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
@ -1280,7 +1420,7 @@ class Pan123Core:
"driveId": 0, "driveId": 0,
"etag": md5, "etag": md5,
"fileName": file_name, "fileName": file_name,
"parentFileId": self.cwd_id, "parentFileId": parent_id,
"size": file_size, "size": file_size,
"type": 0, "type": 0,
"duplicate": duplicate, "duplicate": duplicate,
@ -1307,6 +1447,70 @@ class Pan123Core:
on_progress=on_progress, on_progress=on_progress,
) )
def _mkdir_at(self, name: str, parent_id: int) -> Dict[str, Any]:
"""在指定网盘目录下创建子目录,并返回新目录 FileId。"""
if not name:
return make_result(-1, "目录名不能为空")
payload = {
"driveId": 0,
"etag": "",
"fileName": name,
"parentFileId": parent_id,
"size": 0,
"type": 1,
"duplicate": 1,
"NotReuse": True,
"event": "newCreateFolder",
"operateType": 1,
}
r = self._request("POST", URL_MKDIR, json_data=payload)
if r["code"] != CODE_OK:
return make_result(r["code"], f"创建目录失败: {r['message']}", r["data"])
file_id = self._find_child_folder_id(parent_id, name)
if file_id is None:
file_id = self._extract_file_id(r["data"].get("data"))
if file_id is None:
return make_result(-1, "创建目录成功但未获取到目录 ID", r["data"])
return make_result(CODE_OK, "ok", {"file_id": file_id, "response": r["data"]})
def _extract_file_id(self, data: Any) -> Optional[int]:
"""从接口返回数据中尽量提取 FileId/fileId。"""
if isinstance(data, dict):
for key in ("FileId", "fileId", "FileID", "fileID"):
value = data.get(key)
if isinstance(value, int) and value > 0:
return value
if isinstance(value, str) and value.isdigit():
file_id = int(value)
if file_id > 0:
return file_id
for value in data.values():
file_id = self._extract_file_id(value)
if file_id is not None:
return file_id
elif isinstance(data, list):
for item in data:
file_id = self._extract_file_id(item)
if file_id is not None:
return file_id
return None
def _find_child_folder_id(self, parent_id: int, name: str) -> Optional[int]:
"""在父目录下按名称查找子目录 ID作为创建目录响应缺少 ID 时的兜底。"""
for attempt in range(3):
r = self.list_dir_all(parent_id=parent_id)
if r["code"] == CODE_OK:
matches = [
item for item in r["data"]["items"]
if item.get("Type") == 1 and item.get("FileName") == name and item.get("FileId") is not None
]
if matches:
return max(int(item["FileId"]) for item in matches)
if attempt < 2:
time.sleep(0.5)
return None
def _upload_chunks( def _upload_chunks(
self, self,
file_path: str, file_path: str,
@ -1618,6 +1822,8 @@ class Pan123Tool:
"total": total, "total": total,
"speed": speed, "speed": speed,
}) })
# 换一下行
print("")
os.rename(temp_path, full_path) os.rename(temp_path, full_path)
return make_result(CODE_OK, "下载完成", {"path": full_path}) return make_result(CODE_OK, "下载完成", {"path": full_path})
except Exception as e: except Exception as e: