mirror of
https://github.com/whyour/qinglong.git
synced 2025-07-27 14:46:06 +08:00
适配自定义webhook推送
This commit is contained in:
parent
86e3d8736b
commit
6481a933ea
File diff suppressed because one or more lines are too long
|
@ -31,7 +31,7 @@ def print(text, *args, **kw):
|
|||
|
||||
|
||||
# 通知服务
|
||||
# fmt: off
|
||||
#fmt: on
|
||||
push_config = {
|
||||
'HITOKOTO': False, # 启用一言(随机句子)
|
||||
|
||||
|
@ -41,7 +41,7 @@ push_config = {
|
|||
'BARK_SOUND': '', # bark 推送声音
|
||||
'BARK_ICON': '', # bark 推送图标
|
||||
|
||||
'CONSOLE': True, # 控制台输出
|
||||
'CONSOLE': False, # 控制台输出
|
||||
|
||||
'DD_BOT_SECRET': '', # 钉钉机器人的 DD_BOT_SECRET
|
||||
'DD_BOT_TOKEN': '', # 钉钉机器人的 DD_BOT_TOKEN
|
||||
|
@ -96,9 +96,11 @@ push_config = {
|
|||
'SMTP_EMAIL': '', # SMTP 收发件邮箱,通知将会由自己发给自己
|
||||
'SMTP_PASSWORD': '', # SMTP 登录密码,也可能为特殊口令,视具体邮件服务商说明而定
|
||||
'SMTP_NAME': '', # SMTP 收发件人姓名,可随意填写
|
||||
|
||||
'WEBHOOK_URL': '', # webhook自定义通知 接收回调的URL
|
||||
}
|
||||
notify_function = []
|
||||
# fmt: on
|
||||
#fmt: on
|
||||
|
||||
# 首先读取 面板变量 或者 github action 运行变量
|
||||
for k in push_config:
|
||||
|
@ -164,8 +166,7 @@ def dingding_bot(title: str, content: str) -> None:
|
|||
|
||||
timestamp = str(round(time.time() * 1000))
|
||||
secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8")
|
||||
string_to_sign = "{}\n{}".format(
|
||||
timestamp, push_config.get("DD_BOT_SECRET"))
|
||||
string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET"))
|
||||
string_to_sign_enc = string_to_sign.encode("utf-8")
|
||||
hmac_code = hmac.new(
|
||||
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
|
||||
|
@ -221,7 +222,7 @@ def go_cqhttp(title: str, content: str) -> None:
|
|||
print("go-cqhttp 推送失败!")
|
||||
|
||||
|
||||
def gotify(title: str, content: str) -> None:
|
||||
def gotify(title:str,content:str) -> None:
|
||||
"""
|
||||
使用 gotify 推送消息。
|
||||
"""
|
||||
|
@ -231,9 +232,8 @@ def gotify(title: str, content: str) -> None:
|
|||
print("gotify 服务启动")
|
||||
|
||||
url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}'
|
||||
data = {"title": title, "message": content,
|
||||
"priority": push_config.get("GOTIFY_PRIORITY")}
|
||||
response = requests.post(url, data=data).json()
|
||||
data = {"title": title,"message": content,"priority": push_config.get("GOTIFY_PRIORITY")}
|
||||
response = requests.post(url,data=data).json()
|
||||
|
||||
if response.get("id"):
|
||||
print("gotify 推送成功!")
|
||||
|
@ -271,10 +271,10 @@ def serverJ(title: str, content: str) -> None:
|
|||
print("serverJ 服务启动")
|
||||
|
||||
data = {"text": title, "desp": content.replace("\n", "\n\n")}
|
||||
if push_config.get("PUSH_KEY").find("SCT") != -1:
|
||||
if push_config.get("PUSH_KEY").index("SCT") != -1:
|
||||
url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send'
|
||||
else:
|
||||
url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send'
|
||||
url = f'https://sc.ftqq.com/${push_config.get("PUSH_KEY")}.send'
|
||||
response = requests.post(url, data=data).json()
|
||||
|
||||
if response.get("errno") == 0 or response.get("code") == 0:
|
||||
|
@ -291,8 +291,7 @@ def pushdeer(title: str, content: str) -> None:
|
|||
print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送")
|
||||
return
|
||||
print("PushDeer 服务启动")
|
||||
data = {"text": title, "desp": content, "type": "markdown",
|
||||
"pushkey": push_config.get("DEER_KEY")}
|
||||
data = {"text": title, "desp": content, "type": "markdown", "pushkey": push_config.get("DEER_KEY")}
|
||||
url = 'https://api2.pushdeer.com/message/push'
|
||||
if push_config.get("DEER_URL"):
|
||||
url = push_config.get("DEER_URL")
|
||||
|
@ -323,6 +322,7 @@ def chat(title: str, content: str) -> None:
|
|||
print("Chat 推送失败!错误信息:", response)
|
||||
|
||||
|
||||
|
||||
def pushplus_bot(title: str, content: str) -> None:
|
||||
"""
|
||||
通过 push+ 推送消息。
|
||||
|
@ -350,8 +350,7 @@ def pushplus_bot(title: str, content: str) -> None:
|
|||
|
||||
url_old = "http://pushplus.hxtrip.com/send"
|
||||
headers["Accept"] = "application/json"
|
||||
response = requests.post(
|
||||
url=url_old, data=body, headers=headers).json()
|
||||
response = requests.post(url=url_old, data=body, headers=headers).json()
|
||||
|
||||
if response["code"] == 200:
|
||||
print("PUSHPLUS(hxtrip) 推送成功!")
|
||||
|
@ -370,8 +369,7 @@ def qmsg_bot(title: str, content: str) -> None:
|
|||
print("qmsg 服务启动")
|
||||
|
||||
url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}'
|
||||
payload = {
|
||||
"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
|
||||
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
|
||||
response = requests.post(url=url, params=payload).json()
|
||||
|
||||
if response["code"] == 0:
|
||||
|
@ -557,14 +555,14 @@ def aibotk(title: str, content: str) -> None:
|
|||
data = {
|
||||
"apiKey": push_config.get("AIBOTK_KEY"),
|
||||
"roomName": push_config.get("AIBOTK_NAME"),
|
||||
"message": {"type": 1, "content": f'【青龙快讯】\n\n{title}\n{content}'}
|
||||
"message": {"type": 1, "content": f'【青龙快讯】\n\n${title}\n${content}' }
|
||||
}
|
||||
else:
|
||||
url = "https://api-bot.aibotk.com/openapi/v1/chat/contact"
|
||||
data = {
|
||||
"apiKey": push_config.get("AIBOTK_KEY"),
|
||||
"name": push_config.get("AIBOTK_NAME"),
|
||||
"message": {"type": 1, "content": f'【青龙快讯】\n\n{title}\n{content}'}
|
||||
"message": {"type": 1, "content": f'【青龙快讯】\n\n${title}\n${content}' }
|
||||
}
|
||||
body = json.dumps(data).encode(encoding="utf-8")
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
@ -586,24 +584,38 @@ def smtp(title: str, content: str) -> None:
|
|||
print("SMTP 邮件 服务启动")
|
||||
|
||||
message = MIMEText(content, 'plain', 'utf-8')
|
||||
message['From'] = formataddr((Header(push_config.get(
|
||||
"SMTP_NAME"), 'utf-8').encode(), push_config.get("SMTP_EMAIL")))
|
||||
message['To'] = formataddr((Header(push_config.get(
|
||||
"SMTP_NAME"), 'utf-8').encode(), push_config.get("SMTP_EMAIL")))
|
||||
message['From'] = formataddr((Header(push_config.get("SMTP_NAME"), 'utf-8').encode(), push_config.get("SMTP_EMAIL")))
|
||||
message['To'] = formataddr((Header(push_config.get("SMTP_NAME"), 'utf-8').encode(), push_config.get("SMTP_EMAIL")))
|
||||
message['Subject'] = Header(title, 'utf-8')
|
||||
|
||||
try:
|
||||
smtp_server = smtplib.SMTP_SSL(push_config.get("SMTP_SERVER")) if push_config.get(
|
||||
"SMTP_SSL") == 'true' else smtplib.SMTP(push_config.get("SMTP_SERVER"))
|
||||
smtp_server.login(push_config.get("SMTP_EMAIL"),
|
||||
push_config.get("SMTP_PASSWORD"))
|
||||
smtp_server.sendmail(push_config.get("SMTP_EMAIL"),
|
||||
push_config.get("SMTP_EMAIL"), message.as_bytes())
|
||||
smtp_server = smtplib.SMTP_SSL(push_config.get("SMTP_SERVER")) if push_config.get("SMTP_SSL") == 'true' else smtplib.SMTP(push_config.get("SMTP_SERVER"))
|
||||
smtp_server.login(push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD"))
|
||||
smtp_server.sendmail(push_config.get("SMTP_EMAIL"), push_config.get("SMTP_EMAIL"), message.as_bytes())
|
||||
smtp_server.close()
|
||||
print("SMTP 邮件 推送成功!")
|
||||
except Exception as e:
|
||||
print(f'SMTP 邮件 推送失败!{e}')
|
||||
|
||||
def webhook_notify(title: str, content: str) -> None:
|
||||
"""
|
||||
通过 WEBHOOK 推送消息。
|
||||
"""
|
||||
if not push_config.get("WEBHOOK_URL"):
|
||||
print("WEBHOOK 服务的 WEBHOOK_URL 未设置!!\n取消推送")
|
||||
return
|
||||
print("WEBHOOK服务启动")
|
||||
|
||||
url = f"{push_config.get('WEBHOOK_URL')}"
|
||||
headers = {"Content-Type": "application/json;charset=utf-8"}
|
||||
data = {"title": f"{title}", "content": f"{content}"}
|
||||
response = requests.post(
|
||||
url=url, data=json.dumps(data), headers=headers, timeout=15
|
||||
).json()
|
||||
|
||||
print("WEBHOOK 推送成功!")
|
||||
|
||||
|
||||
|
||||
def one() -> str:
|
||||
"""
|
||||
|
@ -649,6 +661,8 @@ if push_config.get("AIBOTK_KEY") and push_config.get("AIBOTK_TYPE") and push_con
|
|||
notify_function.append(aibotk)
|
||||
if push_config.get("SMTP_SERVER") and push_config.get("SMTP_SSL") and push_config.get("SMTP_EMAIL") and push_config.get("SMTP_PASSWORD") and push_config.get("SMTP_NAME"):
|
||||
notify_function.append(smtp)
|
||||
if push_config.get("WEBHOOK_URL"):
|
||||
notify_function.append(webhook_notify)
|
||||
|
||||
|
||||
def send(title: str, content: str) -> None:
|
||||
|
@ -656,21 +670,13 @@ def send(title: str, content: str) -> None:
|
|||
print(f"{title} 推送内容为空!")
|
||||
return
|
||||
|
||||
# 根据标题跳过一些消息推送,环境变量:SKIP_PUSH_TITLE 用回车分隔
|
||||
skipTitle = os.getenv("SKIP_PUSH_TITLE")
|
||||
if skipTitle:
|
||||
if (title in re.split("\n", skipTitle)):
|
||||
print(f"{title} 在SKIP_PUSH_TITLE环境变量内,跳过推送!")
|
||||
return
|
||||
|
||||
hitokoto = push_config.get("HITOKOTO")
|
||||
|
||||
text = one() if hitokoto else ""
|
||||
content += "\n\n" + text
|
||||
|
||||
ts = [
|
||||
threading.Thread(target=mode, args=(
|
||||
title, content), name=mode.__name__)
|
||||
threading.Thread(target=mode, args=(title, content), name=mode.__name__)
|
||||
for mode in notify_function
|
||||
]
|
||||
[t.start() for t in ts]
|
||||
|
@ -682,4 +688,4 @@ def main():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
Loading…
Reference in New Issue
Block a user