執行緒隊列數據共享與生產者消費者模型

執行緒隊列數據共享與生產者消費者模型

queue模組簡介

  queue模組提供了多種隊列,那麼它主要是用於多執行緒編程中的數據共享。

  我們都知道同一進程下的數據是能被多個執行緒共享的,那麼為什麼這些執行緒在同一進程下還去使用隊列呢?

 

  因為隊列是: 管道 + 鎖

 

  所以使用隊列來存放多個執行緒中用於共享的數據還是為了保證其數據的安全性。

  queue模組中的隊列主要是用於本地測試中使用,正式上線時還得要使用Redis,這個是後話,我們先來看關於queue模組的使用。

       官方中文文檔 

 

方法大全

 

queue模組中的隊列方法大全 
方法名稱 功能描述
Queue.qsize() 返回當前隊列的大小
Queue.empty() 判斷當前隊列是否為空
Queue.full() 判斷當前隊列是否已滿
Queue.put(item, block=True, timeout=None) item放入隊列中,block參數為如果要操作的隊列目前已滿是否阻塞,timeout為超時時間。
Queue.put_nowait(item) 相當於 put(item, False),如果操作的隊列已滿則不進行阻塞,而是拋出Full異常。
Queue.get(block=True, timeout=None) 將項目從隊列中取出,block參數為如果要操作的隊列目前為空是否阻塞,timeout為超時時間。
Queue.get_nowait() 相當於 get(False),如果要操作的隊列為空則不進行阻塞,而是拋出Empty異常。
Queue.task_done() 當消費者執行緒被join()阻塞後,生產者執行緒調用此方法會停止消費者執行緒的阻塞狀態。詳情請見下面補充
Queue.join() 當消費者執行緒被join()阻塞後,需等待消費者執行緒調用task_done()方法後方可停止阻塞。詳情請見下面的補充
注意:如遇到異常情況請查看官方文檔,這個模組的方法比較特殊 

 

三種不同的隊列

  queue模組中,提供了三種隊列。如下:

 

  queue.Queue:先進先出隊列

  queue.LifoQueue:先進後出隊列

  queue.PriorityQueue:優先順序隊列

 

import queue

# ==== 先進先出隊列 ====

q1 = queue.Queue(maxsize=5)  # 該隊列最大可存放5個項目
q1.put(1)   # 入隊操作
q1.put(2)
q1.put(3)

print(q1.get())  # 出隊操作
print(q1.get())
print(q1.get())

# ==== 執行結果  ====

"""
1
2
3
"""


# ==== 先進後出隊列 ====

q2 = queue.LifoQueue(maxsize=5)  # 該隊列最大可存放5個項目
q2.put(1)   # 入隊操作
q2.put(2)
q2.put(3)

print(q2.get())  # 出隊操作
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
3
2
1
"""

# ==== 優先順序隊列 ====

q3 = queue.PriorityQueue(maxsize=5)
q2.put([10,"優先順序為10"])   # 入隊操作
q2.put([20,"優先順序為20"])
q2.put([30,"優先順序為30"])

print(q2.get())  # 出隊操作,如果想取出具體元素,加個 索引[1] 即可
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
[30, '優先順序為30']
[20, '優先順序為20']
[10, '優先順序為10']
"""

三種隊列的簡單實用

 

生產者消費者模型

  生產者消費者模型是一種思想,大概意思就是我們不讓生成者與消費者之間進行直接接觸,而是通過某種緩衝的方式讓彼此之間的耦合度降低。生產者生產出了產品,消費者就可以進行購買,如果沒有產品就等著。

  那麼這個時候我們就可以使用隊列了,因為隊列里有阻塞的機制,我畫一幅圖,來看一眼。

image-20200702151514469

來,我們嘗試用程式碼實現一下:

import threading
import queue
import time


def producer():
    """生產者"""
name = "食神周樹人" count = 1 # 計數器 while count < 11: # 每天只做10個包子 produce = "包子" print("包子做好了,這是今天的第{0}個包子".format(count)) q.put(produce + str(count)) # 將產品放在管道中 count += 1 time.sleep(0.4) # 廚師0.4秒做一個包子 def consumer(): """消費者"""
name = threading.current_thread().getName() print("{0}正在等包子".format(name)) while 1: try: food = q.get(timeout=2) # 等拿包子,最多等2秒 time.sleep(0.2) # 消費者0.2秒吃一個包子 print("{0}把{1}吃了,真好吃!".format(name, food)) except queue.Empty: # 有的人等的不耐煩就不等了 print("{0}說:等了這麼久還沒有,不等了".format(name)) return if __name__ == '__main__': q = queue.Queue(maxsize=10) # 一次最多放10個產品 name = ["大耳朵", "二狗", "三蛋子"] # 消費者姓名列表 for i in range(3): t1 = threading.Thread(target=consumer, name=name[i]) t1.start() producer() # 生產者啟動,目前只有一個生產者,也可以多個 # ==== 執行結果 ==== """ 大耳朵正在等包子 二狗正在等包子 三蛋子正在等包子包子做好了,這是今天的第1個包子 三蛋子把包子1吃了,真好吃! 包子做好了,這是今天的第2個包子 二狗把包子2吃了,真好吃! 包子做好了,這是今天的第3個包子 大耳朵把包子3吃了,真好吃! 包子做好了,這是今天的第4個包子 三蛋子把包子4吃了,真好吃! 包子做好了,這是今天的第5個包子 二狗把包子5吃了,真好吃! 包子做好了,這是今天的第6個包子 大耳朵把包子6吃了,真好吃! 包子做好了,這是今天的第7個包子 三蛋子把包子7吃了,真好吃! 包子做好了,這是今天的第8個包子 二狗把包子8吃了,真好吃! 包子做好了,這是今天的第9個包子 大耳朵把包子9吃了,真好吃! 包子做好了,這是今天的第10個包子 三蛋子把包子10吃了,真好吃! 二狗說:等了這麼久還沒有,不等了 大耳朵說:等了這麼久還沒有,不等了 三蛋子說:等了這麼久還沒有,不等了 """

 

補充:join()與task_done()

import threading
import queue
import time

def task_1():
    print("正在裝東西..")
    time.sleep(3)
    q.put("玫瑰花")  # 正在裝東西
    q.task_done()  # 通知對方可以取了


def task_2():
    q.join() # 阻塞等待通知,接到通知說明隊列里里有東西了。
    print("取到了",q.get())  # 取東西


if __name__ == '__main__':

    q = queue.Queue(maxsize=5)

    t1 = threading.Thread(target=task_1,name="小明")
    t2 = threading.Thread(target=task_2,name="小花")

    t1.start()
    t2.start()

# ==== 執行結果 ====

"""
正在裝東西..
取到了 玫瑰花
"""