脚本推送增加自定义 webhook 方式

This commit is contained in:
whyour
2023-10-15 20:42:05 +08:00
parent bd004a0489
commit 533e12a796
2 changed files with 307 additions and 22 deletions
+142 -22
View File
@@ -101,10 +101,17 @@ push_config = {
'SMTP_PASSWORD': '', # SMTP 登录密码,也可能为特殊口令,视具体邮件服务商说明而定
'SMTP_NAME': '', # SMTP 收发件人姓名,可随意填写
'PUSHME_KEY': '', # PushMe 酱的 PUSHME_KEY
'CHRONOCAT_QQ': '', # qq号
'CHRONOCAT_TOKEN': '', # CHRONOCAT 的token
'CHRONOCAT_URL': '' # CHRONOCAT的url地址
'PUSHME_KEY': '', # PushMe 酱的 PUSHME_KEY
'CHRONOCAT_QQ': '', # qq号
'CHRONOCAT_TOKEN': '', # CHRONOCAT 的token
'CHRONOCAT_URL': '', # CHRONOCAT的url地址
'WEBHOOK_URL': '', # 自定义通知 请求地址
'WEBHOOK_BODY': '', # 自定义通知 请求体
'WEBHOOK_HEADERS': '', # 自定义通知 请求头
'WEBHOOK_METHOD': '', # 自定义通知 请求方法
'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type
}
notify_function = []
# fmt: on
@@ -449,7 +456,9 @@ class WeCom:
return data["access_token"]
def send_text(self, message, touser="@all"):
send_url = f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
send_url = (
f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
)
send_values = {
"touser": touser,
"msgtype": "text",
@@ -463,7 +472,9 @@ class WeCom:
return respone["errmsg"]
def send_mpnews(self, title, message, media_id, touser="@all"):
send_url = f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
send_url = (
f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
)
send_values = {
"touser": touser,
"msgtype": "mpnews",
@@ -646,6 +657,7 @@ def smtp(title: str, content: str) -> None:
except Exception as e:
print(f"SMTP 邮件 推送失败!{e}")
def pushme(title: str, content: str) -> None:
"""
使用 PushMe 推送消息。
@@ -667,12 +679,16 @@ def pushme(title: str, content: str) -> None:
else:
print(f"PushMe 推送失败!{response.status_code} {response.text}")
def chronocat(title: str, content: str) -> None:
"""
使用 CHRONOCAT 推送消息。
"""
if not push_config.get("CHRONOCAT_URL") or not push_config.get("CHRONOCAT_QQ") or not push_config.get(
"CHRONOCAT_TOKEN"):
if (
not push_config.get("CHRONOCAT_URL")
or not push_config.get("CHRONOCAT_QQ")
or not push_config.get("CHRONOCAT_TOKEN")
):
print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送")
return
@@ -683,8 +699,8 @@ def chronocat(title: str, content: str) -> None:
url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {push_config.get("CHRONOCAT_TOKEN")}'
"Content-Type": "application/json",
"Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}',
}
for chat_type, ids in [(1, user_ids), (2, group_ids)]:
@@ -692,32 +708,128 @@ def chronocat(title: str, content: str) -> None:
continue
for chat_id in ids:
data = {
"peer": {
"chatType": chat_type,
"peerUin": chat_id
},
"peer": {"chatType": chat_type, "peerUin": chat_id},
"elements": [
{
"elementType": 1,
"textElement": {
"content": f'{title}\n\n{content}'
}
"textElement": {"content": f"{title}\n\n{content}"},
}
]
],
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
if chat_type == 1:
print(f'QQ个人消息:{ids}推送成功!')
print(f"QQ个人消息:{ids}推送成功!")
else:
print(f'QQ群消息:{ids}推送成功!')
print(f"QQ群消息:{ids}推送成功!")
else:
if chat_type == 1:
print(f'QQ个人消息:{ids}推送失败!')
print(f"QQ个人消息:{ids}推送失败!")
else:
print(f'QQ群消息:{ids}推送失败!')
print(f"QQ群消息:{ids}推送失败!")
def parse_headers(headers):
if not headers:
return {}
parsed = {}
lines = headers.split("\n")
for line in lines:
i = line.find(":")
if i == -1:
continue
key = line[:i].strip().lower()
val = line[i + 1 :].strip()
parsed[key] = parsed.get(key, "") + ", " + val if key in parsed else val
return parsed
def parse_body(body, content_type):
if not body:
return ""
parsed = {}
lines = body.split("\n")
for line in lines:
i = line.find(":")
if i == -1:
continue
key = line[:i].strip().lower()
val = line[i + 1 :].strip()
if not key or key in parsed:
continue
try:
json_value = json.loads(val)
parsed[key] = json_value
except:
parsed[key] = val
if content_type == "application/x-www-form-urlencoded":
data = urlencode(parsed, doseq=True)
return data
if content_type == "application/json":
data = json.dumps(parsed)
return data
return parsed
def format_notify_content(url, body, title, content):
if "$title" not in url and "$title" not in body:
return {}
formatted_url = url.replace("$title", urllib.parse.quote_plus(title)).replace(
"$content", urllib.parse.quote_plus(content)
)
formatted_body = body.replace("$title", title).replace("$content", content)
return formatted_url, formatted_body
def custom_notify(title: str, content: str) -> None:
"""
通过 自定义通知 推送消息。
"""
if not push_config.get("WEBHOOK_URL") or not push_config.get("WEBHOOK_METHOD"):
print("自定义通知的 WEBHOOK_URL 或 WEBHOOK_METHOD 未设置!!\n取消推送")
return
print("自定义通知服务启动")
WEBHOOK_URL = push_config.get("WEBHOOK_URL")
WEBHOOK_METHOD = push_config.get("WEBHOOK_METHOD")
WEBHOOK_CONTENT_TYPE = push_config.get("WEBHOOK_CONTENT_TYPE")
WEBHOOK_BODY = push_config.get("WEBHOOK_BODY")
WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS")
formatUrl, formatBody = format_notify_content(
WEBHOOK_URL, WEBHOOK_BODY, title, content
)
if not formatUrl and not formatBody:
print("请求头或者请求体中必须包含 $title 和 $content")
return
headers = parse_headers(WEBHOOK_HEADERS)
body = parse_body(formatBody, WEBHOOK_CONTENT_TYPE)
response = requests.request(
method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body
)
if response.status_code == 200:
print("自定义通知推送成功!")
else:
print(f"自定义通知推送失败!{response.status_code} {response.text}")
def one() -> str:
"""
@@ -775,6 +887,14 @@ if (
notify_function.append(smtp)
if push_config.get("PUSHME_KEY"):
notify_function.append(pushme)
if (
push_config.get("CHRONOCAT_URL")
and push_config.get("CHRONOCAT_QQ")
and push_config.get("CHRONOCAT_TOKEN")
):
notify_function.append(chronocat)
if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"):
notify_function.append(custom_notify)
def send(title: str, content: str) -> None: