增加 Python 通知文件异常捕获

a. PUSHPLUS 兼容两个网址
b. 一言接口增加异常捕获
c. Bark 推送链接增加对末尾 / 的处理

1. 改用字典 get() 方法来减少报错
2. 增加 JSON 版通知文件(Python3.6+)几个异常捕获
3. 丰富返回值判断
4. 增加 requests 超时参数
This commit is contained in:
Oreo 2021-11-07 20:07:59 +08:00
parent 1c71a9d8a8
commit b43a8aea99

View File

@ -31,12 +31,12 @@ def print(text, *args, **kw):
push_config = {
'HITOKOTO': False, # 启用一言(随机句子)
'BARK_PUSH': '', # bark IP 或设备码https://api.day.app/DxHcxxxxxRxxxxxxcm/
'BARK_PUSH': '', # bark IP 或设备码https://api.day.app/DxHcxxxxxRxxxxxxcm
'BARK_ARCHIVE': '', # bark 推送是否存档
'BARK_GROUP': '', # bark 推送分组
'BARK_SOUND': '', # bark 推送声音
'CONSOLE': True, # 控制台输出
'CONSOLE': False, # 控制台输出
'DD_BOT_SECRET': '', # 钉钉机器人的 DD_BOT_SECRET
'DD_BOT_TOKEN': '', # 钉钉机器人的 DD_BOT_TOKEN
@ -92,7 +92,7 @@ def bark(title: str, content: str) -> None:
print("bark 服务启动")
if push_config.get("BARK_PUSH").startswith("http"):
url = f'{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
url = f'{push_config.get("BARK_PUSH").lstrip("/")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
else:
url = f'https://api.day.app/{push_config.get("BARK_PUSH")}/{urllib.parse.quote_plus(title)}/{urllib.parse.quote_plus(content)}'
@ -112,12 +112,22 @@ def bark(title: str, content: str) -> None:
params += f"{bark_params.get(pair[0])}={pair[1]}&"
if params:
url = url + "?" + params.rstrip("&")
response = requests.get(url).json()
if response["code"] == 200:
print("bark 推送成功!")
else:
print("bark 推送失败!")
try:
response = requests.get(url, timeout=15)
datas = response.json()
if datas.get("code") == 200:
print("bark 推送成功!")
elif datas.get("code") == 400:
print("bark 推送失败!找不到 Key 对应的 DeviceToken。")
else:
print(f"bark 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def console(title: str, content: str) -> None:
@ -147,14 +157,22 @@ def dingding_bot(title: str, content: str) -> None:
url = f'https://oapi.dingtalk.com/robot/send?access_token={push_config.get("DD_BOT_TOKEN")}&timestamp={timestamp}&sign={sign}'
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
).json()
if not response["errcode"]:
print("钉钉机器人 推送成功!")
else:
print("钉钉机器人 推送失败!")
try:
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
)
datas = response.json()
if datas.get("errcode") is None:
print("钉钉机器人 推送成功!")
else:
print(f"钉钉机器人 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def feishu_bot(title: str, content: str) -> None:
@ -168,12 +186,19 @@ def feishu_bot(title: str, content: str) -> None:
url = f'https://open.feishu.cn/open-apis/bot/v2/hook/{push_config.get("FSKEY")}'
data = {"msg_type": "text", "content": {"text": f"{title}\n\n{content}"}}
response = requests.post(url, data=json.dumps(data)).json()
if response.get("StatusCode") == 0:
print("飞书 推送成功!")
else:
print("飞书 推送失败!错误信息如下:\n", response)
try:
response = requests.post(url, data=json.dumps(data), timeout=15)
datas = response.json
if datas.get("StatusCode") == 0:
print("飞书 推送成功!")
else:
print(f"飞书 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def go_cqhttp(title: str, content: str) -> None:
@ -186,12 +211,20 @@ def go_cqhttp(title: str, content: str) -> None:
print("go-cqhttp 服务启动")
url = f'{push_config.get("GOBOT_URL")}?access_token={push_config.get("GOBOT_TOKEN")}&{push_config.get("GOBOT_QQ")}&message=标题:{title}\n内容:{content}'
response = requests.get(url).json()
if response["status"] == "ok":
print("go-cqhttp 推送成功!")
else:
print("go-cqhttp 推送失败!")
try:
response = requests.get(url, timeout=15)
datas = response.json()
if datas.get("status") == "ok":
print("go-cqhttp 推送成功!")
else:
print("go-cqhttp 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def iGot(title: str, content: str) -> None:
@ -206,12 +239,20 @@ def iGot(title: str, content: str) -> None:
url = f'https://push.hellyw.com/{push_config.get("IGOT_PUSH_KEY")}'
data = {"title": title, "content": content}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=data, headers=headers).json()
if response["ret"] == 0:
print("iGot 推送成功!")
else:
print(f'iGot 推送失败!{response["errMsg"]}')
try:
response = requests.post(url, data=data, headers=headers, timeout=15)
datas = response.json()
if datas.get("ret") == 0:
print("iGot 推送成功!")
else:
print(f'iGot 推送失败!错误信息:{datas.get("errMsg")}')
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def serverJ(title: str, content: str) -> None:
@ -228,12 +269,22 @@ def serverJ(title: str, content: str) -> None:
url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send'
else:
url = f'https://sc.ftqq.com/${push_config.get("PUSH_KEY")}.send'
response = requests.post(url, data=data).json()
if response.get("errno") == 0 or response.get("code") == 0:
print("serverJ 推送成功!")
else:
print(f'serverJ 推送失败!错误码:{response["message"]}')
try:
response = requests.post(url, data=data, timeout=15)
datas = response.json()
if datas.get("errno") == 0 or datas.get("code") == 0:
print("serverJ 推送成功!")
elif datas.get("code") == 40001:
print("serverJ 推送失败PUSH_KEY 错误。")
else:
print(f'serverJ 推送失败!错误码:{datas.get("message")}')
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def pushplus_bot(title: str, content: str) -> None:
@ -254,21 +305,30 @@ def pushplus_bot(title: str, content: str) -> None:
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS 推送成功!")
else:
url_old = "http://pushplus.hxtrip.com/send"
response = requests.post(url=url_old, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS(hxtrip) 推送成功!")
try:
response = requests.post(url=url, data=body, headers=headers, timeout=15)
datas = response.json()
if datas.get("code") == 200:
print("PUSHPLUS 推送成功!")
elif datas.get("code") == 600:
url2 = "http://pushplus.hxtrip.com/send"
response2 = requests.post(url=url2, data=body, headers=headers, timeout=15)
datas2 = response2.json()
if datas2.get("code") == 200:
print("PUSHPLUS(hxtrip) 推送成功!")
elif datas2.get("code") == 600:
print("PUSHPLUS 推送失败PUSH_PLUS_TOKEN 错误。")
else:
print(f"PUSHPLUS(hxtrip) 推送失败!响应数据:{response2.text}")
else:
print("PUSHPLUS 推送失败!")
print(f"PUSHPLUS 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def qmsg_bot(title: str, content: str) -> None:
@ -282,12 +342,20 @@ def qmsg_bot(title: str, content: str) -> None:
url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}'
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
response = requests.post(url=url, params=payload).json()
if response["code"] == 0:
print("qmsg 推送成功!")
else:
print(f'qmsg 推送失败!{response["reason"]}')
try:
response = requests.post(url=url, params=payload, timeout=15)
datas = response.json()
if response.get("code") == 0:
print("qmsg 推送成功!")
else:
print(f'qmsg 推送失败!错误信息:{datas.get("reason")}')
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def wecom_app(title: str, content: str) -> None:
@ -313,16 +381,22 @@ def wecom_app(title: str, content: str) -> None:
media_id = ""
wx = WeCom(corpid, corpsecret, agentid)
# 如果没有配置 media_id 默认就以 text 方式发送
if not media_id:
message = title + "\n\n" + content
response = wx.send_text(message, touser)
else:
response = wx.send_mpnews(title, content, media_id, touser)
if response == "ok":
print("企业微信推送成功!")
else:
print("企业微信推送失败!错误信息如下:\n", response)
try:
if not media_id:
message = title + "\n\n" + content
datas = wx.send_text(message, touser)
else:
datas = wx.send_mpnews(title, content, media_id, touser)
if datas == "ok":
print("企业微信推送成功!")
else:
print(f"企业微信推送失败!错误信息:{datas}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
class WeCom:
@ -337,9 +411,9 @@ class WeCom:
"corpid": self.CORPID,
"corpsecret": self.CORPSECRET,
}
req = requests.post(url, params=values)
req = requests.post(url, params=values, timeout=15)
data = json.loads(req.text)
return data["access_token"]
return data.get("access_token")
def send_text(self, message, touser="@all"):
send_url = (
@ -354,9 +428,9 @@ class WeCom:
"safe": "0",
}
send_msges = bytes(json.dumps(send_values), "utf-8")
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
response = requests.post(send_url, send_msges, timeout=15)
datas = response.json()
return datas.get("errmsg")
def send_mpnews(self, title, message, media_id, touser="@all"):
send_url = (
@ -381,9 +455,9 @@ class WeCom:
},
}
send_msges = bytes(json.dumps(send_values), "utf-8")
respone = requests.post(send_url, send_msges)
respone = respone.json()
return respone["errmsg"]
response = requests.post(send_url, send_msges, timeout=15)
datas = response.json()
return datas.get("errmsg")
def wecom_bot(title: str, content: str) -> None:
@ -398,14 +472,22 @@ def wecom_bot(title: str, content: str) -> None:
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}"
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
).json()
if response["errcode"] == 0:
print("企业微信机器人推送成功!")
else:
print("企业微信机器人推送失败!")
try:
response = requests.post(
url=url, data=json.dumps(data), headers=headers, timeout=15
)
datas = response.json()
if datas.get("errcode") == 0:
print("企业微信机器人 推送成功!")
else:
print(f"企业微信机器人 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def telegram_bot(title: str, content: str) -> None:
@ -443,14 +525,26 @@ def telegram_bot(title: str, content: str) -> None:
push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT")
)
proxies = {"http": proxyStr, "https": proxyStr}
response = requests.post(
url=url, headers=headers, params=payload, proxies=proxies
).json()
if response["ok"]:
print("tg 推送成功!")
else:
print("tg 推送失败!")
try:
response = requests.post(
url=url, headers=headers, params=payload, proxies=proxies, timeout=15
)
datas = response.json()
if datas.get("ok") == True:
print("tg 推送成功!")
elif datas.get("error_code") == 400:
print("tg 推送失败!请主动给 bot 发送一条消息并检查接收用户 TG_USER_ID 是否正确。")
elif datas.get("error_code") == 401:
print("tg 推送失败TG_BOT_TOKEN 填写错误。")
else:
print(f"tg 推送失败!响应数据:{response.text}")
except json.JSONDecodeError as e:
print(f"推送返回值非 json 格式,请检查网址和账号是否填写正确。\n{e}")
except requests.exceptions.RequestException as e:
print(f"网络异常,请检查你的网络连接、推送服务器和代理配置。\n{e}")
except Exception as e:
print(f"其他错误信息:\n{e}")
def one() -> str:
@ -458,9 +552,12 @@ def one() -> str:
获取一条一言
:return:
"""
url = "https://v1.hitokoto.cn/"
res = requests.get(url).json()
return res["hitokoto"] + " ----" + res["from"]
try:
url = "https://v1.hitokoto.cn/"
res = requests.get(url).json()
return res["hitokoto"] + " ----" + res["from"]
except requests.exceptions.ConnectionError:
return ""
if push_config.get("BARK_PUSH"):