写点什么

python 任务调度利器 -APScheduler

作者:AIWeker
  • 2022-11-28
    福建
  • 本文字数:4143 字

    阅读完需:约 14 分钟

python任务调度利器-APScheduler

1.任务调度应用场景

所谓的任务调度是指安排任务的执行计划,即何时执行,怎么执行等。在现实项目中经常出现它们的身影;特别是数据类项目,比如实时统计每 5 分钟网站的访问量,就需要每 5 分钟定时从日志数据分析访问量。


总结下任务调度应用场景:


  • 离线作业调度: 按时间粒度执行某项任务

  • 共享缓存更新: 定时刷新缓存,如 redis 缓存;不同进程间的共享数据


任务调度工具


  • linux 的 crontab, 支持按照分钟/小时/天/月/周粒度,执行任务

  • java 的 Quartz

  • windows 的任务计划


本文介绍的是 python 中的任务调度库,APScheduler(advance python scheduler)。如果你了解 Quartz 的话,可以看出 APScheduler 是 Quartz 的 python 实现;APScheduler 提供了基于时间,固定时间点和 crontab 方式的任务调用方案, 可以当作一个跨平台的调度工具来使用。

2.APScheduler

2.1 组件介绍

APScheduler 由 5 个部分组成:触发器、调度器、任务存储器、执行器和任务事件。


  • 任务 job: 任务 id 和任务执行 func

  • 触发器 triggers:确定任务何时开始执行

  • 任务存储器 job stores: 保存任务的状态

  • 执行器 executors:确定任务怎么执行

  • 任务事件 event:监控任务执行异常情况

  • 调度器 schedulers:串联任务的整个生命周期,添加编辑任务任务存储器,在任务的执行时间到来时,把任务交给执行器执行返回结果;同时发出事件监听,监控任务事件



2.2 安装

pip install apscheduler
复制代码

2.3 简单例子

from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERRORimport loggingimport datetime
# 任务执行函数def job_func(job_id): print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 事件监听def job_exception_listener(event): if event.exception: # todo: 异常处理, 告警等 print('The job crashed :(') else: print('The job worked :)')
# 日志logging.basicConfig()logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# 定义一个后台任务非阻塞调度器scheduler = BackgroundScheduler()# 添加一个任务到内存中 # 触发器:trigger='interval' seconds=10 每10s触发执行一次# 执行器:executor='default' 线程执行# 任务存储器:jobstore='default' 默认内存存储# 最大并发数:max_instancesscheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)# 设置任务监听scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 启动调度器scheduler.start()
复制代码


运行情况:


job 1 is runed at 2020-03-21 20:00:38The job worked :)job 1 is runed at 2020-03-21 20:00:48The job worked :)job 1 is runed at 2020-03-21 20:00:58The job worked :)
复制代码

2.4 触发器

触发器决定何时执行任务,APScheduler 支持的触发器有 3 种


  • trigger='interval': 按固定时间周期执行,支持 weeks,days,hours,minutes, seconds, 还可指定时间范围


    sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
复制代码


  • trigger='date': 固定时间,执行一次


    sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
复制代码


  • trigger='cron': 支持 crontab 方式,执行任务

  • 参数:分钟/小时/天/月/周粒度,也可指定时间范围


        year (int|str) – 4-digit year        month (int|str) – month (1-12)        day (int|str) – day of the (1-31)        week (int|str) – ISO week (1-53)        day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)        hour (int|str) – hour (0-23)        minute (int|str) – minute (0-59)        second (int|str) – second (0-59)        start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)        end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
复制代码


  • 例子


        # 星期一到星期五,5点30执行任务job_function,直到2014-05-30 00:00:00        sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有 # 5月到8月的1号到15号,0点0分执行任务job_function sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
复制代码

2.5 执行器

执行器决定如何执行任务;APScheduler 支持 4 种不同执行器,常用的有 pool(线程/进程)和 gevent(io 多路复用,支持高并发),默认为 pool 中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)


  • apscheduler.executors.asyncio: 同步 io,阻塞

  • apscheduler.executors.gevent:io 多路复用,非阻塞

  • apscheduler.executors.pool: 线程 ThreadPoolExecutor 和进程 ProcessPoolExecutor

  • apscheduler.executors.twisted: 基于事件驱动

2.6 任务存储器

任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler 支持的任务存储器有:


  • apscheduler.jobstores.memory: 内存

  • apscheduler.jobstores.mongodb: 存储在 mongodb

  • apscheduler.jobstores.redis: 存储在 redis

  • apscheduler.jobstores.rethinkdb:存储在 rethinkdb

  • apscheduler.jobstores.sqlalchemy:支持 sqlalchemy 的数据库如 mysql,sqlite 等

  • apscheduler.jobstores.zookeeper:zookeeper


不同的任务存储器可以在调度器的配置中进行配置(见调度器)

2.7 调度器

APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler


  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回。

  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。

  • AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。

  • GeventScheduler:适用于使用 gevent 模块的应用程序。

  • TwistedScheduler:适用于构建 Twisted 的应用程序。

  • QtScheduler:适用于构建 Qt 的应用程序。


从前面的例子,我们可以看到,调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。


scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
复制代码


我们来详细看下各个部分


  • 调度器配置: 在 add_job 我们看到 jobstore 和 executor 都是 default,APScheduler 在定义调度器时可以指定不同的任务存储和执行器,以及初始的参数


    from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 通过dict方式执行不同的jobstores、executors和默认的参数 jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } # 定义调度器 scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
def job_func(job_id): print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 添加任务 scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10) # 启动调度器 scheduler.start()
复制代码


  • 操作任务: 调度器可以增加,删除,暂停,恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用,已经执行和正在执行的任务不受这些操作的影响。

  • add_job


        scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
复制代码


  • remove_job: 通过任务唯一的 id,删除的时候对应的任务存储器里记录也会删除


        scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')        scheduler.remove_job('my_job_id')
复制代码


  • Pausing and resuming jobs: 暂停和重启任务


        scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')        scheduler.pause_job('my_job_id')        scheduler.resume_job('my_job_id')
复制代码


  • Modifying jobs: 修改任务的配置


        job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)        # 修改任务的属性        job.modify(max_instances=6, name='Alternate name')        # 修改任务的触发器        scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
复制代码


  • 监控任务事件类型,比较常用的类型有:

  • EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发

  • EVENT_JOB_EXECUTED: 任务执行成功时

  • EVENT_JOB_MAX_INSTANCES: 调度器上执行的任务超过配置的参数时


        scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
复制代码


参考文档:https://apscheduler.readthedocs.io/en/stable/userguide.html

发布于: 刚刚阅读数: 3
用户头像

AIWeker

关注

InfoQ签约作者 / 公众号:人工智能微客 2019-11-21 加入

人工智能微客(aiweker)长期跟踪和分享人工智能前沿技术、应用、领域知识,不定期的发布相关产品和应用,欢迎关注和转发

评论

发布
暂无评论
python任务调度利器-APScheduler_Python_AIWeker_InfoQ写作社区