diff --git a/sample/notify.py b/sample/notify.py index e62d4218..456d9f21 100644 --- a/sample/notify.py +++ b/sample/notify.py @@ -33,7 +33,7 @@ def print(text, *args, **kw): # 通知服务 # fmt: off push_config = { - 'HITOKOTO': True, # 启用一言(随机句子) + 'HITOKOTO': True, # 启用一言(随机句子) 'BARK_PUSH': '', # bark IP 或设备码,例:https://api.day.app/DxHcxxxxxRxxxxxxcm/ 'BARK_ARCHIVE': '', # bark 推送是否存档 @@ -43,7 +43,7 @@ push_config = { 'BARK_LEVEL': '', # bark 推送时效性 'BARK_URL': '', # bark 推送跳转URL - 'CONSOLE': False, # 控制台输出 + 'CONSOLE': False, # 控制台输出 'DD_BOT_SECRET': '', # 钉钉机器人的 DD_BOT_SECRET 'DD_BOT_TOKEN': '', # 钉钉机器人的 DD_BOT_TOKEN @@ -58,6 +58,11 @@ push_config = { # /send_group_msg 时填入 group_id=QQ群 'GOBOT_TOKEN': '', # go-cqhttp 的 access_token + 'ONEBOT_URL': '', # OneBot 的推送地址,以send_msg结尾 + 'ONEBOT_USER': '', # OneBot 的推送对象,QQ号 + 'ONEBOT_GROUP': '', # OneBot 的推送对象,群号 + 'ONEBOT_TOKEN': '', # OneBot 的 access_token,可选 + 'GOTIFY_URL': '', # gotify地址,如https://push.example.de:8080 'GOTIFY_TOKEN': '', # gotify的消息应用token 'GOTIFY_PRIORITY': 0, # 推送消息优先级,默认为0 @@ -254,6 +259,53 @@ def go_cqhttp(title: str, content: str) -> None: print("go-cqhttp 推送失败!") +def one_bot(title: str, content: str) -> None: + """ + 使用 one_bot 推送消息。 + """ + if not push_config.get("ONEBOT_USER") and not push_config.get("ONEBOT_GROUP"): + return + if not push_config.get("ONEBOT_URL"): + return + print("OneBot 服务启动") + + url = push_config.get("ONEBOT_URL").rstrip("/") + user_id = push_config.get("ONEBOT_USER") + group_id = push_config.get("ONEBOT_GROUP") + token = push_config.get("ONEBOT_TOKEN") + + if not url.endswith("/send_msg"): + url += "/send_msg" + + headers = {'Content-Type': "application/json"} + message = [{"type": "text", "data": {"text": f"{title}\n\n{content}"}}] + data_private = {"message": message} + data_group = {"message": message} + + if token is not None: + headers["Authorization"] = f"Bearer {token}" + + if user_id is not None: + data_private["message_type"] = "private" + data_private["user_id"] = user_id + response_private = requests.post(url, data=json.dumps(data_private), headers=headers).json() + + if response_private["status"] == "ok": + print("OneBot 私聊推送成功!") + else: + print("OneBot 私聊推送失败!") + + if group_id is not None: + data_group["message_type"] = "group" + data_group["group_id"] = group_id + response_group = requests.post(url, data=json.dumps(data_group), headers=headers).json() + + if response_group["status"] == "ok": + print("OneBot 群聊推送成功!") + else: + print("OneBot 群聊推送失败!") + + def gotify(title: str, content: str) -> None: """ 使用 gotify 推送消息。 @@ -985,6 +1037,8 @@ def add_notify_function(): notify_function.append(feishu_bot) if push_config.get("GOBOT_URL") and push_config.get("GOBOT_QQ"): notify_function.append(go_cqhttp) + if push_config.get("ONEBOT_URL"): + notify_function.append(one_bot) if push_config.get("GOTIFY_URL") and push_config.get("GOTIFY_TOKEN"): notify_function.append(gotify) if push_config.get("IGOT_PUSH_KEY"):