大家好~我是
米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程
,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记
,获取最新文章教程!
# 回顾
上一节我们编写了用例/目录分屏的功能,属于提升用户体验
相关的内容。接下来我们要基于websocket,完成消息中心相关功能。这块博主也没什么经验,如果有不对的地方欢迎大家指正
。
本模块由于篇幅问题,会分为3~5篇。
# 需求背景
我们的产品要想做一些东西,必须得有一个背景在里头。比如我为啥要做这个功能,有什么用处。虽然咱们比不上专业的产品,但是也可以在做项目的过程中培养自己这块能力
。
我个人的想法是,当我们执行case,或者指派一些用例给他人编写,或者未来有一些任务安排的时候,我们可以先指派,接着推送
给对方,让对方能知道你的安排。
但可能目前我们最需要的就是,测试任务完成以后的通知功能。所以我们需要调研一下简单的页面。
参考下语雀的页面:
可以看到十分清爽,左侧tab显示消息类型,右侧tab显示已读/未读消息数量和列表。
# 编写消息表
消息的话,我们需要先定义一下消息表。来看看有哪些字段吧:
from sqlalchemy import SMALLINT, Column, VARCHAR, INT
from app.models.basic import PityBase
class PityNotification(PityBase):
msg_type = Column(SMALLINT, comment="消息类型 0: 用例执行 1: 任务分配")
msg_content = Column(VARCHAR(200), comment="消息内容", nullable=True)
msg_link = Column(VARCHAR(128), comment="消息链接")
msg_status = Column(SMALLINT, comment="消息状态 0: 未读 1: 已读")
sender = Column(INT, comment="消息发送人, 0则是CPU 非0则是其他用户")
receiver = Column(INT, comment="消息接收人, 0为广播消息")
__tablename__ = "pity_notification"
def __init__(self, msg_type, msg_content, sender, receiver, user, msg_link=None, msg_status=0):
super().__init__(user)
self.msg_type = msg_type
self.receiver = receiver
self.msg_content = msg_content
self.sender = sender
self.msg_link = msg_link
self.msg_status = msg_status
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
定义了一个通知表,里面有几个核心字段:
- 类型(预留字段)
- 内容
- 链接(方便用户跳转,可为空)
- 状态
- 发送人
- 接收人(这里有个广播消息的区分)
# 编写后端查询/已读功能
一般我们在页面上的操作,也就是查询
和已读
功能,所以对外暴露的接口,暂时只考虑这2块就可以了。
如果不存在的目录和文件,大家就新建,否则就修改。放上路径的原因便是如此。
- 编写Dao方法(app/crud/notification/NotificationDao.py)
from app.crud import Mapper
from app.models.notification import PityNotification
from app.utils.decorator import dao
from app.utils.logger import Log
@dao(PityNotification, Log("PityNotificationDao"))
class PityNotificationDao(Mapper):
pass
2
3
4
5
6
7
8
9
10
由于之前编写了一些公有方法,所以现在很方便,我们不再需要写很重复的代码了。
编写接口
在此之前,我们需要为app/crud/init.py/Mapper类新增一个update方法:
@classmethod
async def update_by_map(cls, *condition, **kwargs):
try:
async with async_session() as session:
async with session.begin():
sql = update(cls.model).where(*condition).values(**kwargs, updated_at=datetime.now())
await session.execute(sql)
except Exception as e:
cls.log.error(f"更新数据失败: {e}")
raise Exception("更新数据失败")
2
3
4
5
6
7
8
9
10
以往我们编写的update,都是先查询数据,如果没有数据则返回异常,如果有则更新数据。
这样做的缺点是多查询了一次数据库,优点是能够防止意外数据
。
上述的方法,是根据condition(查询条件)及kwargs(变更字典)更新。调用的也都是sqlalchemy基本的方法。
后续有空我也会出一个sqlalchemy crud相关demo,包括join(只能说下次一定)。
接着我们照常编写crud接口就可以了。
新建app/routers/notification/message.py文件(也别忘了在main.py引入这个router 代码大家可以参考之前的routers目录)
from typing import List
from fastapi import APIRouter, Depends
from app.crud.notification.NotificationDao import PityNotificationDao
from app.handler.fatcory import PityResponse
from app.models.notification import PityNotification
from app.routers import Permission
router = APIRouter(prefix="/notification")
@router.get("/list", description="获取用户消息列表")
async def list_msg(msg_status: int = 0, msg_type: int = 0, user_info=Depends(Permission())):
try:
data = await PityNotificationDao.list_record(msg_type=msg_type, msg_status=msg_status,
receiver=user_info['id'])
return PityResponse.success(data)
except Exception as e:
return PityResponse.failed(str(e))
@router.post("/read", description="用户读取消息")
async def read_msg(msg_id: List[int], user_info=Depends(Permission())):
try:
await PityNotificationDao.update_by_map(PityNotification.id.in_(msg_id),
PityNotification.receiver == user_info['id'], msg_status=1)
return PityResponse.success()
except Exception as e:
return PityResponse.failed(str(e))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
这边要注意的地方是已读消息
的接口,接受的是一个msg_id列表,因为我们需要一个一键已读
的功能,这样会比较方便。其实也就是一个批量和单个的接口,为了省事,我们使用一个即可。
今天的内容就介绍到这里了,但其实这样还有个大问题: 对于广播消息,这个查询和更新都是操作不到的。
这个问题就要留到我们下一节来处理了。