大家好~我是
米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程
,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记
,获取最新文章教程!
# 回顾
上一节我们写了一些后端的内容,今天来给后端收个尾,并且说一部分前端的内容,本次消息中心内容就该结束了。
本次内容做的质量不是很够,大概在1星1费卡的水准,毕竟没有很好的参考,所以希望大家轻喷。
# 修改NotificationDao.py
修改这个文件的原因,是我们因为要查询不同类型的消息,不能用简单的mapper(list_record)方法去查询,因为比较复杂。
from datetime import timedelta, datetime
from sqlalchemy import select, and_, or_
from app.crud import Mapper
from app.enums.MessageEnum import MessageTypeEnum, MessageStateEnum
from app.models import async_session
from app.models.broadcast_read_user import PityBroadcastReadUser
from app.models.notification import PityNotification
from app.utils.decorator import dao
from app.utils.logger import Log
@dao(PityNotification, Log("PityNotificationDao"))
class PityNotificationDao(Mapper):
@classmethod
async def list_messages(cls, msg_type: int, msg_status: int, receiver: int):
"""
根据消息id和消息类型以及接收人获取消息数据
:param msg_type:
:param msg_status:
:param receiver:
:return:
"""
ninety_days = datetime.now() - timedelta(days=90)
# 1. 当消息类型不为广播类型时,正常查询
if msg_type == MessageTypeEnum.others:
ans = await cls.list_record(msg_status=msg_status, receiver=receiver, msg_type=msg_type,
condition=[PityNotification.created_at > ninety_days])
else:
# 否则需要根据是否已读进行查询 只支持90天内数据
async with async_session() as session:
# 找到3个月内的消息
default_condition = [PityNotification.deleted_at == 0, PityNotification.created_at >= ninety_days]
if msg_type == MessageTypeEnum.broadcast:
conditions = [*default_condition, PityNotification.msg_type == msg_type]
else:
# 说明是全部消息
conditions = [*default_condition,
or_(PityNotification.msg_type != msg_type, PityNotification.receiver == receiver)]
sql = select(PityNotification, PityBroadcastReadUser) \
.outerjoin(PityBroadcastReadUser,
and_(PityNotification.id == PityBroadcastReadUser.notification_id,
PityBroadcastReadUser.read_user == receiver)).where(*conditions).order_by(
PityNotification.created_at.desc())
query = await session.execute(sql)
result = query.all()
ans = []
last_month = datetime.now() - timedelta(days=30)
for notify, read in result:
# 如果非广播类型,直接
if notify.msg_type == MessageTypeEnum.others:
if notify.msg_status == msg_status:
ans.append(notify)
continue
else:
if msg_status == MessageStateEnum.read:
if read is not None or notify.updated_at < last_month:
ans.append(notify)
else:
if not read:
ans.append(notify)
return ans
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
这边具体讲解一下代码,理一下逻辑:
当查询的非广播消息时,我们不需要与broadcast表关联,直接查询notification表里面90天内的数据即可
否则我们对消息类型进行判断
如果只要广播消息,那么我们也是找到消息类型为
广播消息
且创建时间在90天内
的数据。如果要全部消息,那我们还得带上一个or条件,广播消息+其他类型的消息并且接收人是该用户的
接着联表查出读取状态
最后根据读取的状态判断,是要未读还是要已读的消息,进行一遍筛选
这里sql如果合在一起很复杂(我最近脑子不太好使),所以简单点了,用sql查询出所有数据,接着根据对应的选项筛选出合适的数据。
简单的说就是,要已读消息的话,那么条件是
已读表里面有该条记录或者消息已经过期1个月了
,过期一个月的消息默认算已读了。如果要未读消息的话,必须在广播表里面查不到这条数据
。这块确实很复杂,后续可能会改造下,目前就先将就用着= =
# 调整main.py
@pity.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
await ws_manage.connect(websocket, user_id)
try:
# 定义特殊值的回复,配合前端实现确定连接,心跳检测等逻辑
questions_and_answers_map: dict = {
"HELLO SERVER": F"hello {user_id}",
"HEARTBEAT": F"{user_id}",
}
# 存储连接后获取消息
msg_records = await PityNotificationDao.list_messages(msg_type=MessageTypeEnum.all.value, receiver=user_id,
msg_status=MessageStateEnum.unread.value)
# 如果有未读消息, 则推送给前端对应的count
if len(msg_records) > 0:
await websocket.send_json(WebSocketMessage.msg_count(len(msg_records), True))
while True:
data: str = await websocket.receive_text()
if (du := data.upper()) in questions_and_answers_map:
await ws_manage.send_personal_message(message=questions_and_answers_map.get(du), websocket=websocket)
except WebSocketDisconnect:
if user_id in ws_manage.active_connections:
ws_manage.disconnect(user_id)
except Exception as e:
print(e)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
去main.py编写一个websocket接口,接受参数是user_id,连接上客户端
以后,我们会查询这个用户的消息数量,并发送给前端。while True后的语句用于主动接收客户端的消息,但我们这个场景似乎不太适用
。
对方断开连接后则关闭此处的客户端连接。
# 编写前端部分
# BasicLayout.jsx
在basiclayout.jsx的useEffect方法处编写如下代码,如果用户已登录,默认给之连接websocket,收到消息进行一次判断:
桌面通知还是消息数量,如果是消息数量则更新,否则则调用notification.info进行推送。这个event.preventDefault不要漏掉,会导致服务端报连接关闭的错误。
# Notification.jsx
这里细节代码大家可以去看github提交记录,大概的样式如下图:
由于代码比较多,就不一一展示了。具体可以看https://github.com/wuranxu/pityWeb里的内容。
# 测试下
我们手动在数据库插入一条消息数据,receiver是你的用户id,deleted_at要为0。接着我们刷新下页面:
这里我们还做了删除消息
和已读消息
的功能,默认是进入这个页面就已读全部消息
。
# 测试下桌面通知
在run_test_plan方法里面加入如下判断,如果执行人
不为0,也就是说是手动执行的,那么执行完毕后给出消息通知:
# 可以看到,执行时间比较久了。其实这边接口
已经异步返回了,但消息推送是测试计划真正执行完成了之后才出现。
还是那句话,实现得很粗糙,包括广播消息的处理,还有消息通知,有很多瑕疵。
大量消息存储
websocket自动重连啥的,基本都没有处理
等问题出现了再慢慢修修补补吧。