Add OpeniLink notification channel (#2988)

* Initial plan

* Add OpeniLink notification channel support

Agent-Logs-Url: https://github.com/whyour/qinglong/sessions/c80b4882-1bd7-4ffe-9180-cd3220da5986

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Add context_token and hub_url to OpeniLink notification channel

Agent-Logs-Url: https://github.com/whyour/qinglong/sessions/a5e66f5a-dab8-4a65-96ca-960d35fa9d50

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
Copilot
2026-04-25 16:01:36 +08:00
committed by GitHub
parent 07bf0c705b
commit 66700ebe1a
6 changed files with 169 additions and 1 deletions
+43
View File
@@ -135,6 +135,10 @@ push_config = {
'WXPUSHER_APP_TOKEN': '', # wxpusher 的 appToken 官方文档: https://wxpusher.zjiecode.com/docs/ 管理后台: https://wxpusher.zjiecode.com/admin/
'WXPUSHER_TOPIC_IDS': '', # wxpusher 的 主题ID,多个用英文分号;分隔 topic_ids 与 uids 至少配置一个才行
'WXPUSHER_UIDS': '', # wxpusher 的 用户ID,多个用英文分号;分隔 topic_ids 与 uids 至少配置一个才行
'OPENILINK_APP_TOKEN': '', # OpeniLink 的 app_token,在 OpeniLink Hub 后台安装 App 后获取 官方文档: https://openilink.com/docs/hub/apps
'OPENILINK_HUB_URL': '', # OpeniLink Hub 地址,默认为 https://hub.openilink.com,自建 Hub 时填写自己的地址
'OPENILINK_CONTEXT_TOKEN': '', # OpeniLink 的 context_token,用于标识消息会话上下文,可从消息事件中获取
}
# fmt: on
@@ -898,6 +902,43 @@ def wxpusher_bot(title: str, content: str) -> None:
print(f"wxpusher 推送失败!错误信息:{response.get('msg')}")
def openilink(title: str, content: str) -> None:
"""
通过 OpeniLink 推送消息。
支持的环境变量:
- OPENILINK_APP_TOKEN: 在 OpeniLink Hub 后台安装 App 后获取的 app_token
- OPENILINK_HUB_URL: OpeniLink Hub 地址,默认为 https://hub.openilink.com
- OPENILINK_CONTEXT_TOKEN: 消息会话上下文 token,可从消息事件中获取
"""
if not push_config.get("OPENILINK_APP_TOKEN"):
return
print("OpeniLink 服务启动")
base_url = (
push_config.get("OPENILINK_HUB_URL", "").rstrip("/")
or "https://hub.openilink.com"
)
url = f"{base_url}/bot/v1/message/send"
headers = {
"Content-Type": "application/json",
"Authorization": f'Bearer {push_config.get("OPENILINK_APP_TOKEN")}',
}
data = {
"type": "text",
"content": f"{title}\n\n{content}",
}
if push_config.get("OPENILINK_CONTEXT_TOKEN"):
data["context_token"] = push_config.get("OPENILINK_CONTEXT_TOKEN")
response = requests.post(url=url, json=data, headers=headers).json()
if response.get("ok"):
print("OpeniLink 推送成功!")
else:
print(f'OpeniLink 推送失败!错误信息:{response.get("error")}')
def parse_headers(headers):
if not headers:
return {}
@@ -1063,6 +1104,8 @@ def add_notify_function():
push_config.get("WXPUSHER_TOPIC_IDS") or push_config.get("WXPUSHER_UIDS")
):
notify_function.append(wxpusher_bot)
if push_config.get("OPENILINK_APP_TOKEN"):
notify_function.append(openilink)
if not notify_function:
print(f"无推送渠道,请检查通知变量是否正确")
return notify_function