Python-多執行緒及生產者與消費者

一、前置知識

1. 隊列基礎

  • 如果不指定隊列是什麼,請自行查閱
  • 在Python中,隊列是最常用的執行緒間的通訊方法,因為它是執行緒安全的
from queue import Queue

# 創建隊列
#   -- 限制隊中最多有 maxsize 個元素
#   -- 如果省略參數,默認元素個數無限制
q = Queue(100)
q1 = Queue()

# 元素入隊
q.put(1)
q.put(True)
q.put('abc')

# 隊列的大小
print(q.qsize())

# 判斷隊滿
print(q.full())

# 判斷隊空
print(q.empty())

# 元素出隊
#	注意:如果隊空,取元素時,會陷入阻塞狀態,知道再往隊中加入數據為止【***】
while not q.empty():
    print(q.get())

2. 多執行緒

(1) 進程與執行緒的關係

'''
    1. 一個進程可以有多個執行緒,但是必須有一個主執行緒
    
    2. 進程之間互不影響,資源不共享
    
    3. 執行緒之間,資源可以共享,共享的是執行緒所屬的進程的內容
    
    4. 執行緒必須依賴於進程存在
'''

(2) 創建執行緒——方式1

from threading import Thread

def add(n1, n2):
    print('結果為:' + n1 + n2)

def main():
    # 創建一個執行緒
    #   -- target 函數的名稱
    #   -- args 以元組的形式,傳入函數所需的參數
    t = Thread(target=add, args=(1, 2,))
    # 開啟執行緒
    t.start()

if __name__ == '__main__':
    main()

(3) 創建執行緒——方式2

'''
    1. 通過繼承 Thread類 創建執行緒的步驟
        (1) 定義一個類
        (2) 繼承 Thread類
        (3) 重寫 run() 方法
        (4) 在 run() 方法中寫邏輯程式碼

    2. 注意事項
        (1) 子類繼承 Thread類 後,實例化對象時,會自動執行父類中的 run()方法
            所以我們可以重寫 run(),然後在 run() 中執行我們自己的程式碼
        (2) 一個子類繼承了 Thread類,那麼在對執行緒執行任何其他操作之前
            它必須確保已調用基類的構造函數
            -- 比如:傳參時,需要調用的父類的構造函數
'''
from threading import Thread

class MyThread(Thread):
    # 構造函數
    def __init__(self, n1, n2):
        # 調用父類的構造函數:第一種方法
        # threading.Thread.__init__(self)
        # 調用父類的構造函數:第二種方法
        super().__init__()
        self.n1 = n1
        self.n2 = n2
    # 重寫 run() 方法
    def run(self):
        print('執行緒的名稱:' + self.name)
        print(self.n1 + self.n2)

def main():
    # 實例化對象的過程,就是在創建執行緒
    t1 = MyThread(1, 1)
    # 設置執行緒的名稱
    t1.setName('t1')
    # 開啟執行緒
    t1.start()

if __name__ == '__main__':
    main()

(4) 鎖的使用

  • 一定要保證相關的執行緒使用的是同一把鎖,否則加鎖操作無意義
# 加鎖之前
# ----------------------------------------------------------
from threading import Thread

num = 0 # 聲明共享資源

def Jia():
    # 標註使用共享的資源
    global num
    # 主邏輯程式碼
    for i in range(10000000):
        num+=1
    print(num)

def main():
    # 創建執行緒
    t1 = Thread(target=Jia)
    t2 = Thread(target=Jia)
    t3 = Thread(target=Jia)
    # 開啟執行緒
    t1.start()
    t2.start()
    t3.start()

if __name__ == '__main__':
    main()

# 加鎖之後
# ----------------------------------------------------------
from threading import Thread
from threading import Lock

lock = Lock() # 聲明鎖,要保證相關的執行緒使用的是同一把鎖
num = 0 # 聲明共享資源

def Jia(lock):
    # 加鎖
    lock.acquire()
    # 標註使用共享的資源
    global num
    # 主邏輯程式碼
    for i in range(10000000):
        num+=1
    print(num)
    # 釋放鎖
    lock.release()

def main():
    # 創建執行緒
    t1 = Thread(target=Jia, args=(lock,))
    t2 = Thread(target=Jia, args=(lock,))
    t3 = Thread(target=Jia, args=(lock,))
    # 開啟執行緒
    t1.start()
    t2.start()
    t3.start()

if __name__ == '__main__':
    main()

3. 進階

(1) Thread.join()

  • 作用:阻塞當前所在的執行緒,只有當執行 join() 的執行緒結束之後,才會解除阻塞
  • 分析下面的程式碼:
    • 阻塞前:在主執行緒中有一句print('結束了'),本意是想要在fn函數執行完之後,再輸出結束了,但是因為主執行緒和t1執行緒是同步的,他們在同時執行,所以print('結束了')的輸出位置不一定是最後面,可能是在fn執行一半的時候就輸出結束了
    • 阻塞後:t1執行緒調用了join(),阻塞了當前所在執行緒,即阻塞了主執行緒,所以主執行緒需要等t1執行緒結束後才可以繼續執行主執行緒的內容,故實現了print('結束了')fn執行完後在輸出內容的需求
# 阻塞前:也就是不調用 join()
# ----------------------------------------------------------
import time
from threading import Thread

def fn():
    for i in range(10):
        print(i)
    time.sleep(1.5)

def main():
    t1 = Thread(target=fn)
    t1.start()
    print('結束了')

if __name__ == '__main__':
    main()
    
# 阻塞後:調用了 join()
# ----------------------------------------------------------
import time
from threading import Thread

def fn():
    for i in range(10):
        print(i)
    time.sleep(1.5)

def main():
    t1 = Thread(target=fn)
    t1.start()
    t1.join()
    print('結束了')

if __name__ == '__main__':
    main()

(2) 守護進程

'''
    1. 進程分為主進程、守護進程、非守護進程

    2. 守護、非守護是相對於主進程 而言的

    3. 守護進程,可以理解為不重要的進程,當主進程結束後,守護進程會強制結束

    4. 非守護進程,是比守護進程重要的進程,當主進程結束後,守護進程不會被強制結束
'''
# t1進程是非守護進程:t1進程會陷入死循環
# ----------------------------------------------------------
from threading import Thread

def fn():
    while True:
        print(1)

def main():
    t1 = Thread(target=fn)
    t1.start()
    print('結束了')

if __name__ == '__main__':
    main()
# t1進程是守護進程:t1進程會因為主進程的結束,被強制結束
# ----------------------------------------------------------
from threading import Thread

def fn():
    while True:
        print(1)

def main():
    t1 = Thread(target=fn)
    t1.start()
    t1.setDaemon(True) # 設置為True時,說明此進程是"守護進程"【默認是False】
    print('結束了')

if __name__ == '__main__':
    main()

(3) 隊列在執行緒之間的通訊

  • 一定要在後面的程式碼中仔細思考一下,尤其是階段5的程式碼
# Queue.join()
'''
	當生產者生產結束時,先阻塞生產者執行緒,只有當消費者發出已經消費完隊中產品時,才解除阻塞
'''

# Queue.task_done()
'''
	消費者消費一個隊中的產品,就向生產者發送一次資訊
	當消費完隊中資訊之後,也向生產者發送資訊,並發出已經消費完的提示,提示生產者可以解除生產者執行緒的阻塞了
'''

二、生產者與消費者模式

  • 該模式有兩個對象,分別是生產者、消費者,兩者同時進行操作
  • 下面分為5個階段,慢慢講解

階段1:消費者執行緒的阻塞

from queue import Queue
from threading import Thread

# 生產者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生產產品——{i}')

# 消費者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消費產品——{tmp}')

# 主進程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    pro.start()
    con.start()

if __name__ == '__main__':
    main()
  • 分析
    • 在主執行緒中創建並開啟生產者執行緒和消費者執行緒,生產者共生產10個產品
    • 生產者生產產品的同時,消費者在調用q.get()方法消費產品,當生產者把產品全部生產完之後,生產者執行緒結束,消費者繼續調用q.get()方法消費產品,當沒有產品可以消費時,消費者再調用q.get()時,會導致消費者執行緒進入阻塞狀態,直到再往裡面加數據為止,但是生產者已經把產品生產完,不會再生產了,所以消費者執行緒會一直處於阻塞狀態
    • image-20220113183844119

階段2:產品消費不完

from queue import Queue
from threading import Thread

# 生產者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生產產品——{i}')

# 消費者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消費產品——{tmp}')

# 主進程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True) # 設置守護執行緒
    pro.start()
    con.start()

if __name__ == '__main__':
    main()
  • 針對階段1的程式碼,只添加了一行程式碼,將消費者執行緒為 “守護執行緒”即可
  • 分析
    • 當生產者將產品全部生產完,生產者執行緒結束,然後主執行緒也結束了,接著消費者執行緒作為守護執行緒被強制退出,解決了消費者執行緒阻塞的問題
    • 但是,由下圖可看到,雖然解決了消費者執行緒阻塞的問題,但是消費者本次只消費了5個產品,生產者所生產的產品沒有被消費完,這個問題請看階段3
    • image-20220113152402321

階段3:小完美的程式碼

from queue import Queue
from threading import Thread

# 生產者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生產產品——{i}')
    q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞

# 消費者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消費產品——{tmp}')
        q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品

# 主進程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()

if __name__ == '__main__':
    main()
  • 針對階段2僅添加了兩行程式碼

    • q.join()
    • q.task_done()
  • 分析:

    • 當生產者將產品全部生產完,生產者執行緒因為執行了q.join()而被阻塞,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞
    • 而消費者執行緒會邊消費產品,邊執行q.task_done()給生產者執行緒發送消息,直到消費完全部的產品時,在給生產者發送消息時,會通知生產者已經消費完全部的產品
    • 此時生產者接收到消費完全部產品的資訊,阻塞被解除,生產者執行緒結束
    • 然後主執行緒結束
    • 再接著,由於消費者執行緒的守護執行緒,被強制關閉
    • image-20220113185921120

階段4:有關執行緒執行順序的問題

from queue import Queue
from threading import Thread

# 生產者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生產產品——{i}')
    q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞

# 消費者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消費產品——{tmp}')
        q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品

# 主進程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()
    print('結束了')

if __name__ == '__main__':
    main()
  • 與階段3相比,僅在主執行緒中添加一行輸出語句
  • 分析
    • 我們想要的是兩個子執行緒結束之後,再列印輸出生產者和消費者全部結束了呀!!!,但是很明顯,結果不是這樣的,下面開始分析
    • 程式中有1個主執行緒、2個子執行緒,三者會同時執行,所以主執行緒中的輸出語句的執行時間是隨機的,故輸出的位置也是隨機的
    • 解決方法:阻塞當前執行緒,也就是阻塞主執行緒,見階段5
    • image-20220113190227763

階段5:執行緒執行順序問題的解決

from queue import Queue
from threading import Thread

# 生產者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生產產品——{i}')
    q.join() # 阻塞生產者執行緒,只有接收到消費者發送來的已經消費了最後一個產品的時候,才解除阻塞

# 消費者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消費產品——{tmp}')
        q.task_done() # 向生產者發送消息,告訴生產者我已經消費了一個產品

# 主進程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()
    pro.join() # 阻塞當前所在的執行緒
    print('結束了')

if __name__ == '__main__':
    main()
  • 與階段4相比,僅添加一句程式碼,以達到阻塞主執行緒的需求
  • 分析:
    • 程式中有1個主執行緒、2個子執行緒,三者會同時執行
    • 主執行緒中執行到pro.join()時,當前執行緒被阻塞,也即主執行緒被阻塞,知道生產完全部產品,消費完全部產品,生產者執行緒結束
    • 主執行緒才被解除阻塞
    • 然後主執行緒結束,消費者執行緒被強制結束
    • image-20220113190744707

三、參考