大家好~我是
米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程
,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记
,获取最新文章教程!
# 回顾
上一节我们设计好了测试计划
表并编写了CRUD接口,还没来得及测试,我们就要马不停蹄地编写定时任务相关内容了。今天我们就来编写一个完善的demo,可以定期执行测试计划
的用例,并生成报告。
至于后面的通知,要等到以后完善了。
# 调整Executor类里面的部分方法
之前我们虽然支持了多条case
异步运行并写入测试报告,但为了支持测试计划
,我们还需要进行一些调整。
修改run_multiple方法
我们的
核心
还是这个方法,因为里面已经包装好了运行多条case的方法。但我们要做一些改动:
首先我们要调整它的参数,因为我们运行case的模式,有很多种,按照之前定义的mode字段:
这样,当我们调用run_mutiple方法的时候,就可以知道case的模式是什么模式了,究竟是隶属于测试计划
呢,还是属于普通执行的case
。我们调整一下,改成图2的数据。
plan_id可以为空,如果有plan_id的话,可以直接跳转到测试计划
相关页面。方便编辑测试计划,也方便排查问题。
ordered这个方法区分同步还是异步,因为我们先前的run_multiple都是默认异步的,但我们测试计划又新增了同步模式,所以需要定义这样一个参数。
- 修改ReportDao
也是添加这2个参数,透传进来。方便报告里面表明,这是什么模式的执行,以及是否有测试计划id。
- 调整执行逻辑
根据ordered字段判断是否是同步执行
,如果不是则和以前一致,是的话则用for的方式执行,保证用例执行的顺序性。
# 新增run_test_plan方法
@staticmethod
async def run_test_plan(plan_id: int):
"""
通过测试计划id执行测试计划
:param plan_id:
:return:
"""
plan = await PityTestPlanDao.query_test_plan(plan_id)
if plan is None:
Executor.log.info(f"测试计划: [{plan_id}]不存在")
return
# 设置为running
plan.state = 1
await PityTestPlanDao.update_test_plan(plan, plan.update_user)
# if plan.disabled:
# # 说明测试计划已禁用
# Executor.log.info(f"测试计划: [{plan.name}]未开启")
# return
env = list(map(int, plan.env.split(",")))
case_list = list(map(int, plan.case_list.split(",")))
await asyncio.gather(
*(Executor.run_multiple(0, int(e), case_list, mode=1,
plan_id=plan.id, ordered=plan.ordered) for e in env))
# TODO 后续通知部分
plan.state = 0
await PityTestPlanDao.update_test_plan(plan, plan.update_user)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
先通过query方法查到测试计划数据,如果plan是None,说明没有这个测试计划,可能被删掉
了。
否则我们走执行逻辑: 根据plan的env字段取出环境,并转换为[1, 2]这样的数组。
这里可能有点绕,因为我们数据库存的env和case_list都是这样的形式:
1,2,3,4
通过逗号分开,我们split以后,会得到: ["1", "2", "3", "4"]这样的数组,所以我们需要批量转为int。
最后,由于一般环境之间会隔离
,所以我们的ordered根据环境走就可以了,多环境执行我们可以按照异步处理。
所以最后也选用了gather。
# 编写Scheduler类
我们在utils目录下新建scheduler.py文件:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from app.utils.executor import Executor
class Scheduler(object):
scheduler: AsyncIOScheduler = None
@staticmethod
def init(scheduler):
Scheduler.scheduler = scheduler
@staticmethod
def configure(**kwargs):
Scheduler.scheduler.configure(**kwargs)
@staticmethod
def start():
Scheduler.scheduler.start()
@staticmethod
def add_test_plan(plan_id, plan_name, cron):
return Scheduler.scheduler.add_job(func=Executor.run_test_plan, args=(plan_id,),
name=plan_name, id=str(plan_id),
trigger=CronTrigger.from_crontab(cron))
@staticmethod
def edit():
pass
@staticmethod
def remove():
pass
@staticmethod
def list():
job_list = Scheduler.scheduler.get_jobs()
return job_list
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
这里我们对scheduler进行了非常简单的封装,然后定义了add_test_plan方法。
这个方法可以添加Executor.run_test_plan
到定时任务,他接受id,name和cron3个参数,这个数据最终会落到定时任务表
里面去。
# 在添加测试计划接口调用add_test_plan方法
# 测试一下
FastApi深度结合了Swagger,所以我们只需要打开: http://localhost:7777/docs便可以找到我们的测试方法:
由于我已经添加过了,并且是每分钟一次,所以我们只需要启动服务,静静等待
即可。
那今天的简单demo就完成到这里,下一节我们完善删改查的内容。