From 8797d8f66df9fc9ecddf18392454203f1400523b Mon Sep 17 00:00:00 2001 From: Cp0204 Date: Wed, 10 Apr 2024 22:45:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9B=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=20send()=20=E6=96=B9=E6=B3=95=EF=BC=8C=E4=BF=AE=E6=AD=A3?= =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89=E9=80=9A=E7=9F=A5=20body=20?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E7=AC=94=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sample/notify.py | 408 ++++++++++++++++------------------------------- 1 file changed, 134 insertions(+), 274 deletions(-) diff --git a/sample/notify.py b/sample/notify.py index cbbc5ac5..0fe74000 100644 --- a/sample/notify.py +++ b/sample/notify.py @@ -113,7 +113,6 @@ push_config = { 'WEBHOOK_METHOD': '', # 自定义通知 请求方法 'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type } -notify_function = [] # fmt: on # 首先读取 面板变量 或者 github action 运行变量 @@ -123,19 +122,19 @@ for k in push_config: push_config[k] = v -def bark(title: str, content: str, **kwargs) -> None: +def bark(title: str, content: str) -> None: """ 使用 bark 推送消息。 """ - if not (push_config.get("BARK_PUSH") or kwargs.get("BARK_PUSH")): + if not push_config.get("BARK_PUSH"): print("bark 服务的 BARK_PUSH 未设置!!\n取消推送") return print("bark 服务启动") - BARK_PUSH = kwargs.get("BARK_PUSH", push_config.get("BARK_PUSH")) - if BARK_PUSH.startswith("http"): - url = f"{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}" + + if push_config.get("BARK_PUSH").startswith("http"): + url = f'{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}' else: - url = f"https://api.day.app/{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}" + url = f'https://api.day.app/{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}' bark_params = { "BARK_ARCHIVE": "isArchive", @@ -149,12 +148,11 @@ def bark(title: str, content: str, **kwargs) -> None: for pair in filter( lambda pairs: pairs[0].startswith("BARK_") and pairs[0] != "BARK_PUSH" - and (pairs[1] or kwargs.get(pairs[0])) + and pairs[1] and bark_params.get(pairs[0]), push_config.items(), ): - value = kwargs.get(pair[0], pair[1]) - params += f"{bark_params.get(pair[0])}={value}&" + params += f"{bark_params.get(pair[0])}={pair[1]}&" if params: url = url + "?" + params.rstrip("&") response = requests.get(url).json() @@ -165,40 +163,31 @@ def bark(title: str, content: str, **kwargs) -> None: print("bark 推送失败!") -def console(title: str, content: str, **kwargs) -> None: +def console(title: str, content: str) -> None: """ 使用 控制台 推送消息。 """ print(f"{title}\n\n{content}") -def dingding_bot(title: str, content: str, **kwargs) -> None: +def dingding_bot(title: str, content: str) -> None: """ 使用 钉钉机器人 推送消息。 """ - if not ( - (kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN")) - or (push_config.get("DD_BOT_SECRET") and push_config.get("DD_BOT_TOKEN")) - ): + if not push_config.get("DD_BOT_SECRET") or not push_config.get("DD_BOT_TOKEN"): print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送") return print("钉钉机器人 服务启动") - if kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN"): - DD_BOT_SECRET = kwargs.get("DD_BOT_SECRET") - DD_BOT_TOKEN = kwargs.get("DD_BOT_TOKEN") - else: - DD_BOT_SECRET = push_config.get("DD_BOT_SECRET") - DD_BOT_TOKEN = push_config.get("DD_BOT_TOKEN") timestamp = str(round(time.time() * 1000)) - secret_enc = DD_BOT_SECRET.encode("utf-8") - string_to_sign = "{}\n{}".format(timestamp, DD_BOT_SECRET) + secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8") + string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET")) string_to_sign_enc = string_to_sign.encode("utf-8") hmac_code = hmac.new( secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) - url = f"https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}×tamp={timestamp}&sign={sign}" + url = f'https://oapi.dingtalk.com/robot/send?access_token={push_config.get("DD_BOT_TOKEN")}×tamp={timestamp}&sign={sign}' headers = {"Content-Type": "application/json;charset=utf-8"} data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} response = requests.post( @@ -211,16 +200,16 @@ def dingding_bot(title: str, content: str, **kwargs) -> None: print("钉钉机器人 推送失败!") -def feishu_bot(title: str, content: str, **kwargs) -> None: +def feishu_bot(title: str, content: str) -> None: """ 使用 飞书机器人 推送消息。 """ - if not (kwargs.get("DD_BOT_SECRET") or push_config.get("FSKEY")): + if not push_config.get("FSKEY"): print("飞书 服务的 FSKEY 未设置!!\n取消推送") return print("飞书 服务启动") - FSKEY = kwargs.get("DD_BOT_SECRET", push_config.get("FSKEY")) - url = f"https://open.feishu.cn/open-apis/bot/v2/hook/{FSKEY}" + + url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{push_config.get("FSKEY")}' data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}} response = requests.post(url, data=json.dumps(data)).json() @@ -230,27 +219,16 @@ def feishu_bot(title: str, content: str, **kwargs) -> None: print("飞书 推送失败!错误信息如下:\n", response) -def go_cqhttp(title: str, content: str, **kwargs) -> None: +def go_cqhttp(title: str, content: str) -> None: """ 使用 go_cqhttp 推送消息。 """ - if not ( - (kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ")) - or (push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ")) - ): + if not push_config.get("GOBOT_URL") or not push_config.get("GOBOT_QQ"): print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送") return print("go-cqhttp 服务启动") - if kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ"): - GOBOT_URL = kwargs.get("GOBOT_URL") - GOBOT_QQ = kwargs.get("GOBOT_QQ") - GOBOT_TOKEN = kwargs.get("GOBOT_TOKEN") - else: - GOBOT_URL = push_config.get("GOBOT_URL") - GOBOT_QQ = push_config.get("GOBOT_QQ") - GOBOT_TOKEN = push_config.get("GOBOT_TOKEN") - url = f"{GOBOT_URL}?access_token={GOBOT_TOKEN}&{GOBOT_QQ}&message=标题:{title}\n内容:{content}" + url = f'{push_config.get("GOBOT_URL")}?access_token={push_config.get("GOBOT_TOKEN")}&{push_config.get("GOBOT_QQ")}&message=标题:{title}\n内容:{content}' response = requests.get(url).json() if response["status"] == "ok": @@ -259,31 +237,20 @@ def go_cqhttp(title: str, content: str, **kwargs) -> None: print("go-cqhttp 推送失败!") -def gotify(title: str, content: str, **kwargs) -> None: +def gotify(title: str, content: str) -> None: """ 使用 gotify 推送消息。 """ - if not ( - (kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN")) - or (push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN")) - ): + if not push_config.get("GOTIFY_URL") or not push_config.get("GOTIFY_TOKEN"): print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送") return print("gotify 服务启动") - if kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN"): - GOTIFY_URL = kwargs.get("GOTIFY_URL") - GOTIFY_TOKEN = kwargs.get("GOBOTGOTIFY_TOKEN_QQ") - GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY") - else: - GOTIFY_URL = push_config.get("GOTIFY_URL") - GOTIFY_TOKEN = push_config.get("GOTIFY_TOKEN") - GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY") - url = f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}" + url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}' data = { "title": title, "message": content, - "priority": GOTIFY_PRIORITY, + "priority": push_config.get("GOTIFY_PRIORITY"), } response = requests.post(url, data=data).json() @@ -293,16 +260,16 @@ def gotify(title: str, content: str, **kwargs) -> None: print("gotify 推送失败!") -def iGot(title: str, content: str, **kwargs) -> None: +def iGot(title: str, content: str) -> None: """ 使用 iGot 推送消息。 """ - if not (kwargs.get("IGOT_PUSH_KEY") or push_config.get("IGOT_PUSH_KEY")): + if not push_config.get("IGOT_PUSH_KEY"): print("iGot 服务的 IGOT_PUSH_KEY 未设置!!\n取消推送") return print("iGot 服务启动") - IGOT_PUSH_KEY = kwargs.get("IGOT_PUSH_KEY", push_config.get("IGOT_PUSH_KEY")) - url = f"https://push.hellyw.com/{IGOT_PUSH_KEY}" + + url = f'https://push.hellyw.com/{push_config.get("IGOT_PUSH_KEY")}' data = {"title": title, "content": content} headers = {"Content-Type": "application/x-www-form-urlencoded"} response = requests.post(url, data=data, headers=headers).json() @@ -313,21 +280,20 @@ def iGot(title: str, content: str, **kwargs) -> None: print(f'iGot 推送失败!{response["errMsg"]}') -def serverJ(title: str, content: str, **kwargs) -> None: +def serverJ(title: str, content: str) -> None: """ 通过 serverJ 推送消息。 """ - if not (kwargs.get("PUSH_KEY") or push_config.get("PUSH_KEY")): + if not push_config.get("PUSH_KEY"): print("serverJ 服务的 PUSH_KEY 未设置!!\n取消推送") return print("serverJ 服务启动") - PUSH_KEY = kwargs.get("PUSH_KEY", push_config.get("PUSH_KEY")) data = {"text": title, "desp": content.replace("\n", "\n\n")} - if PUSH_KEY.find("SCT") != -1: - url = f"https://sctapi.ftqq.com/{PUSH_KEY}.send" + if push_config.get("PUSH_KEY").find("SCT") != -1: + url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send' else: - url = f"https://sc.ftqq.com/{PUSH_KEY}.send" + url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send' response = requests.post(url, data=data).json() if response.get("errno") == 0 or response.get("code") == 0: @@ -336,27 +302,23 @@ def serverJ(title: str, content: str, **kwargs) -> None: print(f'serverJ 推送失败!错误码:{response["message"]}') -def pushdeer(title: str, content: str, **kwargs) -> None: +def pushdeer(title: str, content: str) -> None: """ 通过PushDeer 推送消息 """ - if not (kwargs.get("DEER_KEY") or push_config.get("DEER_KEY")): + if not push_config.get("DEER_KEY"): print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送") return print("PushDeer 服务启动") - DEER_KEY = kwargs.get("DEER_KEY", push_config.get("DEER_KEY")) - data = { "text": title, "desp": content, "type": "markdown", - "pushkey": DEER_KEY, + "pushkey": push_config.get("DEER_KEY"), } url = "https://api2.pushdeer.com/message/push" if push_config.get("DEER_URL"): url = push_config.get("DEER_URL") - if kwargs.get("DEER_URL"): - url = kwargs.get("DEER_URL") response = requests.post(url, data=data).json() @@ -366,26 +328,16 @@ def pushdeer(title: str, content: str, **kwargs) -> None: print("PushDeer 推送失败!错误信息:", response) -def chat(title: str, content: str, **kwargs) -> None: +def chat(title: str, content: str) -> None: """ 通过Chat 推送消息 """ - if not ( - (kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN")) - or (push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN")) - ): + if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"): print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送") return print("chat 服务启动") - if kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN"): - CHAT_URL = kwargs.get("CHAT_URL") - CHAT_TOKEN = kwargs.get("CHAT_TOKEN") - else: - CHAT_URL = push_config.get("CHAT_URL") - CHAT_TOKEN = push_config.get("CHAT_TOKEN") - data = "payload=" + json.dumps({"text": title + "\n" + content}) - url = CHAT_URL + CHAT_TOKEN + url = push_config.get("CHAT_URL") + push_config.get("CHAT_TOKEN") response = requests.post(url, data=data) if response.status_code == 200: @@ -394,23 +346,21 @@ def chat(title: str, content: str, **kwargs) -> None: print("Chat 推送失败!错误信息:", response) -def pushplus_bot(title: str, content: str, **kwargs) -> None: +def pushplus_bot(title: str, content: str) -> None: """ 通过 push+ 推送消息。 """ - if not (kwargs.get("PUSH_PLUS_TOKEN") or push_config.get("PUSH_PLUS_TOKEN")): + if not push_config.get("PUSH_PLUS_TOKEN"): print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送") return print("PUSHPLUS 服务启动") - PUSH_PLUS_TOKEN = kwargs.get("PUSH_PLUS_TOKEN", push_config.get("PUSH_PLUS_TOKEN")) - PUSH_PLUS_USER = kwargs.get("PUSH_PLUS_USER", push_config.get("PUSH_PLUS_USER")) url = "http://www.pushplus.plus/send" data = { - "token": PUSH_PLUS_TOKEN, + "token": push_config.get("PUSH_PLUS_TOKEN"), "title": title, "content": content, - "topic": PUSH_PLUS_USER, + "topic": push_config.get("PUSH_PLUS_USER"), } body = json.dumps(data).encode(encoding="utf-8") headers = {"Content-Type": "application/json"} @@ -431,25 +381,16 @@ def pushplus_bot(title: str, content: str, **kwargs) -> None: print("PUSHPLUS 推送失败!") -def qmsg_bot(title: str, content: str, **kwargs) -> None: +def qmsg_bot(title: str, content: str) -> None: """ 使用 qmsg 推送消息。 """ - if not ( - (kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE")) - or (push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE")) - ): + if not push_config.get("QMSG_KEY") or not push_config.get("QMSG_TYPE"): print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送") return print("qmsg 服务启动") - if kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE"): - QMSG_KEY = kwargs.get("QMSG_KEY") - QMSG_TYPE = kwargs.get("QMSG_TYPE") - else: - QMSG_KEY = push_config.get("QMSG_KEY") - QMSG_TYPE = push_config.get("QMSG_TYPE") - url = f"https://qmsg.zendee.cn/{QMSG_TYPE}/{QMSG_KEY}" + url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}' payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")} response = requests.post(url=url, params=payload).json() @@ -459,15 +400,14 @@ def qmsg_bot(title: str, content: str, **kwargs) -> None: print(f'qmsg 推送失败!{response["reason"]}') -def wecom_app(title: str, content: str, **kwargs) -> None: +def wecom_app(title: str, content: str) -> None: """ 通过 企业微信 APP 推送消息。 """ - if not (kwargs.get("QYWX_AM") or push_config.get("QYWX_AM")): + if not push_config.get("QYWX_AM"): print("QYWX_AM 未设置!!\n取消推送") return - QYWX_AM = kwargs.get("QYWX_AM", push_config.get("QYWX_AM")) - QYWX_AM_AY = re.split(",", QYWX_AM) + QYWX_AM_AY = re.split(",", push_config.get("QYWX_AM")) if 4 < len(QYWX_AM_AY) > 5: print("QYWX_AM 设置错误!!\n取消推送") return @@ -557,23 +497,20 @@ class WeCom: return respone["errmsg"] -def wecom_bot(title: str, content: str, **kwargs) -> None: +def wecom_bot(title: str, content: str) -> None: """ 通过 企业微信机器人 推送消息。 """ - if not (kwargs.get("QYWX_KEY") or push_config.get("QYWX_KEY")): + if not push_config.get("QYWX_KEY"): print("企业微信机器人 服务的 QYWX_KEY 未设置!!\n取消推送") return print("企业微信机器人服务启动") - QYWX_KEY = kwargs.get("QYWX_KEY", push_config.get("QYWX_KEY")) origin = "https://qyapi.weixin.qq.com" if push_config.get("QYWX_ORIGIN"): origin = push_config.get("QYWX_ORIGIN") - if kwargs.get("QYWX_ORIGIN"): - origin = kwargs.get("QYWX_ORIGIN") - url = f"{origin}/cgi-bin/webhook/send?key={QYWX_KEY}" + url = f"{origin}/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}" headers = {"Content-Type": "application/json;charset=utf-8"} data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} response = requests.post( @@ -586,53 +523,40 @@ def wecom_bot(title: str, content: str, **kwargs) -> None: print("企业微信机器人推送失败!") -def telegram_bot(title: str, content: str, **kwargs) -> None: +def telegram_bot(title: str, content: str) -> None: """ 使用 telegram 机器人 推送消息。 """ - if not ( - (kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID")) - or (push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID")) - ): - print("tg 服务的 TG_BOT_TOKEN 或者 TG_USER_ID 未设置!!\n取消推送") + if not push_config.get("TG_BOT_TOKEN") or not push_config.get("TG_USER_ID"): + print("tg 服务的 bot_token 或者 user_id 未设置!!\n取消推送") return print("tg 服务启动") - if kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID"): - TG_BOT_TOKEN = kwargs.get("TG_BOT_TOKEN") - TG_USER_ID = kwargs.get("TG_USER_ID") - else: - TG_BOT_TOKEN = push_config.get("TG_BOT_TOKEN") - TG_USER_ID = push_config.get("TG_USER_ID") - if kwargs.get("TG_API_HOST") or push_config.get("TG_API_HOST"): - TG_API_HOST = kwargs.get("TG_API_HOST", push_config.get("TG_API_HOST")) - url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage" + if push_config.get("TG_API_HOST"): + url = f"{push_config.get('TG_API_HOST')}/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage" else: - url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage" + url = ( + f"https://api.telegram.org/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage" + ) headers = {"Content-Type": "application/x-www-form-urlencoded"} payload = { - "chat_id": str(TG_USER_ID), + "chat_id": str(push_config.get("TG_USER_ID")), "text": f"{title}\n\n{content}", "disable_web_page_preview": "true", } proxies = None - if not ( - (kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT")) - or (push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT")) - ): - if kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"): - TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST") - TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT") - else: - TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST") - TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT") - if kwargs.get("TG_PROXY_AUTH") or push_config.get("TG_PROXY_AUTH"): - TG_PROXY_AUTH = kwargs.get( - "TG_PROXY_AUTH", push_config.get("TG_PROXY_AUTH") + if push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"): + if push_config.get("TG_PROXY_AUTH") is not None and "@" not in push_config.get( + "TG_PROXY_HOST" + ): + push_config["TG_PROXY_HOST"] = ( + push_config.get("TG_PROXY_AUTH") + + "@" + + push_config.get("TG_PROXY_HOST") ) - if TG_PROXY_AUTH is not None and "@" not in TG_PROXY_HOST: - TG_PROXY_HOST = TG_PROXY_AUTH + "@" + TG_PROXY_HOST - proxyStr = "http://{}:{}".format(TG_PROXY_HOST, TG_PROXY_PORT) + proxyStr = "http://{}:{}".format( + push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT") + ) proxies = {"http": proxyStr, "https": proxyStr} response = requests.post( url=url, headers=headers, params=payload, proxies=proxies @@ -644,51 +568,33 @@ def telegram_bot(title: str, content: str, **kwargs) -> None: print("tg 推送失败!") -def aibotk(title: str, content: str, **kwargs) -> None: +def aibotk(title: str, content: str) -> None: """ 使用 智能微秘书 推送消息。 """ - if not ( - ( - kwargs.get("AIBOTK_KEY") - and kwargs.get("AIBOTK_TYPE") - and kwargs.get("AIBOTK_NAME") - ) - or ( - push_config.get("AIBOTK_KEY") - and push_config.get("AIBOTK_TYPE") - and push_config.get("AIBOTK_NAME") - ) + if ( + not push_config.get("AIBOTK_KEY") + or not push_config.get("AIBOTK_TYPE") + or not push_config.get("AIBOTK_NAME") ): print( "智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送" ) return print("智能微秘书 服务启动") - if ( - kwargs.get("AIBOTK_KEY") - and kwargs.get("AIBOTK_TYPE") - and kwargs.get("AIBOTK_NAME") - ): - AIBOTK_KEY = kwargs.get("AIBOTK_KEY") - AIBOTK_TYPE = kwargs.get("AIBOTK_TYPE") - AIBOTK_NAME = kwargs.get("AIBOTK_NAME") - else: - AIBOTK_KEY = push_config.get("AIBOTK_KEY") - AIBOTK_TYPE = push_config.get("AIBOTK_TYPE") - AIBOTK_NAME = push_config.get("AIBOTK_NAME") - if AIBOTK_TYPE == "room": + + if push_config.get("AIBOTK_TYPE") == "room": url = "https://api-bot.aibotk.com/openapi/v1/chat/room" data = { - "apiKey": AIBOTK_KEY, - "roomName": AIBOTK_NAME, + "apiKey": push_config.get("AIBOTK_KEY"), + "roomName": push_config.get("AIBOTK_NAME"), "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, } else: url = "https://api-bot.aibotk.com/openapi/v1/chat/contact" data = { - "apiKey": AIBOTK_KEY, - "name": AIBOTK_NAME, + "apiKey": push_config.get("AIBOTK_KEY"), + "name": push_config.get("AIBOTK_NAME"), "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, } body = json.dumps(data).encode(encoding="utf-8") @@ -701,75 +607,50 @@ def aibotk(title: str, content: str, **kwargs) -> None: print(f'智能微秘书 推送失败!{response["error"]}') -def smtp(title: str, content: str, **kwargs) -> None: +def smtp(title: str, content: str) -> None: """ 使用 SMTP 邮件 推送消息。 """ - if not ( - ( - kwargs.get("SMTP_SERVER") - and kwargs.get("SMTP_SSL") - and kwargs.get("SMTP_EMAIL") - and kwargs.get("SMTP_PASSWORD") - and kwargs.get("SMTP_NAME") - ) - or ( - push_config.get("SMTP_SERVER") - and push_config.get("SMTP_SSL") - and push_config.get("SMTP_EMAIL") - and push_config.get("SMTP_PASSWORD") - and push_config.get("SMTP_NAME") - ) + if ( + not push_config.get("SMTP_SERVER") + or not push_config.get("SMTP_SSL") + or not push_config.get("SMTP_EMAIL") + or not push_config.get("SMTP_PASSWORD") + or not push_config.get("SMTP_NAME") ): print( "SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送" ) return print("SMTP 邮件 服务启动") - if ( - kwargs.get("SMTP_SERVER") - and kwargs.get("SMTP_SSL") - and kwargs.get("SMTP_EMAIL") - and kwargs.get("SMTP_PASSWORD") - and kwargs.get("SMTP_NAME") - ): - SMTP_SERVER = kwargs.get("SMTP_SERVER") - SMTP_SSL = kwargs.get("SMTP_SSL") - SMTP_EMAIL = kwargs.get("SMTP_EMAIL") - SMTP_PASSWORD = kwargs.get("SMTP_PASSWORD") - SMTP_NAME = kwargs.get("SMTP_NAME") - else: - SMTP_SERVER = push_config.get("SMTP_SERVER") - SMTP_SSL = push_config.get("SMTP_SSL") - SMTP_EMAIL = push_config.get("SMTP_EMAIL") - SMTP_PASSWORD = push_config.get("SMTP_PASSWORD") - SMTP_NAME = push_config.get("SMTP_NAME") message = MIMEText(content, "plain", "utf-8") message["From"] = formataddr( ( - Header(SMTP_NAME, "utf-8").encode(), - SMTP_EMAIL, + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), ) ) message["To"] = formataddr( ( - Header(SMTP_NAME, "utf-8").encode(), - SMTP_EMAIL, + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), ) ) message["Subject"] = Header(title, "utf-8") try: smtp_server = ( - smtplib.SMTP_SSL(SMTP_SERVER) - if SMTP_SSL == "true" - else smtplib.SMTP(SMTP_SERVER) + smtplib.SMTP_SSL(push_config.get("SMTP_SERVER")) + if push_config.get("SMTP_SSL") == "true" + else smtplib.SMTP(push_config.get("SMTP_SERVER")) + ) + smtp_server.login( + push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD") ) - smtp_server.login(SMTP_EMAIL, SMTP_PASSWORD) smtp_server.sendmail( - SMTP_EMAIL, - SMTP_EMAIL, + push_config.get("SMTP_EMAIL"), + push_config.get("SMTP_EMAIL"), message.as_bytes(), ) smtp_server.close() @@ -778,17 +659,16 @@ def smtp(title: str, content: str, **kwargs) -> None: print(f"SMTP 邮件 推送失败!{e}") -def pushme(title: str, content: str, **kwargs) -> None: +def pushme(title: str, content: str) -> None: """ 使用 PushMe 推送消息。 """ - if not (kwargs.get("PUSHME_KEY") or push_config.get("PUSHME_KEY")): + if not push_config.get("PUSHME_KEY"): print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送") return print("PushMe 服务启动") - PUSHME_KEY = kwargs.get("PUSHME_KEY", push_config.get("PUSHME_KEY")) - url = f"https://push.i-i.me/?push_key={PUSHME_KEY}" + url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}' data = { "title": title, "content": content, @@ -801,45 +681,27 @@ def pushme(title: str, content: str, **kwargs) -> None: print(f"PushMe 推送失败!{response.status_code} {response.text}") -def chronocat(title: str, content: str, **kwargs) -> None: +def chronocat(title: str, content: str) -> None: """ 使用 CHRONOCAT 推送消息。 """ - if not ( - ( - push_config.get("CHRONOCAT_URL") - and push_config.get("CHRONOCAT_QQ") - and push_config.get("CHRONOCAT_TOKEN") - ) - or ( - push_config.get("CHRONOCAT_URL") - and push_config.get("CHRONOCAT_QQ") - and push_config.get("CHRONOCAT_TOKEN") - ) + if ( + not push_config.get("CHRONOCAT_URL") + or not push_config.get("CHRONOCAT_QQ") + or not push_config.get("CHRONOCAT_TOKEN") ): print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送") return + print("CHRONOCAT 服务启动") - if ( - kwargs.get("CHRONOCAT_URL") - and kwargs.get("CHRONOCAT_QQ") - and kwargs.get("CHRONOCAT_TOKEN") - ): - CHRONOCAT_URL = kwargs.get("CHRONOCAT_URL") - CHRONOCAT_QQ = kwargs.get("CHRONOCAT_QQ") - CHRONOCAT_TOKEN = kwargs.get("CHRONOCAT_TOKEN") - else: - CHRONOCAT_URL = push_config.get("CHRONOCAT_URL") - CHRONOCAT_QQ = push_config.get("CHRONOCAT_QQ") - CHRONOCAT_TOKEN = push_config.get("CHRONOCAT_TOKEN") - user_ids = re.findall(r"user_id=(\d+)", CHRONOCAT_QQ) - group_ids = re.findall(r"group_id=(\d+)", CHRONOCAT_QQ) + user_ids = re.findall(r"user_id=(\d+)", push_config.get("CHRONOCAT_QQ")) + group_ids = re.findall(r"group_id=(\d+)", push_config.get("CHRONOCAT_QQ")) - url = f"{CHRONOCAT_URL}/api/message/send" + url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send' headers = { "Content-Type": "application/json", - "Authorization": f"Bearer {CHRONOCAT_TOKEN}", + "Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}', } for chat_type, ids in [(1, user_ids), (2, group_ids)]: @@ -906,10 +768,10 @@ def parse_body(body, content_type, value_format_fn=None): if not body or content_type == "text/plain": return body - parsed = parse_string(input_string, value_format_fn) + parsed = parse_string(body, value_format_fn) if content_type == "application/x-www-form-urlencoded": - data = urlencode(parsed, doseq=True) + data = urllib.parse.urlencode(parsed, doseq=True) return data if content_type == "application/json": @@ -919,18 +781,6 @@ def parse_body(body, content_type, value_format_fn=None): return parsed -def format_notify_content(url, body, title, content): - if "$title" not in url and "$title" not in body: - return {} - - formatted_url = url.replace("$title", urllib.parse.quote_plus(title)).replace( - "$content", urllib.parse.quote_plus(content) - ) - formatted_body = body.replace("$title", title).replace("$content", content) - - return formatted_url, formatted_body - - def custom_notify(title: str, content: str) -> None: """ 通过 自定义通知 推送消息。 @@ -961,7 +811,7 @@ def custom_notify(title: str, content: str) -> None: "$title", urllib.parse.quote_plus(title) ).replace("$content", urllib.parse.quote_plus(content)) response = requests.request( - method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body + method=WEBHOOK_METHOD, url=formatted_url, headers=headers, timeout=15, data=body ) if response.status_code == 200: @@ -981,6 +831,7 @@ def one() -> str: def add_notify_function(): + notify_function = [] if push_config.get("BARK_PUSH"): notify_function.append(bark) if push_config.get("CONSOLE"): @@ -1036,8 +887,19 @@ def add_notify_function(): if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"): notify_function.append(custom_notify) + if not notify_function: + print(f"无推送渠道,请检查通知变量是否正确") + return notify_function + + +def send(title: str, content: str, ignore_default_config: bool = False, **kwargs): + if kwargs: + global push_config + if ignore_default_config: + push_config = kwargs # 清空从环境变量获取的配置 + else: + push_config.update(kwargs) -def send(title: str, content: str, **kwargs) -> None: if not content: print(f"{title} 推送内容为空!") return @@ -1052,11 +914,9 @@ def send(title: str, content: str, **kwargs) -> None: hitokoto = push_config.get("HITOKOTO") content += "\n\n" + one() if hitokoto else "" - add_notify_function() + notify_function = add_notify_function() ts = [ - threading.Thread( - target=mode, args=(title, content), kwargs=kwargs, name=mode.__name__ - ) + threading.Thread(target=mode, args=(title, content), name=mode.__name__) for mode in notify_function ] [t.start() for t in ts]