上一篇
Python Timer定时器教程:控制函数在特定时间执行 | Python定时任务指南
- Python
- 2025-08-15
- 142
Python Timer定时器教程
学习如何使用Python在特定时间执行函数 - 从基础到高级定时任务实现
掌握threading.Timer、sched模块和APScheduler库的使用方法
Python定时器概述
在Python编程中,定时器允许我们在指定的时间间隔后或特定时间点执行函数。这对于需要自动化执行重复任务或计划任务的应用非常有用。
常见应用场景
- 定期数据备份和清理
- 定时发送通知或邮件
- 周期性数据采集(网络爬虫)
- 自动化测试任务
- 计划报表生成
threading.Timer
Python标准库中的简单定时器实现,基于线程实现。
- 轻量级解决方案
- 适用于简单定时任务
- 执行单次任务
- 非阻塞主线程
sched模块
Python标准库中的通用事件调度器。
- 支持绝对时间调度
- 可以处理多个事件
- 支持优先级队列
- 适用于复杂调度需求
APScheduler库
功能强大的第三方调度库。
- 支持多种调度方式
- 持久化存储支持
- 分布式任务调度
- 支持定时、循环和复杂调度
方法一:使用threading.Timer
threading.Timer 是Python标准库中最简单的定时器实现方式。它在单独的线程中运行,不会阻塞主程序的执行。
基础Timer示例
import threading
import time
def scheduled_task():
print("定时任务执行 - 当前时间:", time.strftime("%H:%M:%S"))
# 5秒后执行任务
timer = threading.Timer(5.0, scheduled_task)
timer.start()
print("主线程继续执行 - 当前时间:", time.strftime("%H:%M:%S"))
循环定时任务
import threading
import time
def recurring_task():
print("循环任务执行 - 当前时间:", time.strftime("%H:%M:%S"))
# 每10秒重复一次
threading.Timer(10.0, recurring_task).start()
# 启动循环任务
recurring_task()
print("主线程继续执行...")
注意事项:
- 定时器在单独的线程中运行,需要注意线程安全问题
- 大量定时任务可能会创建过多线程,影响性能
- 定时器不是高精度计时器,可能存在微小误差
方法二:使用sched模块
Python的 sched 模块提供了一个通用的事件调度器,可以更精确地安排任务在特定时间执行。
基本sched使用示例
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
def scheduled_task(description):
print(f"{description} - 执行时间:", time.strftime("%H:%M:%S"))
# 安排任务在10秒后执行
scheduler.enter(10, 1, scheduled_task, ("10秒后任务",))
# 安排任务在特定时间执行(30秒后)
target_time = time.time() + 30
scheduler.enterabs(target_time, 1, scheduled_task, ("特定时间任务",))
print("开始调度...")
scheduler.run()
print("所有任务执行完毕")
使用sched实现每日定时任务
import sched
import time
import datetime
scheduler = sched.scheduler(time.time, time.sleep)
def daily_task():
print("每日任务执行 - 当前时间:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 安排明天的同一时间执行
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
target_time = time.mktime(tomorrow.timetuple())
scheduler.enterabs(target_time, 1, daily_task)
# 设置第一次执行时间(下一个整点)
next_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)
target_time = time.mktime(next_hour.timetuple())
scheduler.enterabs(target_time, 1, daily_task)
# 启动调度器(在后台线程中运行)
import threading
thread = threading.Thread(target=scheduler.run)
thread.start()
print("每日任务已安排...")
方法三:使用APScheduler(高级)
APScheduler 是一个功能强大的Python库,提供了多种调度方式,包括日期、固定时间间隔和cron风格调度。
安装APScheduler
pip install apscheduler
基本使用示例
from apscheduler.schedulers.background import BackgroundScheduler
import datetime
def job_function():
print(f"定时任务执行 - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 创建调度器
scheduler = BackgroundScheduler()
# 添加任务 - 每10秒执行一次
scheduler.add_job(job_function, 'interval', seconds=10)
# 添加任务 - 每天10:30执行
scheduler.add_job(job_function, 'cron', hour=10, minute=30)
# 添加任务 - 2024年1月1日执行
scheduler.add_job(job_function, 'date', run_date=datetime.datetime(2024, 1, 1, 0, 0))
# 启动调度器
scheduler.start()
print("调度器已启动,按 Ctrl+C 退出")
try:
# 保持主线程运行
while True:
time.sleep(2)
except KeyboardInterrupt:
scheduler.shutdown()
print("调度器已关闭")
Cron风格调度
# 添加Cron风格任务
scheduler.add_job(
job_function,
'cron',
day_of_week='mon-fri', # 周一至周五
hour=9, # 9点
minute=30, # 30分
end_date='2024-12-31' # 结束日期
)
# 每15分钟执行一次
scheduler.add_job(job_function, 'cron', minute='*/15')
# 工作日上午8点
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=8, minute=0)
方法对比与选择指南
方法 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
threading.Timer | 简单易用、无需额外依赖 | 功能有限、不适合复杂调度 | 简单延迟任务、单次执行 |
sched模块 | 标准库、支持绝对时间 | 功能相对基础、需要手动管理 | 中等复杂度任务、少量任务调度 |
APScheduler | 功能强大、支持多种调度方式 | 需要额外安装、学习曲线稍高 | 复杂调度需求、生产环境 |
最佳实践建议
- 对于简单延迟任务,使用 threading.Timer
- 对于需要精确时间触发的任务,使用 sched 模块
- 对于生产环境中的复杂调度需求,使用 APScheduler
- 长时间运行的任务考虑使用后台线程或进程
- 注意处理时区问题,特别是跨时区应用
- 为定时任务添加日志记录,便于调试和监控
本文由BeiMin于2025-08-15发表在吾爱品聚,如有疑问,请联系我们。
本文链接:https://521pj.cn/20258163.html
发表评论