测试平台系列(137) 提取多个接口中的参数.md

2022/6/13 测试平台接口测试FastApiPythonReact

大家好~我是米洛
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

# 回顾

上一节我们完成了前置条件支持http请求的内容,这一节我们继续开始研究多接口之间参数提取的问题。

由于之前我们已经能够把har的数据转为我们自身的RequestInfo了,加上已经完成了http请求,所以我们可以直接开始考虑提取多个RequestInfo里面的参数了。

# 思路

在此之前,我们先稍微捋一捋大体的思路,我的想法是这样的,当然肯定会有一些不完善的地方,而且这块内容也是一个初版,后续可能会有改动。

  • 前提

    先说一下我们做这个工作的前提,我们需要做到最核心的2个事情:

  1. 存储每个请求中可能会被下个请求使用到的变量
  2. 读取当前的数据,看是否在上个请求中出现过

搞清楚了这2点,我感觉这块内容就不会很复杂。接着是整体思路:

  1. 遍历RequestInfo数组,并将response和response_headers里面的变量以value-路径的形式存储下来,举个例子,我们登录后返回以下Response:
{
  "code": 0,
  "data": {
    "token": "123"
  }
}
1
2
3
4
5
6

这时候我们需要存储的map就变成了这样:

{"123":"response.data.token"}
1

这时候就有人会问了,如果如果出现重复的怎么办,比如ABC3个接口依次调用,A和B中都有值为123的数据,那这个目前我是以第一个数据为准。

但是为了方便,我还是存放了一个数组,所以形式会是:

{"123": ["response.data.token", "response.data.xxx.token"]}
1

这样是为了扩展,万一后面我不以第一条数据为准了,起码我出现过123的key都被记录下来了,到时候改动功能也不愁有什么问题。

  1. 当请求数据是第一条数据,我们不会做参数替换,否则我们要遍历body和url以及request_headers,因为一般来说变量都会在这3个数据里面。接着我们找到变量的时候,把变量放到全局的数组里面,最后我们就可以知道有多少变量发生了替换。

步骤就是这么简单明了,由于肯定会存在一些变量名重复的问题,所以这个功能,也往往只是用来参考而已。并且还会存在一个问题:

对于字符串组合的数据,是不能够很好支持的

比如我的一个数据是这样的:

{"a": "123456"}
1

我希望他能够被替换为:

{"a": "${response.data.token}456"}
1

这样由于字符串需要做比较复杂的操作,所以我们这里暂时不支持,只进行全匹配

# 开始行动

说了上面这么多,我们就来开始一步一个脚印,编写代码吧~

  • 编写忽略字段相关

    因为headers里面很多都是重复的,我们最好是忽略他们。

class CaseGenerator(object):
    # 忽略的字段
    ignored = (
        "Content-Type", "Connection", "Date", "Content-Length", "Host", "access-control-allow-credentials",
        "access-control-allow-origin", "User-Agent", "Server"
    )

    @staticmethod
    def ignore(key: str):
        for ig in CaseGenerator.ignored:
            if key.lower().endswith(ig.lower()):
                return True
        return False
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 编写存储相关

    这里涉及到了dfs,也就是深度解析JSON里面的key-value,并最后把结果塞入ans,也就是我们存储变量的字典。

    @staticmethod
    def dfs(body, path: str, ans: dict, headers: bool = False):
        if isinstance(body, list):
            for i in range(len(body)):
                c_path = f"{path}.{i}"
                CaseGenerator.dfs(body[i], c_path, ans, headers)
        elif isinstance(body, dict):
            for k, v in body.items():
                c_path = f"{path}.{k}"
                CaseGenerator.dfs(v, c_path, ans, headers)
        else:
            if not headers or not CaseGenerator.ignore(path):
                # 如果是bool值,需要特殊处理一下,因为Python get False/True会变成get 0 1
                if isinstance(body, bool):
                    ans[str(body)].append(path)
                else:
                    ans[body].append(path)

    @staticmethod
    def split_body(request: RequestInfo, ans: dict, var_name: str = ''):
        if request.body:
            try:
                body = json.loads(request.response_content)
                CaseGenerator.dfs(body, var_name, ans)
            except JSONDecodeError:
                # 可能body不是JSON,跳过
                pass
            except Exception as e:
                raise GenerateException(f"解析接口body变量出错: {e}")

    @staticmethod
    def split_headers(request: RequestInfo, ans: dict, var_name: str = ""):
        try:
            CaseGenerator.dfs(request.response_headers, var_name, ans, True)
        except Exception as e:
            raise GenerateException(f"解析接口headers变量出错: {e}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

主要是遍历body和headers,如果不是JSON数据,直接就pass了,不继续解析了。

  • 编写主体方法
    @staticmethod
    def extract_field(requests: List[RequestInfo]) -> List[str]:
        """
        遍历接口,并提取其中的变量
        :param requests:
        :return:
        """
        var_pool = defaultdict(list)
        replaced = []
        for i in range(len(requests)):
            # 记录变量
            CaseGenerator.record_vars(requests[i], var_pool, f"http_res_{i + 1}")
            if i > 0:
                CaseGenerator.replace_vars(requests[i], var_pool, replaced)
        return replaced
        
    @staticmethod
    def record_vars(request: RequestInfo, ans: dict, var_name: str):
        CaseGenerator.split_headers(request, ans, f"{var_name}.response_headers")
        CaseGenerator.split_body(request, ans, f"{var_name}.response")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

针对一个个接口请求,我们都会遍历,只有当请求是第二个以后的请求,我们才替换数据。

  • 编写替换部分

    替换部分分为3部分,替换url,headers和body,替换body和记录response其实比较相似,都是dfs遍历json。

    @staticmethod
    def replace_vars(request: RequestInfo, ans: dict, replaced: list):
        CaseGenerator.replace_url(request, ans, replaced)
        CaseGenerator.replace_headers(request, ans, replaced)
        CaseGenerator.replace_body(request, ans, replaced)
       
1
2
3
4
5
6

以上是主体方法,我们先来打url这个boss:

我们知道,url参数都是有规律的存在:

协议 特殊标识 url path参数 query参数
http/https 😕/ www.baidu.com /:name/:id ?user=woody&handsome=true

所以我们可以这样拆分url,先根据?切割path参数和query参数,然后根据&切割query参数,此时query变成:

["user=woody","handsome=true"]
1

左侧则继续根据/切割urlpath,抓取里面的变量。具体代码如下:

    @staticmethod
    def replace_url(request: RequestInfo, ans: dict, replaced: list):
        """
        拆解url,将url里面的路由path和query参数
        :return:
        """
        # 获取前缀和后缀
        url_query = request.url.split("?")
        # 注意这里要判断,如果url里面没有?,代表query_list为空数组
        if len(url_query) == 1:
            query_list = list()
            prefix = url_query[0]
        else:
            prefix, suffix = url_query
            query_list = suffix.split("&")
        http, prefix = prefix.split("//")
        url_list = prefix.split("/")
        new_url = []
        new_query = []
        for u in url_list:
            if ans.get(u):
                new_url.append(ans.get(u)[0])
                replaced.append("%s => ${%s}" % (u, ans.get(u)[0]))
            else:
                new_url.append(u)
        for q in query_list:
            k, v = q.split("=")
            if ans.get(v):
                new_query.append("%s=${%s}" % (k, ans.get(v)[0]))
                replaced.append("%s => ${%s}" % (k, ans.get(v)[0]))
            else:
                new_query.append(q)
        # 替换最终生成的url
        if len(query_list) == 0:
            request.url = f"{http}//{'/'.join(new_url)}"
            return
        request.url = f"{http}//{'/'.join(new_url)}?{'&'.join(new_query)}"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

接着我们要编写headers和body的替换:

    @staticmethod
    def replace_headers(request: RequestInfo, ans: dict, replaced: list):
        for k, v in request.request_headers.items():
            if ans.get(v):
                request.request_headers[k] = "${%s}" % ans.get(v)[0]
                replaced.append("%s => ${%s}" % (k, ans.get(v)[0]))

    @staticmethod
    def replace_body(request: RequestInfo, ans: dict, replaced: list):
        if request.body:
            try:
                data = json.loads(request.body)
                var_type = list()
                CaseGenerator.dfs_replace(data, ans, var_type, replaced)
                result = json.dumps(data, ensure_ascii=False)
                for v in var_type:
                    result = result.replace(f'"{v}"', f"{v}")
                request.body = result
            except JSONDecodeError:
                pass
            except Exception as e:
                logger.error(f"转换body变量失败: {e}")
                
    @staticmethod
    def dfs_replace(body, ans: dict, var_type: list, replaced: list):
        if isinstance(body, dict):
            for k, v in body.items():
                string, value = CaseGenerator.dfs_replace(v, ans, var_type, replaced)
                if value is not None:
                    body[k] = "${%s}" % value
                    if not string:
                        var_type.append("${%s}" % value)
        elif isinstance(body, list):
            for i in range(len(body)):
                string, value = CaseGenerator.dfs_replace(body[i], ans, var_type, replaced)
                if value is not None:
                    body[i] = "${%s}" % value
                    if not string:
                        var_type.append("${%s}" % value)
        else:
            body_str = body
            if isinstance(body, bool):
                body_str = str(body)
            if ans.get(body_str):
                replaced.append("%s => ${%s}" % (body_str, ans.get(body_str)[0]))
                if not isinstance(body_str, str):
                    return False, ans.get(body_str)[0]
                return True, ans.get(body_str)[0]
            return None, None
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

dfs也就是深度递归的方式,找到在ans里面的变量,并加入到replaced(已被替换的数据),我们除此之外还要记录变量是否是str类型,如果不是,我们则需要替换为${变量},否则需要替换为"${变量}",由于我们的字典不支持这样的数据:

{"a": ${response.data.token}}
1

所以我们需要先转为字符串,再使用字符串replace方法,所以这里需要记录他是不是字符串类型,这里可能比较绕一点。

# 测试一下吧

我特意录制了一个har文件,是这样操作的:

  1. 登录pity
  2. 创建pity项目,并把项目名称改为我的用户名称,这样登录后的用户信息就应该会被替换到project_name这个字段里面

话不多说,我们来看看效果:

if __name__ == "__main__":
    req = HarConvertor.convert("./pity.fun.har", "api.pity.fun")
    print(req)
    CaseGenerator.extract_field(req)
    print(req)
1
2
3
4
5

首先可以看到的是,token已经被替换成功了,接着来我们来确定下项目那块。

这是改造之前的body内容,我们搜下下面的部分(因为RequestInfo打印的时候,是字典的形式,所以顺序比较混乱可能)

所以我这边分了2次打印,替换前后都打印了。可以看到,我们的woody和米洛都被替换成了登录接口的response部分。

最后我们来看看replaced里面的内容:

可以看到里面替换了woody,替换了米洛,也替换了其他的一些包括userId之类的数据,替换1处即插入一条数据。

后续我们需要把替换的详情展示给用户,让他们知道哪里做了替换。再然后,我们就得开始把这些内容转换为case+http前置部分了,这无疑会极大地提高编写case的效率。

今天的内容比较丰富,大家可以先消化消化~