diff --git a/android.py b/android.py new file mode 100644 index 0000000..96b136b --- /dev/null +++ b/android.py @@ -0,0 +1,813 @@ +import hashlib +import json +import os +import re +import time + +import requests + + + +class Pan123: + def __init__( + self, + readfile=True, + user_name="", + pass_word="", + authorization="", + input_pwd=True, + ): + self.cookies = None + self.recycle_list = None + self.list = None + if readfile: + self.read_ini(user_name, pass_word, input_pwd, authorization) + else: + if user_name == "" or pass_word == "": + print("读取已禁用,用户名或密码为空") + if input_pwd: + user_name = input("请输入用户名:") + pass_word = input("请输入密码:") + else: + raise Exception("用户名或密码为空:读取禁用时,userName和passWord不能为空") + self.user_name = user_name + self.password = pass_word + self.authorization = authorization + self.header_logined = { + "user-agent": "123pan/v2.4.0(Android_7.1.2;Xiaomi)", + "authorization": self.authorization, + "accept-encoding": "gzip", + # "authorization": "", + "content-type": "application/json", + "osversion": "Android_7.1.2", + "loginuuid": "3ec3167d3ba54ba61ea15b823b0a05f4", + "platform": "android", + "devicetype": "M2101K9C", + "x-channel": "1004", + "devicename": "Xiaomi", + # "Content-Length": "65", + "host": "www.123pan.com", + "app-version": "61", + "x-app-version": "2.4.0" + } + self.parent_file_id = 0 # 路径,文件夹的id,0为根目录 + self.parent_file_list = [0] + res_code_getdir = self.get_dir() + if res_code_getdir != 0: + self.login() + self.get_dir() + + def login(self): + data = {"type": 1, "passport": self.user_name, "password": self.password} + # sign = getSign("/b/api/user/sign_in") + login_res = requests.post( + "https://www.123pan.com/b/api/user/sign_in", + headers=self.header_logined, + data=data, + # params={sign[0]: sign[1]}, timeout=10, + # verify=False + ) + + res_sign = login_res.json() + # print("登录结果:", res_sign) + # print("登录结果header:", login_res.headers) + res_code_login = res_sign["code"] + if res_code_login != 200: + print("code = 1 Error:" + str(res_code_login)) + print(res_sign["message"]) + return res_code_login + set_cookies = login_res.headers["Set-Cookie"] + set_cookies_list = {} + + for cookie in set_cookies.split(';'): + if '=' in cookie: + key, value = cookie.strip().split('=', 1) + set_cookies_list[key] = value + else: + set_cookies_list[cookie.strip()] = None + + self.cookies = set_cookies_list + + token = res_sign["data"]["token"] + self.authorization = "Bearer " + token + self.header_logined["authorization"] = self.authorization + # ret['cookie'] = cookie + self.save_file() + return res_code_login + + def save_file(self): + with open("123pan.txt", "w", encoding="utf_8") as f: + save_list = { + "userName": self.user_name, + "passWord": self.password, + "authorization": self.authorization, + } + + f.write(json.dumps(save_list)) + print("账号已保存") + + def get_dir(self): + res_code_getdir = 0 + page = 1 + lists = [] + lenth_now = 0 + total = -1 + while lenth_now < total or total == -1: + base_url = "https://www.123pan.com/b/api/file/list/new" + # print(self.headerLogined) + # sign = getSign("/b/api/file/list/new") + # print(sign) + params = { + # sign[0]: sign[1], + "driveId": 0, + "limit": 100, + "next": 0, + "orderBy": "file_id", + "orderDirection": "desc", + "parentFileId": str(self.parent_file_id), + "trashed": False, + "SearchData": "", + "Page": str(page), + "OnlyLookAbnormalFile": 0, + } + try: + a = requests.get(base_url, headers=self.header_logined, params=params, timeout=10) # , verify=False) + except: + print("连接失败") + return -1 + # print(a.text) + # print(a.headers) + text = a.json() + res_code_getdir = text["code"] + if res_code_getdir != 0: + print(a.text) + print(a.headers) + print("code = 2 Error:" + str(res_code_getdir)) + return res_code_getdir + lists_page = text["data"]["InfoList"] + lists += lists_page + total = text["data"]["Total"] + lenth_now += len(lists_page) + page += 1 + file_num = 0 + for i in lists: + i["FileNum"] = file_num + file_num += 1 + + self.list = lists + return res_code_getdir + + def show(self): + print("--------------------") + for i in self.list: + file_size = i["Size"] + if file_size > 1048576: + download_size_print = str(round(file_size / 1048576, 2)) + "M" + else: + download_size_print = str(round(file_size / 1024, 2)) + "K" + + if i["Type"] == 0: + print( + "\033[33m" + "编号:", + self.list.index(i) + 1, + "\033[0m \t\t" + download_size_print + "\t\t\033[36m", + i["FileName"], + "\033[0m", + ) + elif i["Type"] == 1: + print( + "\033[35m" + "编号:", + self.list.index(i) + 1, + " \t\t\033[36m", + i["FileName"], + "\033[0m", + ) + + print("--------------------") + + # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! + def link(self, file_number, showlink=True): + file_detail = self.list[file_number] + type_detail = file_detail["Type"] + if type_detail == 1: + down_request_url = "https://www.123pan.com/a/api/file/batch_download_info" + down_request_data = {"fileIdList": [{"fileId": int(file_detail["FileId"])}]} + + else: + down_request_url = "https://www.123pan.com/a/api/file/download_info" + down_request_data = { + "driveId": 0, + "etag": file_detail["Etag"], + "fileId": file_detail["FileId"], + "s3keyFlag": file_detail["S3KeyFlag"], + "type": file_detail["Type"], + "fileName": file_detail["FileName"], + "size": file_detail["Size"], + } + # print(down_request_data) + + # sign = getSign("/a/api/file/download_info") + + link_res = requests.post( + down_request_url, + headers=self.header_logined, + # params={sign[0]: sign[1]}, + data=down_request_data, + timeout=10 + ) + # print(linkRes.text) + res_code_download = link_res.json()["code"] + if res_code_download != 0: + print("code = 3 Error:" + str(res_code_download)) + # print(linkRes.json()) + return res_code_download + down_load_url = link_res.json()["data"]["DownloadUrl"] + next_to_get = requests.get(down_load_url, timeout=10, allow_redirects=False).text + url_pattern = re.compile(r"href='(https?://[^']+)'") + redirect_url = url_pattern.findall(next_to_get)[0] + if showlink: + print(redirect_url) + + return redirect_url + + def download(self, file_number): + file_detail = self.list[file_number] + down_load_url = self.link(file_number, showlink=False) + file_name = file_detail["FileName"] # 文件名 + if os.path.exists(file_name): + print("文件 " + file_name + " 已存在,是否要覆盖?") + sure_download = input("输入1覆盖,2取消:") + if sure_download != "1": + return + down = requests.get(down_load_url, stream=True, timeout=10) + + file_size = int(down.headers["Content-Length"]) # 文件大小 + content_size = int(file_size) # 文件总大小 + data_count = 0 # 当前已传输的大小 + if file_size > 1048576: + size_print_download = str(round(file_size / 1048576, 2)) + "M" + else: + size_print_download = str(round(file_size / 1024, 2)) + "K" + print(file_name + " " + size_print_download) + time1 = time.time() + time_temp = time1 + data_count_temp = 0 + with open(file_name, "wb") as f: + for i in down.iter_content(1024): + f.write(i) + done_block = int((data_count / content_size) * 50) + data_count = data_count + len(i) + # 实时进度条进度 + now_jd = (data_count / content_size) * 100 + # %% 表示% + # 测速 + time1 = time.time() + pass_time = time1 - time_temp + if pass_time > 1: + time_temp = time1 + pass_data = int(data_count) - int(data_count_temp) + data_count_temp = data_count + speed = pass_data / int(pass_time) + speed_m = speed / 1048576 + if speed_m > 1: + speed_print = str(round(speed_m, 2)) + "M/S" + else: + speed_print = str(round(speed_m * 1024, 2)) + "K/S" + print( + "\r [%s%s] %d%% %s" + % ( + done_block * "█", + " " * (50 - 1 - done_block), + now_jd, + speed_print, + ), + end="", + ) + elif data_count == content_size: + print("\r [%s%s] %d%% %s" % (50 * "█", "", 100, ""), end="") + print("\nok") + + def recycle(self): + recycle_id = 0 + url = ( + "https://www.123pan.com/a/api/file/list/new?driveId=0&limit=100&next=0" + "&orderBy=fileId&orderDirection=desc&parentFileId=" + + str(recycle_id) + + "&trashed=true&&Page=1" + ) + recycle_res = requests.get(url, headers=self.header_logined, timeout=10) + json_recycle = recycle_res.json() + recycle_list = json_recycle["data"]["InfoList"] + self.recycle_list = recycle_list + + # fileNumber 从0开始,0为第一个文件,传入时需要减一 !!! + def delete_file(self, file, by_num=True, operation=True): + # operation = 'true' 删除 , operation = 'false' 恢复 + if by_num: + print(file) + if not str(file).isdigit(): + print("请输入数字") + return -1 + if 0 <= file < len(self.list): + file_detail = self.list[file] + else: + print("不在合理范围内") + return + else: + if file in self.list: + file_detail = file + else: + print("文件不存在") + return + data_delete = { + "driveId": 0, + "fileTrashInfoList": file_detail, + "operation": operation, + } + delete_res = requests.post( + "https://www.123pan.com/a/api/file/trash", + data=json.dumps(data_delete), + headers=self.header_logined, + timeout=10 + ) + dele_json = delete_res.json() + print(dele_json) + message = dele_json["message"] + print(message) + + def share(self): + file_id_list = "" + share_name_list = [] + add = "1" + while str(add) == "1": + share_num = input("分享文件的编号:") + num_test2 = share_num.isdigit() + if num_test2: + share_num = int(share_num) + if 0 < share_num < len(self.list) + 1: + share_id = self.list[int(share_num) - 1]["FileId"] + share_name = self.list[int(share_num) - 1]["FileName"] + share_name_list.append(share_name) + print(share_name_list) + file_id_list = file_id_list + str(share_id) + "," + add = input("输入1添加文件,0发起分享,其他取消") + else: + print("请输入数字,,") + add = "1" + if str(add) == "0": + share_pwd = input("提取码,不设留空:") + file_id_list = file_id_list.strip(",") + data = { + "driveId": 0, + "expiration": "2099-12-12T08:00:00+08:00", + "fileIdList": file_id_list, + "shareName": "My Share", + "sharePwd": share_pwd, + "event": "shareCreate" + } + share_res = requests.post( + "https://www.123pan.com/a/api/share/create", + headers=self.header_logined, + data=json.dumps(data), + timeout=10 + ) + share_res_json = share_res.json() + if share_res_json["code"] != 0: + print(share_res_json["message"]) + print("分享失败") + return + message = share_res_json["message"] + print(message) + share_key = share_res_json["data"]["ShareKey"] + share_url = "https://www.123pan.com/s/" + share_key + print("分享链接:\n" + share_url + "提取码:" + share_pwd) + else: + print("退出分享") + + def up_load(self, file_path): + file_path = file_path.replace('"', "") + file_path = file_path.replace("\\", "/") + file_name = file_path.split("/")[-1] + print("文件名:", file_name) + if not os.path.exists(file_path): + print("文件不存在,请检查路径是否正确") + return + if os.path.isdir(file_path): + print("暂不支持文件夹上传") + return + fsize = os.path.getsize(file_path) + with open(file_path, "rb") as f: + md5 = hashlib.md5() + while True: + data = f.read(64 * 1024) + if not data: + break + md5.update(data) + readable_hash = md5.hexdigest() + + list_up_request = { + "driveId": 0, + "etag": readable_hash, + "fileName": file_name, + "parentFileId": self.parent_file_id, + "size": fsize, + "type": 0, + "duplicate": 0, + } + + # sign = getSign("/b/api/file/upload_request") + up_res = requests.post( + "https://www.123pan.com/b/api/file/upload_request", + headers=self.header_logined, + # params={sign[0]: sign[1]}, + data=list_up_request, + timeout=10 + ) + up_res_json = up_res.json() + res_code_up = up_res_json["code"] + if res_code_up == 5060: + sure_upload = input("检测到1个同名文件,输入1覆盖,2保留两者,0取消:") + if sure_upload == "1": + list_up_request["duplicate"] = 1 + + elif sure_upload == "2": + list_up_request["duplicate"] = 2 + else: + print("取消上传") + return + # sign = getSign("/b/api/file/upload_request") + up_res = requests.post( + "https://www.123pan.com/b/api/file/upload_request", + headers=self.header_logined, + # params={sign[0]: sign[1]}, + data=json.dumps(list_up_request), + timeout=10 + ) + up_res_json = up_res.json() + res_code_up = up_res_json["code"] + if res_code_up == 0: + # print(upResJson) + # print("上传请求成功") + reuse = up_res_json["data"]["Reuse"] + if reuse: + print("上传成功,文件已MD5复用") + return + else: + print(up_res_json) + print("上传请求失败") + return + + bucket = up_res_json["data"]["Bucket"] + storage_node = up_res_json["data"]["StorageNode"] + upload_key = up_res_json["data"]["Key"] + upload_id = up_res_json["data"]["UploadId"] + up_file_id = up_res_json["data"]["FileId"] # 上传文件的fileId,完成上传后需要用到 + print("上传文件的fileId:", up_file_id) + + # 获取已将上传的分块 + start_data = { + "bucket": bucket, + "key": upload_key, + "uploadId": upload_id, + "storageNode": storage_node, + } + start_res = requests.post( + "https://www.123pan.com/b/api/file/s3_list_upload_parts", + headers=self.header_logined, + data=json.dumps(start_data), + timeout=10 + ) + start_res_json = start_res.json() + res_code_up = start_res_json["code"] + if res_code_up == 0: + # print(startResJson) + pass + else: + print(start_data) + print(start_res_json) + + print("获取传输列表失败") + return + + # 分块,每一块取一次链接,依次上传 + block_size = 5242880 + with open(file_path, "rb") as f: + part_number_start = 1 + put_size = 0 + while True: + data = f.read(block_size) + + precent = round(put_size / fsize, 2) + print("\r已上传:" + str(precent * 100) + "%", end="") + put_size = put_size + len(data) + + if not data: + break + get_link_data = { + "bucket": bucket, + "key": upload_key, + "partNumberEnd": part_number_start + 1, + "partNumberStart": part_number_start, + "uploadId": upload_id, + "StorageNode": storage_node, + } + + get_link_url = ( + "https://www.123pan.com/b/api/file/s3_repare_upload_parts_batch" + ) + get_link_res = requests.post( + get_link_url, + headers=self.header_logined, + data=json.dumps(get_link_data), + timeout=10 + ) + get_link_res_json = get_link_res.json() + res_code_up = get_link_res_json["code"] + if res_code_up == 0: + # print("获取链接成功") + pass + else: + print("获取链接失败") + # print(getLinkResJson) + return + # print(getLinkResJson) + upload_url = get_link_res_json["data"]["presignedUrls"][ + str(part_number_start) + ] + # print("上传链接",uploadUrl) + requests.put(upload_url, data=data, timeout=10) + # print("put") + + part_number_start = part_number_start + 1 + + print("\n处理中") + # 完成标志 + # 1.获取已上传的块 + uploaded_list_url = "https://www.123pan.com/b/api/file/s3_list_upload_parts" + uploaded_comp_data = { + "bucket": bucket, + "key": upload_key, + "uploadId": upload_id, + "storageNode": storage_node, + } + # print(uploadedCompData) + requests.post( + uploaded_list_url, + headers=self.header_logined, + data=json.dumps(uploaded_comp_data), + timeout=10 + ) + compmultipart_up_url = ( + "https://www.123pan.com/b/api/file/s3_complete_multipart_upload" + ) + requests.post( + compmultipart_up_url, + headers=self.header_logined, + data=json.dumps(uploaded_comp_data), + timeout=10 + ) + + # 3.报告完成上传,关闭upload session + if fsize > 64 * 1024 * 1024: + time.sleep(3) + close_up_session_url = "https://www.123pan.com/b/api/file/upload_complete" + close_up_session_data = {"fileId": up_file_id} + # print(closeUpSessionData) + close_up_session_res = requests.post( + close_up_session_url, + headers=self.header_logined, + data=json.dumps(close_up_session_data), + timeout=10 + ) + close_res_json = close_up_session_res.json() + # print(closeResJson) + res_code_up = close_res_json["code"] + if res_code_up == 0: + print("上传成功") + else: + print("上传失败") + print(close_res_json) + return + + # dirId 就是 fileNumber,从0开始,0为第一个文件,传入时需要减一 !!!(好像文件夹都排在前面) + def cd(self, dir_num): + if not dir_num.isdigit(): + if dir_num == "..": + if len(self.parent_file_list) > 1: + self.parent_file_list.pop() + self.parent_file_id = self.parent_file_list[-1] + self.get_dir() + self.show() + else: + print("已经是根目录") + return + if dir_num == "/": + self.parent_file_id = 0 + self.parent_file_list = [0] + self.get_dir() + self.show() + return + print("输入错误") + return + dir_num = int(dir_num) - 1 + if dir_num >= (len(self.list) - 1) or dir_num < 0: + print("输入错误") + return + if self.list[dir_num]["Type"] != 1: + print("不是文件夹") + return + self.parent_file_id = self.list[dir_num]["FileId"] + self.parent_file_list.append(self.parent_file_id) + self.get_dir() + self.show() + + def cdById(self, file_id): + self.parent_file_id = file_id + self.parent_file_list.append(self.parent_file_id) + self.get_dir() + self.get_dir() + self.show() + + def read_ini( + self, + user_name, + pass_word, + input_pwd, + authorization="", + ): + try: + with open("123pan.txt", "r", encoding="utf-8") as f: + text = f.read() + text = json.loads(text) + user_name = text["userName"] + pass_word = text["passWord"] + authorization = text["authorization"] + + except: # FileNotFoundError or json.decoder.JSONDecodeError: + print("获取配置失败,重新输入") + + if user_name == "" or pass_word == "": + if input_pwd: + user_name = input("userName:") + pass_word = input("passWord:") + authorization = "" + else: + raise Exception("禁止输入模式下,没有账号或密码") + + self.user_name = user_name + self.password = pass_word + self.authorization = authorization + + def mkdir(self, dirname, remakedir=False): + if not remakedir: + for i in self.list: + if i["FileName"] == dirname: + print("文件夹已存在") + return i["FileId"] + + url = "https://www.123pan.com/a/api/file/upload_request" + data_mk = { + "driveId": 0, + "etag": "", + "fileName": dirname, + "parentFileId": self.parent_file_id, + "size": 0, + "type": 1, + "duplicate": 1, + "NotReuse": True, + "event": "newCreateFolder", + "operateType": 1, + } + # sign = getSign("/a/api/file/upload_request") + res_mk = requests.post( + url, + headers=self.header_logined, + data=json.dumps(data_mk), + # params={sign[0]: sign[1]}, + timeout=10 + ) + try: + res_json = res_mk.json() + # print(res_json) + except json.decoder.JSONDecodeError: + print("创建失败") + print(res_mk.text) + return + code_mkdir = res_json["code"] + + if code_mkdir == 0: + print("创建成功: ", res_json["data"]["FileId"]) + self.get_dir() + return res_json["data"]["Info"]["FileId"] + print(res_json) + print("创建失败") + return + + +if __name__ == "__main__": + pan = Pan123(readfile=True, input_pwd=True) + pan.show() + while True: + command = input("\033[91m >\033[0m") + if command == "ls": + pan.show() + if command == "re": + code = pan.get_dir() + if code == 0: + print("刷新目录成功") + pan.show() + if command.isdigit(): + if int(command) > len(pan.list) or int(command) < 1: + print("输入错误") + continue + if pan.list[int(command) - 1]["Type"] == 1: + pan.cdById(pan.list[int(command) - 1]["FileId"]) + else: + size = pan.list[int(command) - 1]["Size"] + if size > 1048576: + size_print_show = str(round(size / 1048576, 2)) + "M" + else: + size_print_show = str(round(size / 1024, 2)) + "K" + # print(pan.list[int(command) - 1]) + name = pan.list[int(command) - 1]["FileName"] + print(name + " " + size_print_show) + print("press 1 to download now: ", end="") + sure = input() + if sure == "1": + pan.download(int(command) - 1) + elif command[0:9] == "download ": + if command[9:].isdigit(): + if int(command[9:]) > len(pan.list) or int(command[9:]) < 1: + print("输入错误") + continue + pan.download(int(command[9:]) - 1) + else: + print("输入错误") + elif command == "exit": + break + elif command == "log": + pan.login() + pan.get_dir() + pan.show() + + elif command[0:5] == "link ": + if command[5:].isdigit(): + if int(command[5:]) > len(pan.list) or int(command[5:]) < 1: + print("输入错误") + continue + pan.link(int(command[5:]) - 1) + else: + print("输入错误") + elif command == "upload": + filepath = input("请输入文件路径:") + pan.up_load(filepath) + pan.get_dir() + pan.show() + elif command == "share": + pan.share() + elif command[0:6] == "delete": + if command == "delete": + print("请输入要删除的文件编号:", end="") + fileNumber = input() + else: + if command[6] == " ": + fileNumber = command[7:] + else: + print("输入错误") + continue + if fileNumber == "": + print("请输入要删除的文件编号:", end="") + fileNumber = input() + else: + fileNumber = fileNumber[0:] + if fileNumber.isdigit(): + if int(fileNumber) > len(pan.list) or int(fileNumber) < 1: + print("输入错误") + continue + pan.delete_file(int(fileNumber) - 1) + pan.get_dir() + pan.show() + else: + print("输入错误") + + elif command[:3] == "cd ": + path = command[3:] + pan.cd(path) + elif command[0:5] == "mkdir": + if command == "mkdir": + newPath = input("请输入目录名:") + else: + newPath = command[6:] + if newPath == "": + newPath = input("请输入目录名:") + else: + newPath = newPath[0:] + print(pan.mkdir(newPath)) + pan.get_dir() + pan.show() + + elif command == "reload": + pan.read_ini("", "", True) + print("读取成功") + pan.get_dir() + pan.show()