mirror of
https://github.com/smallevilbeast/ntchat.git
synced 2025-07-12 03:46:07 +08:00
fastapi例子open接口返回二维码结果
This commit is contained in:
parent
ae450372b1
commit
f7fb727257
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
import threading
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from mgr import ClientManager
|
from mgr import ClientManager
|
||||||
|
@ -56,8 +57,14 @@ async def client_create():
|
||||||
response_model=models.ResponseModel)
|
response_model=models.ResponseModel)
|
||||||
@catch_exception()
|
@catch_exception()
|
||||||
async def client_open(model: models.ClientOpenReqModel):
|
async def client_open(model: models.ClientOpenReqModel):
|
||||||
ret = client_mgr.get_client(model.guid).open(model.smart, model.show_login_qrcode)
|
client = client_mgr.get_client(model.guid)
|
||||||
return response_json(1 if ret else 0)
|
ret = client.open(model.smart, model.show_login_qrcode)
|
||||||
|
|
||||||
|
# 当show_login_qrcode=True时, 打开微信时会显示二维码界面
|
||||||
|
if model.show_login_qrcode:
|
||||||
|
client.qrcode_event = threading.Event()
|
||||||
|
client.qrcode_event.wait(timeout=10)
|
||||||
|
return response_json(1 if ret else 0, {'qrcode': client.qrcode})
|
||||||
|
|
||||||
|
|
||||||
@app.post("/global/set_callback_url", summary="设置接收通知地址", tags=["Global"],
|
@app.post("/global/set_callback_url", summary="设置接收通知地址", tags=["Global"],
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import ntchat
|
import ntchat
|
||||||
|
import threading
|
||||||
import requests
|
import requests
|
||||||
from typing import Dict, Union
|
from typing import Dict, Union
|
||||||
from ntchat.utils.singleton import Singleton
|
from ntchat.utils.singleton import Singleton
|
||||||
|
@ -9,6 +10,8 @@ from exception import ClientNotExists
|
||||||
|
|
||||||
class ClientWeChat(ntchat.WeChat):
|
class ClientWeChat(ntchat.WeChat):
|
||||||
guid: str = ""
|
guid: str = ""
|
||||||
|
qrcode_event: threading.Event = None
|
||||||
|
qrcode: str = ""
|
||||||
|
|
||||||
|
|
||||||
class ClientManager(metaclass=Singleton):
|
class ClientManager(metaclass=Singleton):
|
||||||
|
@ -35,7 +38,7 @@ class ClientManager(metaclass=Singleton):
|
||||||
wechat.on(ntchat.MT_RECV_WECHAT_QUIT_MSG, self.__on_quit_callback)
|
wechat.on(ntchat.MT_RECV_WECHAT_QUIT_MSG, self.__on_quit_callback)
|
||||||
return guid
|
return guid
|
||||||
|
|
||||||
def get_client(self, guid: str) -> Union[None, ntchat.WeChat]:
|
def get_client(self, guid: str) -> ClientWeChat:
|
||||||
client = self.__client_map.get(guid, None)
|
client = self.__client_map.get(guid, None)
|
||||||
if client is None:
|
if client is None:
|
||||||
raise ClientNotExists(guid)
|
raise ClientNotExists(guid)
|
||||||
|
@ -45,7 +48,14 @@ class ClientManager(metaclass=Singleton):
|
||||||
if guid in self.__client_map:
|
if guid in self.__client_map:
|
||||||
del self.__client_map[guid]
|
del self.__client_map[guid]
|
||||||
|
|
||||||
def __on_callback(self, wechat, message):
|
def __on_callback(self, wechat: ClientWeChat, message: dict):
|
||||||
|
|
||||||
|
# 通知二维码显示
|
||||||
|
msg_type = message['type']
|
||||||
|
if msg_type == ntchat.MT_RECV_LOGIN_QRCODE_MSG and wechat.qrcode_event:
|
||||||
|
wechat.qrcode = message["data"]["code"]
|
||||||
|
wechat.qrcode_event.set()
|
||||||
|
|
||||||
if not self.callback_url:
|
if not self.callback_url:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user