python並發編程-進程間通訊-Queue隊列使用-生產者消費者模型-執行緒理論-創建及對象屬性方法-執行緒互斥鎖-守護執行緒-02

  • 2019 年 10 月 7 日
  • 筆記

目錄

進程補充

進程通訊前言

要想實現進程間通訊,可以用管道或者隊列

隊列比管道更好用(隊列自帶管道和鎖)

隊列特點:先進先出

堆棧特點:先進後出

我們採用隊列來實現進程間數據通訊,下面先介紹一下隊列

Queue隊列的基本使用

基本方法:q.put(元素) q.get() q.get_nowait() q.full() q.empty()

from multiprocessing import Process, Queue    q = Queue(5)  # 實例化出一個對象  # --------------------------------------  # q.put(元素) 往隊列里放東西  #   如果隊列滿了還往裡面放,就會等在這裡  # --------------------------------------  # q.put(1)  # q.put(2)  # q.put(3)  # --------------------------------------  # # q.full() 判斷隊列有沒有滿  # --------------------------------------  # print(q.full())  # q.full 判斷隊列有沒有滿  # # False  # q.put(4)  # q.put(5)  # # q.put(6)  # 如果隊列滿了還往裡面放,就會等在這裡  # print(q.full())  # # True    for i in range(5):      q.put(i)  print(q.full())  # True  # --------------------------------------  # q.get() 從隊列頭取一個值  #   如果隊列空了,就會等在這裡,等數據過來  # --------------------------------------  print(q.get())  print(q.full())  # 0  # False  print(q.get())  print(q.get())  # print(q.get())  # --------------------------------------  # q.get_nowait() 從隊列頭取一個值  #   在隊列有數據的情況下,與get取值一樣  #   當隊列沒有值的情況下,取值直接報錯  # --------------------------------------  print(q.get_nowait())  # 在隊列有數據的情況下,與get取值一樣,當隊列沒有值的情況下,取值直接報錯  # --------------------------------------  # q.empty() 判斷隊列是否為空  #   在並發的情況下,這個方法不準確  # --------------------------------------  print(q.empty())  # 判斷隊列是否為空,需要注意的是在並發的情況下,這個方法不準確  print(q.get())  # 1  # 2  # 3  # False  # 4  # print(q.get())  # 如果隊列空了,就會等在這裡,等數據過來  print(q.empty())  # True  # print(q.get_nowait())  # 直接報錯 queue.Empty

通過Queue隊列實現進程間通訊(IPC機制)

數據的互通,可實現主進程與子進程之間的互通,子進程與子進程之間的互通 數據只有一份,取完就沒了,無法重複獲取同一份數據

from multiprocessing import Queue, Process      def producer(q):      q.put('hello baby.')      def consumer(q):      print(q.get())      if __name__ == '__main__':      q = Queue()  # 生成一個隊列對象      p1 = Process(target=producer, args=(q,))      c1 = Process(target=consumer, args=(q,))      p1.start()      c1.start()  # 子進程獲取到了另一個子進程的數據      # hello baby.      # print(q.get())  # 主進程獲取到了子進程的數據      # hello baby.

生產者消費者模型

生產者:生產/製造數據的

消費者:消費/處理數據的

例子:做包子的,賣包子的      1.做的包子遠比買包子的多      2.做的包子遠比買包子的少      --> 供需不平衡

用處:解決供需不平衡的問題

以做包子買包子為例實現當包子賣完了停止消費行為

方式一

from multiprocessing import Process, Queue  import time  import random      def producer(name, food, q: Queue):      for i in range(10):          data = f'{name} 生產了 {food}{i}'          time.sleep(random.random())          q.put(data)          print(data)      def consumer(name, q):      while True:          res = q.get()          if not res:  # 已經把生產者做的東西全部吃完了,那麼本消費者也結束食用              break          data = res.split(' ')[2]          data = f'{name} 吃了 {data}'          print(data)          time.sleep(random.random())      if __name__ == '__main__':      q = Queue()      p = Process(target=producer, args=('大廚egon', '饅頭', q))      p2 = Process(target=producer, args=('跟班tank', '生蚝', q))      c = Process(target=consumer, args=('jason', q))      c2 = Process(target=consumer, args=('吃貨kevin', q))      p.start()      p2.start()      c.start()      c2.start()      # 不知道什麼時候生產者什麼時候生成完      p.join()      p2.join()      q.put(None)  # 通過 None來標誌生產者已生產完成      q.put(None)      # 可以實現,但是不好

方式二

改用JoinableQueue模組的隊列守護進程來實現

from multiprocessing import Process, JoinableQueue  import time  import random      def producer(name, food, q: JoinableQueue):      for i in range(10):          data = f'{name} 生產了 {food}{i}'          time.sleep(random.random())          q.put(data)          print(data)      def consumer(name, q):      while True:          res = q.get()          if not res:              break          data = res.split(' ')[2]          data = f'{name} 吃了 {data}'          print(data)          time.sleep(random.random())          q.task_done()  # 告訴隊列,你已經從隊列中取出了一個數據,並且處理完畢了      if __name__ == '__main__':      q = JoinableQueue()      p = Process(target=producer, args=('大廚egon', '饅頭', q))      p2 = Process(target=producer, args=('跟班tank', '生蚝', q))      c = Process(target=consumer, args=('jason', q))      c2 = Process(target=consumer, args=('吃貨kevin', q))      p.start()      p2.start()      c.daemon = True  # 配合join,結束程式消費者也結束(注意join是主進程的最後一句程式碼)      c.start()      c2.daemon = True      c2.start()      # 不知道什麼時候生產者什麼時候生成完      p.join()      p2.join()        q.join()  # 等待隊列中數據全部取出,執行完了這句話,也就意味著隊列中沒有數據了(消費者那裡還是會卡住,get不到東西等待)      # 配合上 守護進程 來實現....

執行緒

什麼是執行緒

進程和執行緒其實都是虛擬單位,都是用來幫助我們形象的描述某種事物

進程:資源單位(一塊獨立的記憶體空間)

執行緒:執行單位

將記憶體比喻成工廠,那麼進程就相當於工廠里的車間,而你的執行緒就相當於是車間裡面的流水線

CPU其實運行的其實是執行緒,進程只是資源單位

執行緒執行時需要的資源單位都跟進程要

ps:每個進程都自帶一個執行緒,執行緒才是真正的執行單位,進程只是在執行緒運行過程中提供程式碼運行所需要的資源 每個進程都會自帶一個執行緒 執行緒沒有主次之分,只不過我們默認就把主進程自帶的那個執行緒叫做主執行緒

為什麼要有執行緒

開進程

  • 申請記憶體空間 —> 耗資源
  • 「拷貝程式碼」 —> 耗資源

開執行緒

  • 一個進程內可以起多個執行緒,並且執行緒與執行緒之間數據是共享的

ps:開啟執行緒的開銷要遠遠小於開啟進程的開銷(可能剛執行完創建執行緒的程式碼執行緒就創建好了)

開啟執行緒的兩種方式

方式一

from threading import Thread  import time      def task(name):      print(f"{name} is running")      time.sleep(3)      print(f"{name} is over")      t = Thread(target=task, args=('egon', ))  # 開執行緒不需要在 __main__ 程式碼塊內,但是習慣性的還是寫在 __main__ 內  t.start()  # 告訴作業系統開啟一個執行緒  # 執行緒的開銷遠遠小於進程,小到以至於可以程式碼執行完,執行緒就已經開啟了  print("主")  # 執行緒沒有主次之分,都在同一個進程的名稱空間里,只是人為把進程自帶的執行緒叫做主執行緒  # egon is running  # 主執行緒  # 進程的時候這個主執行緒可能會是最先列印的  # egon is over

方式二

from threading import Thread  import time      class MyThread(Thread):      def __init__(self, name):          super().__init__()          self.name = name        def run(self):          print(f"{self.name} is running")          time.sleep(1)          print(f"{self.name} is over")      if __name__ == '__main__':      t = MyThread('jason')      t.start()  # 開啟執行緒的速度非常快,幾乎程式碼執行完執行緒就已經開啟      print("主")    # jason is running  # 主  # jason is over

執行緒之間數據共享

from threading import Thread    money = 666      def task():      global money      money = 999      t = Thread(target=task)  t.start()  t.join()  # 確保是執行緒運行結束後  print(money)  # 999  # 主執行緒與子執行緒之間數據是通用的

執行緒間想要實現數據通訊,不需要藉助於隊列(執行緒間支援數據通訊)

執行緒對象的其他屬性和方法

import time  from threading import Thread, active_count, current_thread  import os      def task(name):      print(f"{name} is running {os.getpid()}")      # # ------------------------------------------------      # # current_thread().name current_thread().getname() 當前執行緒名      # #   記得導入模組      # # ------------------------------------------------      # print(f"current_thread().name:{current_thread().name}")      # current_thread().name:Thread-1      time.sleep(1)      print(f"{name} is over")      # t = Thread(target=task, args=('jason', ))  # t.start()  # # ------------------------------------------------  # # os.getpid() os.getppid() 獲取進程號 父進程號  # #   多個執行緒屬於同一進程  # # ------------------------------------------------  # print(f"pid {os.getpid()}")  # # jason is running 5572  # # pid 5572  # # jason is over      t = Thread(target=task, args=('jason', ))  t.start()  # ------------------------------------------------  # active_count()  統計當前存活的執行緒數  #   記得導入模組  # ------------------------------------------------  print(active_count())  print(f"pid {os.getpid()}")  # jason is running 5728  # 2  # pid 5728  print(f"主 current_thread().name:{current_thread().name}")  # 主 current_thread().name:MainThread    t.join()  # 主執行緒等待子執行緒運行結束  # jason is over  print("主 active_count", active_count())  # 可能會有問題,多執行緒是非同步,可能join的執行緒結束了,其他執行緒也正好結束了(多個執行緒時)  # 主 active_count 1  # Thread.join(t)  # 可以考慮用類調用對象方法,傳入對象來在循環里對執行緒對象進行操作

守護執行緒

主執行緒要等待所有非守護執行緒結束後才會結束(不是主執行緒的程式碼執行完了就立馬結束了)

主執行緒結束後,守護(子)執行緒也會立即結束

主執行緒運行結束之後為什麼需要等待子執行緒結束才能結束呢?

主執行緒的結束也就意味著進程的結束 主執行緒必須等待其他非守護執行緒的結束才能結束 因為子執行緒在運行的時候需要使用進程中的資源,而主執行緒一旦結束了,資源也就銷毀了

# from threading import Thread, current_thread  # import time  #  #  # def task(i):  #     print(f"{current_thread().name}")  #     time.sleep(i)  #     print("GG")  #  #  # for i in range(3):  #     t = Thread(target=task, args=(i, ))  #     t.start()  #  #  # print("主")  # # 循環的時候就已經列印了部分數據了(非同步)  # # Thread-1  # # GG  # # Thread-2  # # Thread-3  # # 主  # # GG  # # GG    # 主執行緒運行結束之後為什麼需要等待子執行緒結束才能結束呢?  '''  主執行緒的結束也就意味著進程的結束      主執行緒必須等待其他非守護執行緒的結束才能結束  因為子執行緒在運行的時候需要使用進程中的資源,而主執行緒一旦結束了,資源也就銷毀了  '''  from threading import Thread, current_thread  import time      def task(i):      print(f"{current_thread().name}")      time.sleep(i)      print("GG")      for i in range(3):      t = Thread(target=task, args=(i,))      t.daemon = True      t.start()    print("主")  # Thread-1  # GG  # Thread-2  # Thread-3  # 主

測試

下面程式的執行結果是什麼?

from threading import Thread  import time      def foo():      print(123)      time.sleep(1)      print("end123")      def bar():      print(456)      time.sleep(3)      print("end456")      t1 = Thread(target=foo)  t2 = Thread(target=bar)    t1.daemon = True  t1.start()  t2.start()  print("main-------")      # 123  # 456  # main-------  # end123  # end456

執行緒互斥鎖

從執行緒間通訊那裡的案例可以看出,執行緒間數據是相通的,那麼多個執行緒對同一份數據進行操作會產生問題

下面同樣模擬一個網路延遲來對數據進行操作(確保所有執行緒都執行完的操作可以記一下)

不加鎖遇到延遲的情況

# 模擬網路延遲的現象  #   多個執行緒操作同一個數據,也會造成數據不安全  import time  from threading import Thread    n = 10      def task():      global n      tmp = n      time.sleep(1)      n = tmp - 1      # -------------------------------  t_list = []  for i in range(10):      t = Thread(target=task)      t.start()      t_list.append(t)    # 確保其他執行緒都執行完了之後再列印  for t in t_list:      t.join()  # -------------------------------    print(n)  # 9

加鎖後遇到延遲

# 加鎖解決問題  import time  from threading import Thread, Lock    n = 10      def task(mutex):      mutex.acquire()  # 搶鎖      global n      tmp = n      time.sleep(1)      n = tmp - 1      mutex.release()  # 釋放鎖      t_list = []  mutex = Lock()  for i in range(10):      t = Thread(target=task, args=(mutex, ))      t.start()      t_list.append(t)    # 確保其他執行緒都執行完了之後再列印  for t in t_list:      t.join()  print(n)  # 0  # 等10s多點 後列印出結果,數據未受延遲影響,保證了數據安全

為什麼用互斥鎖不用 執行緒/進程對象.join()

雖然互斥鎖也是將並發改成串列,犧牲效率來保證數據安全,這一點執行緒對象.join()也可以實現將並發改成串列,同樣保證數據安全,但執行緒對象.join()是將每一個執行緒的運行都變成串列的,對比互斥鎖的只將數據操作部分編程串列消耗的時間要多得多,若果執行緒耗時長,執行效率就會低的可怕

# # 不加鎖:未加鎖部分並發執行,加鎖部分串列執行,速度慢,數據安全  # from threading import current_thread, Thread, Lock  # import os  # import time  #  #  # def task():  #     # 未加鎖的程式碼並發運行  #     time.sleep(3)  #     print('%s start to run' % current_thread().getName())  #     global n  #     # 加鎖的程式碼串列運行  #     lock.acquire()  #     temp = n  #     time.sleep(0.5)  #     n = temp - 1  #     lock.release()  #  #  # if __name__ == '__main__':  #     n = 100  #     lock = Lock()  #     threads = []  #     start_time = time.time()  #     for i in range(100):  #         t = Thread(target=task)  #         threads.append(t)  #         t.start()  #     for t in threads:  #         t.join()  #     stop_time = time.time()  #     print('主:%s n:%s' % (stop_time - start_time, n))  #  # '''  # Thread-3 start to run  # Thread-1 start to run  # ......  # Thread-100 start to run  # Thread-96 start to run  # 主:53.06105661392212 n:0  # '''    # 利用 join 保證數據安全  from threading import current_thread, Thread, Lock  import os  import time      def task():      time.sleep(3)      print('%s start to run' % current_thread().getName())      global n      temp = n      time.sleep(0.5)      n = temp - 1      if __name__ == '__main__':      n = 100      lock = Lock()      start_time = time.time()      for i in range(100):          t = Thread(target=task)          t.start()          t.join()      stop_time = time.time()      print('主:%s n:%s' % (stop_time - start_time, n))    '''  Thread-1 start to run  Thread-2 start to run  ......  Thread-100 start to run  主:350.1616487503052 n:0 # 耗時是多麼的恐怖  '''

執行緒和進程的用戶大同小異,可以對比著來記

後續可以畫圖或表格用對比的方式來整理一下,方便記憶~