测试平台系列(74) 测试计划定时执行初体验.md

2022/6/13 测试平台接口测试FastApiPythonReact

大家好~我是米洛
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

# 回顾

上一节我们设计好了测试计划表并编写了CRUD接口,还没来得及测试,我们就要马不停蹄地编写定时任务相关内容了。今天我们就来编写一个完善的demo,可以定期执行测试计划的用例,并生成报告。

至于后面的通知,要等到以后完善了。

# 调整Executor类里面的部分方法

之前我们虽然支持了多条case异步运行并写入测试报告,但为了支持测试计划,我们还需要进行一些调整。

  • 修改run_multiple方法

    我们的核心还是这个方法,因为里面已经包装好了运行多条case的方法。但我们要做一些改动:

变更参数

首先我们要调整它的参数,因为我们运行case的模式,有很多种,按照之前定义的mode字段:

这个是测试报告表中定义的

调整一下

这样,当我们调用run_mutiple方法的时候,就可以知道case的模式是什么模式了,究竟是隶属于测试计划呢,还是属于普通执行的case。我们调整一下,改成图2的数据。

plan_id可以为空,如果有plan_id的话,可以直接跳转到测试计划相关页面。方便编辑测试计划,也方便排查问题。

ordered这个方法区分同步还是异步,因为我们先前的run_multiple都是默认异步的,但我们测试计划又新增了同步模式,所以需要定义这样一个参数

  • 修改ReportDao

也是添加这2个参数,透传进来。方便报告里面表明,这是什么模式的执行,以及是否有测试计划id。

  • 调整执行逻辑

根据ordered字段判断是否是同步执行,如果不是则和以前一致,是的话则用for的方式执行,保证用例执行的顺序性

# 新增run_test_plan方法

    @staticmethod
    async def run_test_plan(plan_id: int):
        """
        通过测试计划id执行测试计划
        :param plan_id:
        :return:
        """
        plan = await PityTestPlanDao.query_test_plan(plan_id)
        if plan is None:
            Executor.log.info(f"测试计划: [{plan_id}]不存在")
            return
        # 设置为running
        plan.state = 1
        await PityTestPlanDao.update_test_plan(plan, plan.update_user)
        # if plan.disabled:
        #     # 说明测试计划已禁用
        #     Executor.log.info(f"测试计划: [{plan.name}]未开启")
        #     return
        env = list(map(int, plan.env.split(",")))
        case_list = list(map(int, plan.case_list.split(",")))
        await asyncio.gather(
            *(Executor.run_multiple(0, int(e), case_list, mode=1,
                                    plan_id=plan.id, ordered=plan.ordered) for e in env))
        # TODO 后续通知部分
        plan.state = 0
        await PityTestPlanDao.update_test_plan(plan, plan.update_user)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

先通过query方法查到测试计划数据,如果plan是None,说明没有这个测试计划,可能被删掉了。

否则我们走执行逻辑: 根据plan的env字段取出环境,并转换为[1, 2]这样的数组。

这里可能有点绕,因为我们数据库存的env和case_list都是这样的形式:

1,2,3,4

通过逗号分开,我们split以后,会得到: ["1", "2", "3", "4"]这样的数组,所以我们需要批量转为int。

最后,由于一般环境之间会隔离,所以我们的ordered根据环境走就可以了,多环境执行我们可以按照异步处理。

所以最后也选用了gather。

# 编写Scheduler类

我们在utils目录下新建scheduler.py文件:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

from app.utils.executor import Executor


class Scheduler(object):
    scheduler: AsyncIOScheduler = None

    @staticmethod
    def init(scheduler):
        Scheduler.scheduler = scheduler

    @staticmethod
    def configure(**kwargs):
        Scheduler.scheduler.configure(**kwargs)

    @staticmethod
    def start():
        Scheduler.scheduler.start()

    @staticmethod
    def add_test_plan(plan_id, plan_name, cron):
        return Scheduler.scheduler.add_job(func=Executor.run_test_plan, args=(plan_id,),
                                           name=plan_name, id=str(plan_id),
                                           trigger=CronTrigger.from_crontab(cron))

    @staticmethod
    def edit():
        pass

    @staticmethod
    def remove():
        pass

    @staticmethod
    def list():
        job_list = Scheduler.scheduler.get_jobs()
        return job_list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

这里我们对scheduler进行了非常简单的封装,然后定义了add_test_plan方法。

这个方法可以添加Executor.run_test_plan到定时任务,他接受id,name和cron3个参数,这个数据最终会落到定时任务表里面去。

# 在添加测试计划接口调用add_test_plan方法

当测试计划添加成功的时候,我们自动添加定时任务

# 测试一下

FastApi深度结合了Swagger,所以我们只需要打开: http://localhost:7777/docs便可以找到我们的测试方法:

填入token和测试计划相关信息即可

由于我已经添加过了,并且是每分钟一次,所以我们只需要启动服务,静静等待即可。

启动服务的时候,提示我这个每分钟执行的任务被miss了,我们再等一分钟

可以看到,环境1和环境2保持每分钟都在写入测试报告到数据库

那今天的简单demo就完成到这里,下一节我们完善删改查的内容。