新增web接口完整例子

This commit is contained in:
evilbeast 2022-08-29 17:02:06 +08:00
parent 9505e69779
commit 708cc9e2d9
12 changed files with 509 additions and 1 deletions

View File

@ -120,6 +120,10 @@ except KeyboardInterrupt:
sys.exit() sys.exit()
``` ```
## 使用fastapi框架实现的web api接口
[查看fastapi_example例子](./fastapi_example/README.md)
## 使用pyxcgui界面库实现的简单例子 ## 使用pyxcgui界面库实现的简单例子

26
fastapi_example/README.md Normal file
View File

@ -0,0 +1,26 @@
## NtChat fastapi完整示例
通过fastapi的swagger在线文档可以很方便的管理NtChat接口
![vfazT0.jpg](https://s1.ax1x.com/2022/08/29/vfazT0.jpg)
## 安装依赖
```bash
pip install -r requirements.txt
```
## 运行例子
```bash
python main.py
```
## 访问api在线文档
[http://127.0.0.1:8000/docs](http://127.0.0.1:8000/docs)
## 如何调用
可以使用requests库去访问接口
/client/create 是创建一个微信的实例返回guid标识实例的id, 后面所有的接口都要用到
/client/open 是打开并管理上微信实例

View File

17
fastapi_example/down.py Normal file
View File

@ -0,0 +1,17 @@
import os.path
import time
import requests
from xdg import get_download_dir
from models import SendMediaReqModel
def get_local_path(model: SendMediaReqModel):
if os.path.isfile(model.file_path):
return model.file_path
if not model.url:
return None
data = requests.get(model.url).content
temp_file = os.path.join(get_download_dir(), str(time.time_ns()))
with open(temp_file, 'wb') as fp:
fp.write(data)
return temp_file

View File

@ -0,0 +1,8 @@
class ClientNotExists(Exception):
guid = ""
def __init__(self, guid):
self.guid = guid
class MediaNotExistsError(Exception):
pass

240
fastapi_example/main.py Normal file
View File

@ -0,0 +1,240 @@
# -*- coding: utf-8 -*-
import uvicorn
from functools import wraps
from fastapi import FastAPI
from mgr import ClientManager
from typing import List
from down import get_local_path
from exception import MediaNotExistsError, ClientNotExists
import models
import ntchat
def response_json(status=0, data=None, msg=""):
return {
"status": status,
"data": {} if data is None else data,
"msg": msg
}
class catch_exception:
def __call__(self, f):
@wraps(f)
async def wrapper(*args, **kwargs):
try:
return await f(*args, **kwargs)
except ntchat.WeChatNotLoginError:
return response_json(msg="wechat instance not login")
except ntchat.WeChatBindError:
return response_json(msg="wechat bind error")
except ntchat.WeChatVersionNotMatchError:
return response_json(msg="wechat version not match, install require wechat version")
except MediaNotExistsError:
return response_json(msg="file_path or url error")
except ClientNotExists as e:
return response_json(msg="client not exists, guid: %s" % e.guid)
except Exception as e:
return response_json(msg=str(e))
return wrapper
client_mgr = ClientManager()
app = FastAPI(title="NtChat fastapi完整示例",
description="NtChat项目地址: https://github.com/smallevilbeast/ntchat")
@app.post("/client/create", summary="创建实例", tags=["Client"],
response_model=models.ResponseModel)
@catch_exception()
async def client_create():
guid = client_mgr.create_client()
return response_json(1, {"guid": guid})
@app.post("/client/open", summary="打开微信", tags=["Client"],
response_model=models.ResponseModel)
@catch_exception()
async def client_open(model: models.ClientOpenReqModel):
ret = client_mgr.get_client(model.guid).open(model.smart)
return response_json(1 if ret else 0)
@app.post("/global/set_callback_url", summary="设置接收通知地址", tags=["Global"],
response_model=models.ResponseModel)
@catch_exception()
async def client_set_callback_url(model: models.CallbackUrlReqModel):
client_mgr.callback_url = model.callback_url
return response_json(1)
@app.post("/user/get_profile", summary="获取自己的信息", tags=["User"],
response_model=models.ResponseModel)
@catch_exception()
async def user_get_profile(model: models.ClientReqModel):
data = client_mgr.get_client(model.guid).get_self_info()
return response_json(1, data)
@app.post("/contact/get_contacts", summary="获取联系人列表", tags=["Contact"],
response_model=models.ResponseModel)
@catch_exception()
async def get_contacts(model: models.ClientReqModel):
data = client_mgr.get_client(model.guid).get_contacts()
print(data)
return response_json(1, data)
@app.post("/contact/get_contact_detail", summary="获取指定联系人详细信息", tags=["Contact"],
response_model=models.ContactDetailModel)
@catch_exception()
async def get_contact_detail(model: models.ContactDetailReqModel):
data = client_mgr.get_client(model.guid).get_contact_detail(model.wxid)
return response_json(1, data)
@app.post("/room/get_rooms", summary="获取群列表", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def get_rooms(model: models.ClientReqModel):
data = client_mgr.get_client(model.guid).get_rooms()
return response_json(1, data)
@app.post("/room/get_room_members", summary="获取群成员列表", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def get_room_members(model: models.GetRoomMembersReqModel):
data = client_mgr.get_client(model.guid).get_room_members(model.room_wxid)
return response_json(1, data)
@app.post("/room/create_room", summary="创建群", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def create_room(model: models.CreateRoomReqModel):
ret = client_mgr.get_client(model.guid).create_room(model.member_list)
return response_json(1 if ret else 0)
@app.post("/room/add_room_member", summary="添加好友入群", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def add_room_member(model: models.RoomMembersReqModel):
data = client_mgr.get_client(model.guid).add_room_member(model.room_wxid, model.member_list)
return response_json(1, data)
@app.post("/room/invite_room_member", summary="邀请好友入群", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def invite_room_member(model: models.RoomMembersReqModel):
data = client_mgr.get_client(model.guid).invite_room_member(model.room_wxid, model.member_list)
return response_json(1, data)
@app.post("/room/del_room_member", summary="删除群成员", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def del_room_member(model: models.RoomMembersReqModel):
data = client_mgr.get_client(model.guid).del_room_member(model.room_wxid, model.member_list)
return response_json(1, data)
@app.post("/room/add_room_friend", summary="添加群成员为好友", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def add_room_friend(model: models.AddRoomFriendReqModel):
data = client_mgr.get_client(model.guid).add_room_friend(model.room_wxid,
model.wxid,
model.verify)
return response_json(1, data)
@app.post("/room/modify_name", summary="修改群名", tags=["Room"],
response_model=models.ResponseModel)
@catch_exception()
async def add_room_friend(model: models.ModifyRoomNameReqModel):
data = client_mgr.get_client(model.guid).modify_room_name(model.room_wxid,
model.name)
return response_json(1, data)
@app.post("/msg/send_text", summary="发送文本消息", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def msg_send_text(model: models.SendTextReqModel):
ret = client_mgr.get_client(model.guid).send_text(model.to_wxid, model.content)
return response_json(1 if ret else 0)
@app.post("/msg/send_room_at", summary="发送群@消息", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_room_at(model: models.SendRoomAtReqModel):
ret = client_mgr.get_client(model.guid).send_room_at_msg(model.to_wxid,
model.content,
model.at_list)
return response_json(1 if ret else 0)
@app.post("/msg/send_card", summary="发送名片", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_card(model: models.SendCardReqModel):
ret = client_mgr.get_client(model.guid).send_card(model.to_wxid,
model.card_wxid)
return response_json(1 if ret else 0)
@app.post("/msg/send_link_card", summary="发送链接卡片消息", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_link_card(model: models.SendLinkCardReqModel):
ret = client_mgr.get_client(model.guid).send_link_card(model.to_wxid,
model.title,
model.desc,
model.url,
model.image_url)
return response_json(1 if ret else 0)
@app.post("/msg/send_image", summary="发送图片", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_image(model: models.SendMediaReqModel):
file_path = get_local_path(model)
if file_path is None:
raise MediaNotExistsError()
ret = client_mgr.get_client(model.guid).send_image(model.to_wxid, file_path)
return response_json(1 if ret else 0)
@app.post("/msg/send_file", summary="发送文件", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_file(model: models.SendMediaReqModel):
file_path = get_local_path(model)
if file_path is None:
raise MediaNotExistsError()
ret = client_mgr.get_client(model.guid).send_file(model.to_wxid, file_path)
return response_json(1 if ret else 0)
@app.post("/msg/send_video", summary="发送视频", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_video(model: models.SendMediaReqModel):
file_path = get_local_path(model)
if file_path is None:
raise MediaNotExistsError()
ret = client_mgr.get_client(model.guid).send_video(model.to_wxid, file_path)
return response_json(1 if ret else 0)
@app.post("/msg/send_gif", summary="发送GIF", tags=["Msg"], response_model=models.ResponseModel)
@catch_exception()
async def send_gif(model: models.SendMediaReqModel):
file_path = get_local_path(model)
if file_path is None:
raise MediaNotExistsError()
ret = client_mgr.get_client(model.guid).send_gif(model.to_wxid, file_path)
return response_json(1 if ret else 0)
if __name__ == '__main__':
uvicorn.run(app=app)

55
fastapi_example/mgr.py Normal file
View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
import ntchat
import requests
from typing import Dict, Union
from ntchat.utils.singleton import Singleton
from utils import generate_guid
from exception import ClientNotExists
class ClientWeChat(ntchat.WeChat):
guid: str = ""
class ClientManager(metaclass=Singleton):
__client_map: Dict[str, ntchat.WeChat] = {}
callback_url: str = ""
def new_guid(self):
"""
生成新的guid
"""
while True:
guid = generate_guid("wechat")
if guid not in self.__client_map:
return guid
def create_client(self):
guid = self.new_guid()
wechat = ClientWeChat()
wechat.guid = guid
self.__client_map[guid] = wechat
# 注册回调
wechat.on(ntchat.MT_ALL, self.__on_callback)
return guid
def get_client(self, guid: str) -> Union[None, ntchat.WeChat]:
client = self.__client_map.get(guid, None)
if client is None:
raise ClientNotExists(guid)
return client
def remove_client(self, guid):
if guid in self.__client_map:
del self.__client_map[guid]
def __on_callback(self, wechat, message):
if not self.callback_url:
return
client_message = {
"guid": wechat.guid,
"message": message
}
requests.post(self.callback_url, json=client_message)

134
fastapi_example/models.py Normal file
View File

@ -0,0 +1,134 @@
from typing import Optional, List, Any, Union, Dict
from pydantic import BaseModel
class ClientReqModel(BaseModel):
guid: str
class ResponseModel(BaseModel):
status: int
msg: Optional[str] = ""
data: Optional[Any] = None
class ClientOpenReqModel(ClientReqModel):
smart: Optional[bool] = True
show_login_qrcode: Optional[bool] = False
class CallbackUrlReqModel(BaseModel):
callback_url: Optional[str] = ""
class UserProfileModel(BaseModel):
wxid: str
nickname: str
account: str
avatar: str
class ContactModel(BaseModel):
account: str
avatar: str
city: str
country: str
nickname: str
province: str
remark: str
sex: int
wxid: str
class ContactDetailReqModel(ClientReqModel):
wxid: str
class ContactDetailModel(BaseModel):
account: str
avatar: str
city: str
country: str
nickname: str
province: str
remark: str
sex: int
wxid: str
signature: str
small_avatar: str
sns_pic: str
source_type: int
status: int
v1: str
v2: str
class AcceptFriendReqModel(ClientReqModel):
encryptusername: str
ticket: str
scene: int
class RoomModel(BaseModel):
wxid: str
nickname: str
avatar: str
is_manager: int
manager_wxid: str
total_member: int
member_list: List[str]
class RoomMemberModel(ContactModel):
display_name: str
class GetRoomMembersReqModel(ClientReqModel):
room_wxid: str
class CreateRoomReqModel(ClientReqModel):
member_list: List[str]
class RoomMembersReqModel(CreateRoomReqModel):
room_wxid: str
class AddRoomFriendReqModel(ClientReqModel):
room_wxid: str
wxid: str
verify: str
class ModifyRoomNameReqModel(ClientReqModel):
room_wxid: str
name: str
class SendMsgReqModel(ClientReqModel):
to_wxid: str
class SendTextReqModel(SendMsgReqModel):
content: str
class SendRoomAtReqModel(SendTextReqModel):
at_list: List[str]
class SendCardReqModel(SendMsgReqModel):
card_wxid: str
class SendLinkCardReqModel(SendMsgReqModel):
title: str
desc: str
url: str
image_url: str
class SendMediaReqModel(SendMsgReqModel):
file_path: Optional[str] = ""
url: Optional[str] = ""

View File

@ -0,0 +1,4 @@
ntchat
fastapi
requests
uvicorn

6
fastapi_example/utils.py Normal file
View File

@ -0,0 +1,6 @@
import uuid
import time
def generate_guid(prefix=''):
return str(uuid.uuid3(uuid.NAMESPACE_URL, prefix + str(time.time())))

14
fastapi_example/xdg.py Normal file
View File

@ -0,0 +1,14 @@
import os
import sys
import os.path
def get_exec_dir():
return os.path.dirname(sys.argv[0])
def get_download_dir():
user_dir = os.path.join(get_exec_dir(), 'download')
if not os.path.isdir(user_dir):
os.makedirs(user_dir)
return user_dir

View File

@ -335,7 +335,7 @@ class WeChat:
"room_wxid": room_wxid, "room_wxid": room_wxid,
"notice": notice "notice": notice
} }
return self.__send_sync(send_type.MT_MOD_ROOM_NAME_MSG, data) return self.__send_sync(send_type.MT_MOD_ROOM_NOTICE_MSG, data)
def add_room_friend(self, room_wxid: str, wxid: str, verify: str): def add_room_friend(self, room_wxid: str, wxid: str, verify: str):
""" """