改进自定义 send() 方法,修正自定义通知 body 解析笔误

This commit is contained in:
Cp0204 2024-04-10 22:45:57 +08:00 committed by GitHub
parent ac6de4911a
commit 8797d8f66d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -113,7 +113,6 @@ push_config = {
'WEBHOOK_METHOD': '', # 自定义通知 请求方法
'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type
}
notify_function = []
# fmt: on
# 首先读取 面板变量 或者 github action 运行变量
@ -123,19 +122,19 @@ for k in push_config:
push_config[k] = v
def bark(title: str, content: str, **kwargs) -> None:
def bark(title: str, content: str) -> None:
"""
使用 bark 推送消息
"""
if not (push_config.get("BARK_PUSH") or kwargs.get("BARK_PUSH")):
if not push_config.get("BARK_PUSH"):
print("bark 服务的 BARK_PUSH 未设置!!\n取消推送")
return
print("bark 服务启动")
BARK_PUSH = kwargs.get("BARK_PUSH", push_config.get("BARK_PUSH"))
if BARK_PUSH.startswith("http"):
url = f"{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}"
if push_config.get("BARK_PUSH").startswith("http"):
url = f'{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
else:
url = f"https://api.day.app/{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}"
url = f'https://api.day.app/{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
bark_params = {
"BARK_ARCHIVE": "isArchive",
@ -149,12 +148,11 @@ def bark(title: str, content: str, **kwargs) -> None:
for pair in filter(
lambda pairs: pairs[0].startswith("BARK_")
and pairs[0] != "BARK_PUSH"
and (pairs[1] or kwargs.get(pairs[0]))
and pairs[1]
and bark_params.get(pairs[0]),
push_config.items(),
):
value = kwargs.get(pair[0], pair[1])
params += f"{bark_params.get(pair[0])}={value}&"
params += f"{bark_params.get(pair[0])}={pair[1]}&"
if params:
url = url + "?" + params.rstrip("&")
response = requests.get(url).json()
@ -165,40 +163,31 @@ def bark(title: str, content: str, **kwargs) -> None:
print("bark 推送失败!")
def console(title: str, content: str, **kwargs) -> None:
def console(title: str, content: str) -> None:
"""
使用 控制台 推送消息
"""
print(f"{title}\n\n{content}")
def dingding_bot(title: str, content: str, **kwargs) -> None:
def dingding_bot(title: str, content: str) -> None:
"""
使用 钉钉机器人 推送消息
"""
if not (
(kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN"))
or (push_config.get("DD_BOT_SECRET") and push_config.get("DD_BOT_TOKEN"))
):
if not push_config.get("DD_BOT_SECRET") or not push_config.get("DD_BOT_TOKEN"):
print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送")
return
print("钉钉机器人 服务启动")
if kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN"):
DD_BOT_SECRET = kwargs.get("DD_BOT_SECRET")
DD_BOT_TOKEN = kwargs.get("DD_BOT_TOKEN")
else:
DD_BOT_SECRET = push_config.get("DD_BOT_SECRET")
DD_BOT_TOKEN = push_config.get("DD_BOT_TOKEN")
timestamp = str(round(time.time() * 1000))
secret_enc = DD_BOT_SECRET.encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, DD_BOT_SECRET)
secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET"))
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}&timestamp={timestamp}&sign={sign}"
url = f'https://oapi.dingtalk.com/robot/send?access_token={push_config.get("DD_BOT_TOKEN")}&timestamp={timestamp}&sign={sign}'
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
@ -211,16 +200,16 @@ def dingding_bot(title: str, content: str, **kwargs) -> None:
print("钉钉机器人 推送失败!")
def feishu_bot(title: str, content: str, **kwargs) -> None:
def feishu_bot(title: str, content: str) -> None:
"""
使用 飞书机器人 推送消息
"""
if not (kwargs.get("DD_BOT_SECRET") or push_config.get("FSKEY")):
if not push_config.get("FSKEY"):
print("飞书 服务的 FSKEY 未设置!!\n取消推送")
return
print("飞书 服务启动")
FSKEY = kwargs.get("DD_BOT_SECRET", push_config.get("FSKEY"))
url = f"https://open.feishu.cn/open-apis/bot/v2/hook/{FSKEY}"
url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{push_config.get("FSKEY")}'
data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}}
response = requests.post(url, data=json.dumps(data)).json()
@ -230,27 +219,16 @@ def feishu_bot(title: str, content: str, **kwargs) -> None:
print("飞书 推送失败!错误信息如下:\n", response)
def go_cqhttp(title: str, content: str, **kwargs) -> None:
def go_cqhttp(title: str, content: str) -> None:
"""
使用 go_cqhttp 推送消息
"""
if not (
(kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ"))
or (push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"))
):
if not push_config.get("GOBOT_URL") or not push_config.get("GOBOT_QQ"):
print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送")
return
print("go-cqhttp 服务启动")
if kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ"):
GOBOT_URL = kwargs.get("GOBOT_URL")
GOBOT_QQ = kwargs.get("GOBOT_QQ")
GOBOT_TOKEN = kwargs.get("GOBOT_TOKEN")
else:
GOBOT_URL = push_config.get("GOBOT_URL")
GOBOT_QQ = push_config.get("GOBOT_QQ")
GOBOT_TOKEN = push_config.get("GOBOT_TOKEN")
url = f"{GOBOT_URL}?access_token={GOBOT_TOKEN}&{GOBOT_QQ}&message=标题:{title}\n内容:{content}"
url = f'{push_config.get("GOBOT_URL")}?access_token={push_config.get("GOBOT_TOKEN")}&{push_config.get("GOBOT_QQ")}&message=标题:{title}\n内容:{content}'
response = requests.get(url).json()
if response["status"] == "ok":
@ -259,31 +237,20 @@ def go_cqhttp(title: str, content: str, **kwargs) -> None:
print("go-cqhttp 推送失败!")
def gotify(title: str, content: str, **kwargs) -> None:
def gotify(title: str, content: str) -> None:
"""
使用 gotify 推送消息
"""
if not (
(kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN"))
or (push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"))
):
if not push_config.get("GOTIFY_URL") or not push_config.get("GOTIFY_TOKEN"):
print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送")
return
print("gotify 服务启动")
if kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN"):
GOTIFY_URL = kwargs.get("GOTIFY_URL")
GOTIFY_TOKEN = kwargs.get("GOBOTGOTIFY_TOKEN_QQ")
GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY")
else:
GOTIFY_URL = push_config.get("GOTIFY_URL")
GOTIFY_TOKEN = push_config.get("GOTIFY_TOKEN")
GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY")
url = f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}"
url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}'
data = {
"title": title,
"message": content,
"priority": GOTIFY_PRIORITY,
"priority": push_config.get("GOTIFY_PRIORITY"),
}
response = requests.post(url, data=data).json()
@ -293,16 +260,16 @@ def gotify(title: str, content: str, **kwargs) -> None:
print("gotify 推送失败!")
def iGot(title: str, content: str, **kwargs) -> None:
def iGot(title: str, content: str) -> None:
"""
使用 iGot 推送消息
"""
if not (kwargs.get("IGOT_PUSH_KEY") or push_config.get("IGOT_PUSH_KEY")):
if not push_config.get("IGOT_PUSH_KEY"):
print("iGot 服务的 IGOT_PUSH_KEY 未设置!!\n取消推送")
return
print("iGot 服务启动")
IGOT_PUSH_KEY = kwargs.get("IGOT_PUSH_KEY", push_config.get("IGOT_PUSH_KEY"))
url = f"https://push.hellyw.com/{IGOT_PUSH_KEY}"
url = f'https://push.hellyw.com/{push_config.get("IGOT_PUSH_KEY")}'
data = {"title": title, "content": content}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=data, headers=headers).json()
@ -313,21 +280,20 @@ def iGot(title: str, content: str, **kwargs) -> None:
print(f'iGot 推送失败!{response["errMsg"]}')
def serverJ(title: str, content: str, **kwargs) -> None:
def serverJ(title: str, content: str) -> None:
"""
通过 serverJ 推送消息
"""
if not (kwargs.get("PUSH_KEY") or push_config.get("PUSH_KEY")):
if not push_config.get("PUSH_KEY"):
print("serverJ 服务的 PUSH_KEY 未设置!!\n取消推送")
return
print("serverJ 服务启动")
PUSH_KEY = kwargs.get("PUSH_KEY", push_config.get("PUSH_KEY"))
data = {"text": title, "desp": content.replace("\n", "\n\n")}
if PUSH_KEY.find("SCT") != -1:
url = f"https://sctapi.ftqq.com/{PUSH_KEY}.send"
if push_config.get("PUSH_KEY").find("SCT") != -1:
url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send'
else:
url = f"https://sc.ftqq.com/{PUSH_KEY}.send"
url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send'
response = requests.post(url, data=data).json()
if response.get("errno") == 0 or response.get("code") == 0:
@ -336,27 +302,23 @@ def serverJ(title: str, content: str, **kwargs) -> None:
print(f'serverJ 推送失败!错误码:{response["message"]}')
def pushdeer(title: str, content: str, **kwargs) -> None:
def pushdeer(title: str, content: str) -> None:
"""
通过PushDeer 推送消息
"""
if not (kwargs.get("DEER_KEY") or push_config.get("DEER_KEY")):
if not push_config.get("DEER_KEY"):
print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送")
return
print("PushDeer 服务启动")
DEER_KEY = kwargs.get("DEER_KEY", push_config.get("DEER_KEY"))
data = {
"text": title,
"desp": content,
"type": "markdown",
"pushkey": DEER_KEY,
"pushkey": push_config.get("DEER_KEY"),
}
url = "https://api2.pushdeer.com/message/push"
if push_config.get("DEER_URL"):
url = push_config.get("DEER_URL")
if kwargs.get("DEER_URL"):
url = kwargs.get("DEER_URL")
response = requests.post(url, data=data).json()
@ -366,26 +328,16 @@ def pushdeer(title: str, content: str, **kwargs) -> None:
print("PushDeer 推送失败!错误信息:", response)
def chat(title: str, content: str, **kwargs) -> None:
def chat(title: str, content: str) -> None:
"""
通过Chat 推送消息
"""
if not (
(kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN"))
or (push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"))
):
if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"):
print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送")
return
print("chat 服务启动")
if kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN"):
CHAT_URL = kwargs.get("CHAT_URL")
CHAT_TOKEN = kwargs.get("CHAT_TOKEN")
else:
CHAT_URL = push_config.get("CHAT_URL")
CHAT_TOKEN = push_config.get("CHAT_TOKEN")
data = "payload=" + json.dumps({"text": title + "\n" + content})
url = CHAT_URL + CHAT_TOKEN
url = push_config.get("CHAT_URL") + push_config.get("CHAT_TOKEN")
response = requests.post(url, data=data)
if response.status_code == 200:
@ -394,23 +346,21 @@ def chat(title: str, content: str, **kwargs) -> None:
print("Chat 推送失败!错误信息:", response)
def pushplus_bot(title: str, content: str, **kwargs) -> None:
def pushplus_bot(title: str, content: str) -> None:
"""
通过 push+ 推送消息
"""
if not (kwargs.get("PUSH_PLUS_TOKEN") or push_config.get("PUSH_PLUS_TOKEN")):
if not push_config.get("PUSH_PLUS_TOKEN"):
print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送")
return
print("PUSHPLUS 服务启动")
PUSH_PLUS_TOKEN = kwargs.get("PUSH_PLUS_TOKEN", push_config.get("PUSH_PLUS_TOKEN"))
PUSH_PLUS_USER = kwargs.get("PUSH_PLUS_USER", push_config.get("PUSH_PLUS_USER"))
url = "http://www.pushplus.plus/send"
data = {
"token": PUSH_PLUS_TOKEN,
"token": push_config.get("PUSH_PLUS_TOKEN"),
"title": title,
"content": content,
"topic": PUSH_PLUS_USER,
"topic": push_config.get("PUSH_PLUS_USER"),
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
@ -431,25 +381,16 @@ def pushplus_bot(title: str, content: str, **kwargs) -> None:
print("PUSHPLUS 推送失败!")
def qmsg_bot(title: str, content: str, **kwargs) -> None:
def qmsg_bot(title: str, content: str) -> None:
"""
使用 qmsg 推送消息
"""
if not (
(kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE"))
or (push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"))
):
if not push_config.get("QMSG_KEY") or not push_config.get("QMSG_TYPE"):
print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送")
return
print("qmsg 服务启动")
if kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE"):
QMSG_KEY = kwargs.get("QMSG_KEY")
QMSG_TYPE = kwargs.get("QMSG_TYPE")
else:
QMSG_KEY = push_config.get("QMSG_KEY")
QMSG_TYPE = push_config.get("QMSG_TYPE")
url = f"https://qmsg.zendee.cn/{QMSG_TYPE}/{QMSG_KEY}"
url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}'
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
response = requests.post(url=url, params=payload).json()
@ -459,15 +400,14 @@ def qmsg_bot(title: str, content: str, **kwargs) -> None:
print(f'qmsg 推送失败!{response["reason"]}')
def wecom_app(title: str, content: str, **kwargs) -> None:
def wecom_app(title: str, content: str) -> None:
"""
通过 企业微信 APP 推送消息
"""
if not (kwargs.get("QYWX_AM") or push_config.get("QYWX_AM")):
if not push_config.get("QYWX_AM"):
print("QYWX_AM 未设置!!\n取消推送")
return
QYWX_AM = kwargs.get("QYWX_AM", push_config.get("QYWX_AM"))
QYWX_AM_AY = re.split(",", QYWX_AM)
QYWX_AM_AY = re.split(",", push_config.get("QYWX_AM"))
if 4 < len(QYWX_AM_AY) > 5:
print("QYWX_AM 设置错误!!\n取消推送")
return
@ -557,23 +497,20 @@ class WeCom:
return respone["errmsg"]
def wecom_bot(title: str, content: str, **kwargs) -> None:
def wecom_bot(title: str, content: str) -> None:
"""
通过 企业微信机器人 推送消息
"""
if not (kwargs.get("QYWX_KEY") or push_config.get("QYWX_KEY")):
if not push_config.get("QYWX_KEY"):
print("企业微信机器人 服务的 QYWX_KEY 未设置!!\n取消推送")
return
print("企业微信机器人服务启动")
QYWX_KEY = kwargs.get("QYWX_KEY", push_config.get("QYWX_KEY"))
origin = "https://qyapi.weixin.qq.com"
if push_config.get("QYWX_ORIGIN"):
origin = push_config.get("QYWX_ORIGIN")
if kwargs.get("QYWX_ORIGIN"):
origin = kwargs.get("QYWX_ORIGIN")
url = f"{origin}/cgi-bin/webhook/send?key={QYWX_KEY}"
url = f"{origin}/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}"
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
@ -586,53 +523,40 @@ def wecom_bot(title: str, content: str, **kwargs) -> None:
print("企业微信机器人推送失败!")
def telegram_bot(title: str, content: str, **kwargs) -> None:
def telegram_bot(title: str, content: str) -> None:
"""
使用 telegram 机器人 推送消息
"""
if not (
(kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID"))
or (push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"))
):
print("tg 服务的 TG_BOT_TOKEN 或者 TG_USER_ID 未设置!!\n取消推送")
if not push_config.get("TG_BOT_TOKEN") or not push_config.get("TG_USER_ID"):
print("tg 服务的 bot_token 或者 user_id 未设置!!\n取消推送")
return
print("tg 服务启动")
if kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID"):
TG_BOT_TOKEN = kwargs.get("TG_BOT_TOKEN")
TG_USER_ID = kwargs.get("TG_USER_ID")
else:
TG_BOT_TOKEN = push_config.get("TG_BOT_TOKEN")
TG_USER_ID = push_config.get("TG_USER_ID")
if kwargs.get("TG_API_HOST") or push_config.get("TG_API_HOST"):
TG_API_HOST = kwargs.get("TG_API_HOST", push_config.get("TG_API_HOST"))
url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage"
if push_config.get("TG_API_HOST"):
url = f"{push_config.get('TG_API_HOST')}/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage"
else:
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
url = (
f"https://api.telegram.org/bot{push_config.get('TG_BOT_TOKEN')}/sendMessage"
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = {
"chat_id": str(TG_USER_ID),
"chat_id": str(push_config.get("TG_USER_ID")),
"text": f"{title}\n\n{content}",
"disable_web_page_preview": "true",
}
proxies = None
if not (
(kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"))
or (push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"))
if push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"):
if push_config.get("TG_PROXY_AUTH") is not None and "@" not in push_config.get(
"TG_PROXY_HOST"
):
if kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"):
TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST")
TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT")
else:
TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST")
TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT")
if kwargs.get("TG_PROXY_AUTH") or push_config.get("TG_PROXY_AUTH"):
TG_PROXY_AUTH = kwargs.get(
"TG_PROXY_AUTH", push_config.get("TG_PROXY_AUTH")
push_config["TG_PROXY_HOST"] = (
push_config.get("TG_PROXY_AUTH")
+ "@"
+ push_config.get("TG_PROXY_HOST")
)
proxyStr = "http://{}:{}".format(
push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT")
)
if TG_PROXY_AUTH is not None and "@" not in TG_PROXY_HOST:
TG_PROXY_HOST = TG_PROXY_AUTH + "@" + TG_PROXY_HOST
proxyStr = "http://{}:{}".format(TG_PROXY_HOST, TG_PROXY_PORT)
proxies = {"http": proxyStr, "https": proxyStr}
response = requests.post(
url=url, headers=headers, params=payload, proxies=proxies
@ -644,51 +568,33 @@ def telegram_bot(title: str, content: str, **kwargs) -> None:
print("tg 推送失败!")
def aibotk(title: str, content: str, **kwargs) -> None:
def aibotk(title: str, content: str) -> None:
"""
使用 智能微秘书 推送消息
"""
if not (
(
kwargs.get("AIBOTK_KEY")
and kwargs.get("AIBOTK_TYPE")
and kwargs.get("AIBOTK_NAME")
)
or (
push_config.get("AIBOTK_KEY")
and push_config.get("AIBOTK_TYPE")
and push_config.get("AIBOTK_NAME")
)
if (
not push_config.get("AIBOTK_KEY")
or not push_config.get("AIBOTK_TYPE")
or not push_config.get("AIBOTK_NAME")
):
print(
"智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送"
)
return
print("智能微秘书 服务启动")
if (
kwargs.get("AIBOTK_KEY")
and kwargs.get("AIBOTK_TYPE")
and kwargs.get("AIBOTK_NAME")
):
AIBOTK_KEY = kwargs.get("AIBOTK_KEY")
AIBOTK_TYPE = kwargs.get("AIBOTK_TYPE")
AIBOTK_NAME = kwargs.get("AIBOTK_NAME")
else:
AIBOTK_KEY = push_config.get("AIBOTK_KEY")
AIBOTK_TYPE = push_config.get("AIBOTK_TYPE")
AIBOTK_NAME = push_config.get("AIBOTK_NAME")
if AIBOTK_TYPE == "room":
if push_config.get("AIBOTK_TYPE") == "room":
url = "https://api-bot.aibotk.com/openapi/v1/chat/room"
data = {
"apiKey": AIBOTK_KEY,
"roomName": AIBOTK_NAME,
"apiKey": push_config.get("AIBOTK_KEY"),
"roomName": push_config.get("AIBOTK_NAME"),
"message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"},
}
else:
url = "https://api-bot.aibotk.com/openapi/v1/chat/contact"
data = {
"apiKey": AIBOTK_KEY,
"name": AIBOTK_NAME,
"apiKey": push_config.get("AIBOTK_KEY"),
"name": push_config.get("AIBOTK_NAME"),
"message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"},
}
body = json.dumps(data).encode(encoding="utf-8")
@ -701,75 +607,50 @@ def aibotk(title: str, content: str, **kwargs) -> None:
print(f'智能微秘书 推送失败!{response["error"]}')
def smtp(title: str, content: str, **kwargs) -> None:
def smtp(title: str, content: str) -> None:
"""
使用 SMTP 邮件 推送消息
"""
if not (
(
kwargs.get("SMTP_SERVER")
and kwargs.get("SMTP_SSL")
and kwargs.get("SMTP_EMAIL")
and kwargs.get("SMTP_PASSWORD")
and kwargs.get("SMTP_NAME")
)
or (
push_config.get("SMTP_SERVER")
and push_config.get("SMTP_SSL")
and push_config.get("SMTP_EMAIL")
and push_config.get("SMTP_PASSWORD")
and push_config.get("SMTP_NAME")
)
if (
not push_config.get("SMTP_SERVER")
or not push_config.get("SMTP_SSL")
or not push_config.get("SMTP_EMAIL")
or not push_config.get("SMTP_PASSWORD")
or not push_config.get("SMTP_NAME")
):
print(
"SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送"
)
return
print("SMTP 邮件 服务启动")
if (
kwargs.get("SMTP_SERVER")
and kwargs.get("SMTP_SSL")
and kwargs.get("SMTP_EMAIL")
and kwargs.get("SMTP_PASSWORD")
and kwargs.get("SMTP_NAME")
):
SMTP_SERVER = kwargs.get("SMTP_SERVER")
SMTP_SSL = kwargs.get("SMTP_SSL")
SMTP_EMAIL = kwargs.get("SMTP_EMAIL")
SMTP_PASSWORD = kwargs.get("SMTP_PASSWORD")
SMTP_NAME = kwargs.get("SMTP_NAME")
else:
SMTP_SERVER = push_config.get("SMTP_SERVER")
SMTP_SSL = push_config.get("SMTP_SSL")
SMTP_EMAIL = push_config.get("SMTP_EMAIL")
SMTP_PASSWORD = push_config.get("SMTP_PASSWORD")
SMTP_NAME = push_config.get("SMTP_NAME")
message = MIMEText(content, "plain", "utf-8")
message["From"] = formataddr(
(
Header(SMTP_NAME, "utf-8").encode(),
SMTP_EMAIL,
Header(push_config.get("SMTP_NAME"), "utf-8").encode(),
push_config.get("SMTP_EMAIL"),
)
)
message["To"] = formataddr(
(
Header(SMTP_NAME, "utf-8").encode(),
SMTP_EMAIL,
Header(push_config.get("SMTP_NAME"), "utf-8").encode(),
push_config.get("SMTP_EMAIL"),
)
)
message["Subject"] = Header(title, "utf-8")
try:
smtp_server = (
smtplib.SMTP_SSL(SMTP_SERVER)
if SMTP_SSL == "true"
else smtplib.SMTP(SMTP_SERVER)
smtplib.SMTP_SSL(push_config.get("SMTP_SERVER"))
if push_config.get("SMTP_SSL") == "true"
else smtplib.SMTP(push_config.get("SMTP_SERVER"))
)
smtp_server.login(
push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD")
)
smtp_server.login(SMTP_EMAIL, SMTP_PASSWORD)
smtp_server.sendmail(
SMTP_EMAIL,
SMTP_EMAIL,
push_config.get("SMTP_EMAIL"),
push_config.get("SMTP_EMAIL"),
message.as_bytes(),
)
smtp_server.close()
@ -778,17 +659,16 @@ def smtp(title: str, content: str, **kwargs) -> None:
print(f"SMTP 邮件 推送失败!{e}")
def pushme(title: str, content: str, **kwargs) -> None:
def pushme(title: str, content: str) -> None:
"""
使用 PushMe 推送消息
"""
if not (kwargs.get("PUSHME_KEY") or push_config.get("PUSHME_KEY")):
if not push_config.get("PUSHME_KEY"):
print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送")
return
print("PushMe 服务启动")
PUSHME_KEY = kwargs.get("PUSHME_KEY", push_config.get("PUSHME_KEY"))
url = f"https://push.i-i.me/?push_key={PUSHME_KEY}"
url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}'
data = {
"title": title,
"content": content,
@ -801,45 +681,27 @@ def pushme(title: str, content: str, **kwargs) -> None:
print(f"PushMe 推送失败!{response.status_code} {response.text}")
def chronocat(title: str, content: str, **kwargs) -> None:
def chronocat(title: str, content: str) -> None:
"""
使用 CHRONOCAT 推送消息
"""
if not (
(
push_config.get("CHRONOCAT_URL")
and push_config.get("CHRONOCAT_QQ")
and push_config.get("CHRONOCAT_TOKEN")
)
or (
push_config.get("CHRONOCAT_URL")
and push_config.get("CHRONOCAT_QQ")
and push_config.get("CHRONOCAT_TOKEN")
)
if (
not push_config.get("CHRONOCAT_URL")
or not push_config.get("CHRONOCAT_QQ")
or not push_config.get("CHRONOCAT_TOKEN")
):
print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送")
return
print("CHRONOCAT 服务启动")
if (
kwargs.get("CHRONOCAT_URL")
and kwargs.get("CHRONOCAT_QQ")
and kwargs.get("CHRONOCAT_TOKEN")
):
CHRONOCAT_URL = kwargs.get("CHRONOCAT_URL")
CHRONOCAT_QQ = kwargs.get("CHRONOCAT_QQ")
CHRONOCAT_TOKEN = kwargs.get("CHRONOCAT_TOKEN")
else:
CHRONOCAT_URL = push_config.get("CHRONOCAT_URL")
CHRONOCAT_QQ = push_config.get("CHRONOCAT_QQ")
CHRONOCAT_TOKEN = push_config.get("CHRONOCAT_TOKEN")
user_ids = re.findall(r"user_id=(\d+)", CHRONOCAT_QQ)
group_ids = re.findall(r"group_id=(\d+)", CHRONOCAT_QQ)
user_ids = re.findall(r"user_id=(\d+)", push_config.get("CHRONOCAT_QQ"))
group_ids = re.findall(r"group_id=(\d+)", push_config.get("CHRONOCAT_QQ"))
url = f"{CHRONOCAT_URL}/api/message/send"
url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send'
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {CHRONOCAT_TOKEN}",
"Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}',
}
for chat_type, ids in [(1, user_ids), (2, group_ids)]:
@ -906,10 +768,10 @@ def parse_body(body, content_type, value_format_fn=None):
if not body or content_type == "text/plain":
return body
parsed = parse_string(input_string, value_format_fn)
parsed = parse_string(body, value_format_fn)
if content_type == "application/x-www-form-urlencoded":
data = urlencode(parsed, doseq=True)
data = urllib.parse.urlencode(parsed, doseq=True)
return data
if content_type == "application/json":
@ -919,18 +781,6 @@ def parse_body(body, content_type, value_format_fn=None):
return parsed
def format_notify_content(url, body, title, content):
if "$title" not in url and "$title" not in body:
return {}
formatted_url = url.replace("$title", urllib.parse.quote_plus(title)).replace(
"$content", urllib.parse.quote_plus(content)
)
formatted_body = body.replace("$title", title).replace("$content", content)
return formatted_url, formatted_body
def custom_notify(title: str, content: str) -> None:
"""
通过 自定义通知 推送消息
@ -961,7 +811,7 @@ def custom_notify(title: str, content: str) -> None:
"$title", urllib.parse.quote_plus(title)
).replace("$content", urllib.parse.quote_plus(content))
response = requests.request(
method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body
method=WEBHOOK_METHOD, url=formatted_url, headers=headers, timeout=15, data=body
)
if response.status_code == 200:
@ -981,6 +831,7 @@ def one() -> str:
def add_notify_function():
notify_function = []
if push_config.get("BARK_PUSH"):
notify_function.append(bark)
if push_config.get("CONSOLE"):
@ -1036,8 +887,19 @@ def add_notify_function():
if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"):
notify_function.append(custom_notify)
if not notify_function:
print(f"无推送渠道,请检查通知变量是否正确")
return notify_function
def send(title: str, content: str, ignore_default_config: bool = False, **kwargs):
if kwargs:
global push_config
if ignore_default_config:
push_config = kwargs # 清空从环境变量获取的配置
else:
push_config.update(kwargs)
def send(title: str, content: str, **kwargs) -> None:
if not content:
print(f"{title} 推送内容为空!")
return
@ -1052,11 +914,9 @@ def send(title: str, content: str, **kwargs) -> None:
hitokoto = push_config.get("HITOKOTO")
content += "\n\n" + one() if hitokoto else ""
add_notify_function()
notify_function = add_notify_function()
ts = [
threading.Thread(
target=mode, args=(title, content), kwargs=kwargs, name=mode.__name__
)
threading.Thread(target=mode, args=(title, content), name=mode.__name__)
for mode in notify_function
]
[t.start() for t in ts]