深度盘点:8种 Python 定时任务的解决方案!

深度盘点:8种 Python 定时任务的解决方案!
最新回答
や泪漫延街

2022-11-05 23:50:26

Python 定时任务的八种方案

Python 定时任务在日常工作中非常常见,以下是八种实现 Python 定时任务的方案:

  • 利用 while True: + sleep() 实现定时任务

位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。基于这样的特性我们可以通过 while 死循环 + sleep() 的方式实现简单的定时任务。

代码示例:

import datetimeimport timedef time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts)def loop_monitor(): while True: time_printer() time.sleep(5) # 暂停5秒if __name__ == "__main__": loop_monitor()

主要缺点:

  1. 只能设定间隔,不能指定具体的时间,比如每天早上 8:00。
  2. sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
  • 使用 Timeloop 库运行定时任务

Timeloop 是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数。

示例代码:

import timefrom timeloop import Timeloopfrom datetime import timedeltatl = Timeloop()@tl.job(interval=timedelta(seconds=2))def sample_job_every_2s(): print "2s job current time : {}".format(time.ctime())@tl.job(interval=timedelta(seconds=5))def sample_job_every_5s(): print "5s job current time : {}".format(time.ctime())@tl.job(interval=timedelta(seconds=10))def sample_job_every_10s(): print "10s job current time : {}".format(time.ctime())
  • 利用 threading.Timer 实现定时任务

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的时间
  • function: 要执行的方法
  • args/kwargs: 方法的参数

代码示例:

# 备注:Timer只能执行一次,这里需要循环调用,否则只能执行一次
  • 利用内置模块 sched 实现定时任务

sched 模块实现了一个通用瞎前漏事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务磨烂执行后会立刻调用延时函数,以确保其他线程也能执行。

class sched.scheduler(timefunc, delayfunc)

这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc 是一个没有参数的返回时间类型数字的函数(常用使悔没用的如 time 模块里面的 time),delayfunc 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。

代码示例:

import datetimeimport timeimport scheddef time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor()def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成调度器 s.enter(5, 1, time_printer, ()) s.run()if __name__ == "__main__": loop_monitor()

scheduler 对象主要方法:

  • enter(delay, priority, action, argument):安排一个事件来延迟 delay 个时间单位。
  • cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个 ValueError。
  • run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。

个人点评:比 threading.Timer 更好,不需要循环调用。

  • 利用调度模块 schedule 实现定时任务

schedule 是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数(或其它可调用函数)。

先来看代码,是不是不看文档就能明白什么意思?

import scheduleimport timedef job(): print("I'm working...")schedule.every(10).seconds.do(job)schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every(5).to(10).minutes.do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)schedule.every().minute.at(":17").do(job)while True: schedule.run_pending() time.sleep(1)

装饰器:通过 @repeat() 装饰静态方法

import timefrom schedule import every, repeat, run_pending@repeat(every().second)def job(): print('working...')while True: run_pending() time.sleep(1)

传递参数:

import scheduledef greet(name): print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')schedule.every(4).seconds.do(greet, name='Bob')while True: schedule.run_pending()

装饰器同样能传递参数:

from schedule import every, repeat, run_pending@repeat(every().second, 'World')@repeat(every().minute, 'Mars')def hello(planet): print('Hello', planet)while True: run_pending() time.sleep(1)

取消任务:

import schedulei = 0def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0)job = schedule.every().second.do(some_task)while True: schedule.run_pending()

运行一次任务:

import timeimport scheduledef job_that_executes_once(): print('Hello') return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)while True: schedule.run_pending() time.sleep(1)

根据标签检索任务:

# 检索所有任务:schedule.get_jobs()import scheduledef greet(name): print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')print(friends)

根据标签取消任务:

# 取消所有任务:schedule.clear()import scheduledef greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True: schedule.run_pending()

运行任务到某时间:

import schedulefrom datetime import datetime, timedelta, timedef job(): print('working...')schedule.every().second.until('23:59').do(job) # 今天23:59停止schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止while True: schedule.run_pending()

马上运行所有任务(主要用于测试):

import scheduledef job(): print('working...')def job1(): print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3) # 任务间延迟3秒

并行运行:使用 Python 内置队列实现:

import threading