Python-多執行緒及生產者與消費者
一、前置知識
1. 隊列基礎
- 如果不指定
隊列
是什麼,請自行查閱 - 在Python中,隊列是最常用的執行緒間的通訊方法,因為它是執行緒安全的
from queue import Queue
# 創建隊列
# -- 限制隊中最多有 maxsize 個元素
# -- 如果省略參數,默認元素個數無限制
q = Queue(100)
q1 = Queue()
# 元素入隊
q.put(1)
q.put(True)
q.put('abc')
# 隊列的大小
print(q.qsize())
# 判斷隊滿
print(q.full())
# 判斷隊空
print(q.empty())
# 元素出隊
# 注意:如果隊空,取元素時,會陷入阻塞狀態,知道再往隊中加入數據為止【***】
while not q.empty():
print(q.get())
2. 多執行緒
(1) 進程與執行緒的關係
'''
1. 一個進程可以有多個執行緒,但是必須有一個主執行緒
2. 進程之間互不影響,資源不共享
3. 執行緒之間,資源可以共享,共享的是執行緒所屬的進程的內容
4. 執行緒必須依賴於進程存在
'''
(2) 創建執行緒——方式1
from threading import Thread
def add(n1, n2):
print('結果為:' + n1 + n2)
def main():
# 創建一個執行緒
# -- target 函數的名稱
# -- args 以元組的形式,傳入函數所需的參數
t = Thread(target=add, args=(1, 2,))
# 開啟執行緒
t.start()
if __name__ == '__main__':
main()
(3) 創建執行緒——方式2
'''
1. 通過繼承 Thread類 創建執行緒的步驟
(1) 定義一個類
(2) 繼承 Thread類
(3) 重寫 run() 方法
(4) 在 run() 方法中寫邏輯程式碼
2. 注意事項
(1) 子類繼承 Thread類 後,實例化對象時,會自動執行父類中的 run()方法
所以我們可以重寫 run(),然後在 run() 中執行我們自己的程式碼
(2) 一個子類繼承了 Thread類,那麼在對執行緒執行任何其他操作之前
它必須確保已調用基類的構造函數
-- 比如:傳參時,需要調用的父類的構造函數
'''
from threading import Thread
class MyThread(Thread):
# 構造函數
def __init__(self, n1, n2):
# 調用父類的構造函數:第一種方法
# threading.Thread.__init__(self)
# 調用父類的構造函數:第二種方法
super().__init__()
self.n1 = n1
self.n2 = n2
# 重寫 run() 方法
def run(self):
print('執行緒的名稱:' + self.name)
print(self.n1 + self.n2)
def main():
# 實例化對象的過程,就是在創建執行緒
t1 = MyThread(1, 1)
# 設置執行緒的名稱
t1.setName('t1')
# 開啟執行緒
t1.start()
if __name__ == '__main__':
main()
(4) 鎖的使用
- 一定要保證相關的執行緒使用的是同一把鎖,否則加鎖操作無意義
# 加鎖之前
# ----------------------------------------------------------
from threading import Thread
num = 0 # 聲明共享資源
def Jia():
# 標註使用共享的資源
global num
# 主邏輯程式碼
for i in range(10000000):
num+=1
print(num)
def main():
# 創建執行緒
t1 = Thread(target=Jia)
t2 = Thread(target=Jia)
t3 = Thread(target=Jia)
# 開啟執行緒
t1.start()
t2.start()
t3.start()
if __name__ == '__main__':
main()
# 加鎖之後
# ----------------------------------------------------------
from threading import Thread
from threading import Lock
lock = Lock() # 聲明鎖,要保證相關的執行緒使用的是同一把鎖
num = 0 # 聲明共享資源
def Jia(lock):
# 加鎖
lock.acquire()
# 標註使用共享的資源
global num
# 主邏輯程式碼
for i in range(10000000):
num+=1
print(num)
# 釋放鎖
lock.release()
def main():
# 創建執行緒
t1 = Thread(target=Jia, args=(lock,))
t2 = Thread(target=Jia, args=(lock,))
t3 = Thread(target=Jia, args=(lock,))
# 開啟執行緒
t1.start()
t2.start()
t3.start()
if __name__ == '__main__':
main()
3. 進階
(1) Thread.join()
- 作用:
阻塞當前所在的執行緒
,只有當執行 join() 的執行緒結束之後,才會解除阻塞 - 分析下面的程式碼:
- 阻塞前:在主執行緒中有一句
print('結束了')
,本意是想要在fn
函數執行完之後,再輸出結束了
,但是因為主執行緒和t1執行緒是同步的,他們在同時執行,所以print('結束了')
的輸出位置不一定是最後面,可能是在fn
執行一半的時候就輸出結束了
- 阻塞後:
t1執行緒
調用了join()
,阻塞了當前所在執行緒,即阻塞了主執行緒,所以主執行緒需要等t1執行緒
結束後才可以繼續執行主執行緒的內容,故實現了print('結束了')
在fn
執行完後在輸出內容的需求
- 阻塞前:在主執行緒中有一句
# 阻塞前:也就是不調用 join()
# ----------------------------------------------------------
import time
from threading import Thread
def fn():
for i in range(10):
print(i)
time.sleep(1.5)
def main():
t1 = Thread(target=fn)
t1.start()
print('結束了')
if __name__ == '__main__':
main()
# 阻塞後:調用了 join()
# ----------------------------------------------------------
import time
from threading import Thread
def fn():
for i in range(10):
print(i)
time.sleep(1.5)
def main():
t1 = Thread(target=fn)
t1.start()
t1.join()
print('結束了')
if __name__ == '__main__':
main()
(2) 守護進程
'''
1. 進程分為主進程、守護進程、非守護進程
2. 守護、非守護是相對於主進程 而言的
3. 守護進程,可以理解為不重要的進程,當主進程結束後,守護進程會強制結束
4. 非守護進程,是比守護進程重要的進程,當主進程結束後,守護進程不會被強制結束
'''
# t1進程是非守護進程:t1進程會陷入死循環
# ----------------------------------------------------------
from threading import Thread
def fn():
while True:
print(1)
def main():
t1 = Thread(target=fn)
t1.start()
print('結束了')
if __name__ == '__main__':
main()
# t1進程是守護進程:t1進程會因為主進程的結束,被強制結束
# ----------------------------------------------------------
from threading import Thread
def fn():
while True:
print(1)
def main():
t1 = Thread(target=fn)
t1.start()
t1.setDaemon(True) # 設置為True時,說明此進程是"守護進程"【默認是False】
print('結束了')
if __name__ == '__main__':
main()
(3) 隊列在執行緒之間的通訊
一定要在後面的程式碼中仔細思考一下,尤其是階段5的程式碼
# Queue.join()
'''
當生產者生產結束時,先阻塞生產者執行緒,只有當消費者發出已經消費完隊中產品時,才解除阻塞
'''
# Queue.task_done()
'''
消費者消費一個隊中的產品,就向生產者發送一次資訊
當消費完隊中資訊之後,也向生產者發送資訊,並發出已經消費完的提示,提示生產者可以解除生產者執行緒的阻塞了
'''
二、生產者與消費者模式
- 該模式有兩個對象,分別是生產者、消費者,兩者同時進行操作
- 下面分為5個階段,慢慢講解
階段1:消費者執行緒的阻塞
from queue import Queue
from threading import Thread
# 生產者
def produce(q):
for i in range(1, 11):
q.put(i)
print(f'生產產品——{i}')
# 消費者
def consumer(q):
while True:
tmp = q.get()
print(f'消費產品——{tmp}')
# 主進程
def main():
q = Queue()
pro = Thread(target=produce, args=(q,))
con = Thread(target=consumer, args=(q,))
pro.start()
con.start()
if __name__ == '__main__':
main()
- 分析
- 在主執行緒中創建並開啟生產者執行緒和消費者執行緒,生產者共生產10個產品
- 生產者生產產品的同時,消費者在調用
q.get()
方法消費產品,當生產者把產品全部生產完之後,生產者執行緒結束,消費者繼續調用q.get()
方法消費產品,當沒有產品可以消費時,消費者再調用q.get()
時,會導致消費者執行緒進入阻塞狀態,直到再往裡面加數據為止,但是生產者已經把產品生產完,不會再生產了,所以消費者執行緒會一直處於阻塞狀態
階段2:產品消費不完
from queue import Queue
from threading import Thread
# 生產者
def produce(q):
for i in range(1, 11):
q.put(i)
print(f'生產產品——{i}')
# 消費者
def consumer(q):
while True:
tmp = q.get()
print(f'消費產品——{tmp}')
# 主進程
def main():
q = Queue()
pro = Thread(target=produce, args=(q,))
con = Thread(target=consumer, args=(q,))
con.setDaemon(True) # 設置守護執行緒
pro.start()
con.start()
if __name__ == '__main__':
main()
- 針對階段1的程式碼,只添加了一行程式碼,將消費者執行緒為 “守護執行緒”即可
- 分析
- 當生產者將產品全部生產完,生產者執行緒結束,然後主執行緒也結束了,接著
消費者執行緒
作為守護執行緒被強制退出,解決了消費者執行緒阻塞的問題 - 但是,由下圖可看到,雖然解決了消費者執行緒阻塞的問題,但是消費者本次只消費了5個產品,生產者所生產的產品沒有被消費完,這個問題請看
階段3
- 當生產者將產品全部生產完,生產者執行緒結束,然後主執行緒也結束了,接著
階段3:小完美的程式碼
from queue import Queue
from threading import Thread
# 生產者
def produce(q):
for i in range(1, 11):
q.put(i)
print(f'生產產品——{i}')
q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞
# 消費者
def consumer(q):
while True:
tmp = q.get()
print(f'消費產品——{tmp}')
q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品
# 主進程
def main():
q = Queue()
pro = Thread(target=produce, args=(q,))
con = Thread(target=consumer, args=(q,))
con.setDaemon(True)
pro.start()
con.start()
if __name__ == '__main__':
main()
-
針對階段2僅添加了兩行程式碼
- q.join()
- q.task_done()
-
分析:
- 當生產者將產品全部生產完,生產者執行緒因為執行了
q.join()
而被阻塞,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞 - 而消費者執行緒會邊消費產品,邊執行
q.task_done()
給生產者執行緒發送消息,直到消費完全部的產品時,在給生產者發送消息時,會通知生產者已經消費完全部的產品 - 此時生產者接收到消費完全部產品的資訊,阻塞被解除,生產者執行緒結束
- 然後主執行緒結束
- 再接著,由於消費者執行緒的守護執行緒,被強制關閉
- 當生產者將產品全部生產完,生產者執行緒因為執行了
階段4:有關執行緒執行順序的問題
from queue import Queue
from threading import Thread
# 生產者
def produce(q):
for i in range(1, 11):
q.put(i)
print(f'生產產品——{i}')
q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞
# 消費者
def consumer(q):
while True:
tmp = q.get()
print(f'消費產品——{tmp}')
q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品
# 主進程
def main():
q = Queue()
pro = Thread(target=produce, args=(q,))
con = Thread(target=consumer, args=(q,))
con.setDaemon(True)
pro.start()
con.start()
print('結束了')
if __name__ == '__main__':
main()
與階段3相比,僅在主執行緒中添加一行輸出語句
- 分析
- 我們想要的是兩個子執行緒結束之後,再列印輸出
生產者和消費者全部結束了呀!!!
,但是很明顯,結果不是這樣的,下面開始分析 - 程式中有1個主執行緒、2個子執行緒,三者會同時執行,所以主執行緒中的輸出語句的執行時間是隨機的,故輸出的位置也是隨機的
- 解決方法:阻塞當前執行緒,也就是阻塞主執行緒,見階段5
- 我們想要的是兩個子執行緒結束之後,再列印輸出
階段5:執行緒執行順序問題的解決
from queue import Queue
from threading import Thread
# 生產者
def produce(q):
for i in range(1, 11):
q.put(i)
print(f'生產產品——{i}')
q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞
# 消費者
def consumer(q):
while True:
tmp = q.get()
print(f'消費產品——{tmp}')
q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品
# 主進程
def main():
q = Queue()
pro = Thread(target=produce, args=(q,))
con = Thread(target=consumer, args=(q,))
con.setDaemon(True)
pro.start()
con.start()
pro.join() # 阻塞當前所在的執行緒
print('結束了')
if __name__ == '__main__':
main()
與階段4相比,僅添加一句程式碼,以達到阻塞主執行緒的需求
- 分析:
- 程式中有1個主執行緒、2個子執行緒,三者會同時執行
- 主執行緒中執行到
pro.join()
時,當前執行緒被阻塞,也即主執行緒被阻塞,知道生產完全部產品,消費完全部產品,生產者執行緒結束 - 主執行緒才被解除阻塞
- 然後主執行緒結束,消費者執行緒被強制結束