python並發編程-進程間通訊-Queue隊列使用-生產者消費者模型-執行緒理論-創建及對象屬性方法-執行緒互斥鎖-守護執行緒-02
- 2019 年 10 月 7 日
- 筆記
目錄
進程補充
進程通訊前言
要想實現進程間通訊,可以用管道或者隊列
隊列比管道更好用(隊列自帶管道和鎖)

隊列特點:先進先出
堆棧特點:先進後出
我們採用隊列來實現進程間數據通訊,下面先介紹一下隊列
Queue隊列的基本使用
基本方法:q.put(元素) q.get() q.get_nowait() q.full() q.empty()
from multiprocessing import Process, Queue q = Queue(5) # 實例化出一個對象 # -------------------------------------- # q.put(元素) 往隊列里放東西 # 如果隊列滿了還往裡面放,就會等在這裡 # -------------------------------------- # q.put(1) # q.put(2) # q.put(3) # -------------------------------------- # # q.full() 判斷隊列有沒有滿 # -------------------------------------- # print(q.full()) # q.full 判斷隊列有沒有滿 # # False # q.put(4) # q.put(5) # # q.put(6) # 如果隊列滿了還往裡面放,就會等在這裡 # print(q.full()) # # True for i in range(5): q.put(i) print(q.full()) # True # -------------------------------------- # q.get() 從隊列頭取一個值 # 如果隊列空了,就會等在這裡,等數據過來 # -------------------------------------- print(q.get()) print(q.full()) # 0 # False print(q.get()) print(q.get()) # print(q.get()) # -------------------------------------- # q.get_nowait() 從隊列頭取一個值 # 在隊列有數據的情況下,與get取值一樣 # 當隊列沒有值的情況下,取值直接報錯 # -------------------------------------- print(q.get_nowait()) # 在隊列有數據的情況下,與get取值一樣,當隊列沒有值的情況下,取值直接報錯 # -------------------------------------- # q.empty() 判斷隊列是否為空 # 在並發的情況下,這個方法不準確 # -------------------------------------- print(q.empty()) # 判斷隊列是否為空,需要注意的是在並發的情況下,這個方法不準確 print(q.get()) # 1 # 2 # 3 # False # 4 # print(q.get()) # 如果隊列空了,就會等在這裡,等數據過來 print(q.empty()) # True # print(q.get_nowait()) # 直接報錯 queue.Empty
通過Queue隊列實現進程間通訊(IPC機制)
數據的互通,可實現主進程與子進程之間的互通,子進程與子進程之間的互通 數據只有一份,取完就沒了,無法重複獲取同一份數據
from multiprocessing import Queue, Process def producer(q): q.put('hello baby.') def consumer(q): print(q.get()) if __name__ == '__main__': q = Queue() # 生成一個隊列對象 p1 = Process(target=producer, args=(q,)) c1 = Process(target=consumer, args=(q,)) p1.start() c1.start() # 子進程獲取到了另一個子進程的數據 # hello baby. # print(q.get()) # 主進程獲取到了子進程的數據 # hello baby.
生產者消費者模型
生產者:生產/製造數據的
消費者:消費/處理數據的
例子:做包子的,賣包子的 1.做的包子遠比買包子的多 2.做的包子遠比買包子的少 --> 供需不平衡
用處:解決供需不平衡的問題
以做包子買包子為例實現當包子賣完了停止消費行為
方式一
from multiprocessing import Process, Queue import time import random def producer(name, food, q: Queue): for i in range(10): data = f'{name} 生產了 {food}{i}' time.sleep(random.random()) q.put(data) print(data) def consumer(name, q): while True: res = q.get() if not res: # 已經把生產者做的東西全部吃完了,那麼本消費者也結束食用 break data = res.split(' ')[2] data = f'{name} 吃了 {data}' print(data) time.sleep(random.random()) if __name__ == '__main__': q = Queue() p = Process(target=producer, args=('大廚egon', '饅頭', q)) p2 = Process(target=producer, args=('跟班tank', '生蚝', q)) c = Process(target=consumer, args=('jason', q)) c2 = Process(target=consumer, args=('吃貨kevin', q)) p.start() p2.start() c.start() c2.start() # 不知道什麼時候生產者什麼時候生成完 p.join() p2.join() q.put(None) # 通過 None來標誌生產者已生產完成 q.put(None) # 可以實現,但是不好
方式二
改用JoinableQueue模組的隊列
與守護進程
來實現
from multiprocessing import Process, JoinableQueue import time import random def producer(name, food, q: JoinableQueue): for i in range(10): data = f'{name} 生產了 {food}{i}' time.sleep(random.random()) q.put(data) print(data) def consumer(name, q): while True: res = q.get() if not res: break data = res.split(' ')[2] data = f'{name} 吃了 {data}' print(data) time.sleep(random.random()) q.task_done() # 告訴隊列,你已經從隊列中取出了一個數據,並且處理完畢了 if __name__ == '__main__': q = JoinableQueue() p = Process(target=producer, args=('大廚egon', '饅頭', q)) p2 = Process(target=producer, args=('跟班tank', '生蚝', q)) c = Process(target=consumer, args=('jason', q)) c2 = Process(target=consumer, args=('吃貨kevin', q)) p.start() p2.start() c.daemon = True # 配合join,結束程式消費者也結束(注意join是主進程的最後一句程式碼) c.start() c2.daemon = True c2.start() # 不知道什麼時候生產者什麼時候生成完 p.join() p2.join() q.join() # 等待隊列中數據全部取出,執行完了這句話,也就意味著隊列中沒有數據了(消費者那裡還是會卡住,get不到東西等待) # 配合上 守護進程 來實現....
執行緒
什麼是執行緒
進程和執行緒其實都是虛擬單位,都是用來幫助我們形象的描述某種事物
進程:資源單位(一塊獨立的記憶體空間)
執行緒:執行單位
將記憶體比喻成工廠,那麼進程就相當於工廠里的車間,而你的執行緒就相當於是車間裡面的流水線
CPU其實運行的其實是執行緒,進程只是資源單位
執行緒執行時需要的資源單位都跟進程要
ps:每個進程都自帶一個執行緒,執行緒才是真正的執行單位,進程只是在執行緒運行過程中提供程式碼運行所需要的資源 每個進程都會自帶一個執行緒 執行緒沒有主次之分,只不過我們默認就把主進程自帶的那個執行緒叫做主執行緒
為什麼要有執行緒
開進程
- 申請記憶體空間 —> 耗資源
- 「拷貝程式碼」 —> 耗資源
開執行緒
- 一個進程內可以起多個執行緒,並且執行緒與執行緒之間數據是共享的
ps:開啟執行緒的開銷要遠遠小於開啟進程的開銷(可能剛執行完創建執行緒的程式碼執行緒就創建好了)
開啟執行緒的兩種方式
方式一
from threading import Thread import time def task(name): print(f"{name} is running") time.sleep(3) print(f"{name} is over") t = Thread(target=task, args=('egon', )) # 開執行緒不需要在 __main__ 程式碼塊內,但是習慣性的還是寫在 __main__ 內 t.start() # 告訴作業系統開啟一個執行緒 # 執行緒的開銷遠遠小於進程,小到以至於可以程式碼執行完,執行緒就已經開啟了 print("主") # 執行緒沒有主次之分,都在同一個進程的名稱空間里,只是人為把進程自帶的執行緒叫做主執行緒 # egon is running # 主執行緒 # 進程的時候這個主執行緒可能會是最先列印的 # egon is over
方式二
from threading import Thread import time class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print(f"{self.name} is running") time.sleep(1) print(f"{self.name} is over") if __name__ == '__main__': t = MyThread('jason') t.start() # 開啟執行緒的速度非常快,幾乎程式碼執行完執行緒就已經開啟 print("主") # jason is running # 主 # jason is over
執行緒之間數據共享
from threading import Thread money = 666 def task(): global money money = 999 t = Thread(target=task) t.start() t.join() # 確保是執行緒運行結束後 print(money) # 999 # 主執行緒與子執行緒之間數據是通用的
執行緒間想要實現數據通訊,不需要藉助於隊列(執行緒間支援數據通訊)
執行緒對象的其他屬性和方法
import time from threading import Thread, active_count, current_thread import os def task(name): print(f"{name} is running {os.getpid()}") # # ------------------------------------------------ # # current_thread().name current_thread().getname() 當前執行緒名 # # 記得導入模組 # # ------------------------------------------------ # print(f"current_thread().name:{current_thread().name}") # current_thread().name:Thread-1 time.sleep(1) print(f"{name} is over") # t = Thread(target=task, args=('jason', )) # t.start() # # ------------------------------------------------ # # os.getpid() os.getppid() 獲取進程號 父進程號 # # 多個執行緒屬於同一進程 # # ------------------------------------------------ # print(f"pid {os.getpid()}") # # jason is running 5572 # # pid 5572 # # jason is over t = Thread(target=task, args=('jason', )) t.start() # ------------------------------------------------ # active_count() 統計當前存活的執行緒數 # 記得導入模組 # ------------------------------------------------ print(active_count()) print(f"pid {os.getpid()}") # jason is running 5728 # 2 # pid 5728 print(f"主 current_thread().name:{current_thread().name}") # 主 current_thread().name:MainThread t.join() # 主執行緒等待子執行緒運行結束 # jason is over print("主 active_count", active_count()) # 可能會有問題,多執行緒是非同步,可能join的執行緒結束了,其他執行緒也正好結束了(多個執行緒時) # 主 active_count 1 # Thread.join(t) # 可以考慮用類調用對象方法,傳入對象來在循環里對執行緒對象進行操作
守護執行緒
主執行緒要等待所有非守護執行緒結束後才會結束(不是主執行緒的程式碼執行完了就立馬結束了)
主執行緒結束後,守護(子)執行緒也會立即結束
主執行緒運行結束之後為什麼需要等待子執行緒結束才能結束呢?
主執行緒的結束也就意味著進程的結束 主執行緒必須等待其他非守護執行緒的結束才能結束 因為子執行緒在運行的時候需要使用進程中的資源,而主執行緒一旦結束了,資源也就銷毀了
# from threading import Thread, current_thread # import time # # # def task(i): # print(f"{current_thread().name}") # time.sleep(i) # print("GG") # # # for i in range(3): # t = Thread(target=task, args=(i, )) # t.start() # # # print("主") # # 循環的時候就已經列印了部分數據了(非同步) # # Thread-1 # # GG # # Thread-2 # # Thread-3 # # 主 # # GG # # GG # 主執行緒運行結束之後為什麼需要等待子執行緒結束才能結束呢? ''' 主執行緒的結束也就意味著進程的結束 主執行緒必須等待其他非守護執行緒的結束才能結束 因為子執行緒在運行的時候需要使用進程中的資源,而主執行緒一旦結束了,資源也就銷毀了 ''' from threading import Thread, current_thread import time def task(i): print(f"{current_thread().name}") time.sleep(i) print("GG") for i in range(3): t = Thread(target=task, args=(i,)) t.daemon = True t.start() print("主") # Thread-1 # GG # Thread-2 # Thread-3 # 主
測試
下面程式的執行結果是什麼?
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("main-------") # 123 # 456 # main------- # end123 # end456
執行緒互斥鎖
從執行緒間通訊那裡的案例可以看出,執行緒間數據是相通的,那麼多個執行緒對同一份數據進行操作會產生問題
下面同樣模擬一個網路延遲來對數據進行操作(確保所有執行緒都執行完的操作可以記一下)
不加鎖遇到延遲的情況
# 模擬網路延遲的現象 # 多個執行緒操作同一個數據,也會造成數據不安全 import time from threading import Thread n = 10 def task(): global n tmp = n time.sleep(1) n = tmp - 1 # ------------------------------- t_list = [] for i in range(10): t = Thread(target=task) t.start() t_list.append(t) # 確保其他執行緒都執行完了之後再列印 for t in t_list: t.join() # ------------------------------- print(n) # 9
加鎖後遇到延遲
# 加鎖解決問題 import time from threading import Thread, Lock n = 10 def task(mutex): mutex.acquire() # 搶鎖 global n tmp = n time.sleep(1) n = tmp - 1 mutex.release() # 釋放鎖 t_list = [] mutex = Lock() for i in range(10): t = Thread(target=task, args=(mutex, )) t.start() t_list.append(t) # 確保其他執行緒都執行完了之後再列印 for t in t_list: t.join() print(n) # 0 # 等10s多點 後列印出結果,數據未受延遲影響,保證了數據安全
為什麼用互斥鎖不用 執行緒/進程對象.join()
雖然互斥鎖也是將並發改成串列,犧牲效率來保證數據安全,這一點
執行緒對象.join()
也可以實現將並發改成串列,同樣保證數據安全,但執行緒對象.join()
是將每一個執行緒的運行都變成串列的,對比互斥鎖的只將數據操作部分編程串列消耗的時間要多得多,若果執行緒耗時長,執行效率就會低的可怕
# # 不加鎖:未加鎖部分並發執行,加鎖部分串列執行,速度慢,數據安全 # from threading import current_thread, Thread, Lock # import os # import time # # # def task(): # # 未加鎖的程式碼並發運行 # time.sleep(3) # print('%s start to run' % current_thread().getName()) # global n # # 加鎖的程式碼串列運行 # lock.acquire() # temp = n # time.sleep(0.5) # n = temp - 1 # lock.release() # # # if __name__ == '__main__': # n = 100 # lock = Lock() # threads = [] # start_time = time.time() # for i in range(100): # t = Thread(target=task) # threads.append(t) # t.start() # for t in threads: # t.join() # stop_time = time.time() # print('主:%s n:%s' % (stop_time - start_time, n)) # # ''' # Thread-3 start to run # Thread-1 start to run # ...... # Thread-100 start to run # Thread-96 start to run # 主:53.06105661392212 n:0 # ''' # 利用 join 保證數據安全 from threading import current_thread, Thread, Lock import os import time def task(): time.sleep(3) print('%s start to run' % current_thread().getName()) global n temp = n time.sleep(0.5) n = temp - 1 if __name__ == '__main__': n = 100 lock = Lock() start_time = time.time() for i in range(100): t = Thread(target=task) t.start() t.join() stop_time = time.time() print('主:%s n:%s' % (stop_time - start_time, n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.1616487503052 n:0 # 耗時是多麼的恐怖 '''
執行緒和進程的用戶大同小異,可以對比著來記
後續可以畫圖或表格用對比的方式來整理一下,方便記憶~