2022-11-05 23:50:26
Python 定时任务的八种方案
Python 定时任务在日常工作中非常常见,以下是八种实现 Python 定时任务的方案:
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。基于这样的特性我们可以通过 while 死循环 + sleep() 的方式实现简单的定时任务。
代码示例:
import datetimeimport timedef time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts)def loop_monitor(): while True: time_printer() time.sleep(5) # 暂停5秒if __name__ == "__main__": loop_monitor()主要缺点:
Timeloop 是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数。
示例代码:
import timefrom timeloop import Timeloopfrom datetime import timedeltatl = Timeloop()@tl.job(interval=timedelta(seconds=2))def sample_job_every_2s(): print "2s job current time : {}".format(time.ctime())@tl.job(interval=timedelta(seconds=5))def sample_job_every_5s(): print "5s job current time : {}".format(time.ctime())@tl.job(interval=timedelta(seconds=10))def sample_job_every_10s(): print "10s job current time : {}".format(time.ctime())threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
Timer(interval, function, args=[ ], kwargs={ })
代码示例:
# 备注:Timer只能执行一次,这里需要循环调用,否则只能执行一次sched 模块实现了一个通用瞎前漏事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务磨烂执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)
这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc 是一个没有参数的返回时间类型数字的函数(常用使悔没用的如 time 模块里面的 time),delayfunc 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。
代码示例:
import datetimeimport timeimport scheddef time_printer(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :', ts) loop_monitor()def loop_monitor(): s = sched.scheduler(time.time, time.sleep) # 生成调度器 s.enter(5, 1, time_printer, ()) s.run()if __name__ == "__main__": loop_monitor()scheduler 对象主要方法:
个人点评:比 threading.Timer 更好,不需要循环调用。
schedule 是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数(或其它可调用函数)。
先来看代码,是不是不看文档就能明白什么意思?
import scheduleimport timedef job(): print("I'm working...")schedule.every(10).seconds.do(job)schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every(5).to(10).minutes.do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)schedule.every().minute.at(":17").do(job)while True: schedule.run_pending() time.sleep(1)装饰器:通过 @repeat() 装饰静态方法
import timefrom schedule import every, repeat, run_pending@repeat(every().second)def job(): print('working...')while True: run_pending() time.sleep(1)传递参数:
import scheduledef greet(name): print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')schedule.every(4).seconds.do(greet, name='Bob')while True: schedule.run_pending()装饰器同样能传递参数:
from schedule import every, repeat, run_pending@repeat(every().second, 'World')@repeat(every().minute, 'Mars')def hello(planet): print('Hello', planet)while True: run_pending() time.sleep(1)取消任务:
import schedulei = 0def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0)job = schedule.every().second.do(some_task)while True: schedule.run_pending()运行一次任务:
import timeimport scheduledef job_that_executes_once(): print('Hello') return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)while True: schedule.run_pending() time.sleep(1)根据标签检索任务:
# 检索所有任务:schedule.get_jobs()import scheduledef greet(name): print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')print(friends)根据标签取消任务:
# 取消所有任务:schedule.clear()import scheduledef greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True: schedule.run_pending()运行任务到某时间:
import schedulefrom datetime import datetime, timedelta, timedef job(): print('working...')schedule.every().second.until('23:59').do(job) # 今天23:59停止schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止while True: schedule.run_pending()马上运行所有任务(主要用于测试):
import scheduledef job(): print('working...')def job1(): print('Hello...')schedule.every().monday.at('12:40').do(job)schedule.every().tuesday.at('16:40').do(job1)schedule.run_all()schedule.run_all(delay_seconds=3) # 任务间延迟3秒并行运行:使用 Python 内置队列实现:
import threading