在Python3中,queue
模块提供了多线程编程时线程间通信常用的同步机制。
在多线程编程中,多个线程之间共同操作同一资源时,可能会出现资源竞争问题,因此需要一种同步机制来保证线程之间的协调和同步。Python中的Queue
(队列)类提供了同步机制,通过可阻塞和非阻塞的队列操作方法实现了多线程编程中的同步和协调。
Queue
类提供了三种队列实现:
Queue
(队列):先进先出(FIFO)的线程安全队列。LifoQueue
(栈):后进先出(LIFO)的线程安全队列。PriorityQueue
(优先级队列):对添加的元素提供优先级排序的线程安全队列。通过Queue
模块提供的接口方法,可以安全地实现多个线程之间的同步操作,从而避免资源竞争和线程阻塞问题。
下面是一个生产者(Producer
)和消费者(Consumer
)模型的例子,这个例子没有使用多线程库来控制执行,但使用了Queue
模块的阻塞队列来实现线程间通信。
from queue import Queue
# 队列容量
queue_max_size = 10
# 生产者生产的任务
tasks = range(500)
# 用于控制生产者和消费者的同步
queue = Queue(queue_max_size)
# 生产者函数定义
def producer():
for t in tasks:
queue.put(t) # 如果队列已满,会自动阻塞
queue.put(None) # 用来通知消费者生产结束
# 消费者函数定义
def consumer():
while True:
# 如果队列为空,会自动阻塞
task = queue.get()
if task is None:
break
print(task)
# 消费者线程
consumer_thread = Thread(target=consumer)
consumer_thread.start()
# 生产者线程
producer_thread = Thread(target=producer)
producer_thread.start()
# 等待生产者线程执行结束
producer_thread.join()
# 生产者生产的任务总数为 500,加上结束标识 None
# 消费者正常消费结束时,事实上只打印了 500 个任务的编号
下面是一个多任务示例,程序为多个任务分配多个进程处理,每个进程都处于独立的线程中,同时利用queue
模块实现了多线程之间的通信。
import os
import threading
from queue import Queue
# 存储所有要处理的任务
tasks = ["task1", "task2", "task3", "task4", "task5"]
# 存储所有结果,把所有处理结果都放到一个list中
results = []
# 任务分派函数
def assign_tasks(q):
for task in tasks:
q.put(task)
# 任务处理函数
def process_task(q):
while not q.empty():
task = q.get()
result = os.popen(f"python {task}.py").read().strip()
results.append(result)
# 定义一个线程安全的队列
q = Queue()
# 创建两个线程,分别用于任务分派和任务处理
t1 = threading.Thread(target=assign_tasks, args=(q, ))
t2 = threading.Thread(target=process_task, args=(q, ))
# 启动两个线程
t1.start()
t2.start()
# 等待线程执行完毕
t1.join()
t2.join()
# 输出结果
for i, result in enumerate(results):
print(f"result{i}: {result}")
上面两个示例都是使用queue
模块来实现多线程通信的例子。在这些例子中,使用Queue
类来创建一个队列,利用put
、get
等方法来进行经典的生产者消费者模型等操作即可。