线程队列数据共享与生产者消费者模型

线程队列数据共享与生产者消费者模型

queue模块简介

  queue模块提供了多种队列,那么它主要是用于多线程编程中的数据共享。

  我们都知道同一进程下的数据是能被多个线程共享的,那么为什么这些线程在同一进程下还去使用队列呢?

 

  因为队列是: 管道 + 锁

 

  所以使用队列来存放多个线程中用于共享的数据还是为了保证其数据的安全性。

  queue模块中的队列主要是用于本地测试中使用,正式上线时还得要使用Redis,这个是后话,我们先来看关于queue模块的使用。

       官方中文文档 

 

方法大全

 

queue模块中的队列方法大全 
方法名称 功能描述
Queue.qsize() 返回当前队列的大小
Queue.empty() 判断当前队列是否为空
Queue.full() 判断当前队列是否已满
Queue.put(item, block=True, timeout=None) item放入队列中,block参数为如果要操作的队列目前已满是否阻塞,timeout为超时时间。
Queue.put_nowait(item) 相当于 put(item, False),如果操作的队列已满则不进行阻塞,而是抛出Full异常。
Queue.get(block=True, timeout=None) 将项目从队列中取出,block参数为如果要操作的队列目前为空是否阻塞,timeout为超时时间。
Queue.get_nowait() 相当于 get(False),如果要操作的队列为空则不进行阻塞,而是抛出Empty异常。
Queue.task_done() 当消费者线程被join()阻塞后,生产者线程调用此方法会停止消费者线程的阻塞状态。详情请见下面补充
Queue.join() 当消费者线程被join()阻塞后,需等待消费者线程调用task_done()方法后方可停止阻塞。详情请见下面的补充
注意:如遇到异常情况请查看官方文档,这个模块的方法比较特殊 

 

三种不同的队列

  queue模块中,提供了三种队列。如下:

 

  queue.Queue:先进先出队列

  queue.LifoQueue:先进后出队列

  queue.PriorityQueue:优先级队列

 

import queue

# ==== 先进先出队列 ====

q1 = queue.Queue(maxsize=5)  # 该队列最大可存放5个项目
q1.put(1)   # 入队操作
q1.put(2)
q1.put(3)

print(q1.get())  # 出队操作
print(q1.get())
print(q1.get())

# ==== 执行结果  ====

"""
1
2
3
"""


# ==== 先进后出队列 ====

q2 = queue.LifoQueue(maxsize=5)  # 该队列最大可存放5个项目
q2.put(1)   # 入队操作
q2.put(2)
q2.put(3)

print(q2.get())  # 出队操作
print(q2.get())
print(q2.get())

# ==== 执行结果  ====

"""
3
2
1
"""

# ==== 优先级队列 ====

q3 = queue.PriorityQueue(maxsize=5)
q2.put([10,"优先级为10"])   # 入队操作
q2.put([20,"优先级为20"])
q2.put([30,"优先级为30"])

print(q2.get())  # 出队操作,如果想取出具体元素,加个 索引[1] 即可
print(q2.get())
print(q2.get())

# ==== 执行结果  ====

"""
[30, '优先级为30']
[20, '优先级为20']
[10, '优先级为10']
"""

三种队列的简单实用

 

生产者消费者模型

  生产者消费者模型是一种思想,大概意思就是我们不让生成者与消费者之间进行直接接触,而是通过某种缓冲的方式让彼此之间的耦合度降低。生产者生产出了产品,消费者就可以进行购买,如果没有产品就等着。

  那么这个时候我们就可以使用队列了,因为队列里有阻塞的机制,我画一幅图,来看一眼。

image-20200702151514469

来,我们尝试用代码实现一下:

import threading
import queue
import time


def producer():
    """生产者"""
name = "食神周树人" count = 1 # 计数器 while count < 11: # 每天只做10个包子 produce = "包子" print("包子做好了,这是今天的第{0}个包子".format(count)) q.put(produce + str(count)) # 将产品放在管道中 count += 1 time.sleep(0.4) # 厨师0.4秒做一个包子 def consumer(): """消费者"""
name = threading.current_thread().getName() print("{0}正在等包子".format(name)) while 1: try: food = q.get(timeout=2) # 等拿包子,最多等2秒 time.sleep(0.2) # 消费者0.2秒吃一个包子 print("{0}把{1}吃了,真好吃!".format(name, food)) except queue.Empty: # 有的人等的不耐烦就不等了 print("{0}说:等了这么久还没有,不等了".format(name)) return if __name__ == '__main__': q = queue.Queue(maxsize=10) # 一次最多放10个产品 name = ["大耳朵", "二狗", "三蛋子"] # 消费者姓名列表 for i in range(3): t1 = threading.Thread(target=consumer, name=name[i]) t1.start() producer() # 生产者启动,目前只有一个生产者,也可以多个 # ==== 执行结果 ==== """ 大耳朵正在等包子 二狗正在等包子 三蛋子正在等包子包子做好了,这是今天的第1个包子 三蛋子把包子1吃了,真好吃! 包子做好了,这是今天的第2个包子 二狗把包子2吃了,真好吃! 包子做好了,这是今天的第3个包子 大耳朵把包子3吃了,真好吃! 包子做好了,这是今天的第4个包子 三蛋子把包子4吃了,真好吃! 包子做好了,这是今天的第5个包子 二狗把包子5吃了,真好吃! 包子做好了,这是今天的第6个包子 大耳朵把包子6吃了,真好吃! 包子做好了,这是今天的第7个包子 三蛋子把包子7吃了,真好吃! 包子做好了,这是今天的第8个包子 二狗把包子8吃了,真好吃! 包子做好了,这是今天的第9个包子 大耳朵把包子9吃了,真好吃! 包子做好了,这是今天的第10个包子 三蛋子把包子10吃了,真好吃! 二狗说:等了这么久还没有,不等了 大耳朵说:等了这么久还没有,不等了 三蛋子说:等了这么久还没有,不等了 """

 

补充:join()与task_done()

import threading
import queue
import time

def task_1():
    print("正在装东西..")
    time.sleep(3)
    q.put("玫瑰花")  # 正在装东西
    q.task_done()  # 通知对方可以取了


def task_2():
    q.join() # 阻塞等待通知,接到通知说明队列里里有东西了。
    print("取到了",q.get())  # 取东西


if __name__ == '__main__':

    q = queue.Queue(maxsize=5)

    t1 = threading.Thread(target=task_1,name="小明")
    t2 = threading.Thread(target=task_2,name="小花")

    t1.start()
    t2.start()

# ==== 执行结果 ====

"""
正在装东西..
取到了 玫瑰花
"""