更加规范

This commit is contained in:
qing 2023-12-15 19:30:44 +08:00
parent d33b1f45b4
commit 5add24521f

626
123pan.py
View File

@ -9,8 +9,15 @@ import base64
class Pan123: class Pan123:
def __init__(self, readfile=True, user_name="", pass_word="", authorization="", input_pwd=True): def __init__(
self.RecycleList = None self,
readfile=True,
user_name="",
pass_word="",
authorization="",
input_pwd=True,
):
self.recycle_list = None
self.list = None self.list = None
if readfile: if readfile:
self.read_ini(user_name, pass_word, input_pwd, authorization) self.read_ini(user_name, pass_word, input_pwd, authorization)
@ -22,14 +29,17 @@ class Pan123:
pass_word = input("请输入密码:") pass_word = input("请输入密码:")
else: else:
raise Exception("用户名或密码为空读取禁用时userName和passWord不能为空") raise Exception("用户名或密码为空读取禁用时userName和passWord不能为空")
self.userName = user_name self.user_name = user_name
self.passWord = pass_word self.password = pass_word
self.authorization = authorization self.authorization = authorization
self.headerOnlyUsage = { self.header_only_usage = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1474.0', "user-agent": "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/"
"537.36 (KHTML, like Gecko) Chrome/109.0.0.0 "
"Safari/537.36 Edg/109.0.1474.0",
"app-version": "2", "app-version": "2",
"platform": "web", } "platform": "web",
self.headerLogined = { }
self.header_logined = {
"Accept": "*/*", "Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"App-Version": "3", "App-Version": "3",
@ -42,33 +52,39 @@ class Pan123:
"Sec-Fetch-Dest": "empty", "Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors", "Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin", "Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/"
"537.36 (KHTML, like Gecko) Chrome/119.0.0.0 "
"Safari/537.36 Edg/119.0.0.0",
"platform": "web", "platform": "web",
"sec-ch-ua": "^\\^Microsoft", "sec-ch-ua": "^\\^Microsoft",
"sec-ch-ua-mobile": "?0", "sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "^\\^Windows^^" "sec-ch-ua-platform": "^\\^Windows^^",
} }
self.parentFileId = 0 # 路径文件夹的id,0为根目录 self.parent_file_id = 0 # 路径文件夹的id,0为根目录
self.parentFileList = [0] self.parent_file_list = [0]
code = self.get_dir() res_code_getdir = self.get_dir()
if code != 0: if res_code_getdir != 0:
self.login() self.login()
self.get_dir() self.get_dir()
def login(self): def login(self):
data = {"remember": True, "passport": self.userName, "password": self.passWord} data = {"remember": True, "passport": self.user_name, "password": self.password}
sign = getSign('/b/api/user/sign_in') sign = getSign("/b/api/user/sign_in")
loginRes = requests.post("https://www.123pan.com/b/api/user/sign_in", headers=self.headerOnlyUsage, data=data, login_res = requests.post(
params={sign[0]: sign[1]}) "https://www.123pan.com/b/api/user/sign_in",
res_sign = loginRes.json() headers=self.header_only_usage,
code = res_sign['code'] data=data,
if code != 200: params={sign[0]: sign[1]}, timeout=10
print("code = 1 Error:" + str(code)) )
print(res_sign['message']) res_sign = login_res.json()
return code res_code_login = res_sign["code"]
token = res_sign['data']['token'] if res_code_login != 200:
self.authorization = 'Bearer ' + token print("code = 1 Error:" + str(res_code_login))
headerLogined = { print(res_sign["message"])
return res_code_login
token = res_sign["data"]["token"]
self.authorization = "Bearer " + token
header_logined = {
"Accept": "*/*", "Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"App-Version": "3", "App-Version": "3",
@ -81,30 +97,32 @@ class Pan123:
"Sec-Fetch-Dest": "empty", "Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors", "Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin", "Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/"
"537.36 (KHTML, like"
" Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
"platform": "web", "platform": "web",
"sec-ch-ua": "^\\^Microsoft", "sec-ch-ua": "^\\^Microsoft",
"sec-ch-ua-mobile": "?0", "sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "^\\^Windows^^" "sec-ch-ua-platform": "^\\^Windows^^",
} }
self.headerLogined = headerLogined self.header_logined = header_logined
# ret['cookie'] = cookie # ret['cookie'] = cookie
self.save_file() self.save_file()
return code return res_code_login
def save_file(self): def save_file(self):
with open("123pan.txt", "w") as f: with open("123pan.txt", "w",encoding="utf_8") as f:
saveList = { save_list = {
"userName": self.userName, "userName": self.user_name,
"passWord": self.passWord, "passWord": self.password,
"authorization": self.authorization, "authorization": self.authorization,
} }
f.write(json.dumps(saveList)) f.write(json.dumps(save_list))
print("Save!") print("Save!")
def get_dir(self): def get_dir(self):
code = 0 res_code_getdir = 0
page = 1 page = 1
lists = [] lists = []
lenth_now = 0 lenth_now = 0
@ -113,7 +131,7 @@ class Pan123:
base_url = "https://www.123pan.com/b/api/file/list/new" base_url = "https://www.123pan.com/b/api/file/list/new"
# print(self.headerLogined) # print(self.headerLogined)
sign = getSign('/b/api/file/list/new') sign = getSign("/b/api/file/list/new")
print(sign) print(sign)
params = { params = {
sign[0]: sign[1], sign[0]: sign[1],
@ -122,116 +140,136 @@ class Pan123:
"next": 0, "next": 0,
"orderBy": "file_id", "orderBy": "file_id",
"orderDirection": "desc", "orderDirection": "desc",
"parentFileId": str(self.parentFileId), "parentFileId": str(self.parent_file_id),
"trashed": False, "trashed": False,
"SearchData": "", "SearchData": "",
"Page": str(page), "Page": str(page),
"OnlyLookAbnormalFile": 0 "OnlyLookAbnormalFile": 0,
} }
a = requests.get(base_url, headers=self.headerLogined, params=params) a = requests.get(base_url, headers=self.header_logined, params=params, timeout=10)
# print(a.text) # print(a.text)
# print(a.headers) # print(a.headers)
text = a.json() text = a.json()
code = text['code'] res_code_getdir = text["code"]
if code != 0: if res_code_getdir != 0:
print(a.text) print(a.text)
print(a.headers) print(a.headers)
print("code = 2 Error:" + str(code)) print("code = 2 Error:" + str(res_code_getdir))
return code return res_code_getdir
lists_page = text['data']['InfoList'] lists_page = text["data"]["InfoList"]
lists += lists_page lists += lists_page
total = text['data']['Total'] total = text["data"]["Total"]
lenth_now += len(lists_page) lenth_now += len(lists_page)
page += 1 page += 1
FileNum = 0 file_num = 0
for i in lists: for i in lists:
i["FileNum"] = FileNum i["FileNum"] = file_num
FileNum += 1 file_num += 1
self.list = lists self.list = lists
return code return res_code_getdir
def show(self): def show(self):
print("--------------------") print("--------------------")
for i in self.list: for i in self.list:
size = i["Size"] file_size = i["Size"]
if size > 1048576: if file_size > 1048576:
size_print = str(round(size / 1048576, 2)) + "M" download_size_print = str(round(file_size / 1048576, 2)) + "M"
else: else:
size_print = str(round(size / 1024, 2)) + "K" download_size_print = str(round(file_size / 1024, 2)) + "K"
if i["Type"] == 0: if i["Type"] == 0:
print(
print("\033[33m" + "编号:", self.list.index(i) + 1, "\033[0m \t\t" + size_print + "\t\t\033[36m", "\033[33m" + "编号:",
i["FileName"], "\033[0m") self.list.index(i) + 1,
"\033[0m \t\t" + download_size_print + "\t\t\033[36m",
i["FileName"],
"\033[0m",
)
elif i["Type"] == 1: elif i["Type"] == 1:
print("\033[35m" + "编号:", self.list.index(i) + 1, " \t\t\033[36m", print(
i["FileName"], "\033[0m") "\033[35m" + "编号:",
self.list.index(i) + 1,
" \t\t\033[36m",
i["FileName"],
"\033[0m",
)
print("--------------------") print("--------------------")
# fileNumber 从0开始0为第一个文件传入时需要减一 # fileNumber 从0开始0为第一个文件传入时需要减一
def link(self, file_number, showlink=True): def link(self, file_number, showlink=True):
fileDetail = self.list[file_number] file_detail = self.list[file_number]
typeDetail = fileDetail['Type'] type_detail = file_detail["Type"]
if typeDetail == 1: if type_detail == 1:
down_request_url = "https://www.123pan.com/a/api/file/batch_download_info" down_request_url = "https://www.123pan.com/a/api/file/batch_download_info"
down_request_data = {"fileIdList": [{"fileId": int(fileDetail["FileId"])}]} down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]}
else: else:
down_request_url = "https://www.123pan.com/a/api/file/download_info" down_request_url = "https://www.123pan.com/a/api/file/download_info"
down_request_data = {"driveId": 0, "etag": fileDetail["Etag"], "fileId": fileDetail["FileId"], down_request_data = {
"s3keyFlag": fileDetail['S3KeyFlag'], "type": fileDetail['Type'], "driveId": 0,
"fileName": fileDetail['FileName'], "size": fileDetail['Size']} "etag": file_detail["Etag"],
"fileId": file_detail["FileId"],
"s3keyFlag": file_detail["S3KeyFlag"],
"type": file_detail["Type"],
"fileName": file_detail["FileName"],
"size": file_detail["Size"],
}
# print(down_request_data) # print(down_request_data)
sign = getSign("/a/api/file/download_info") sign = getSign("/a/api/file/download_info")
linkRes = requests.post(down_request_url, headers=self.headerLogined, params={sign[0]: sign[1]}, link_res = requests.post(
data=down_request_data) down_request_url,
headers=self.header_logined,
params={sign[0]: sign[1]},
data=down_request_data,
timeout=10
)
# print(linkRes.text) # print(linkRes.text)
code = linkRes.json()['code'] res_code_download = link_res.json()["code"]
if code != 0: if res_code_download != 0:
print("code = 3 Error:" + str(code)) print("code = 3 Error:" + str(res_code_download))
# print(linkRes.json()) # print(linkRes.json())
return code return res_code_download
downloadLinkBase64 = linkRes.json()["data"]["DownloadUrl"] download_link_base64 = link_res.json()["data"]["DownloadUrl"]
Base64Url = re.findall("params=(.*)&", downloadLinkBase64)[0] base64_url = re.findall("params=(.*)&", download_link_base64)[0]
# print(Base64Url) # print(Base64Url)
downLoadUrl = base64.b64decode(Base64Url) down_load_url = base64.b64decode(base64_url)
downLoadUrl = downLoadUrl.decode("utf-8") down_load_url = down_load_url.decode("utf-8")
nextToGet = requests.get(downLoadUrl).json() next_to_get = requests.get(down_load_url,timeout=10).json()
redirect_url = nextToGet['data']['redirect_url'] redirect_url = next_to_get["data"]["redirect_url"]
if showlink: if showlink:
print(redirect_url) print(redirect_url)
return redirect_url return redirect_url
def download(self, file_number): def download(self, file_number):
fileDetail = self.list[file_number] file_detail = self.list[file_number]
downLoadUrl = self.link(file_number, showlink=False) down_load_url = self.link(file_number, showlink=False)
name = fileDetail['FileName'] # 文件名 file_name = file_detail["FileName"] # 文件名
if os.path.exists(name): if os.path.exists(file_name):
print("文件 " + name + " 已存在,是否要覆盖?") print("文件 " + file_name + " 已存在,是否要覆盖?")
sure = input("输入1覆盖2取消") sure_download = input("输入1覆盖2取消")
if sure != '1': if sure_download != "1":
return return
down = requests.get(downLoadUrl, stream=True) down = requests.get(down_load_url, stream=True, timeout=10)
size = int(down.headers['Content-Length']) # 文件大小 file_size = int(down.headers["Content-Length"]) # 文件大小
content_size = int(size) # 文件总大小 content_size = int(file_size) # 文件总大小
data_count = 0 # 当前已传输的大小 data_count = 0 # 当前已传输的大小
if size > 1048576: if file_size > 1048576:
size_print = str(round(size / 1048576, 2)) + "M" size_print_download = str(round(file_size / 1048576, 2)) + "M"
else: else:
size_print = str(round(size / 1024, 2)) + "K" size_print_download = str(round(file_size / 1024, 2)) + "K"
print(name + " " + size_print) print(file_name + " " + size_print_download)
time1 = time.time() time1 = time.time()
time_temp = time1 time_temp = time1
data_count_temp = 0 data_count_temp = 0
with open(name, "wb") as f: with open(file_name, "wb") as f:
for i in down.iter_content(1024): for i in down.iter_content(1024):
f.write(i) f.write(i)
done_block = int((data_count / content_size) * 50) done_block = int((data_count / content_size) * 50)
@ -247,26 +285,37 @@ class Pan123:
pass_data = int(data_count) - int(data_count_temp) pass_data = int(data_count) - int(data_count_temp)
data_count_temp = data_count data_count_temp = data_count
speed = pass_data / int(pass_time) speed = pass_data / int(pass_time)
speed_M = speed / 1048576 speed_m = speed / 1048576
if speed_M > 1: if speed_m > 1:
speed_print = str(round(speed_M, 2)) + "M/S" speed_print = str(round(speed_m, 2)) + "M/S"
else: else:
speed_print = str(round(speed_M * 1024, 2)) + "K/S" speed_print = str(round(speed_m * 1024, 2)) + "K/S"
print( print(
"\r [%s%s] %d%% %s" % (done_block * '', ' ' * (50 - 1 - done_block), now_jd, speed_print), "\r [%s%s] %d%% %s"
end="") % (
done_block * "",
" " * (50 - 1 - done_block),
now_jd,
speed_print,
),
end="",
)
elif data_count == content_size: elif data_count == content_size:
print("\r [%s%s] %d%% %s" % (50 * '', '', 100, ""), end="") print("\r [%s%s] %d%% %s" % (50 * "", "", 100, ""), end="")
print("\nok") print("\nok")
def recycle(self): def recycle(self):
recycle_id = 0 recycle_id = 0
url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=" + str( url = (
recycle_id) + "&trashed=true&&Page=1" "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0"
recycleRes = requests.get(url, headers=self.headerLogined) "&orderBy=fileId&orderDirection=desc&parentFileId="
jsonRecycle = recycleRes.json() + str(recycle_id)
RecycleList = jsonRecycle['data']['InfoList'] + "&trashed=true&&Page=1"
self.RecycleList = RecycleList )
recycle_res = requests.get(url, headers=self.header_logined, timeout=10)
json_recycle = recycle_res.json()
recycle_list = json_recycle["data"]["InfoList"]
self.recycle_list = recycle_list
# fileNumber 从0开始0为第一个文件传入时需要减一 # fileNumber 从0开始0为第一个文件传入时需要减一
def delete_file(self, file, by_num=True, operation=True): def delete_file(self, file, by_num=True, operation=True):
@ -287,69 +336,79 @@ class Pan123:
else: else:
print("文件不存在") print("文件不存在")
return return
dataDelete = {"driveId": 0, data_delete = {
"driveId": 0,
"fileTrashInfoList": file_detail, "fileTrashInfoList": file_detail,
"operation": operation} "operation": operation,
deleteRes = requests.post("https://www.123pan.com/a/api/file/trash", data=json.dumps(dataDelete), }
headers=self.headerLogined) delete_res = requests.post(
DeleJson = deleteRes.json() "https://www.123pan.com/a/api/file/trash",
print(DeleJson) data=json.dumps(data_delete),
message = DeleJson['message'] headers=self.header_logined,
timeout=10
)
dele_json = delete_res.json()
print(dele_json)
message = dele_json["message"]
print(message) print(message)
def share(self): def share(self):
fileIdList = "" file_id_list = ""
share_name_list = [] share_name_list = []
add = '1' add = "1"
while str(add) == '1': while str(add) == "1":
share_num = input("分享文件的编号:") share_num = input("分享文件的编号:")
num_test2 = share_num.isdigit() num_test2 = share_num.isdigit()
if num_test2: if num_test2:
share_num = int(share_num) share_num = int(share_num)
if 0 < share_num < len(self.list) + 1: if 0 < share_num < len(self.list) + 1:
share_id = self.list[int(share_num) - 1]['FileId'] share_id = self.list[int(share_num) - 1]["FileId"]
share_name = self.list[int(share_num) - 1]['FileName'] share_name = self.list[int(share_num) - 1]["FileName"]
share_name_list.append(share_name) share_name_list.append(share_name)
print(share_name_list) print(share_name_list)
fileIdList = fileIdList + str(share_id) + "," file_id_list = file_id_list + str(share_id) + ","
add = input("输入1添加文件0发起分享其他取消") add = input("输入1添加文件0发起分享其他取消")
else: else:
print("请输入数字,,") print("请输入数字,,")
add = "1" add = "1"
if str(add) == "0": if str(add) == "0":
sharePwd = input("提取码,不设留空:") share_pwd = input("提取码,不设留空:")
fileIdList = fileIdList.strip(',') file_id_list = file_id_list.strip(",")
data = {"driveId": 0, data = {
"driveId": 0,
"expiration": "2024-02-09T11:42:45+08:00", "expiration": "2024-02-09T11:42:45+08:00",
"fileIdList": fileIdList, "fileIdList": file_id_list,
"shareName": "我的分享", "shareName": "我的分享",
"sharePwd": sharePwd, "sharePwd": share_pwd,
} }
shareRes = requests.post("https://www.123pan.com/a/api/share/create", headers=self.headerLogined, share_res = requests.post(
data=json.dumps(data)) "https://www.123pan.com/a/api/share/create",
shareResJson = shareRes.json() headers=self.header_logined,
message = shareResJson['message'] data=json.dumps(data),
timeout=10
)
share_res_json = share_res.json()
message = share_res_json["message"]
print(message) print(message)
ShareKey = shareResJson['data']['ShareKey'] share_key = share_res_json["data"]["ShareKey"]
share_url = 'https://www.123pan.com/s/' + ShareKey share_url = "https://www.123pan.com/s/" + share_key
print('分享链接:\n' + share_url + "提取码:" + sharePwd) print("分享链接:\n" + share_url + "提取码:" + share_pwd)
else: else:
print("退出分享") print("退出分享")
def up_load(self, filePath): def up_load(self, file_path):
filePath = filePath.replace("\"", "") file_path = file_path.replace('"', "")
filePath = filePath.replace("\\", "/") file_path = file_path.replace("\\", "/")
fileName = filePath.split("/")[-1] file_name = file_path.split("/")[-1]
print("文件名:", fileName) print("文件名:", file_name)
if not os.path.exists(filePath): if not os.path.exists(file_path):
print("文件不存在,请检查路径是否正确") print("文件不存在,请检查路径是否正确")
return return
if os.path.isdir(filePath): if os.path.isdir(file_path):
print("暂不支持文件夹上传") print("暂不支持文件夹上传")
return return
fsize = os.path.getsize(filePath) fsize = os.path.getsize(file_path)
with open(filePath, 'rb') as f: with open(file_path, "rb") as f:
md5 = hashlib.md5() md5 = hashlib.md5()
while True: while True:
data = f.read(64 * 1024) data = f.read(64 * 1024)
@ -358,91 +417,125 @@ class Pan123:
md5.update(data) md5.update(data)
readable_hash = md5.hexdigest() readable_hash = md5.hexdigest()
listUpRequest = {"driveId": 0, "etag": readable_hash, "fileName": fileName, list_up_request = {
"parentFileId": self.parentFileId, "size": fsize, "type": 0, "duplicate": 0} "driveId": 0,
"etag": readable_hash,
"fileName": file_name,
"parentFileId": self.parent_file_id,
"size": fsize,
"type": 0,
"duplicate": 0,
}
sign = getSign("/b/api/file/upload_request") sign = getSign("/b/api/file/upload_request")
upRes = requests.post("https://www.123pan.com/b/api/file/upload_request", headers=self.headerLogined, up_res = requests.post(
"https://www.123pan.com/b/api/file/upload_request",
headers=self.header_logined,
params={sign[0]: sign[1]}, params={sign[0]: sign[1]},
data=listUpRequest) data=list_up_request,
upResJson = upRes.json() timeout=10
code = upResJson['code'] )
if code == 5060: up_res_json = up_res.json()
sure = input("检测到1个同名文件,输入1覆盖2保留两者0取消") res_code_up = up_res_json["code"]
if sure == '1': if res_code_up == 5060:
listUpRequest["duplicate"] = 1 sure_upload = input("检测到1个同名文件,输入1覆盖2保留两者0取消")
if sure_upload == "1":
list_up_request["duplicate"] = 1
elif sure == '2': elif sure_upload == "2":
listUpRequest["duplicate"] = 2 list_up_request["duplicate"] = 2
else: else:
print("取消上传") print("取消上传")
return return
sign = getSign("/b/api/file/upload_request") sign = getSign("/b/api/file/upload_request")
upRes = requests.post("https://www.123pan.com/b/api/file/upload_request", headers=self.headerLogined, up_res = requests.post(
"https://www.123pan.com/b/api/file/upload_request",
headers=self.header_logined,
params={sign[0]: sign[1]}, params={sign[0]: sign[1]},
data=json.dumps(listUpRequest)) data=json.dumps(list_up_request),
upResJson = upRes.json() timeout=10
code = upResJson['code'] )
if code == 0: up_res_json = up_res.json()
res_code_up = up_res_json["code"]
if res_code_up == 0:
# print(upResJson) # print(upResJson)
# print("上传请求成功") # print("上传请求成功")
Reuse = upResJson['data']['Reuse'] reuse = up_res_json["data"]["Reuse"]
if Reuse: if reuse:
print("上传成功文件已MD5复用") print("上传成功文件已MD5复用")
return return
else: else:
print(upResJson) print(up_res_json)
print("上传请求失败") print("上传请求失败")
return return
bucket = upResJson['data']['Bucket'] bucket = up_res_json["data"]["Bucket"]
StorageNode = upResJson['data']['StorageNode'] storage_node = up_res_json["data"]["StorageNode"]
uploadKey = upResJson['data']['Key'] upload_key = up_res_json["data"]["Key"]
uploadId = upResJson['data']['UploadId'] upload_id = up_res_json["data"]["UploadId"]
upFileId = upResJson['data']['FileId'] # 上传文件的fileId,完成上传后需要用到 up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到
print("上传文件的fileId:", upFileId) print("上传文件的fileId:", up_file_id)
# 获取已将上传的分块 # 获取已将上传的分块
startData = {"bucket": bucket, "key": uploadKey, "uploadId": uploadId, "storageNode": StorageNode} start_data = {
startRes = requests.post("https://www.123pan.com/b/api/file/s3_list_upload_parts", headers=self.headerLogined, "bucket": bucket,
data=json.dumps(startData)) "key": upload_key,
startResJson = startRes.json() "uploadId": upload_id,
code = startResJson['code'] "storageNode": storage_node,
if code == 0: }
start_res = requests.post(
"https://www.123pan.com/b/api/file/s3_list_upload_parts",
headers=self.header_logined,
data=json.dumps(start_data),
timeout=10
)
start_res_json = start_res.json()
res_code_up = start_res_json["code"]
if res_code_up == 0:
# print(startResJson) # print(startResJson)
pass pass
else: else:
print(startData) print(start_data)
print(startResJson) print(start_res_json)
print("获取传输列表失败") print("获取传输列表失败")
return return
# 分块,每一块取一次链接,依次上传 # 分块,每一块取一次链接,依次上传
block_size = 5242880 block_size = 5242880
with open(filePath, 'rb') as f: with open(file_path, "rb") as f:
partNumberStart = 1 part_number_start = 1
putSize = 0 put_size = 0
while True: while True:
data = f.read(block_size) data = f.read(block_size)
precent = round(putSize / fsize, 2) precent = round(put_size / fsize, 2)
print("\r已上传:" + str(precent * 100) + "%", end="") print("\r已上传:" + str(precent * 100) + "%", end="")
putSize = putSize + len(data) put_size = put_size + len(data)
if not data: if not data:
break break
getLinkData = {"bucket": bucket, "key": uploadKey, get_link_data = {
"partNumberEnd": partNumberStart + 1, "bucket": bucket,
"partNumberStart": partNumberStart, "key": upload_key,
"uploadId": uploadId, "partNumberEnd": part_number_start + 1,
"StorageNode": StorageNode} "partNumberStart": part_number_start,
"uploadId": upload_id,
"StorageNode": storage_node,
}
getLinkUrl = "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch" get_link_url = (
getLinkRes = requests.post(getLinkUrl, headers=self.headerLogined, data=json.dumps(getLinkData)) "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch"
getLinkResJson = getLinkRes.json() )
code = getLinkResJson['code'] get_link_res = requests.post(
if code == 0: get_link_url,
headers=self.header_logined,
data=json.dumps(get_link_data),
timeout=10
)
get_link_res_json = get_link_res.json()
res_code_up = get_link_res_json["code"]
if res_code_up == 0:
# print("获取链接成功") # print("获取链接成功")
pass pass
else: else:
@ -450,106 +543,131 @@ class Pan123:
# print(getLinkResJson) # print(getLinkResJson)
return return
# print(getLinkResJson) # print(getLinkResJson)
uploadUrl = getLinkResJson['data']['presignedUrls'][str(partNumberStart)] upload_url = get_link_res_json["data"]["presignedUrls"][
str(part_number_start)
]
# print("上传链接",uploadUrl) # print("上传链接",uploadUrl)
requests.put(uploadUrl, data=data) requests.put(upload_url, data=data, timeout=10)
# print("put") # print("put")
partNumberStart = partNumberStart + 1 part_number_start = part_number_start + 1
print("\n处理中") print("\n处理中")
# 完成标志 # 完成标志
# 1.获取已上传的块 # 1.获取已上传的块
uploadedListUrl = "https://www.123pan.com/b/api/file/s3_list_upload_parts" uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts"
uploadedCompData = {"bucket": bucket, "key": uploadKey, "uploadId": uploadId, "storageNode": StorageNode} uploaded_comp_data = {
"bucket": bucket,
"key": upload_key,
"uploadId": upload_id,
"storageNode": storage_node,
}
# print(uploadedCompData) # print(uploadedCompData)
requests.post(uploadedListUrl, headers=self.headerLogined, data=json.dumps(uploadedCompData)) requests.post(
compmultipartUpUrl = "https://www.123pan.com/b/api/file/s3_complete_multipart_upload" uploaded_list_url,
requests.post(compmultipartUpUrl, headers=self.headerLogined, headers=self.header_logined,
data=json.dumps(uploadedCompData)) data=json.dumps(uploaded_comp_data),
timeout=10
)
compmultipart_up_url = (
"https://www.123pan.com/b/api/file/s3_complete_multipart_upload"
)
requests.post(
compmultipart_up_url,
headers=self.header_logined,
data=json.dumps(uploaded_comp_data),
timeout=10
)
# 3.报告完成上传关闭upload session # 3.报告完成上传关闭upload session
if fsize > 64 * 1024 * 1024: if fsize > 64 * 1024 * 1024:
time.sleep(3) time.sleep(3)
closeUpSessionUrl = "https://www.123pan.com/b/api/file/upload_complete" close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete"
closeUpSessionData = {"fileId": upFileId} close_up_session_data = {"fileId": up_file_id}
# print(closeUpSessionData) # print(closeUpSessionData)
closeUpSessionRes = requests.post(closeUpSessionUrl, headers=self.headerLogined, close_up_session_res = requests.post(
data=json.dumps(closeUpSessionData)) close_up_session_url,
closeResJson = closeUpSessionRes.json() headers=self.header_logined,
data=json.dumps(close_up_session_data),
timeout=10
)
close_res_json = close_up_session_res.json()
# print(closeResJson) # print(closeResJson)
code = closeResJson['code'] res_code_up = close_res_json["code"]
if code == 0: if res_code_up == 0:
print("上传成功") print("上传成功")
else: else:
print("上传失败") print("上传失败")
print(closeResJson) print(close_res_json)
return return
# dirId 就是 fileNumber从0开始0为第一个文件传入时需要减一 !!!(好像文件夹都排在前面) # dirId 就是 fileNumber从0开始0为第一个文件传入时需要减一 !!!(好像文件夹都排在前面)
def cd(self, dir_num): def cd(self, dir_num):
if not dir_num.isdigit(): if not dir_num.isdigit():
if dir_num == "..": if dir_num == "..":
if len(self.parentFileList) > 1: if len(self.parent_file_list) > 1:
self.parentFileList.pop() self.parent_file_list.pop()
self.parentFileId = self.parentFileList[-1] self.parent_file_id = self.parent_file_list[-1]
self.get_dir() self.get_dir()
self.show() self.show()
else: else:
print("已经是根目录") print("已经是根目录")
return return
elif dir_num == "/": if dir_num == "/":
self.parentFileId = 0 self.parent_file_id = 0
self.parentFileList = [0] self.parent_file_list = [0]
self.get_dir() self.get_dir()
self.show() self.show()
return return
else:
print("输入错误") print("输入错误")
return return
dir_num = int(dir_num) - 1 dir_num = int(dir_num) - 1
if dir_num >= (len(self.list) - 1) or dir_num < 0: if dir_num >= (len(self.list) - 1) or dir_num < 0:
print("输入错误") print("输入错误")
return return
if self.list[dir_num]['Type'] != 1: if self.list[dir_num]["Type"] != 1:
print("不是文件夹") print("不是文件夹")
return return
self.parentFileId = self.list[dir_num]['FileId'] self.parent_file_id = self.list[dir_num]["FileId"]
self.parentFileList.append(self.parentFileId) self.parent_file_list.append(self.parent_file_id)
self.get_dir() self.get_dir()
self.show() self.show()
def cdById(self, id): def cdById(self, file_id):
self.parent_file_id = file_id
self.parentFileId = id self.parent_file_list.append(self.parent_file_id)
self.parentFileList.append(self.parentFileId)
self.get_dir() self.get_dir()
self.get_dir() self.get_dir()
self.show() self.show()
def read_ini(self, user_name, pass_word, input_pwd, authorization="", ): def read_ini(
self,
user_name,
pass_word,
input_pwd,
authorization="",
):
try: try:
with open("123pan.txt", "r") as f: with open("123pan.txt", "r",encoding="utf-8") as f:
text = f.read() text = f.read()
text = json.loads(text) text = json.loads(text)
user_name = text['userName'] user_name = text["userName"]
pass_word = text['passWord'] pass_word = text["passWord"]
authorization = text['authorization'] authorization = text["authorization"]
except FileNotFoundError or json.decoder.JSONDecodeError: except FileNotFoundError or json.decoder.JSONDecodeError:
print("read failed") print("read failed")
if user_name == "" or pass_word == "": if user_name == "" or pass_word == "":
if input_pwd: if input_pwd:
user_name = input("userName:") user_name = input("userName:")
pass_word = input("passWord:") pass_word = input("passWord:")
authorization = "" authorization = ""
else: else:
raise Exception("禁止输入模式下,没有账号或密码") raise Exception("禁止输入模式下,没有账号或密码")
self.userName = user_name self.user_name = user_name
self.passWord = pass_word self.password = pass_word
self.authorization = authorization self.authorization = authorization
def mkdir(self, dirname, remakedir=False): def mkdir(self, dirname, remakedir=False):
@ -560,30 +678,45 @@ class Pan123:
return i["FileId"] return i["FileId"]
url = "https://www.123pan.com/a/api/file/upload_request" url = "https://www.123pan.com/a/api/file/upload_request"
dataMk = {"driveId": 0, "etag": "", "fileName": dirname, "parentFileId": self.parentFileId, "size": 0, data_mk = {
"type": 1, "duplicate": 1, "NotReuse": True, "event": "newCreateFolder", "operateType": 1} "driveId": 0,
"etag": "",
"fileName": dirname,
"parentFileId": self.parent_file_id,
"size": 0,
"type": 1,
"duplicate": 1,
"NotReuse": True,
"event": "newCreateFolder",
"operateType": 1,
}
sign = getSign("/a/api/file/upload_request") sign = getSign("/a/api/file/upload_request")
resMk = requests.post(url, headers=self.headerLogined, data=json.dumps(dataMk), params={sign[0]: sign[1]}) res_mk = requests.post(
url,
headers=self.header_logined,
data=json.dumps(data_mk),
params={sign[0]: sign[1]},
timeout=10
)
try: try:
resJson = resMk.json() res_json = res_mk.json()
print(resJson) print(res_json)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
print("创建失败") print("创建失败")
print(resMk.text) print(res_mk.text)
return return
code = resJson['code'] code_mkdir = res_json["code"]
if code == 0: if code_mkdir == 0:
print("创建成功: ", resJson["data"]["FileId"]) print("创建成功: ", res_json["data"]["FileId"])
self.get_dir() self.get_dir()
return resJson["data"]["Info"]["FileId"] return res_json["data"]["Info"]["FileId"]
else:
print("创建失败") print("创建失败")
print(resJson) print(res_json)
return return
if __name__ == '__main__': if __name__ == "__main__":
pan = Pan123(readfile=True, input_pwd=True) pan = Pan123(readfile=True, input_pwd=True)
pan.show() pan.show()
while True: while True:
@ -602,15 +735,14 @@ if __name__ == '__main__':
if pan.list[int(command) - 1]["Type"] == 1: if pan.list[int(command) - 1]["Type"] == 1:
pan.cdById(pan.list[int(command) - 1]["FileId"]) pan.cdById(pan.list[int(command) - 1]["FileId"])
else: else:
size = pan.list[int(command) - 1]["Size"] size = pan.list[int(command) - 1]["Size"]
if size > 1048576: if size > 1048576:
size_print = str(round(size / 1048576, 2)) + "M" size_print_show = str(round(size / 1048576, 2)) + "M"
else: else:
size_print = str(round(size / 1024, 2)) + "K" size_print_show = str(round(size / 1024, 2)) + "K"
# print(pan.list[int(command) - 1]) # print(pan.list[int(command) - 1])
name = pan.list[int(command) - 1]["FileName"] name = pan.list[int(command) - 1]["FileName"]
print(name + " " + size_print) print(name + " " + size_print_show)
print("press 1 to download now: ", end="") print("press 1 to download now: ", end="")
sure = input() sure = input()
if sure == "1": if sure == "1":