修改自定义通知 body 解析逻辑

This commit is contained in:
whyour 2024-03-12 22:22:10 +08:00
parent 68ad01e0e8
commit 2ac4581d54
4 changed files with 132 additions and 112 deletions

View File

@ -360,7 +360,10 @@ export function parseHeaders(headers: string) {
return parsed; return parsed;
} }
function parseString(input: string): Record<string, string> { function parseString(
input: string,
valueFormatFn?: (v: string) => string,
): Record<string, string> {
const regex = /(\w+):\s*((?:(?!\n\w+:).)*)/g; const regex = /(\w+):\s*((?:(?!\n\w+:).)*)/g;
const matches: Record<string, string> = {}; const matches: Record<string, string> = {};
@ -372,9 +375,10 @@ function parseString(input: string): Record<string, string> {
continue; continue;
} }
const _value = value.trim(); let _value = value.trim();
try { try {
_value = valueFormatFn ? valueFormatFn(_value) : _value;
const jsonValue = JSON.parse(_value); const jsonValue = JSON.parse(_value);
matches[_key] = jsonValue; matches[_key] = jsonValue;
} catch (error) { } catch (error) {
@ -392,12 +396,13 @@ export function parseBody(
| 'multipart/form-data' | 'multipart/form-data'
| 'application/x-www-form-urlencoded' | 'application/x-www-form-urlencoded'
| 'text/plain', | 'text/plain',
valueFormatFn?: (v: string) => string,
) { ) {
if (contentType === 'text/plain' || !body) { if (contentType === 'text/plain' || !body) {
return body; return body;
} }
const parsed = parseString(body); const parsed = parseString(body, valueFormatFn);
switch (contentType) { switch (contentType) {
case 'multipart/form-data': case 'multipart/form-data':

View File

@ -656,17 +656,14 @@ export default class NotificationService {
webhookContentType, webhookContentType,
} = this.params; } = this.params;
const { formatBody, formatUrl } = this.formatNotifyContent( if (!webhookUrl.includes('$title') && !webhookBody.includes('$title')) {
webhookUrl,
webhookBody,
);
if (!formatUrl && !formatBody) {
throw new Error('Url 或者 Body 中必须包含 $title'); throw new Error('Url 或者 Body 中必须包含 $title');
} }
const headers = parseHeaders(webhookHeaders); const headers = parseHeaders(webhookHeaders);
const body = parseBody(formatBody, webhookContentType); const body = parseBody(webhookBody, webhookContentType, (v) =>
v?.replaceAll('$title', this.title)?.replaceAll('$content', this.content),
);
const bodyParam = this.formatBody(webhookContentType, body); const bodyParam = this.formatBody(webhookContentType, body);
const options = { const options = {
method: webhookMethod, method: webhookMethod,
@ -676,6 +673,9 @@ export default class NotificationService {
...bodyParam, ...bodyParam,
}; };
try { try {
const formatUrl = webhookUrl
?.replaceAll('$title', encodeURIComponent(this.title))
?.replaceAll('$content', encodeURIComponent(this.content));
const res = await got(formatUrl, options); const res = await got(formatUrl, options);
if (String(res.statusCode).startsWith('20')) { if (String(res.statusCode).startsWith('20')) {
return true; return true;
@ -700,19 +700,4 @@ export default class NotificationService {
} }
return {}; return {};
} }
private formatNotifyContent(url: string, body: string) {
if (!url.includes('$title') && !body.includes('$title')) {
return {};
}
return {
formatUrl: url
?.replaceAll('$title', encodeURIComponent(this.title))
?.replaceAll('$content', encodeURIComponent(this.content)),
formatBody: body
?.replaceAll('$title', this.title)
?.replaceAll('$content', this.content),
};
}
} }

View File

@ -819,11 +819,11 @@ function ChangeUserId(desp) {
async function qywxamNotify(text, desp) { async function qywxamNotify(text, desp) {
const MAX_LENGTH = 900; const MAX_LENGTH = 900;
if (desp.length > MAX_LENGTH) { if (desp.length > MAX_LENGTH) {
let d = desp.substr(0, MAX_LENGTH) + "\n==More=="; let d = desp.substr(0, MAX_LENGTH) + '\n==More==';
await do_qywxamNotify(text, d); await do_qywxamNotify(text, d);
await qywxamNotify(text, desp.substr(MAX_LENGTH)); await qywxamNotify(text, desp.substr(MAX_LENGTH));
} else { } else {
return await do_qywxamNotify(text,desp); return await do_qywxamNotify(text, desp);
} }
} }
@ -1284,18 +1284,15 @@ function chronocatNotify(title, desp) {
function webhookNotify(text, desp) { function webhookNotify(text, desp) {
return new Promise((resolve) => { return new Promise((resolve) => {
const { formatBody, formatUrl } = formatNotifyContentFun( if (!WEBHOOK_URL.includes('$title') && !WEBHOOK_BODY.includes('$title')) {
WEBHOOK_URL,
WEBHOOK_BODY,
text,
desp,
);
if (!formatUrl && !formatBody) {
resolve(); resolve();
return; return;
} }
const headers = parseHeaders(WEBHOOK_HEADERS); const headers = parseHeaders(WEBHOOK_HEADERS);
const body = parseBody(formatBody, WEBHOOK_CONTENT_TYPE); const body = parseBody(WEBHOOK_BODY, WEBHOOK_CONTENT_TYPE, (v) =>
v?.replaceAll('$title', text)?.replaceAll('$content', desp),
);
const bodyParam = formatBodyFun(WEBHOOK_CONTENT_TYPE, body); const bodyParam = formatBodyFun(WEBHOOK_CONTENT_TYPE, body);
const options = { const options = {
method: WEBHOOK_METHOD, method: WEBHOOK_METHOD,
@ -1307,6 +1304,10 @@ function webhookNotify(text, desp) {
}; };
if (WEBHOOK_METHOD) { if (WEBHOOK_METHOD) {
const formatUrl = WEBHOOK_URL.replaceAll(
'$title',
encodeURIComponent(text),
).replaceAll('$content', encodeURIComponent(desp));
got(formatUrl, options).then((resp) => { got(formatUrl, options).then((resp) => {
try { try {
if (resp.statusCode !== 200) { if (resp.statusCode !== 200) {
@ -1326,7 +1327,7 @@ function webhookNotify(text, desp) {
}); });
} }
function parseString(input) { function parseString(input, valueFormatFn) {
const regex = /(\w+):\s*((?:(?!\n\w+:).)*)/g; const regex = /(\w+):\s*((?:(?!\n\w+:).)*)/g;
const matches = {}; const matches = {};
@ -1338,9 +1339,10 @@ function parseString(input) {
continue; continue;
} }
const _value = value.trim(); let _value = value.trim();
try { try {
_value = valueFormatFn ? valueFormatFn(_value) : _value;
const jsonValue = JSON.parse(_value); const jsonValue = JSON.parse(_value);
matches[_key] = jsonValue; matches[_key] = jsonValue;
} catch (error) { } catch (error) {
@ -1375,12 +1377,12 @@ function parseHeaders(headers) {
return parsed; return parsed;
} }
function parseBody(body, contentType) { function parseBody(body, contentType, valueFormatFn) {
if (contentType === 'text/plain' || !body) { if (contentType === 'text/plain' || !body) {
return body; return body;
} }
const parsed = parseString(body); const parsed = parseString(body, valueFormatFn);
switch (contentType) { switch (contentType) {
case 'multipart/form-data': case 'multipart/form-data':

View File

@ -133,9 +133,9 @@ def bark(title: str, content: str, **kwargs) -> None:
print("bark 服务启动") print("bark 服务启动")
BARK_PUSH = kwargs.get("BARK_PUSH", push_config.get("BARK_PUSH")) BARK_PUSH = kwargs.get("BARK_PUSH", push_config.get("BARK_PUSH"))
if BARK_PUSH.startswith("http"): if BARK_PUSH.startswith("http"):
url = f'{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}' url = f"{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}"
else: else:
url = f'https://api.day.app/{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}' url = f"https://api.day.app/{BARK_PUSH}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}"
bark_params = { bark_params = {
"BARK_ARCHIVE": "isArchive", "BARK_ARCHIVE": "isArchive",
@ -176,7 +176,10 @@ def dingding_bot(title: str, content: str, **kwargs) -> None:
""" """
使用 钉钉机器人 推送消息 使用 钉钉机器人 推送消息
""" """
if not ((kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN")) or (push_config.get("DD_BOT_SECRET") and push_config.get("DD_BOT_TOKEN"))): if not (
(kwargs.get("DD_BOT_SECRET") and kwargs.get("DD_BOT_TOKEN"))
or (push_config.get("DD_BOT_SECRET") and push_config.get("DD_BOT_TOKEN"))
):
print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送") print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送")
return return
print("钉钉机器人 服务启动") print("钉钉机器人 服务启动")
@ -195,7 +198,7 @@ def dingding_bot(title: str, content: str, **kwargs) -> None:
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest() ).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f'https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}&timestamp={timestamp}&sign={sign}' url = f"https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}&timestamp={timestamp}&sign={sign}"
headers = {"Content-Type": "application/json;charset=utf-8"} headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post( response = requests.post(
@ -217,7 +220,7 @@ def feishu_bot(title: str, content: str, **kwargs) -> None:
return return
print("飞书 服务启动") print("飞书 服务启动")
FSKEY = kwargs.get("DD_BOT_SECRET", push_config.get("FSKEY")) FSKEY = kwargs.get("DD_BOT_SECRET", push_config.get("FSKEY"))
url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{FSKEY}' url = f"https://open.feishu.cn/open-apis/bot/v2/hook/{FSKEY}"
data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}} data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}}
response = requests.post(url, data=json.dumps(data)).json() response = requests.post(url, data=json.dumps(data)).json()
@ -231,7 +234,10 @@ def go_cqhttp(title: str, content: str, **kwargs) -> None:
""" """
使用 go_cqhttp 推送消息 使用 go_cqhttp 推送消息
""" """
if not ((kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ")) or (push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"))): if not (
(kwargs.get("GOBOT_URL") and kwargs.get("GOBOT_QQ"))
or (push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"))
):
print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送") print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送")
return return
print("go-cqhttp 服务启动") print("go-cqhttp 服务启动")
@ -244,7 +250,7 @@ def go_cqhttp(title: str, content: str, **kwargs) -> None:
GOBOT_QQ = push_config.get("GOBOT_QQ") GOBOT_QQ = push_config.get("GOBOT_QQ")
GOBOT_TOKEN = push_config.get("GOBOT_TOKEN") GOBOT_TOKEN = push_config.get("GOBOT_TOKEN")
url = f'{GOBOT_URL}?access_token={GOBOT_TOKEN}&{GOBOT_QQ}&message=标题:{title}\n内容:{content}' url = f"{GOBOT_URL}?access_token={GOBOT_TOKEN}&{GOBOT_QQ}&message=标题:{title}\n内容:{content}"
response = requests.get(url).json() response = requests.get(url).json()
if response["status"] == "ok": if response["status"] == "ok":
@ -257,7 +263,10 @@ def gotify(title: str, content: str, **kwargs) -> None:
""" """
使用 gotify 推送消息 使用 gotify 推送消息
""" """
if not ((kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN")) or (push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"))): if not (
(kwargs.get("GOTIFY_URL") and kwargs.get("GOTIFY_TOKEN"))
or (push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"))
):
print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送") print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送")
return return
print("gotify 服务启动") print("gotify 服务启动")
@ -270,7 +279,7 @@ def gotify(title: str, content: str, **kwargs) -> None:
GOTIFY_TOKEN = push_config.get("GOTIFY_TOKEN") GOTIFY_TOKEN = push_config.get("GOTIFY_TOKEN")
GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY") GOTIFY_PRIORITY = kwargs.get("GOTIFY_PRIORITY")
url = f'{GOTIFY_URL}/message?token={GOTIFY_TOKEN}' url = f"{GOTIFY_URL}/message?token={GOTIFY_TOKEN}"
data = { data = {
"title": title, "title": title,
"message": content, "message": content,
@ -293,7 +302,7 @@ def iGot(title: str, content: str, **kwargs) -> None:
return return
print("iGot 服务启动") print("iGot 服务启动")
IGOT_PUSH_KEY = kwargs.get("IGOT_PUSH_KEY", push_config.get("IGOT_PUSH_KEY")) IGOT_PUSH_KEY = kwargs.get("IGOT_PUSH_KEY", push_config.get("IGOT_PUSH_KEY"))
url = f'https://push.hellyw.com/{IGOT_PUSH_KEY}' url = f"https://push.hellyw.com/{IGOT_PUSH_KEY}"
data = {"title": title, "content": content} data = {"title": title, "content": content}
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=data, headers=headers).json() response = requests.post(url, data=data, headers=headers).json()
@ -316,9 +325,9 @@ def serverJ(title: str, content: str, **kwargs) -> None:
data = {"text": title, "desp": content.replace("\n", "\n\n")} data = {"text": title, "desp": content.replace("\n", "\n\n")}
if PUSH_KEY.find("SCT") != -1: if PUSH_KEY.find("SCT") != -1:
url = f'https://sctapi.ftqq.com/{PUSH_KEY}.send' url = f"https://sctapi.ftqq.com/{PUSH_KEY}.send"
else: else:
url = f'https://sc.ftqq.com/{PUSH_KEY}.send' url = f"https://sc.ftqq.com/{PUSH_KEY}.send"
response = requests.post(url, data=data).json() response = requests.post(url, data=data).json()
if response.get("errno") == 0 or response.get("code") == 0: if response.get("errno") == 0 or response.get("code") == 0:
@ -361,7 +370,10 @@ def chat(title: str, content: str, **kwargs) -> None:
""" """
通过Chat 推送消息 通过Chat 推送消息
""" """
if not ((kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN")) or (push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"))): if not (
(kwargs.get("CHAT_URL") and kwargs.get("CHAT_TOKEN"))
or (push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"))
):
print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送") print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送")
return return
print("chat 服务启动") print("chat 服务启动")
@ -423,7 +435,10 @@ def qmsg_bot(title: str, content: str, **kwargs) -> None:
""" """
使用 qmsg 推送消息 使用 qmsg 推送消息
""" """
if not ((kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE")) or (push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"))): if not (
(kwargs.get("QMSG_KEY") and kwargs.get("QMSG_TYPE"))
or (push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"))
):
print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送") print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送")
return return
print("qmsg 服务启动") print("qmsg 服务启动")
@ -434,7 +449,7 @@ def qmsg_bot(title: str, content: str, **kwargs) -> None:
QMSG_KEY = push_config.get("QMSG_KEY") QMSG_KEY = push_config.get("QMSG_KEY")
QMSG_TYPE = push_config.get("QMSG_TYPE") QMSG_TYPE = push_config.get("QMSG_TYPE")
url = f'https://qmsg.zendee.cn/{QMSG_TYPE}/{QMSG_KEY}' url = f"https://qmsg.zendee.cn/{QMSG_TYPE}/{QMSG_KEY}"
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")} payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
response = requests.post(url=url, params=payload).json() response = requests.post(url=url, params=payload).json()
@ -575,7 +590,10 @@ def telegram_bot(title: str, content: str, **kwargs) -> None:
""" """
使用 telegram 机器人 推送消息 使用 telegram 机器人 推送消息
""" """
if not ((kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID")) or (push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"))): if not (
(kwargs.get("TG_BOT_TOKEN") and kwargs.get("TG_USER_ID"))
or (push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"))
):
print("tg 服务的 TG_BOT_TOKEN 或者 TG_USER_ID 未设置!!\n取消推送") print("tg 服务的 TG_BOT_TOKEN 或者 TG_USER_ID 未设置!!\n取消推送")
return return
print("tg 服务启动") print("tg 服务启动")
@ -590,9 +608,7 @@ def telegram_bot(title: str, content: str, **kwargs) -> None:
TG_API_HOST = kwargs.get("TG_API_HOST", push_config.get("TG_API_HOST")) TG_API_HOST = kwargs.get("TG_API_HOST", push_config.get("TG_API_HOST"))
url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage" url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage"
else: else:
url = ( url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
)
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = { payload = {
"chat_id": str(TG_USER_ID), "chat_id": str(TG_USER_ID),
@ -600,7 +616,10 @@ def telegram_bot(title: str, content: str, **kwargs) -> None:
"disable_web_page_preview": "true", "disable_web_page_preview": "true",
} }
proxies = None proxies = None
if not ((kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT")) or (push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"))): if not (
(kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"))
or (push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"))
):
if kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"): if kwargs.get("TG_PROXY_HOST") and kwargs.get("TG_PROXY_PORT"):
TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST") TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST")
TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT") TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT")
@ -608,16 +627,12 @@ def telegram_bot(title: str, content: str, **kwargs) -> None:
TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST") TG_PROXY_HOST = kwargs.get("TG_PROXY_HOST")
TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT") TG_PROXY_PORT = kwargs.get("TG_PROXY_PORT")
if kwargs.get("TG_PROXY_AUTH") or push_config.get("TG_PROXY_AUTH"): if kwargs.get("TG_PROXY_AUTH") or push_config.get("TG_PROXY_AUTH"):
TG_PROXY_AUTH = kwargs.get("TG_PROXY_AUTH", push_config.get("TG_PROXY_AUTH")) TG_PROXY_AUTH = kwargs.get(
if TG_PROXY_AUTH is not None and "@" not in TG_PROXY_HOST: "TG_PROXY_AUTH", push_config.get("TG_PROXY_AUTH")
TG_PROXY_HOST = (
TG_PROXY_AUTH
+ "@"
+ TG_PROXY_HOST
) )
proxyStr = "http://{}:{}".format( if TG_PROXY_AUTH is not None and "@" not in TG_PROXY_HOST:
TG_PROXY_HOST, TG_PROXY_PORT TG_PROXY_HOST = TG_PROXY_AUTH + "@" + TG_PROXY_HOST
) proxyStr = "http://{}:{}".format(TG_PROXY_HOST, TG_PROXY_PORT)
proxies = {"http": proxyStr, "https": proxyStr} proxies = {"http": proxyStr, "https": proxyStr}
response = requests.post( response = requests.post(
url=url, headers=headers, params=payload, proxies=proxies url=url, headers=headers, params=payload, proxies=proxies
@ -639,19 +654,22 @@ def aibotk(title: str, content: str, **kwargs) -> None:
and kwargs.get("AIBOTK_TYPE") and kwargs.get("AIBOTK_TYPE")
and kwargs.get("AIBOTK_NAME") and kwargs.get("AIBOTK_NAME")
) )
or or (
(
push_config.get("AIBOTK_KEY") push_config.get("AIBOTK_KEY")
and push_config.get("AIBOTK_TYPE") and push_config.get("AIBOTK_TYPE")
and push_config.get("AIBOTK_NAME") and push_config.get("AIBOTK_NAME")
) )
): ):
print( print(
"智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送" "智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送"
) )
return return
print("智能微秘书 服务启动") print("智能微秘书 服务启动")
if kwargs.get("AIBOTK_KEY") and kwargs.get("AIBOTK_TYPE") and kwargs.get("AIBOTK_NAME"): if (
kwargs.get("AIBOTK_KEY")
and kwargs.get("AIBOTK_TYPE")
and kwargs.get("AIBOTK_NAME")
):
AIBOTK_KEY = kwargs.get("AIBOTK_KEY") AIBOTK_KEY = kwargs.get("AIBOTK_KEY")
AIBOTK_TYPE = kwargs.get("AIBOTK_TYPE") AIBOTK_TYPE = kwargs.get("AIBOTK_TYPE")
AIBOTK_NAME = kwargs.get("AIBOTK_NAME") AIBOTK_NAME = kwargs.get("AIBOTK_NAME")
@ -687,19 +705,22 @@ def smtp(title: str, content: str, **kwargs) -> None:
""" """
使用 SMTP 邮件 推送消息 使用 SMTP 邮件 推送消息
""" """
if not (( if not (
kwargs.get("SMTP_SERVER") (
and kwargs.get("SMTP_SSL") kwargs.get("SMTP_SERVER")
and kwargs.get("SMTP_EMAIL") and kwargs.get("SMTP_SSL")
and kwargs.get("SMTP_PASSWORD") and kwargs.get("SMTP_EMAIL")
and kwargs.get("SMTP_NAME") and kwargs.get("SMTP_PASSWORD")
) or ( and kwargs.get("SMTP_NAME")
push_config.get("SMTP_SERVER") )
and push_config.get("SMTP_SSL") or (
and push_config.get("SMTP_EMAIL") push_config.get("SMTP_SERVER")
and push_config.get("SMTP_PASSWORD") and push_config.get("SMTP_SSL")
and push_config.get("SMTP_NAME") and push_config.get("SMTP_EMAIL")
)): and push_config.get("SMTP_PASSWORD")
and push_config.get("SMTP_NAME")
)
):
print( print(
"SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送" "SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送"
) )
@ -745,9 +766,7 @@ def smtp(title: str, content: str, **kwargs) -> None:
if SMTP_SSL == "true" if SMTP_SSL == "true"
else smtplib.SMTP(SMTP_SERVER) else smtplib.SMTP(SMTP_SERVER)
) )
smtp_server.login( smtp_server.login(SMTP_EMAIL, SMTP_PASSWORD)
SMTP_EMAIL, SMTP_PASSWORD
)
smtp_server.sendmail( smtp_server.sendmail(
SMTP_EMAIL, SMTP_EMAIL,
SMTP_EMAIL, SMTP_EMAIL,
@ -769,7 +788,7 @@ def pushme(title: str, content: str, **kwargs) -> None:
print("PushMe 服务启动") print("PushMe 服务启动")
PUSHME_KEY = kwargs.get("PUSHME_KEY", push_config.get("PUSHME_KEY")) PUSHME_KEY = kwargs.get("PUSHME_KEY", push_config.get("PUSHME_KEY"))
url = f'https://push.i-i.me/?push_key={PUSHME_KEY}' url = f"https://push.i-i.me/?push_key={PUSHME_KEY}"
data = { data = {
"title": title, "title": title,
"content": content, "content": content,
@ -786,15 +805,18 @@ def chronocat(title: str, content: str, **kwargs) -> None:
""" """
使用 CHRONOCAT 推送消息 使用 CHRONOCAT 推送消息
""" """
if not (( if not (
push_config.get("CHRONOCAT_URL") (
and push_config.get("CHRONOCAT_QQ") push_config.get("CHRONOCAT_URL")
and push_config.get("CHRONOCAT_TOKEN") and push_config.get("CHRONOCAT_QQ")
) or ( and push_config.get("CHRONOCAT_TOKEN")
push_config.get("CHRONOCAT_URL") )
and push_config.get("CHRONOCAT_QQ") or (
and push_config.get("CHRONOCAT_TOKEN") push_config.get("CHRONOCAT_URL")
)): and push_config.get("CHRONOCAT_QQ")
and push_config.get("CHRONOCAT_TOKEN")
)
):
print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送") print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送")
return return
print("CHRONOCAT 服务启动") print("CHRONOCAT 服务启动")
@ -814,10 +836,10 @@ def chronocat(title: str, content: str, **kwargs) -> None:
user_ids = re.findall(r"user_id=(\d+)", CHRONOCAT_QQ) user_ids = re.findall(r"user_id=(\d+)", CHRONOCAT_QQ)
group_ids = re.findall(r"group_id=(\d+)", CHRONOCAT_QQ) group_ids = re.findall(r"group_id=(\d+)", CHRONOCAT_QQ)
url = f'{CHRONOCAT_URL}/api/message/send' url = f"{CHRONOCAT_URL}/api/message/send"
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
"Authorization": f'Bearer {CHRONOCAT_TOKEN}', "Authorization": f"Bearer {CHRONOCAT_TOKEN}",
} }
for chat_type, ids in [(1, user_ids), (2, group_ids)]: for chat_type, ids in [(1, user_ids), (2, group_ids)]:
@ -865,13 +887,14 @@ def parse_headers(headers):
return parsed return parsed
def parse_string(input_string): def parse_string(input_string, value_format_fn=None):
matches = {} matches = {}
pattern = r'(\w+):\s*((?:(?!\n\w+:).)*)' pattern = r"(\w+):\s*((?:(?!\n\w+:).)*)"
regex = re.compile(pattern) regex = re.compile(pattern)
for match in regex.finditer(input_string): for match in regex.finditer(input_string):
key, value = match.group(1).strip(), match.group(2).strip() key, value = match.group(1).strip(), match.group(2).strip()
try: try:
value = value_format_fn(value) if value_format_fn else value
json_value = json.loads(value) json_value = json.loads(value)
matches[key] = json_value matches[key] = json_value
except: except:
@ -879,11 +902,11 @@ def parse_string(input_string):
return matches return matches
def parse_body(body, content_type): def parse_body(body, content_type, value_format_fn=None):
if not body or content_type == "text/plain": if not body or content_type == "text/plain":
return body return body
parsed = parse_string(input_string) parsed = parse_string(input_string, value_format_fn)
if content_type == "application/x-www-form-urlencoded": if content_type == "application/x-www-form-urlencoded":
data = urlencode(parsed, doseq=True) data = urlencode(parsed, doseq=True)
@ -924,16 +947,19 @@ def custom_notify(title: str, content: str) -> None:
WEBHOOK_BODY = push_config.get("WEBHOOK_BODY") WEBHOOK_BODY = push_config.get("WEBHOOK_BODY")
WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS") WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS")
formatUrl, formatBody = format_notify_content( if "$title" not in WEBHOOK_URL and "$title" not in WEBHOOK_BODY:
WEBHOOK_URL, WEBHOOK_BODY, title, content
)
if not formatUrl and not formatBody:
print("请求头或者请求体中必须包含 $title 和 $content") print("请求头或者请求体中必须包含 $title 和 $content")
return return
headers = parse_headers(WEBHOOK_HEADERS) headers = parse_headers(WEBHOOK_HEADERS)
body = parse_body(formatBody, WEBHOOK_CONTENT_TYPE) body = parse_body(
WEBHOOK_BODY,
WEBHOOK_CONTENT_TYPE,
lambda v: v.replace("$title", title).replace("$content", content),
)
formatted_url = WEBHOOK_URL.replace(
"$title", urllib.parse.quote_plus(title)
).replace("$content", urllib.parse.quote_plus(content))
response = requests.request( response = requests.request(
method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body
) )
@ -1028,7 +1054,9 @@ def send(title: str, content: str, **kwargs) -> None:
add_notify_function() add_notify_function()
ts = [ ts = [
threading.Thread(target=mode, args=(title, content),kwargs=kwargs, name=mode.__name__) threading.Thread(
target=mode, args=(title, content), kwargs=kwargs, name=mode.__name__
)
for mode in notify_function for mode in notify_function
] ]
[t.start() for t in ts] [t.start() for t in ts]