什么是Python协程?
协程(Coroutine)是Python中实现并发编程的一种方式,它允许你在单个线程内执行多个任务,通过任务切换而非线程切换来实现并发。Python通过asyncio
库和async/await
语法提供了对协程的原生支持。
注意: 协程与线程不同,它们由事件循环管理,在单个线程中运行,通过任务切换而非线程切换来实现并发。
为什么需要增加任务?
在异步编程中,我们经常需要同时处理多个I/O密集型操作(如网络请求、文件读写等)。通过增加任务,我们可以:
- 同时发起多个操作而不互相阻塞
- 更高效地利用系统资源
- 提高程序的响应速度和吞吐量
- 简化复杂异步逻辑的管理
在协程中增加任务的三种方法
1. 使用asyncio.create_task()
这是Python 3.7+推荐的方式,用于将协程包装成任务并加入事件循环。
import asyncio
async def my_task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
async def main():
# 创建并添加任务
task1 = asyncio.create_task(my_task("A", 2))
task2 = asyncio.create_task(my_task("B", 1))
# 等待两个任务完成
await task1
await task2
asyncio.run(main())
输出结果:
任务 A 开始
任务 B 开始
任务 B 完成 (1秒后)
任务 A 完成 (2秒后)
2. 使用asyncio.gather()
当需要同时运行多个协程并等待它们全部完成时,asyncio.gather()
是更好的选择。
async def main():
# 使用gather同时运行多个任务
await asyncio.gather(
my_task("C", 3),
my_task("D", 1),
my_task("E", 2)
)
asyncio.run(main())
输出结果:
任务 C 开始
任务 D 开始
任务 E 开始
任务 D 完成 (1秒后)
任务 E 完成 (2秒后)
任务 C 完成 (3秒后)
3. 使用asyncio.ensure_future() (Python 3.7之前)
在Python 3.7之前,可以使用asyncio.ensure_future()
来创建任务。
async def main():
# 兼容旧版本Python的创建任务方式
task1 = asyncio.ensure_future(my_task("F", 2))
task2 = asyncio.ensure_future(my_task("G", 1))
await task1
await task2
# 旧版本的事件循环启动方式
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
任务管理的最佳实践
高效任务管理的要点
- 使用
asyncio.create_task()
创建任务并立即开始执行 - 使用
asyncio.gather()
管理一组相关任务 - 使用
asyncio.wait()
处理任务完成的不同状态 - 为任务设置超时:
await asyncio.wait_for(task, timeout=5.0)
- 使用
asyncio.shield()
防止任务被取消 - 合理控制并发任务数量,避免资源耗尽
任务取消与异常处理
正确处理任务取消和异常对于构建健壮的异步应用至关重要。
async def main():
task = asyncio.create_task(my_task("H", 5))
try:
# 等待任务完成,但最多3秒
await asyncio.wait_for(task, timeout=3.0)
except asyncio.TimeoutError:
print("任务超时,正在取消...")
task.cancel()
try:
await task # 等待任务处理取消
except asyncio.CancelledError:
print("任务已成功取消")
高级应用:任务队列模式
当需要限制同时运行的协程数量时,可以使用任务队列模式。
async def worker(queue):
while True:
# 从队列获取任务
task_func = await queue.get()
await task_func() # 执行任务
queue.task_done() # 标记任务完成
async def main():
# 创建最大容量为5的队列
queue = asyncio.Queue(maxsize=5)
# 启动3个worker协程
workers = [
asyncio.create_task(worker(queue))
for _ in range(3)
]
# 添加20个任务到队列
for i in range(20):
await queue.put(lambda: my_task(f"Task-{i}", 1))
# 等待所有任务完成
await queue.join()
# 取消worker任务
for w in workers:
w.cancel()
asyncio.run(main())
发表评论