当前位置:首页 > Python > 正文

Python threading模块函数详解 - 多线程编程完全指南 | Python并发编程教程

Python threading模块函数详解与使用指南

什么是Python threading模块?

Python threading模块是Python标准库中用于实现多线程编程的核心模块。它提供了创建和管理线程的工具, 帮助开发者充分利用多核CPU资源,提高程序执行效率。本教程将详细讲解threading模块的主要函数、类及其使用方法, 并附有实际代码示例。

Thread类 - 线程创建与管理

Thread类是threading模块的核心,用于创建和管理线程。每个Thread实例代表一个执行线程。

主要方法:

  • start() - 启动线程
  • run() - 线程执行的方法(可重写)
  • join(timeout=None) - 等待线程结束
  • is_alive() - 检查线程是否在运行
  • name - 线程名称
  • daemon - 守护线程标志

示例代码:

import threading
import time

def worker(message):
    print(f"线程 {threading.current_thread().name} 开始: {message}")
    time.sleep(2)
    print(f"线程 {threading.current_thread().name} 结束")

# 创建线程
t1 = threading.Thread(target=worker, args=("任务1",), name="工作线程1")
t2 = threading.Thread(target=worker, args=("任务2",))

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print("所有线程执行完毕")

Lock - 基本线程锁

Lock(互斥锁)是最简单的线程同步机制,用于保护共享资源,防止多个线程同时访问。

主要方法:

  • acquire(blocking=True, timeout=-1) - 获取锁
  • release() - 释放锁

示例代码:

import threading

# 共享资源
counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        lock.acquire()
        counter += 1
        lock.release()

threads = []
for i in range(5):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"最终计数器值: {counter}")  # 应该是500000

RLock - 可重入锁

RLock(可重入锁)允许同一个线程多次获取锁,解决递归调用中的死锁问题。

示例代码:

import threading

rlock = threading.RLock()

def recursive_func(n):
    if n > 0:
        rlock.acquire()
        print(f"获取锁,层级: {n}")
        recursive_func(n-1)
        rlock.release()
        print(f"释放锁,层级: {n}")

t = threading.Thread(target=recursive_func, args=(3,))
t.start()
t.join()

Semaphore - 信号量

信号量用于控制对共享资源的访问数量,维护一个计数器,限制同时访问资源的线程数。

示例代码:

import threading
import time

# 限制同时只有3个线程可以访问
semaphore = threading.Semaphore(3)

def access_resource(thread_id):
    print(f"线程 {thread_id} 等待访问资源")
    with semaphore:
        print(f"线程 {thread_id} 获得资源访问权")
        time.sleep(2)
    print(f"线程 {thread_id} 释放资源")

threads = []
for i in range(10):
    t = threading.Thread(target=access_resource, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Event - 线程事件

事件对象用于线程间通信,一个线程发出事件信号,其他线程等待该事件。

主要方法:

  • set() - 设置事件为True
  • clear() - 重置事件为False
  • is_set() - 检查事件状态
  • wait(timeout=None) - 等待事件被设置

示例代码:

import threading
import time

# 创建事件对象
event = threading.Event()

def waiter():
    print("等待者等待事件发生")
    event.wait()
    print("等待者检测到事件,继续执行")

def setter():
    time.sleep(3)
    print("设置者设置事件")
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)

t1.start()
t2.start()

t1.join()
t2.join()

Condition - 条件变量

条件变量用于复杂的线程同步场景,允许线程等待特定条件发生。

主要方法:

  • acquire() - 获取底层锁
  • release() - 释放底层锁
  • wait(timeout=None) - 等待通知
  • notify(n=1) - 通知一个等待线程
  • notify_all() - 通知所有等待线程

示例代码:生产者-消费者模型

import threading
import time
import random

# 共享资源
queue = []
MAX_ITEMS = 5
condition = threading.Condition()

def producer():
    for i in range(10):
        time.sleep(random.random())
        condition.acquire()
        if len(queue) == MAX_ITEMS:
            print("队列已满,生产者等待")
            condition.wait()
        item = f"产品-{i}"
        queue.append(item)
        print(f"生产: {item}")
        condition.notify()
        condition.release()

def consumer():
    for i in range(10):
        time.sleep(random.random() * 2)
        condition.acquire()
        if not queue:
            print("队列为空,消费者等待")
            condition.wait()
        item = queue.pop(0)
        print(f"消费: {item}")
        condition.notify()
        condition.release()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Timer - 定时线程

定时器线程用于在指定时间后执行函数。

示例代码:

import threading

def delayed_action():
    print("5秒后执行此操作")

# 5秒后执行
timer = threading.Timer(5.0, delayed_action)
timer.start()

print("定时器已启动,等待5秒...")

Barrier - 线程屏障

屏障用于同步多个线程,要求指定数量的线程到达屏障点后才能继续执行。

示例代码:

import threading
import time

# 创建需要4个线程的屏障
barrier = threading.Barrier(4)

def worker(thread_id):
    print(f"线程 {thread_id} 开始工作")
    time.sleep(thread_id)
    print(f"线程 {thread_id} 到达屏障点,等待其他线程")
    barrier.wait()
    print(f"线程 {thread_id} 通过屏障,继续执行")

threads = []
for i in range(4):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

其他实用函数

current_thread() - 获取当前线程

import threading

def worker():
    print(f"当前线程: {threading.current_thread().name}")

t = threading.Thread(target=worker, name="自定义线程")
t.start()
t.join()

active_count() - 活动线程计数

import threading
import time

def worker():
    time.sleep(2)

threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()
    print(f"活动线程数: {threading.active_count()}")

for t in threads:
    t.join()

print(f"最终活动线程数: {threading.active_count()}")

enumerate() - 枚举活动线程

import threading
import time

def worker(id):
    time.sleep(id)

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i+1,), name=f"Worker-{i}")
    threads.append(t)
    t.start()

# 列出所有活动线程
for thread in threading.enumerate():
    print(f"活动线程: {thread.name} (ID: {thread.ident})")

for t in threads:
    t.join()

Python线程编程最佳实践

  • 避免使用全局变量,优先使用线程局部存储(threading.local)
  • 使用with语句管理锁资源,确保锁的正确释放
  • 为线程设置合理的名称,便于调试
  • 使用守护线程处理后台任务,但确保主线程结束时它们不会造成问题
  • 了解GIL(全局解释器锁)的影响,对于CPU密集型任务考虑多进程
  • 使用线程池(concurrent.futures.ThreadPoolExecutor)管理线程生命周期
  • 优先使用队列(queue.Queue)进行线程间通信
  • 谨慎使用线程优先级,避免饥饿问题

线程局部存储示例:

import threading

# 创建线程局部数据
local_data = threading.local()

def show_data():
    print(f"线程 {threading.current_thread().name} 的值: {local_data.value}")

def worker(value):
    local_data.value = value
    show_data()

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i*10,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

总结

Python threading模块提供了强大的多线程编程能力,从基本的Thread类到各种同步原语(Lock、RLock、Semaphore、Event、Condition等), 再到高级功能如Timer和Barrier,为并发编程提供了全面的支持。通过本教程的学习,你应该能够:

  • 理解并创建多线程应用程序
  • 使用适当的同步机制解决线程安全问题
  • 实现线程间通信与协调
  • 应用线程编程最佳实践
  • 诊断和解决常见的多线程问题

多线程编程虽然强大,但也带来复杂性。始终记住:清晰的代码设计比复杂的并发更重要。 在真实项目中,考虑使用更高级的并发框架如concurrent.futures或asyncio可能更合适。

发表评论