测试平台系列(132) 用例生成之录制接口请求(上).md

2022/6/13 测试平台接口测试FastApiPythonReact

大家好~我是米洛
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

# 回顾

上一节我们完成了mitmproxy的demo版本,这一节我们来大刀阔斧,利用mitmproxy录制我们需要的接口请求

录制了以后,下一节我们估计是要存取/解析这些请求,最后一步就是转换为pity的测试用例了。当然,为了通用,最好我们也能支持har/jmx等其他文件的导入。

# 录制流程

  1. 用户在页面输入要录制的url
  2. 用户点击开始录制,服务端将用户的ip存放到redis中,这样用户就算中途离开,刷新页面依然能看到对应的录制数据和录制仍在进行中的状态
  3. 用户点击停止录制按钮,结束录制,后续就是用例的生成工作了

本节分2部分,建议配合起来阅读。先给大家看个概览:

# 配置代理

根据mitmproxy官网可知,他们需要至少python3.8以上,所以大家也趁这个机会升级一下python吧~毕竟3.8也发布很久了。

接着我们需要先安装mitmproxy,因为他需要随着我们的服务启动:

pip install mitmproxy
1

安装好以后,我们来配置我们的MITMPROXY启动端口,暂定为7778吧~

由于配置文件我有些许改动,可以参考下文档: https://fastapi.tiangolo.com/advanced/settings/#reading-a-env-file (opens new window)

这里就不细说了。基本用法和之前类似,只不过是从conf/dev.env里面读取配置。

我们开启mock并且设置好对应的端口号。

# 编写Redis缓存接口请求的方法

我的计划是把临时录制的请求(保存1小时)放入redis,与该用户的ip绑定,如果1小时内用户暂时离开,比如去上了个厕所,回来也能继续操作。

所以我们需要写一个redis缓存接口请求的方法,下面是代码:

我们用列表存放这个用户录制的请求(利用ip地址的唯一性保证key的唯一),接着判断这个key是否设置了过期时间,如果没有则给它设置1小时的过期时间。

# 编写proxy插件

我们现在开始动手编写一个录制插件,看过上篇文章的观众应该都很熟悉。

"""
流量录制->生成case功能
record steps and generate testcase
"""
import asyncio
import json
import re

from app.core.ws_connection_manager import ws_manage
from app.enums.MessageEnum import WebSocketMessageEnum
from app.middleware.RedisManager import RedisHelper
from app.proxy.utils import RequestInfo


class PityRecorder(object):

    async def response(self, flow):
        # 获取当前录制到的客户端ip地址
        addr = flow.client_conn.address[0]
        # 判断这个ip是否开启了录制,没开启的话,直接return
        record = await RedisHelper.get_address_record(addr)
        if not record:
            return
        # 获取录制的url信息
        data = json.loads(record)
        # 判断当前的url是否和配置的url匹配
        pattern = re.compile(data.get("regex"))
        # 如果匹配则继续
        if re.findall(pattern, flow.request.url):
            # 忽略js、css等文件 忽略options请求
            if flow.request.method.lower() == "options":
                return
            if flow.request.url.endswith(("js", "css", "ttf", "jpg", "svg", "gif")):
                return
            # 说明已开启录制开关,记录状态 这里将flow里面的request进行了转换
            request_data = RequestInfo(flow)
            # 序列化为str,方便存入redis
            dump_data = request_data.dumps()
            # 记录录制的接口数据
            await RedisHelper.cache_record(addr, dump_data)
            # 通过websocket发送到录制页面,使页面生成最新的数据
            asyncio.create_task(ws_manage.send_data(data.get("user_id"), WebSocketMessageEnum.RECORD,
                                                    dump_data))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

注释里面写的很详尽,这里面有一部分代码还没有实现,涉及到的是redis和websocket那块,大体意思大家能理解即可。

接着我们要将插件挂载到mitmproxy:

app/proxy/__init__.py里面包裹了一个start_proxy的方法,先判断用户是否安装了mitmproxy,接着把recorder插件挂载进来,利用DumpMaster启动mitmproxy即可。

上面就是本节内容,下一节我们具体细化websocket、redis还有前端部分内容,敬请期待。