diff --git a/123pan.py b/123pan.py index f78c0e5..1a23803 100644 --- a/123pan.py +++ b/123pan.py @@ -9,8 +9,15 @@ import base64 class Pan123: - def __init__(self, readfile=True, user_name="", pass_word="", authorization="", input_pwd=True): - self.RecycleList = None + def __init__( + self, + readfile=True, + user_name="", + pass_word="", + authorization="", + input_pwd=True, + ): + self.recycle_list = None self.list = None if readfile: self.read_ini(user_name, pass_word, input_pwd, authorization) @@ -22,14 +29,17 @@ class Pan123: pass_word = input("请输入密码:") else: raise Exception("用户名或密码为空:读取禁用时,userName和passWord不能为空") - self.userName = user_name - self.passWord = pass_word + self.user_name = user_name + self.password = pass_word self.authorization = authorization - self.headerOnlyUsage = { - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1474.0', + self.header_only_usage = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/" + "537.36 (KHTML, like Gecko) Chrome/109.0.0.0 " + "Safari/537.36 Edg/109.0.1474.0", "app-version": "2", - "platform": "web", } - self.headerLogined = { + "platform": "web", + } + self.header_logined = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "App-Version": "3", @@ -42,33 +52,39 @@ class Pan123: "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/" + "537.36 (KHTML, like Gecko) Chrome/119.0.0.0 " + "Safari/537.36 Edg/119.0.0.0", "platform": "web", "sec-ch-ua": "^\\^Microsoft", "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": "^\\^Windows^^" + "sec-ch-ua-platform": "^\\^Windows^^", } - self.parentFileId = 0 # 路径,文件夹的id,0为根目录 - self.parentFileList = [0] - code = self.get_dir() - if code != 0: + self.parent_file_id = 0 # 路径,文件夹的id,0为根目录 + self.parent_file_list = [0] + res_code_getdir = self.get_dir() + if res_code_getdir != 0: self.login() self.get_dir() def login(self): - data = {"remember": True, "passport": self.userName, "password": self.passWord} - sign = getSign('/b/api/user/sign_in') - loginRes = requests.post("https://www.123pan.com/b/api/user/sign_in", headers=self.headerOnlyUsage, data=data, - params={sign[0]: sign[1]}) - res_sign = loginRes.json() - code = res_sign['code'] - if code != 200: - print("code = 1 Error:" + str(code)) - print(res_sign['message']) - return code - token = res_sign['data']['token'] - self.authorization = 'Bearer ' + token - headerLogined = { + data = {"remember": True, "passport": self.user_name, "password": self.password} + sign = getSign("/b/api/user/sign_in") + login_res = requests.post( + "https://www.123pan.com/b/api/user/sign_in", + headers=self.header_only_usage, + data=data, + params={sign[0]: sign[1]}, timeout=10 + ) + res_sign = login_res.json() + res_code_login = res_sign["code"] + if res_code_login != 200: + print("code = 1 Error:" + str(res_code_login)) + print(res_sign["message"]) + return res_code_login + token = res_sign["data"]["token"] + self.authorization = "Bearer " + token + header_logined = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "App-Version": "3", @@ -81,30 +97,32 @@ class Pan123: "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/" + "537.36 (KHTML, like" + " Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", "platform": "web", "sec-ch-ua": "^\\^Microsoft", "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": "^\\^Windows^^" + "sec-ch-ua-platform": "^\\^Windows^^", } - self.headerLogined = headerLogined + self.header_logined = header_logined # ret['cookie'] = cookie self.save_file() - return code + return res_code_login def save_file(self): - with open("123pan.txt", "w") as f: - saveList = { - "userName": self.userName, - "passWord": self.passWord, + with open("123pan.txt", "w",encoding="utf_8") as f: + save_list = { + "userName": self.user_name, + "passWord": self.password, "authorization": self.authorization, } - f.write(json.dumps(saveList)) + f.write(json.dumps(save_list)) print("Save!") def get_dir(self): - code = 0 + res_code_getdir = 0 page = 1 lists = [] lenth_now = 0 @@ -113,7 +131,7 @@ class Pan123: base_url = "https://www.123pan.com/b/api/file/list/new" # print(self.headerLogined) - sign = getSign('/b/api/file/list/new') + sign = getSign("/b/api/file/list/new") print(sign) params = { sign[0]: sign[1], @@ -122,116 +140,136 @@ class Pan123: "next": 0, "orderBy": "file_id", "orderDirection": "desc", - "parentFileId": str(self.parentFileId), + "parentFileId": str(self.parent_file_id), "trashed": False, "SearchData": "", "Page": str(page), - "OnlyLookAbnormalFile": 0 + "OnlyLookAbnormalFile": 0, } - a = requests.get(base_url, headers=self.headerLogined, params=params) + a = requests.get(base_url, headers=self.header_logined, params=params, timeout=10) # print(a.text) # print(a.headers) text = a.json() - code = text['code'] - if code != 0: + res_code_getdir = text["code"] + if res_code_getdir != 0: print(a.text) print(a.headers) - print("code = 2 Error:" + str(code)) - return code - lists_page = text['data']['InfoList'] + print("code = 2 Error:" + str(res_code_getdir)) + return res_code_getdir + lists_page = text["data"]["InfoList"] lists += lists_page - total = text['data']['Total'] + total = text["data"]["Total"] lenth_now += len(lists_page) page += 1 - FileNum = 0 + file_num = 0 for i in lists: - i["FileNum"] = FileNum - FileNum += 1 + i["FileNum"] = file_num + file_num += 1 self.list = lists - return code + return res_code_getdir def show(self): print("--------------------") for i in self.list: - size = i["Size"] - if size > 1048576: - size_print = str(round(size / 1048576, 2)) + "M" + file_size = i["Size"] + if file_size > 1048576: + download_size_print = str(round(file_size / 1048576, 2)) + "M" else: - size_print = str(round(size / 1024, 2)) + "K" + download_size_print = str(round(file_size / 1024, 2)) + "K" if i["Type"] == 0: - - print("\033[33m" + "编号:", self.list.index(i) + 1, "\033[0m \t\t" + size_print + "\t\t\033[36m", - i["FileName"], "\033[0m") + print( + "\033[33m" + "编号:", + self.list.index(i) + 1, + "\033[0m \t\t" + download_size_print + "\t\t\033[36m", + i["FileName"], + "\033[0m", + ) elif i["Type"] == 1: - print("\033[35m" + "编号:", self.list.index(i) + 1, " \t\t\033[36m", - i["FileName"], "\033[0m") + print( + "\033[35m" + "编号:", + self.list.index(i) + 1, + " \t\t\033[36m", + i["FileName"], + "\033[0m", + ) print("--------------------") # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! def link(self, file_number, showlink=True): - fileDetail = self.list[file_number] - typeDetail = fileDetail['Type'] - if typeDetail == 1: + file_detail = self.list[file_number] + type_detail = file_detail["Type"] + if type_detail == 1: down_request_url = "https://www.123pan.com/a/api/file/batch_download_info" - down_request_data = {"fileIdList": [{"fileId": int(fileDetail["FileId"])}]} + down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]} else: down_request_url = "https://www.123pan.com/a/api/file/download_info" - down_request_data = {"driveId": 0, "etag": fileDetail["Etag"], "fileId": fileDetail["FileId"], - "s3keyFlag": fileDetail['S3KeyFlag'], "type": fileDetail['Type'], - "fileName": fileDetail['FileName'], "size": fileDetail['Size']} + down_request_data = { + "driveId": 0, + "etag": file_detail["Etag"], + "fileId": file_detail["FileId"], + "s3keyFlag": file_detail["S3KeyFlag"], + "type": file_detail["Type"], + "fileName": file_detail["FileName"], + "size": file_detail["Size"], + } # print(down_request_data) sign = getSign("/a/api/file/download_info") - linkRes = requests.post(down_request_url, headers=self.headerLogined, params={sign[0]: sign[1]}, - data=down_request_data) + link_res = requests.post( + down_request_url, + headers=self.header_logined, + params={sign[0]: sign[1]}, + data=down_request_data, + timeout=10 + ) # print(linkRes.text) - code = linkRes.json()['code'] - if code != 0: - print("code = 3 Error:" + str(code)) + res_code_download = link_res.json()["code"] + if res_code_download != 0: + print("code = 3 Error:" + str(res_code_download)) # print(linkRes.json()) - return code - downloadLinkBase64 = linkRes.json()["data"]["DownloadUrl"] - Base64Url = re.findall("params=(.*)&", downloadLinkBase64)[0] + return res_code_download + download_link_base64 = link_res.json()["data"]["DownloadUrl"] + base64_url = re.findall("params=(.*)&", download_link_base64)[0] # print(Base64Url) - downLoadUrl = base64.b64decode(Base64Url) - downLoadUrl = downLoadUrl.decode("utf-8") + down_load_url = base64.b64decode(base64_url) + down_load_url = down_load_url.decode("utf-8") - nextToGet = requests.get(downLoadUrl).json() - redirect_url = nextToGet['data']['redirect_url'] + next_to_get = requests.get(down_load_url,timeout=10).json() + redirect_url = next_to_get["data"]["redirect_url"] if showlink: print(redirect_url) return redirect_url def download(self, file_number): - fileDetail = self.list[file_number] - downLoadUrl = self.link(file_number, showlink=False) - name = fileDetail['FileName'] # 文件名 - if os.path.exists(name): - print("文件 " + name + " 已存在,是否要覆盖?") - sure = input("输入1覆盖,2取消:") - if sure != '1': + file_detail = self.list[file_number] + down_load_url = self.link(file_number, showlink=False) + file_name = file_detail["FileName"] # 文件名 + if os.path.exists(file_name): + print("文件 " + file_name + " 已存在,是否要覆盖?") + sure_download = input("输入1覆盖,2取消:") + if sure_download != "1": return - down = requests.get(downLoadUrl, stream=True) + down = requests.get(down_load_url, stream=True, timeout=10) - size = int(down.headers['Content-Length']) # 文件大小 - content_size = int(size) # 文件总大小 + file_size = int(down.headers["Content-Length"]) # 文件大小 + content_size = int(file_size) # 文件总大小 data_count = 0 # 当前已传输的大小 - if size > 1048576: - size_print = str(round(size / 1048576, 2)) + "M" + if file_size > 1048576: + size_print_download = str(round(file_size / 1048576, 2)) + "M" else: - size_print = str(round(size / 1024, 2)) + "K" - print(name + " " + size_print) + size_print_download = str(round(file_size / 1024, 2)) + "K" + print(file_name + " " + size_print_download) time1 = time.time() time_temp = time1 data_count_temp = 0 - with open(name, "wb") as f: + with open(file_name, "wb") as f: for i in down.iter_content(1024): f.write(i) done_block = int((data_count / content_size) * 50) @@ -247,26 +285,37 @@ class Pan123: pass_data = int(data_count) - int(data_count_temp) data_count_temp = data_count speed = pass_data / int(pass_time) - speed_M = speed / 1048576 - if speed_M > 1: - speed_print = str(round(speed_M, 2)) + "M/S" + speed_m = speed / 1048576 + if speed_m > 1: + speed_print = str(round(speed_m, 2)) + "M/S" else: - speed_print = str(round(speed_M * 1024, 2)) + "K/S" + speed_print = str(round(speed_m * 1024, 2)) + "K/S" print( - "\r [%s%s] %d%% %s" % (done_block * '█', ' ' * (50 - 1 - done_block), now_jd, speed_print), - end="") + "\r [%s%s] %d%% %s" + % ( + done_block * "█", + " " * (50 - 1 - done_block), + now_jd, + speed_print, + ), + end="", + ) elif data_count == content_size: - print("\r [%s%s] %d%% %s" % (50 * '█', '', 100, ""), end="") + print("\r [%s%s] %d%% %s" % (50 * "█", "", 100, ""), end="") print("\nok") def recycle(self): recycle_id = 0 - url = "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0&orderBy=fileId&orderDirection=desc&parentFileId=" + str( - recycle_id) + "&trashed=true&&Page=1" - recycleRes = requests.get(url, headers=self.headerLogined) - jsonRecycle = recycleRes.json() - RecycleList = jsonRecycle['data']['InfoList'] - self.RecycleList = RecycleList + url = ( + "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0" + "&orderBy=fileId&orderDirection=desc&parentFileId=" + + str(recycle_id) + + "&trashed=true&&Page=1" + ) + recycle_res = requests.get(url, headers=self.header_logined, timeout=10) + json_recycle = recycle_res.json() + recycle_list = json_recycle["data"]["InfoList"] + self.recycle_list = recycle_list # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! def delete_file(self, file, by_num=True, operation=True): @@ -287,69 +336,79 @@ class Pan123: else: print("文件不存在") return - dataDelete = {"driveId": 0, - "fileTrashInfoList": file_detail, - "operation": operation} - deleteRes = requests.post("https://www.123pan.com/a/api/file/trash", data=json.dumps(dataDelete), - headers=self.headerLogined) - DeleJson = deleteRes.json() - print(DeleJson) - message = DeleJson['message'] + data_delete = { + "driveId": 0, + "fileTrashInfoList": file_detail, + "operation": operation, + } + delete_res = requests.post( + "https://www.123pan.com/a/api/file/trash", + data=json.dumps(data_delete), + headers=self.header_logined, + timeout=10 + ) + dele_json = delete_res.json() + print(dele_json) + message = dele_json["message"] print(message) def share(self): - fileIdList = "" + file_id_list = "" share_name_list = [] - add = '1' - while str(add) == '1': + add = "1" + while str(add) == "1": share_num = input("分享文件的编号:") num_test2 = share_num.isdigit() if num_test2: share_num = int(share_num) if 0 < share_num < len(self.list) + 1: - share_id = self.list[int(share_num) - 1]['FileId'] - share_name = self.list[int(share_num) - 1]['FileName'] + share_id = self.list[int(share_num) - 1]["FileId"] + share_name = self.list[int(share_num) - 1]["FileName"] share_name_list.append(share_name) print(share_name_list) - fileIdList = fileIdList + str(share_id) + "," + file_id_list = file_id_list + str(share_id) + "," add = input("输入1添加文件,0发起分享,其他取消") else: print("请输入数字,,") add = "1" if str(add) == "0": - sharePwd = input("提取码,不设留空:") - fileIdList = fileIdList.strip(',') - data = {"driveId": 0, - "expiration": "2024-02-09T11:42:45+08:00", - "fileIdList": fileIdList, - "shareName": "我的分享", - "sharePwd": sharePwd, - - } - shareRes = requests.post("https://www.123pan.com/a/api/share/create", headers=self.headerLogined, - data=json.dumps(data)) - shareResJson = shareRes.json() - message = shareResJson['message'] + share_pwd = input("提取码,不设留空:") + file_id_list = file_id_list.strip(",") + data = { + "driveId": 0, + "expiration": "2024-02-09T11:42:45+08:00", + "fileIdList": file_id_list, + "shareName": "我的分享", + "sharePwd": share_pwd, + } + share_res = requests.post( + "https://www.123pan.com/a/api/share/create", + headers=self.header_logined, + data=json.dumps(data), + timeout=10 + ) + share_res_json = share_res.json() + message = share_res_json["message"] print(message) - ShareKey = shareResJson['data']['ShareKey'] - share_url = 'https://www.123pan.com/s/' + ShareKey - print('分享链接:\n' + share_url + "提取码:" + sharePwd) + share_key = share_res_json["data"]["ShareKey"] + share_url = "https://www.123pan.com/s/" + share_key + print("分享链接:\n" + share_url + "提取码:" + share_pwd) else: print("退出分享") - def up_load(self, filePath): - filePath = filePath.replace("\"", "") - filePath = filePath.replace("\\", "/") - fileName = filePath.split("/")[-1] - print("文件名:", fileName) - if not os.path.exists(filePath): + def up_load(self, file_path): + file_path = file_path.replace('"', "") + file_path = file_path.replace("\\", "/") + file_name = file_path.split("/")[-1] + print("文件名:", file_name) + if not os.path.exists(file_path): print("文件不存在,请检查路径是否正确") return - if os.path.isdir(filePath): + if os.path.isdir(file_path): print("暂不支持文件夹上传") return - fsize = os.path.getsize(filePath) - with open(filePath, 'rb') as f: + fsize = os.path.getsize(file_path) + with open(file_path, "rb") as f: md5 = hashlib.md5() while True: data = f.read(64 * 1024) @@ -358,91 +417,125 @@ class Pan123: md5.update(data) readable_hash = md5.hexdigest() - listUpRequest = {"driveId": 0, "etag": readable_hash, "fileName": fileName, - "parentFileId": self.parentFileId, "size": fsize, "type": 0, "duplicate": 0} + list_up_request = { + "driveId": 0, + "etag": readable_hash, + "fileName": file_name, + "parentFileId": self.parent_file_id, + "size": fsize, + "type": 0, + "duplicate": 0, + } sign = getSign("/b/api/file/upload_request") - upRes = requests.post("https://www.123pan.com/b/api/file/upload_request", headers=self.headerLogined, - params={sign[0]: sign[1]}, - data=listUpRequest) - upResJson = upRes.json() - code = upResJson['code'] - if code == 5060: - sure = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:") - if sure == '1': - listUpRequest["duplicate"] = 1 + up_res = requests.post( + "https://www.123pan.com/b/api/file/upload_request", + headers=self.header_logined, + params={sign[0]: sign[1]}, + data=list_up_request, + timeout=10 + ) + up_res_json = up_res.json() + res_code_up = up_res_json["code"] + if res_code_up == 5060: + sure_upload = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:") + if sure_upload == "1": + list_up_request["duplicate"] = 1 - elif sure == '2': - listUpRequest["duplicate"] = 2 + elif sure_upload == "2": + list_up_request["duplicate"] = 2 else: print("取消上传") return sign = getSign("/b/api/file/upload_request") - upRes = requests.post("https://www.123pan.com/b/api/file/upload_request", headers=self.headerLogined, - params={sign[0]: sign[1]}, - data=json.dumps(listUpRequest)) - upResJson = upRes.json() - code = upResJson['code'] - if code == 0: + up_res = requests.post( + "https://www.123pan.com/b/api/file/upload_request", + headers=self.header_logined, + params={sign[0]: sign[1]}, + data=json.dumps(list_up_request), + timeout=10 + ) + up_res_json = up_res.json() + res_code_up = up_res_json["code"] + if res_code_up == 0: # print(upResJson) # print("上传请求成功") - Reuse = upResJson['data']['Reuse'] - if Reuse: + reuse = up_res_json["data"]["Reuse"] + if reuse: print("上传成功,文件已MD5复用") return else: - print(upResJson) + print(up_res_json) print("上传请求失败") return - bucket = upResJson['data']['Bucket'] - StorageNode = upResJson['data']['StorageNode'] - uploadKey = upResJson['data']['Key'] - uploadId = upResJson['data']['UploadId'] - upFileId = upResJson['data']['FileId'] # 上传文件的fileId,完成上传后需要用到 - print("上传文件的fileId:", upFileId) + bucket = up_res_json["data"]["Bucket"] + storage_node = up_res_json["data"]["StorageNode"] + upload_key = up_res_json["data"]["Key"] + upload_id = up_res_json["data"]["UploadId"] + up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到 + print("上传文件的fileId:", up_file_id) # 获取已将上传的分块 - startData = {"bucket": bucket, "key": uploadKey, "uploadId": uploadId, "storageNode": StorageNode} - startRes = requests.post("https://www.123pan.com/b/api/file/s3_list_upload_parts", headers=self.headerLogined, - data=json.dumps(startData)) - startResJson = startRes.json() - code = startResJson['code'] - if code == 0: + start_data = { + "bucket": bucket, + "key": upload_key, + "uploadId": upload_id, + "storageNode": storage_node, + } + start_res = requests.post( + "https://www.123pan.com/b/api/file/s3_list_upload_parts", + headers=self.header_logined, + data=json.dumps(start_data), + timeout=10 + ) + start_res_json = start_res.json() + res_code_up = start_res_json["code"] + if res_code_up == 0: # print(startResJson) pass else: - print(startData) - print(startResJson) + print(start_data) + print(start_res_json) print("获取传输列表失败") return # 分块,每一块取一次链接,依次上传 block_size = 5242880 - with open(filePath, 'rb') as f: - partNumberStart = 1 - putSize = 0 + with open(file_path, "rb") as f: + part_number_start = 1 + put_size = 0 while True: data = f.read(block_size) - precent = round(putSize / fsize, 2) + precent = round(put_size / fsize, 2) print("\r已上传:" + str(precent * 100) + "%", end="") - putSize = putSize + len(data) + put_size = put_size + len(data) if not data: break - getLinkData = {"bucket": bucket, "key": uploadKey, - "partNumberEnd": partNumberStart + 1, - "partNumberStart": partNumberStart, - "uploadId": uploadId, - "StorageNode": StorageNode} + get_link_data = { + "bucket": bucket, + "key": upload_key, + "partNumberEnd": part_number_start + 1, + "partNumberStart": part_number_start, + "uploadId": upload_id, + "StorageNode": storage_node, + } - getLinkUrl = "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch" - getLinkRes = requests.post(getLinkUrl, headers=self.headerLogined, data=json.dumps(getLinkData)) - getLinkResJson = getLinkRes.json() - code = getLinkResJson['code'] - if code == 0: + get_link_url = ( + "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch" + ) + get_link_res = requests.post( + get_link_url, + headers=self.header_logined, + data=json.dumps(get_link_data), + timeout=10 + ) + get_link_res_json = get_link_res.json() + res_code_up = get_link_res_json["code"] + if res_code_up == 0: # print("获取链接成功") pass else: @@ -450,106 +543,131 @@ class Pan123: # print(getLinkResJson) return # print(getLinkResJson) - uploadUrl = getLinkResJson['data']['presignedUrls'][str(partNumberStart)] + upload_url = get_link_res_json["data"]["presignedUrls"][ + str(part_number_start) + ] # print("上传链接",uploadUrl) - requests.put(uploadUrl, data=data) + requests.put(upload_url, data=data, timeout=10) # print("put") - partNumberStart = partNumberStart + 1 + part_number_start = part_number_start + 1 print("\n处理中") # 完成标志 # 1.获取已上传的块 - uploadedListUrl = "https://www.123pan.com/b/api/file/s3_list_upload_parts" - uploadedCompData = {"bucket": bucket, "key": uploadKey, "uploadId": uploadId, "storageNode": StorageNode} + uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts" + uploaded_comp_data = { + "bucket": bucket, + "key": upload_key, + "uploadId": upload_id, + "storageNode": storage_node, + } # print(uploadedCompData) - requests.post(uploadedListUrl, headers=self.headerLogined, data=json.dumps(uploadedCompData)) - compmultipartUpUrl = "https://www.123pan.com/b/api/file/s3_complete_multipart_upload" - requests.post(compmultipartUpUrl, headers=self.headerLogined, - data=json.dumps(uploadedCompData)) + requests.post( + uploaded_list_url, + headers=self.header_logined, + data=json.dumps(uploaded_comp_data), + timeout=10 + ) + compmultipart_up_url = ( + "https://www.123pan.com/b/api/file/s3_complete_multipart_upload" + ) + requests.post( + compmultipart_up_url, + headers=self.header_logined, + data=json.dumps(uploaded_comp_data), + timeout=10 + ) # 3.报告完成上传,关闭upload session if fsize > 64 * 1024 * 1024: time.sleep(3) - closeUpSessionUrl = "https://www.123pan.com/b/api/file/upload_complete" - closeUpSessionData = {"fileId": upFileId} + close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete" + close_up_session_data = {"fileId": up_file_id} # print(closeUpSessionData) - closeUpSessionRes = requests.post(closeUpSessionUrl, headers=self.headerLogined, - data=json.dumps(closeUpSessionData)) - closeResJson = closeUpSessionRes.json() + close_up_session_res = requests.post( + close_up_session_url, + headers=self.header_logined, + data=json.dumps(close_up_session_data), + timeout=10 + ) + close_res_json = close_up_session_res.json() # print(closeResJson) - code = closeResJson['code'] - if code == 0: + res_code_up = close_res_json["code"] + if res_code_up == 0: print("上传成功") else: print("上传失败") - print(closeResJson) + print(close_res_json) return # dirId 就是 fileNumber,从0开始,0为第一个文件,传入时需要减一 !!!(好像文件夹都排在前面) def cd(self, dir_num): if not dir_num.isdigit(): if dir_num == "..": - if len(self.parentFileList) > 1: - self.parentFileList.pop() - self.parentFileId = self.parentFileList[-1] + if len(self.parent_file_list) > 1: + self.parent_file_list.pop() + self.parent_file_id = self.parent_file_list[-1] self.get_dir() self.show() else: print("已经是根目录") return - elif dir_num == "/": - self.parentFileId = 0 - self.parentFileList = [0] + if dir_num == "/": + self.parent_file_id = 0 + self.parent_file_list = [0] self.get_dir() self.show() return - else: - print("输入错误") - return + print("输入错误") + return dir_num = int(dir_num) - 1 if dir_num >= (len(self.list) - 1) or dir_num < 0: print("输入错误") return - if self.list[dir_num]['Type'] != 1: + if self.list[dir_num]["Type"] != 1: print("不是文件夹") return - self.parentFileId = self.list[dir_num]['FileId'] - self.parentFileList.append(self.parentFileId) + self.parent_file_id = self.list[dir_num]["FileId"] + self.parent_file_list.append(self.parent_file_id) self.get_dir() self.show() - def cdById(self, id): - - self.parentFileId = id - self.parentFileList.append(self.parentFileId) + def cdById(self, file_id): + self.parent_file_id = file_id + self.parent_file_list.append(self.parent_file_id) self.get_dir() self.get_dir() self.show() - def read_ini(self, user_name, pass_word, input_pwd, authorization="", ): + def read_ini( + self, + user_name, + pass_word, + input_pwd, + authorization="", + ): try: - with open("123pan.txt", "r") as f: + with open("123pan.txt", "r",encoding="utf-8") as f: text = f.read() text = json.loads(text) - user_name = text['userName'] - pass_word = text['passWord'] - authorization = text['authorization'] + user_name = text["userName"] + pass_word = text["passWord"] + authorization = text["authorization"] except FileNotFoundError or json.decoder.JSONDecodeError: print("read failed") if user_name == "" or pass_word == "": if input_pwd: - user_name = input("userName:") pass_word = input("passWord:") authorization = "" else: raise Exception("禁止输入模式下,没有账号或密码") - self.userName = user_name - self.passWord = pass_word + self.user_name = user_name + self.password = pass_word self.authorization = authorization def mkdir(self, dirname, remakedir=False): @@ -560,30 +678,45 @@ class Pan123: return i["FileId"] url = "https://www.123pan.com/a/api/file/upload_request" - dataMk = {"driveId": 0, "etag": "", "fileName": dirname, "parentFileId": self.parentFileId, "size": 0, - "type": 1, "duplicate": 1, "NotReuse": True, "event": "newCreateFolder", "operateType": 1} + data_mk = { + "driveId": 0, + "etag": "", + "fileName": dirname, + "parentFileId": self.parent_file_id, + "size": 0, + "type": 1, + "duplicate": 1, + "NotReuse": True, + "event": "newCreateFolder", + "operateType": 1, + } sign = getSign("/a/api/file/upload_request") - resMk = requests.post(url, headers=self.headerLogined, data=json.dumps(dataMk), params={sign[0]: sign[1]}) + res_mk = requests.post( + url, + headers=self.header_logined, + data=json.dumps(data_mk), + params={sign[0]: sign[1]}, + timeout=10 + ) try: - resJson = resMk.json() - print(resJson) + res_json = res_mk.json() + print(res_json) except json.decoder.JSONDecodeError: print("创建失败") - print(resMk.text) + print(res_mk.text) return - code = resJson['code'] + code_mkdir = res_json["code"] - if code == 0: - print("创建成功: ", resJson["data"]["FileId"]) + if code_mkdir == 0: + print("创建成功: ", res_json["data"]["FileId"]) self.get_dir() - return resJson["data"]["Info"]["FileId"] - else: - print("创建失败") - print(resJson) - return + return res_json["data"]["Info"]["FileId"] + print("创建失败") + print(res_json) + return -if __name__ == '__main__': +if __name__ == "__main__": pan = Pan123(readfile=True, input_pwd=True) pan.show() while True: @@ -602,15 +735,14 @@ if __name__ == '__main__': if pan.list[int(command) - 1]["Type"] == 1: pan.cdById(pan.list[int(command) - 1]["FileId"]) else: - size = pan.list[int(command) - 1]["Size"] if size > 1048576: - size_print = str(round(size / 1048576, 2)) + "M" + size_print_show = str(round(size / 1048576, 2)) + "M" else: - size_print = str(round(size / 1024, 2)) + "K" + size_print_show = str(round(size / 1024, 2)) + "K" # print(pan.list[int(command) - 1]) name = pan.list[int(command) - 1]["FileName"] - print(name + " " + size_print) + print(name + " " + size_print_show) print("press 1 to download now: ", end="") sure = input() if sure == "1":