diff --git a/andriod.py b/andriod.py deleted file mode 100644 index 96b136b..0000000 --- a/andriod.py +++ /dev/null @@ -1,813 +0,0 @@ -import hashlib -import json -import os -import re -import time - -import requests - - - -class Pan123: - def __init__( - self, - readfile=True, - user_name="", - pass_word="", - authorization="", - input_pwd=True, - ): - self.cookies = None - self.recycle_list = None - self.list = None - if readfile: - self.read_ini(user_name, pass_word, input_pwd, authorization) - else: - if user_name == "" or pass_word == "": - print("读取已禁用,用户名或密码为空") - if input_pwd: - user_name = input("请输入用户名:") - pass_word = input("请输入密码:") - else: - raise Exception("用户名或密码为空:读取禁用时,userName和passWord不能为空") - self.user_name = user_name - self.password = pass_word - self.authorization = authorization - self.header_logined = { - "user-agent": "123pan/v2.4.0(Android_7.1.2;Xiaomi)", - "authorization": self.authorization, - "accept-encoding": "gzip", - # "authorization": "", - "content-type": "application/json", - "osversion": "Android_7.1.2", - "loginuuid": "3ec3167d3ba54ba61ea15b823b0a05f4", - "platform": "android", - "devicetype": "M2101K9C", - "x-channel": "1004", - "devicename": "Xiaomi", - # "Content-Length": "65", - "host": "www.123pan.com", - "app-version": "61", - "x-app-version": "2.4.0" - } - self.parent_file_id = 0 # 路径,文件夹的id,0为根目录 - self.parent_file_list = [0] - res_code_getdir = self.get_dir() - if res_code_getdir != 0: - self.login() - self.get_dir() - - def login(self): - data = {"type": 1, "passport": self.user_name, "password": self.password} - # sign = getSign("/b/api/user/sign_in") - login_res = requests.post( - "https://www.123pan.com/b/api/user/sign_in", - headers=self.header_logined, - data=data, - # params={sign[0]: sign[1]}, timeout=10, - # verify=False - ) - - res_sign = login_res.json() - # print("登录结果:", res_sign) - # print("登录结果header:", login_res.headers) - res_code_login = res_sign["code"] - if res_code_login != 200: - print("code = 1 Error:" + str(res_code_login)) - print(res_sign["message"]) - return res_code_login - set_cookies = login_res.headers["Set-Cookie"] - set_cookies_list = {} - - for cookie in set_cookies.split(';'): - if '=' in cookie: - key, value = cookie.strip().split('=', 1) - set_cookies_list[key] = value - else: - set_cookies_list[cookie.strip()] = None - - self.cookies = set_cookies_list - - token = res_sign["data"]["token"] - self.authorization = "Bearer " + token - self.header_logined["authorization"] = self.authorization - # ret['cookie'] = cookie - self.save_file() - return res_code_login - - def save_file(self): - with open("123pan.txt", "w", encoding="utf_8") as f: - save_list = { - "userName": self.user_name, - "passWord": self.password, - "authorization": self.authorization, - } - - f.write(json.dumps(save_list)) - print("账号已保存") - - def get_dir(self): - res_code_getdir = 0 - page = 1 - lists = [] - lenth_now = 0 - total = -1 - while lenth_now < total or total == -1: - base_url = "https://www.123pan.com/b/api/file/list/new" - # print(self.headerLogined) - # sign = getSign("/b/api/file/list/new") - # print(sign) - params = { - # sign[0]: sign[1], - "driveId": 0, - "limit": 100, - "next": 0, - "orderBy": "file_id", - "orderDirection": "desc", - "parentFileId": str(self.parent_file_id), - "trashed": False, - "SearchData": "", - "Page": str(page), - "OnlyLookAbnormalFile": 0, - } - try: - a = requests.get(base_url, headers=self.header_logined, params=params, timeout=10) # , verify=False) - except: - print("连接失败") - return -1 - # print(a.text) - # print(a.headers) - text = a.json() - res_code_getdir = text["code"] - if res_code_getdir != 0: - print(a.text) - print(a.headers) - print("code = 2 Error:" + str(res_code_getdir)) - return res_code_getdir - lists_page = text["data"]["InfoList"] - lists += lists_page - total = text["data"]["Total"] - lenth_now += len(lists_page) - page += 1 - file_num = 0 - for i in lists: - i["FileNum"] = file_num - file_num += 1 - - self.list = lists - return res_code_getdir - - def show(self): - print("--------------------") - for i in self.list: - file_size = i["Size"] - if file_size > 1048576: - download_size_print = str(round(file_size / 1048576, 2)) + "M" - else: - download_size_print = str(round(file_size / 1024, 2)) + "K" - - if i["Type"] == 0: - print( - "\033[33m" + "编号:", - self.list.index(i) + 1, - "\033[0m \t\t" + download_size_print + "\t\t\033[36m", - i["FileName"], - "\033[0m", - ) - elif i["Type"] == 1: - print( - "\033[35m" + "编号:", - self.list.index(i) + 1, - " \t\t\033[36m", - i["FileName"], - "\033[0m", - ) - - print("--------------------") - - # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! - def link(self, file_number, showlink=True): - file_detail = self.list[file_number] - type_detail = file_detail["Type"] - if type_detail == 1: - down_request_url = "https://www.123pan.com/a/api/file/batch_download_info" - down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]} - - else: - down_request_url = "https://www.123pan.com/a/api/file/download_info" - down_request_data = { - "driveId": 0, - "etag": file_detail["Etag"], - "fileId": file_detail["FileId"], - "s3keyFlag": file_detail["S3KeyFlag"], - "type": file_detail["Type"], - "fileName": file_detail["FileName"], - "size": file_detail["Size"], - } - # print(down_request_data) - - # sign = getSign("/a/api/file/download_info") - - link_res = requests.post( - down_request_url, - headers=self.header_logined, - # params={sign[0]: sign[1]}, - data=down_request_data, - timeout=10 - ) - # print(linkRes.text) - res_code_download = link_res.json()["code"] - if res_code_download != 0: - print("code = 3 Error:" + str(res_code_download)) - # print(linkRes.json()) - return res_code_download - down_load_url = link_res.json()["data"]["DownloadUrl"] - next_to_get = requests.get(down_load_url, timeout=10, allow_redirects=False).text - url_pattern = re.compile(r"href='(https?://[^']+)'") - redirect_url = url_pattern.findall(next_to_get)[0] - if showlink: - print(redirect_url) - - return redirect_url - - def download(self, file_number): - file_detail = self.list[file_number] - down_load_url = self.link(file_number, showlink=False) - file_name = file_detail["FileName"] # 文件名 - if os.path.exists(file_name): - print("文件 " + file_name + " 已存在,是否要覆盖?") - sure_download = input("输入1覆盖,2取消:") - if sure_download != "1": - return - down = requests.get(down_load_url, stream=True, timeout=10) - - file_size = int(down.headers["Content-Length"]) # 文件大小 - content_size = int(file_size) # 文件总大小 - data_count = 0 # 当前已传输的大小 - if file_size > 1048576: - size_print_download = str(round(file_size / 1048576, 2)) + "M" - else: - size_print_download = str(round(file_size / 1024, 2)) + "K" - print(file_name + " " + size_print_download) - time1 = time.time() - time_temp = time1 - data_count_temp = 0 - with open(file_name, "wb") as f: - for i in down.iter_content(1024): - f.write(i) - done_block = int((data_count / content_size) * 50) - data_count = data_count + len(i) - # 实时进度条进度 - now_jd = (data_count / content_size) * 100 - # %% 表示% - # 测速 - time1 = time.time() - pass_time = time1 - time_temp - if pass_time > 1: - time_temp = time1 - pass_data = int(data_count) - int(data_count_temp) - data_count_temp = data_count - speed = pass_data / int(pass_time) - speed_m = speed / 1048576 - if speed_m > 1: - speed_print = str(round(speed_m, 2)) + "M/S" - else: - speed_print = str(round(speed_m * 1024, 2)) + "K/S" - print( - "\r [%s%s] %d%% %s" - % ( - done_block * "█", - " " * (50 - 1 - done_block), - now_jd, - speed_print, - ), - end="", - ) - elif data_count == content_size: - print("\r [%s%s] %d%% %s" % (50 * "█", "", 100, ""), end="") - print("\nok") - - def recycle(self): - recycle_id = 0 - url = ( - "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0" - "&orderBy=fileId&orderDirection=desc&parentFileId=" - + str(recycle_id) - + "&trashed=true&&Page=1" - ) - recycle_res = requests.get(url, headers=self.header_logined, timeout=10) - json_recycle = recycle_res.json() - recycle_list = json_recycle["data"]["InfoList"] - self.recycle_list = recycle_list - - # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! - def delete_file(self, file, by_num=True, operation=True): - # operation = 'true' 删除 , operation = 'false' 恢复 - if by_num: - print(file) - if not str(file).isdigit(): - print("请输入数字") - return -1 - if 0 <= file < len(self.list): - file_detail = self.list[file] - else: - print("不在合理范围内") - return - else: - if file in self.list: - file_detail = file - else: - print("文件不存在") - return - data_delete = { - "driveId": 0, - "fileTrashInfoList": file_detail, - "operation": operation, - } - delete_res = requests.post( - "https://www.123pan.com/a/api/file/trash", - data=json.dumps(data_delete), - headers=self.header_logined, - timeout=10 - ) - dele_json = delete_res.json() - print(dele_json) - message = dele_json["message"] - print(message) - - def share(self): - file_id_list = "" - share_name_list = [] - add = "1" - while str(add) == "1": - share_num = input("分享文件的编号:") - num_test2 = share_num.isdigit() - if num_test2: - share_num = int(share_num) - if 0 < share_num < len(self.list) + 1: - share_id = self.list[int(share_num) - 1]["FileId"] - share_name = self.list[int(share_num) - 1]["FileName"] - share_name_list.append(share_name) - print(share_name_list) - file_id_list = file_id_list + str(share_id) + "," - add = input("输入1添加文件,0发起分享,其他取消") - else: - print("请输入数字,,") - add = "1" - if str(add) == "0": - share_pwd = input("提取码,不设留空:") - file_id_list = file_id_list.strip(",") - data = { - "driveId": 0, - "expiration": "2099-12-12T08:00:00+08:00", - "fileIdList": file_id_list, - "shareName": "My Share", - "sharePwd": share_pwd, - "event": "shareCreate" - } - share_res = requests.post( - "https://www.123pan.com/a/api/share/create", - headers=self.header_logined, - data=json.dumps(data), - timeout=10 - ) - share_res_json = share_res.json() - if share_res_json["code"] != 0: - print(share_res_json["message"]) - print("分享失败") - return - message = share_res_json["message"] - print(message) - share_key = share_res_json["data"]["ShareKey"] - share_url = "https://www.123pan.com/s/" + share_key - print("分享链接:\n" + share_url + "提取码:" + share_pwd) - else: - print("退出分享") - - def up_load(self, file_path): - file_path = file_path.replace('"', "") - file_path = file_path.replace("\\", "/") - file_name = file_path.split("/")[-1] - print("文件名:", file_name) - if not os.path.exists(file_path): - print("文件不存在,请检查路径是否正确") - return - if os.path.isdir(file_path): - print("暂不支持文件夹上传") - return - fsize = os.path.getsize(file_path) - with open(file_path, "rb") as f: - md5 = hashlib.md5() - while True: - data = f.read(64 * 1024) - if not data: - break - md5.update(data) - readable_hash = md5.hexdigest() - - list_up_request = { - "driveId": 0, - "etag": readable_hash, - "fileName": file_name, - "parentFileId": self.parent_file_id, - "size": fsize, - "type": 0, - "duplicate": 0, - } - - # sign = getSign("/b/api/file/upload_request") - up_res = requests.post( - "https://www.123pan.com/b/api/file/upload_request", - headers=self.header_logined, - # params={sign[0]: sign[1]}, - data=list_up_request, - timeout=10 - ) - up_res_json = up_res.json() - res_code_up = up_res_json["code"] - if res_code_up == 5060: - sure_upload = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:") - if sure_upload == "1": - list_up_request["duplicate"] = 1 - - elif sure_upload == "2": - list_up_request["duplicate"] = 2 - else: - print("取消上传") - return - # sign = getSign("/b/api/file/upload_request") - up_res = requests.post( - "https://www.123pan.com/b/api/file/upload_request", - headers=self.header_logined, - # params={sign[0]: sign[1]}, - data=json.dumps(list_up_request), - timeout=10 - ) - up_res_json = up_res.json() - res_code_up = up_res_json["code"] - if res_code_up == 0: - # print(upResJson) - # print("上传请求成功") - reuse = up_res_json["data"]["Reuse"] - if reuse: - print("上传成功,文件已MD5复用") - return - else: - print(up_res_json) - print("上传请求失败") - return - - bucket = up_res_json["data"]["Bucket"] - storage_node = up_res_json["data"]["StorageNode"] - upload_key = up_res_json["data"]["Key"] - upload_id = up_res_json["data"]["UploadId"] - up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到 - print("上传文件的fileId:", up_file_id) - - # 获取已将上传的分块 - start_data = { - "bucket": bucket, - "key": upload_key, - "uploadId": upload_id, - "storageNode": storage_node, - } - start_res = requests.post( - "https://www.123pan.com/b/api/file/s3_list_upload_parts", - headers=self.header_logined, - data=json.dumps(start_data), - timeout=10 - ) - start_res_json = start_res.json() - res_code_up = start_res_json["code"] - if res_code_up == 0: - # print(startResJson) - pass - else: - print(start_data) - print(start_res_json) - - print("获取传输列表失败") - return - - # 分块,每一块取一次链接,依次上传 - block_size = 5242880 - with open(file_path, "rb") as f: - part_number_start = 1 - put_size = 0 - while True: - data = f.read(block_size) - - precent = round(put_size / fsize, 2) - print("\r已上传:" + str(precent * 100) + "%", end="") - put_size = put_size + len(data) - - if not data: - break - get_link_data = { - "bucket": bucket, - "key": upload_key, - "partNumberEnd": part_number_start + 1, - "partNumberStart": part_number_start, - "uploadId": upload_id, - "StorageNode": storage_node, - } - - get_link_url = ( - "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch" - ) - get_link_res = requests.post( - get_link_url, - headers=self.header_logined, - data=json.dumps(get_link_data), - timeout=10 - ) - get_link_res_json = get_link_res.json() - res_code_up = get_link_res_json["code"] - if res_code_up == 0: - # print("获取链接成功") - pass - else: - print("获取链接失败") - # print(getLinkResJson) - return - # print(getLinkResJson) - upload_url = get_link_res_json["data"]["presignedUrls"][ - str(part_number_start) - ] - # print("上传链接",uploadUrl) - requests.put(upload_url, data=data, timeout=10) - # print("put") - - part_number_start = part_number_start + 1 - - print("\n处理中") - # 完成标志 - # 1.获取已上传的块 - uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts" - uploaded_comp_data = { - "bucket": bucket, - "key": upload_key, - "uploadId": upload_id, - "storageNode": storage_node, - } - # print(uploadedCompData) - requests.post( - uploaded_list_url, - headers=self.header_logined, - data=json.dumps(uploaded_comp_data), - timeout=10 - ) - compmultipart_up_url = ( - "https://www.123pan.com/b/api/file/s3_complete_multipart_upload" - ) - requests.post( - compmultipart_up_url, - headers=self.header_logined, - data=json.dumps(uploaded_comp_data), - timeout=10 - ) - - # 3.报告完成上传,关闭upload session - if fsize > 64 * 1024 * 1024: - time.sleep(3) - close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete" - close_up_session_data = {"fileId": up_file_id} - # print(closeUpSessionData) - close_up_session_res = requests.post( - close_up_session_url, - headers=self.header_logined, - data=json.dumps(close_up_session_data), - timeout=10 - ) - close_res_json = close_up_session_res.json() - # print(closeResJson) - res_code_up = close_res_json["code"] - if res_code_up == 0: - print("上传成功") - else: - print("上传失败") - print(close_res_json) - return - - # dirId 就是 fileNumber,从0开始,0为第一个文件,传入时需要减一 !!!(好像文件夹都排在前面) - def cd(self, dir_num): - if not dir_num.isdigit(): - if dir_num == "..": - if len(self.parent_file_list) > 1: - self.parent_file_list.pop() - self.parent_file_id = self.parent_file_list[-1] - self.get_dir() - self.show() - else: - print("已经是根目录") - return - if dir_num == "/": - self.parent_file_id = 0 - self.parent_file_list = [0] - self.get_dir() - self.show() - return - print("输入错误") - return - dir_num = int(dir_num) - 1 - if dir_num >= (len(self.list) - 1) or dir_num < 0: - print("输入错误") - return - if self.list[dir_num]["Type"] != 1: - print("不是文件夹") - return - self.parent_file_id = self.list[dir_num]["FileId"] - self.parent_file_list.append(self.parent_file_id) - self.get_dir() - self.show() - - def cdById(self, file_id): - self.parent_file_id = file_id - self.parent_file_list.append(self.parent_file_id) - self.get_dir() - self.get_dir() - self.show() - - def read_ini( - self, - user_name, - pass_word, - input_pwd, - authorization="", - ): - try: - with open("123pan.txt", "r", encoding="utf-8") as f: - text = f.read() - text = json.loads(text) - user_name = text["userName"] - pass_word = text["passWord"] - authorization = text["authorization"] - - except: # FileNotFoundError or json.decoder.JSONDecodeError: - print("获取配置失败,重新输入") - - if user_name == "" or pass_word == "": - if input_pwd: - user_name = input("userName:") - pass_word = input("passWord:") - authorization = "" - else: - raise Exception("禁止输入模式下,没有账号或密码") - - self.user_name = user_name - self.password = pass_word - self.authorization = authorization - - def mkdir(self, dirname, remakedir=False): - if not remakedir: - for i in self.list: - if i["FileName"] == dirname: - print("文件夹已存在") - return i["FileId"] - - url = "https://www.123pan.com/a/api/file/upload_request" - data_mk = { - "driveId": 0, - "etag": "", - "fileName": dirname, - "parentFileId": self.parent_file_id, - "size": 0, - "type": 1, - "duplicate": 1, - "NotReuse": True, - "event": "newCreateFolder", - "operateType": 1, - } - # sign = getSign("/a/api/file/upload_request") - res_mk = requests.post( - url, - headers=self.header_logined, - data=json.dumps(data_mk), - # params={sign[0]: sign[1]}, - timeout=10 - ) - try: - res_json = res_mk.json() - # print(res_json) - except json.decoder.JSONDecodeError: - print("创建失败") - print(res_mk.text) - return - code_mkdir = res_json["code"] - - if code_mkdir == 0: - print("创建成功: ", res_json["data"]["FileId"]) - self.get_dir() - return res_json["data"]["Info"]["FileId"] - print(res_json) - print("创建失败") - return - - -if __name__ == "__main__": - pan = Pan123(readfile=True, input_pwd=True) - pan.show() - while True: - command = input("\033[91m >\033[0m") - if command == "ls": - pan.show() - if command == "re": - code = pan.get_dir() - if code == 0: - print("刷新目录成功") - pan.show() - if command.isdigit(): - if int(command) > len(pan.list) or int(command) < 1: - print("输入错误") - continue - if pan.list[int(command) - 1]["Type"] == 1: - pan.cdById(pan.list[int(command) - 1]["FileId"]) - else: - size = pan.list[int(command) - 1]["Size"] - if size > 1048576: - size_print_show = str(round(size / 1048576, 2)) + "M" - else: - size_print_show = str(round(size / 1024, 2)) + "K" - # print(pan.list[int(command) - 1]) - name = pan.list[int(command) - 1]["FileName"] - print(name + " " + size_print_show) - print("press 1 to download now: ", end="") - sure = input() - if sure == "1": - pan.download(int(command) - 1) - elif command[0:9] == "download ": - if command[9:].isdigit(): - if int(command[9:]) > len(pan.list) or int(command[9:]) < 1: - print("输入错误") - continue - pan.download(int(command[9:]) - 1) - else: - print("输入错误") - elif command == "exit": - break - elif command == "log": - pan.login() - pan.get_dir() - pan.show() - - elif command[0:5] == "link ": - if command[5:].isdigit(): - if int(command[5:]) > len(pan.list) or int(command[5:]) < 1: - print("输入错误") - continue - pan.link(int(command[5:]) - 1) - else: - print("输入错误") - elif command == "upload": - filepath = input("请输入文件路径:") - pan.up_load(filepath) - pan.get_dir() - pan.show() - elif command == "share": - pan.share() - elif command[0:6] == "delete": - if command == "delete": - print("请输入要删除的文件编号:", end="") - fileNumber = input() - else: - if command[6] == " ": - fileNumber = command[7:] - else: - print("输入错误") - continue - if fileNumber == "": - print("请输入要删除的文件编号:", end="") - fileNumber = input() - else: - fileNumber = fileNumber[0:] - if fileNumber.isdigit(): - if int(fileNumber) > len(pan.list) or int(fileNumber) < 1: - print("输入错误") - continue - pan.delete_file(int(fileNumber) - 1) - pan.get_dir() - pan.show() - else: - print("输入错误") - - elif command[:3] == "cd ": - path = command[3:] - pan.cd(path) - elif command[0:5] == "mkdir": - if command == "mkdir": - newPath = input("请输入目录名:") - else: - newPath = command[6:] - if newPath == "": - newPath = input("请输入目录名:") - else: - newPath = newPath[0:] - print(pan.mkdir(newPath)) - pan.get_dir() - pan.show() - - elif command == "reload": - pan.read_ini("", "", True) - print("读取成功") - pan.get_dir() - pan.show()