Mrli
别装作很努力,
因为结局不会陪你演戏。
Contacts:
QQ博客园

Python任务调度模块APScheduler

2019/09/15 Python
Word count: 1,449 | Reading time: 7min

Python任务调度模块APScheduler

APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

  • triggers: 任务触发器组件,提供任务触发方式
    • triggers(触发器)中包含调度逻辑,每个作业都由自己的触发器来决定下次运行时间。除了他们自己初始配置意外,触发器完全是无状态的。
  • job stores: 任务商店组件,提供任务保存方式
    • job stores(作业存储器)存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。
  • executors: 任务调度组件,提供任务调度方式
    • executors(执行器)处理作业的运行,他们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  • schedulers: 任务调度组件,提供任务工作方式
    • schedulers(调度器)配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。根据不同的应用场景可以选用不同的调度器,可选的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。
      • 其中BlockingSchedulerBackgroundScheduler是其中最常用的两种调度器。那他们之间有什么区别呢?
      • BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时
      • BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。

使用的一个简单实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from apscheduler.schedulers.blocking import BlockingScheduler  # 阻塞
import time

# 实例化一个调度器
scheduler = BlockingScheduler()

def job1():
print "%s: 执行任务" % time.asctime()

# 添加任务并设置触发方式为3s一次
scheduler.add_job(job1, 'interval', seconds=3)

# 开始运行调度器
scheduler.start()

触发器

  • date 一次性指定日期
  • interval 在某个时间范围内间隔多长时间执行一次
  • cron 和Linux crontab格式兼容,最为强大

date 最基本的一种调度,作业只会执行一次。它的参数如下:

  1. run_date (datetime|str) – 作业的运行日期或时间
  2. timezone (datetime.tzinfo|str) – 指定时区
1
2
3
4
# 2016-12-12运行一次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00运行一次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])

interval 间隔调度,参数如下:

weeks (int) – 间隔几周
days (int) – 间隔几天
hours (int) – 间隔几小时
minutes (int) – 间隔几分钟
seconds (int) – 间隔多少秒
start_date (datetime|str) – 开始日期
end_date (datetime|str) – 结束日期

1
2
# 每两个小时调一下job_function
sched.add_job(job_function, 'interval', hours=2)

cron参数如下:

year (int|str) – 年,4位数字
month (int|str) – 月 (范围1-12)
day (int|str) – 日 (范围1-31)
week (int|str) – 周 (范围1-53)
day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 时 (范围0-23)
minute (int|str) – 分 (范围0-59)
second (int|str) – 秒 (范围0-59)
start_date (datetime|str) – 最早开始日期(包含)
end_date (datetime|str) – 最晚结束时间(包含)
timezone (datetime.tzinfo|str) – 指定时区

1
2
3
4
# job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

添加任务

有两种方法,一种是使用add_job()函数,还有一种方式是通过scheduled_job()装饰器。

add_job()函数方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job1():
print 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def my_job2():
print 'my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

sched = BlockingScheduler()
# 每隔5秒运行一次my_job1
sched.add_job(my_job1, 'interval', seconds=5,id='my_job1')
# 每隔5秒运行一次my_job2
sched.add_job(my_job2,'cron',second='*/5',id='my_job2')
sched.start()

scheduled_job()装饰器方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

sched = BlockingScheduler()
#每隔5秒运行一次my_job1
@sched.scheduled_job('interval',seconds=5,id='my_job1')
def my_job1():
print 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

#每隔5秒运行一次my_job2
@sched.scheduled_job('cron',second='*/5',id='my_job2')
def my_job2():
print 'my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sched.start()

使用SQLAlchemy作业存储器存放作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime,timedelta
import logging

sched = BlockingScheduler()
def my_job():
print 'my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#使用sqlalchemy作业存储器
url='mysql+mysqldb://root:123456@localhost:3306/scrapy?charset=utf8'
sched.add_jobstore('sqlalchemy',url=url)
#添加作业
sched.add_job(my_job,'interval',id='myjob',seconds=5)

log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO) # DEBUG
#设定日志格式
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)

sched.start()

部分摘自:APScheduler浅析


Flask-APScheduler

大致用法跟APScheduler一致,只不过需要

1.在配置中设置SCHEDULER_API_ENABLED = True

2.app配置

1
2
3
4
5
from flask_apscheduler import APScheduler;

scheduler = APScheduler();
scheduler.init_app(app)
scheduler.start()

3.添加任务

exts.py

1
2
from flask_apscheduler import APScheduler
sche = APScheduler()

aap.py

1
2
3
4
5
6
7
8
9
10
11
12
from exts import db,mail,sche

sche.init_app(app)

...

def refreshinfo():
pass


sche.add_job(func=refreshinfo, id='whetherInfo', trigger='interval', seconds=150)
sche.start()

Author: Mrli

Link: https://nymrli.top/2019/02/24/Python任务调度模块APScheduler/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

< PreviousPost
南京邮电大学java程序设计作业在线编程第一次作业
NextPost >
搭建frp服务--阿里云服务器
CATALOG
  1. 1. Python任务调度模块APScheduler
    1. 1.1. 使用的一个简单实例
    2. 1.2. 触发器
      1. 1.2.1. date 最基本的一种调度,作业只会执行一次。它的参数如下:
      2. 1.2.2. interval 间隔调度,参数如下:
      3. 1.2.3. cron参数如下:
    3. 1.3. 添加任务
      1. 1.3.0.1. add_job()函数方式
      2. 1.3.0.2. scheduled_job()装饰器方式
      3. 1.3.0.3. 使用SQLAlchemy作业存储器存放作业
  2. 1.4. Flask-APScheduler
    1. 1.4.1. 1.在配置中设置SCHEDULER_API_ENABLED = True
    2. 1.4.2. 2.app配置
    3. 1.4.3. 3.添加任务