大家好~我是
米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程
,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记
,获取最新文章教程!
# 回顾
上一节我们完成了前置条件
支持http请求的内容,这一节我们继续开始研究多接口之间参数提取的问题。
由于之前我们已经能够把har的数据转为我们自身的RequestInfo了,加上已经完成了http请求,所以我们可以直接开始考虑提取多个RequestInfo里面的参数了。
# 思路
在此之前,我们先稍微捋一捋大体的思路,我的想法是这样的,当然肯定会有一些不完善的地方,而且这块内容也是一个初版,后续可能会有改动。
前提
先说一下我们做这个工作的前提,我们需要做到最核心的2个事情:
- 存储每个请求中可能会被下个请求使用到的变量
- 读取当前的数据,看是否在上个请求中出现过
搞清楚了这2点,我感觉这块内容就不会很复杂。接着是整体思路:
- 遍历RequestInfo数组,并将response和response_headers里面的变量以value-路径的形式存储下来,举个例子,我们登录后返回以下Response:
{
"code": 0,
"data": {
"token": "123"
}
}
2
3
4
5
6
这时候我们需要存储的map就变成了这样:
{"123":"response.data.token"}
这时候就有人会问了,如果如果出现重复的怎么办,比如ABC3个接口依次调用,A和B中都有值为123的数据,那这个目前我是以第一个数据
为准。
但是为了方便,我还是存放了一个数组,所以形式会是:
{"123": ["response.data.token", "response.data.xxx.token"]}
这样是为了扩展,万一后面我不以第一条
数据为准了,起码我出现过123的key都被记录下来了,到时候改动功能也不愁有什么问题。
- 当请求数据是第一条数据,我们不会做参数替换,否则我们要遍历body和url以及request_headers,因为一般来说变量都会在这3个数据里面。接着我们找到变量的时候,把变量放到全局的数组里面,最后我们就可以知道有多少变量发生了替换。
步骤就是这么简单明了,由于肯定会存在一些变量名重复的问题,所以这个功能,也往往只是用来参考
而已。并且还会存在一个问题:
对于字符串组合的数据,是不能够很好支持的
。
比如我的一个数据是这样的:
{"a": "123456"}
我希望他能够被替换为:
{"a": "${response.data.token}456"}
这样由于字符串
需要做比较复杂的操作,所以我们这里暂时不支持,只进行全匹配
。
# 开始行动
说了上面这么多,我们就来开始一步一个脚印,编写代码吧~
编写忽略字段相关
因为headers里面很多都是重复的,我们最好是忽略他们。
class CaseGenerator(object):
# 忽略的字段
ignored = (
"Content-Type", "Connection", "Date", "Content-Length", "Host", "access-control-allow-credentials",
"access-control-allow-origin", "User-Agent", "Server"
)
@staticmethod
def ignore(key: str):
for ig in CaseGenerator.ignored:
if key.lower().endswith(ig.lower()):
return True
return False
2
3
4
5
6
7
8
9
10
11
12
13
编写存储相关
这里涉及到了dfs,也就是深度解析JSON里面的key-value,并最后把结果塞入ans,也就是我们存储变量的字典。
@staticmethod
def dfs(body, path: str, ans: dict, headers: bool = False):
if isinstance(body, list):
for i in range(len(body)):
c_path = f"{path}.{i}"
CaseGenerator.dfs(body[i], c_path, ans, headers)
elif isinstance(body, dict):
for k, v in body.items():
c_path = f"{path}.{k}"
CaseGenerator.dfs(v, c_path, ans, headers)
else:
if not headers or not CaseGenerator.ignore(path):
# 如果是bool值,需要特殊处理一下,因为Python get False/True会变成get 0 1
if isinstance(body, bool):
ans[str(body)].append(path)
else:
ans[body].append(path)
@staticmethod
def split_body(request: RequestInfo, ans: dict, var_name: str = ''):
if request.body:
try:
body = json.loads(request.response_content)
CaseGenerator.dfs(body, var_name, ans)
except JSONDecodeError:
# 可能body不是JSON,跳过
pass
except Exception as e:
raise GenerateException(f"解析接口body变量出错: {e}")
@staticmethod
def split_headers(request: RequestInfo, ans: dict, var_name: str = ""):
try:
CaseGenerator.dfs(request.response_headers, var_name, ans, True)
except Exception as e:
raise GenerateException(f"解析接口headers变量出错: {e}")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
主要是遍历body和headers,如果不是JSON数据,直接就pass了,不继续解析了。
- 编写主体方法
@staticmethod
def extract_field(requests: List[RequestInfo]) -> List[str]:
"""
遍历接口,并提取其中的变量
:param requests:
:return:
"""
var_pool = defaultdict(list)
replaced = []
for i in range(len(requests)):
# 记录变量
CaseGenerator.record_vars(requests[i], var_pool, f"http_res_{i + 1}")
if i > 0:
CaseGenerator.replace_vars(requests[i], var_pool, replaced)
return replaced
@staticmethod
def record_vars(request: RequestInfo, ans: dict, var_name: str):
CaseGenerator.split_headers(request, ans, f"{var_name}.response_headers")
CaseGenerator.split_body(request, ans, f"{var_name}.response")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
针对一个个接口请求,我们都会遍历,只有当请求是第二个以后的请求,我们才替换数据。
编写替换部分
替换部分分为3部分,替换url,headers和body,替换body和记录response其实比较相似,都是dfs遍历json。
@staticmethod
def replace_vars(request: RequestInfo, ans: dict, replaced: list):
CaseGenerator.replace_url(request, ans, replaced)
CaseGenerator.replace_headers(request, ans, replaced)
CaseGenerator.replace_body(request, ans, replaced)
2
3
4
5
6
以上是主体方法,我们先来打url这个boss:
我们知道,url参数都是有规律的存在:
协议 | 特殊标识 | url | path参数 | query参数 |
---|---|---|---|---|
http/https | 😕/ | www.baidu.com | /:name/:id | ?user=woody&handsome=true |
所以我们可以这样拆分url,先根据?切割path参数和query参数,然后根据&切割query参数,此时query变成:
["user=woody","handsome=true"]
左侧则继续根据/切割urlpath,抓取里面的变量。具体代码如下:
@staticmethod
def replace_url(request: RequestInfo, ans: dict, replaced: list):
"""
拆解url,将url里面的路由path和query参数
:return:
"""
# 获取前缀和后缀
url_query = request.url.split("?")
# 注意这里要判断,如果url里面没有?,代表query_list为空数组
if len(url_query) == 1:
query_list = list()
prefix = url_query[0]
else:
prefix, suffix = url_query
query_list = suffix.split("&")
http, prefix = prefix.split("//")
url_list = prefix.split("/")
new_url = []
new_query = []
for u in url_list:
if ans.get(u):
new_url.append(ans.get(u)[0])
replaced.append("%s => ${%s}" % (u, ans.get(u)[0]))
else:
new_url.append(u)
for q in query_list:
k, v = q.split("=")
if ans.get(v):
new_query.append("%s=${%s}" % (k, ans.get(v)[0]))
replaced.append("%s => ${%s}" % (k, ans.get(v)[0]))
else:
new_query.append(q)
# 替换最终生成的url
if len(query_list) == 0:
request.url = f"{http}//{'/'.join(new_url)}"
return
request.url = f"{http}//{'/'.join(new_url)}?{'&'.join(new_query)}"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
接着我们要编写headers和body的替换:
@staticmethod
def replace_headers(request: RequestInfo, ans: dict, replaced: list):
for k, v in request.request_headers.items():
if ans.get(v):
request.request_headers[k] = "${%s}" % ans.get(v)[0]
replaced.append("%s => ${%s}" % (k, ans.get(v)[0]))
@staticmethod
def replace_body(request: RequestInfo, ans: dict, replaced: list):
if request.body:
try:
data = json.loads(request.body)
var_type = list()
CaseGenerator.dfs_replace(data, ans, var_type, replaced)
result = json.dumps(data, ensure_ascii=False)
for v in var_type:
result = result.replace(f'"{v}"', f"{v}")
request.body = result
except JSONDecodeError:
pass
except Exception as e:
logger.error(f"转换body变量失败: {e}")
@staticmethod
def dfs_replace(body, ans: dict, var_type: list, replaced: list):
if isinstance(body, dict):
for k, v in body.items():
string, value = CaseGenerator.dfs_replace(v, ans, var_type, replaced)
if value is not None:
body[k] = "${%s}" % value
if not string:
var_type.append("${%s}" % value)
elif isinstance(body, list):
for i in range(len(body)):
string, value = CaseGenerator.dfs_replace(body[i], ans, var_type, replaced)
if value is not None:
body[i] = "${%s}" % value
if not string:
var_type.append("${%s}" % value)
else:
body_str = body
if isinstance(body, bool):
body_str = str(body)
if ans.get(body_str):
replaced.append("%s => ${%s}" % (body_str, ans.get(body_str)[0]))
if not isinstance(body_str, str):
return False, ans.get(body_str)[0]
return True, ans.get(body_str)[0]
return None, None
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
dfs也就是深度递归的方式,找到在ans里面的变量,并加入到replaced(已被替换的数据),我们除此之外还要记录变量是否是str类型,如果不是,我们则需要替换为${变量}
,否则需要替换为"${变量}"
,由于我们的字典
不支持这样的数据:
{"a": ${response.data.token}}
所以我们需要先转为字符串,再使用字符串
replace方法,所以这里需要记录他是不是字符串类型,这里可能比较绕一点。
# 测试一下吧
我特意录制了一个har文件,是这样操作的:
- 登录pity
- 创建pity项目,并把项目名称改为我的
用户名称
,这样登录后的用户信息就应该会被替换到project_name这个字段里面
话不多说,我们来看看效果:
if __name__ == "__main__":
req = HarConvertor.convert("./pity.fun.har", "api.pity.fun")
print(req)
CaseGenerator.extract_field(req)
print(req)
2
3
4
5
首先可以看到的是,token已经被替换成功了,接着来我们来确定下项目
那块。
这是改造之前的body内容,我们搜下下面的部分(因为RequestInfo打印的时候,是字典的形式,所以顺序比较混乱可能)
所以我这边分了2次打印,替换前后都打印了。可以看到,我们的woody和米洛都被替换成了登录接口
的response部分。
最后我们来看看replaced里面的内容:
可以看到里面替换了woody,替换了米洛,也替换了其他的一些包括userId之类的数据,替换1处即插入一条数据。
后续我们需要把替换的详情展示给用户,让他们知道哪里做了替换。再然后,我们就得开始把这些内容转换为case+http前置部分
了,这无疑会极大地提高编写case的效率。
今天的内容比较丰富,大家可以先消化消化~