Django+vue自动化测试平台(25)-- 自动化测试之封装APscheduler定时任务框架

APscheduler简介

APscheduler全称Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业,其是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。

组成部分说明

# 触发器(trigger):
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。
除了他们自己初始配置意外,触发器完全是无状态的。

# 作业存储(job store):
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。
一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。
调度器不能分享同一个作业存储。

# 执行器(executor):
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。
当作业完成时,执行器将会通知调度器。

# 调度器(scheduler):
通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。
配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 

框架对比:Celery VS APScheduler:

celery:

celery是一个专注于实时处理和任务调度的任务队列,任务就是消息(消息队列使用rabbitmq或者redie),消息中的有效载荷中包含要执行任务的全部数据。我们通常将celery作为一个任务队列来使用,但是celery也有定时任务的功能,但是celery无法在flask这样的系统中动态的添加定时任务,而且单独为定时任务功能而搭建celery显得过于重量级。

apscheduler:

apscheduler是基于Quartz的一个Python定时任务框架,提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化作业。APScheduler算是在实际项目中最好用的一个工具库,不仅可以在程序中动态的添加和删除定时任务,还支持持久化。

函数封装:

# 自定义调用函数
def scheduler_function(request):
   try:
        scheduler_id= "示例:123"
        scheduler_type = "示例:1"
        task_time = "示例:2024-01-01 00:00:00"
        task_hour= "示例:1"
        task_week_days= "示例:mon"
        task_week_times= "示例:22:00"
        data= "函数传参"
        # 示例:创建定时任务,可自主定义传参
        add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data)
   except Exception as e:
        return False, f"创建定时任务失败,原因是:{str(e)}"
# 添加定时任务
def add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data):
    try:
        scheduler = BackgroundScheduler()
        scheduler.add_jobstore(DjangoJobStore(), 'default')
        if scheduler_type == 1:
            # 某个时刻执行
            scheduler.add_job(run_task,
                              'date', id=str(scheduler_id),
                              run_date=task_time, args=(data,))
            scheduler.start()

        elif scheduler_type == 2:
            # 间隔时间性执行
            scheduler.add_job(run_task,
                              'interval', id=str(scheduler_id), minutes=task_hour, args=(data,))
            scheduler.start()

        elif scheduler_type == 3:
            # 每周几点执行
            start_time = task_week_times.split(':')
            h = int(start_time[0])
            m = int(start_time[1])
            scheduler.add_job(run_task, 'cron', id=str(scheduler_id),
                              day_of_week=task_week_days, hour=h, minute=m,
                              args=(data,))
            scheduler.start()
        return True, "创建定时任务成功"
    except Exception as e:
        return False, f"创建定时任务失败,原因是:{str(e)}"

# 删除定时任务
def remove_scheduler(scheduler_id):
    try:
        scheduler = BackgroundScheduler()
        scheduler.add_jobstore(DjangoJobStore(), 'default')
        DjangoJob.objects.filter(id=str(scheduler_id)).delete()
        return True, "删除定时任务成功"
    except Exception as e:
        return False, f"删除定时任务失败,原因是:{str(e)}"

# 获取定时任务,判断是否存在,若有,则返回下一次执行的时间
def get_scheduler(scheduler_id):
    try:
        job = DjangoJob.objects.filter(id=str(scheduler_id)).values().first()
        if job:
            return True, job["next_run_time"]
        else:
            return False, ""
    except Exception as e:
        return False, f"查询定时任务列表失败,原因:{str(e)}"

总结:

以上封装的函数可以解决绝大部分所需要的定时任务类型,分别是单次执行,间隔执行,周期性执行,满足绝大多数的自动化测试使用,注:个人愚见,仅供参考!!!

相关推荐

  1. Django+vue自动化测试28)-- ADB获取设备信息

    2024-07-09 17:32:03       23 阅读
  2. Selenium - 自动化测试框架

    2024-07-09 17:32:03       59 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-09 17:32:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-09 17:32:03       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-09 17:32:03       58 阅读
  4. Python语言-面向对象

    2024-07-09 17:32:03       69 阅读

热门阅读

  1. 开源 WAF 解析:选择最适合你的防护利器

    2024-07-09 17:32:03       28 阅读
  2. VPN是什么?

    2024-07-09 17:32:03       28 阅读
  3. Android C++系列:Linux进程(一)

    2024-07-09 17:32:03       28 阅读
  4. Oracle查询固定值查询

    2024-07-09 17:32:03       23 阅读
  5. android Gradle储蓄地址

    2024-07-09 17:32:03       21 阅读
  6. 基于BERT的大规模文本处理实战

    2024-07-09 17:32:03       24 阅读
  7. 【LeetCode 0242】【Map/排序】有效的异位词

    2024-07-09 17:32:03       20 阅读
  8. Ubuntu下Qt-5.12.9创建快捷方式到桌面

    2024-07-09 17:32:03       26 阅读
  9. ArkTs基础入门

    2024-07-09 17:32:03       24 阅读
  10. 代码随想录Day74(图论Part10)

    2024-07-09 17:32:03       29 阅读
  11. 如何使一个盒子水平垂直居中(常用的)

    2024-07-09 17:32:03       21 阅读