python並發編程-多執行緒實現服務端並發-GIL全局解釋器鎖-驗證python多執行緒是否有用-死鎖-遞歸鎖-訊號量-Event事件-執行緒結合隊列-03

  • 2019 年 10 月 7 日
  • 筆記

目錄

結合多執行緒實現服務端並發(不用socketserver模組)

socketserver自帶多執行緒

服務端程式碼

import socket  from threading import Thread    '''  服務端      1.固定的ip和埠      2.24小時不間斷提供服務      3.支援並發  '''    server = socket.socket()  server.bind(('127.0.0.1', 8080))  server.listen(5)      def talk(conn):      while True:          # 模擬不停交互          try:              data = conn.recv(1024)              if len(data) == 0:                  break              print(data.decode('utf-8'))                conn.send(b'Hi')            except ConnectionResetError as e:              print(e)              break      conn.close()      # 鏈接循環  while True:      conn, addr = server.accept()      print(addr)      t = Thread(target=talk, args=(conn, ))      t.start()      # 通訊循環  # 提取這塊程式碼,封裝起來      # while True:      #     try:      #         data = conn.recv(1024)      #         if len(data) == 0:      #             break      #     except ConnectionResetError as e:      #         print(e)      #         break      # conn.close()

客戶端程式碼

import socket    client = socket.socket()  client.connect(('127.0.0.1', 8080))    while True:      # 模擬不停交互      client.send(b'hello')        data = client.recv(1024)      print(data.decode('utf-8'))

CIL全局解釋器鎖******

'''  In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython』s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)  '''    ps:python解釋器有很多種  最常見的就是Cpython解釋器  """  GIL本質也是一把互斥鎖:將並發變成串列,犧牲效率保證數據的安全  用來阻止同一個進程下的多個執行緒的同時執行(同一個進程內多個執行緒無法實現並行但是可以實現並發)      python的多執行緒無法並行就無法利用多核優勢  是不是就是沒有用了?    GIL的存在是因為CPython解釋器的記憶體管理不是執行緒安全的    垃圾回收機制本質也是一個執行緒,進程間是不同的記憶體空間,執行緒間數據共享  """

每一個進程都有一個python解釋器,都有一個垃圾回收機制的執行緒 如果沒有GIL,允許多執行緒同時運行 執行緒1 執行到 a = 1,剛申請一塊記憶體空間,把1 放進去,正要與a 綁定關係之前突然垃圾回收機制掃描到這個1 沒有引用,順手就給清除掉了,那麼這個執行緒就直接報錯了

可能被問到的兩個判斷

1. GIL是python的特點嗎?

不是,它只是CPython解釋器的特點

2. 單進程下多個執行緒無法利用多核優勢是所有解釋型語言的通病

正確,如果解釋型語言能夠利用多核優勢,並行地執行程式碼,就會出現垃圾回收機制干擾執行緒數據的情況,CPython中就採用了CIL全局解釋器鎖來解決這一問題,犧牲多核優勢保證執行緒安全

解釋型語言都需要先解釋再執行,在CPython中是用GIL全局解釋器鎖

與普通互斥鎖的區別

程式碼遇到I/O操作就將GIL全局解釋器鎖給釋放了,保證執行緒安全但不能保證數據安全 GIL是專門保護執行緒安全的,要想保護數據安全需要單獨為數據處理加鎖(普通互斥鎖通常都是這樣的) 針對不同的數據操作應該加不同的鎖去處理

驗證GIL與普通互斥鎖的區別

import time  from threading import Thread    n = 100      def task():      global n      tmp = n      time.sleep(1)  # IO ,遇到IO 就把GIL鎖釋放,給別的執行緒搶      n = tmp - 1      t_list = []  for i in range(100):      t = Thread(target=task)      t.start()      t_list.append(t)    for i in t_list:      i.join()    print(n)  # 99  # 寫上 time.sleep(1) 時  # 0

驗證python的多執行緒是否有用需要分情況討論

進程可以充分利用CPU(多核時體現),但消耗資源較(執行緒)大 執行緒較(進程)節省記憶體資源,但無法充分發揮多核CPU優勢

計算密集型任務

計算操作很依靠CPU

單核情況下

​ 開執行緒更省資源

多核情況下

​ 開進程更省時間

from multiprocessing import Process  from threading import Thread  import os  import time      def work():      res = 0      for i in range(100000000):          res *= i      if __name__ == '__main__':      l = []      print(os.cpu_count())      # 4  # 4核CPU,我的CPU比較菜      start = time.time()      for i in range(6):          p = Process(target=work)  # 多個進程同時運算          # p = Thread(target=work)  # 執行緒排隊切換(並發)執行運算          l.append(p)          p.start()      for p in l:          p.join()      stop = time.time()      print('run time is %s' % (stop - start))      # run time is 21.93324899673462  # p = Process(target=work) 多進程時      # run time is 35.11313056945801  # p = Thread(target=work) 多執行緒時

IO密集型任務

IO操作不太依靠CPU(IO操作會讓CPU空閑,程式進入阻塞態)

單核情況下

​ 開執行緒更省資源

多核情況下

​ 開執行緒更省資源(基本用不到多少CPU)

from multiprocessing import Process  from threading import Thread  import os  import time      def work():      time.sleep(2)      if __name__ == '__main__':      l = []      print(os.cpu_count())      # 4      start = time.time()      for i in range(400):          # p = Process(target=work)  # 多進程,大部分時間耗費在創建進程上          p = Thread(target=work)  # 多執行緒          l.append(p)          p.start()      for p in l:          p.join()      stop = time.time()      print('run time is %s' % (stop - start))      # run time is 22.937195301055908  # p = Process(target=work) 多進程時      # run time is 2.0452797412872314  # p = Thread(target=work) 多執行緒時

小結論

python的多執行緒到底有沒有用,需要看情況而定,並且肯定是有用的(GIL全局解釋器鎖限制了python的多執行緒不能並行)

絕大數情況下還是多進程+多執行緒配合使用

偽程式碼:編造程式碼實現效果演示一下

死鎖與遞歸鎖

死鎖

雙方接下來要的鎖都在對方手上,並且都不肯釋放鎖,就都在等待鎖被釋放再搶

import time  from threading import Thread, Lock    mutexA = Lock()  mutexB = Lock()  """  只要類加括弧實例化對象  無論傳入的參數是否一樣,生成的對象肯定不一樣  (單例模式除外)  """      class MyThread(Thread):        def run(self):  # 創建執行緒自動觸發run 方法,  run方法內調用 func1 func2 相當於也是自動觸發          self.func1()          self.func2()        def func1(self):          mutexA.acquire()          print(f"{self.name}搶到了A鎖")  # 執行也是要點時間的(雖然超級超級短)          mutexB.acquire()          print(f"{self.name}搶到了B鎖")          mutexB.release()          print(f"{self.name}釋放了B鎖")          mutexA.release()          print(f"{self.name}釋放了A鎖")        def func2(self):          mutexB.acquire()          print(f"{self.name}搶到了B鎖")          time.sleep(1)          mutexA.acquire()          print(f"{self.name}搶到了A鎖")          mutexA.release()          print(f"{self.name}釋放了A鎖")          mutexB.release()          print(f"{self.name}釋放了B鎖")      for i in range(10):      t = MyThread()      t.start()    # Thread-1搶到了A鎖  # Thread-1搶到了B鎖  # Thread-1釋放了B鎖  # Thread-1釋放了A鎖  # Thread-1搶到了B鎖  # Thread-2搶到了A鎖  # 程式卡住.....    '''  結果原因分析:      搶到A鎖,再搶B鎖沒人搶,再釋放B鎖也沒人搶,釋放A鎖執行func2      大家都去搶A鎖,我搶B鎖,搶到了休息一秒,別人還在接著往下搶鎖,搶到B鎖,去搶A鎖,我還在休息(執行程式碼超級快)      等我休息好了要搶B鎖,而B鎖別人拿著,別人又要搶完了A鎖才會釋放B鎖,而我要搶到了B鎖才會釋放A鎖,所以大家就都這樣僵著了...(程式就卡這兒了)  '''

自己千萬不要輕易的處理鎖的問題(一般也不會涉及到)

遞歸鎖 RLock

遞歸鎖機制: RLock 可以被第一個搶到鎖的人連續acquire和release多次 ​ 每acquire一次,鎖身上的計數加 ​ 每release一次,鎖身上的計數減1 ​ 只要鎖的計數不為0,其他人都不能搶

from threading import Thread, RLock  import time    mutexA = mutexB = RLock()  # mutexA 和 mutexB 是同一把鎖(不想改下面的程式碼)  """  只要類加括弧實例化對象  無論傳入的參數是否一樣,生成的對象肯定不一樣  (單例模式除外)  """      class MyThread(Thread):        def run(self):  # 創建執行緒自動觸發run 方法,  run方法內調用 func1 func2 相當於也是自動觸發          self.func1()          self.func2()        def func1(self):          mutexA.acquire()          print(f"{self.name}搶到了A鎖")  # 執行也是要點時間的(雖然超級超級短)          mutexB.acquire()          print(f"{self.name}搶到了B鎖")          mutexB.release()          print(f"{self.name}釋放了B鎖")          mutexA.release()          print(f"{self.name}釋放了A鎖")        def func2(self):          mutexB.acquire()          print(f"{self.name}搶到了B鎖")          time.sleep(1)          mutexA.acquire()          print(f"{self.name}搶到了A鎖")          mutexA.release()          print(f"{self.name}釋放了A鎖")          mutexB.release()          print(f"{self.name}釋放了B鎖")      for i in range(10):      t = MyThread()      t.start()    # Thread-1搶到了A鎖  # Thread-1搶到了B鎖  # Thread-1釋放了B鎖  # Thread-1釋放了A鎖  # Thread-1搶到了B鎖  # ....省略大量列印結果.....  # Thread-1釋放了B鎖  # Thread-9搶到了B鎖  # Thread-9搶到了A鎖  # Thread-9釋放了A鎖  # Thread-9釋放了B鎖  # ---> 誰搶到了下面一大段都是誰在操作

訊號量 Semaphore

這裡的訊號量不是通用概念,在不同的地方有不同的意義,對應不同的知識點

比喻

互斥鎖–> 廁所(一把鎖)

訊號量–> 公共廁所(多把鎖)

import random  import time  from threading import Thread, Semaphore      semaphore = Semaphore(2)  # 造了一個含有五個坑位的公共廁所      def task(name):      semaphore.acquire()      print(f"{name}佔了一個坑位")      time.sleep(random.randint(1, 3))      semaphore.release()      print(f"{name}拉完了")      for i in range(5):      t = Thread(target=task, args=(i,))      t.start()    # 0佔了一個坑位  # 1佔了一個坑位  # 1拉完了  # 2佔了一個坑位  # 0拉完了  # 3佔了一個坑位  # 3拉完了  # 4佔了一個坑位  # 2拉完了  # 4拉完了

Event事件

可利用event實現子執行緒等待某個子執行緒的結束再接著執行

import time  from threading import Thread, Event    event = Event()      def light():      print("紅燈正亮著...")      time.sleep(2)      # --------------------------------------      # event.set() 發出訊號      #   同一 Event對象.wait()處將收到訊號      #       不再等待,接著往下執行      # --------------------------------------      event.set()      # 測試GIL全局解釋器鎖 start      # a = 1 + 6 * 4 * 4 / 12 * 1*151*158*235*122*21*45/121  # CPU運算不會釋放GIL鎖      # msg = input('>>>:').strip()  # I/O 操作會釋放GIL鎖      # 測試GIL全局解釋器鎖 end      print("綠燈亮了")      def car(name):      print(f"{name} 正在等紅燈...")      # --------------------------------------      # event.wait() 等待訊號      #   未收到訊號就在這裡等待訊號      #       類似隊列的 .get() .put()等待      # --------------------------------------      event.wait()      print(f"{name}加油門飆車了...")      _light = Thread(target=light)  _light.start()  for i in range(5):      t = Thread(target=car, args=(f'car{i}',))      t.start()    # 紅燈正亮著...  # car0 正在等紅燈...  # car1 正在等紅燈...  # car2 正在等紅燈...  # car3 正在等紅燈...  # car4 正在等紅燈...  # 綠燈亮了  # car0加油門飆車了...  # car2加油門飆車了...  # car4加油門飆車了...  # car1加油門飆車了...  # car3加油門飆車了...      # # 測試GIL全局解釋器鎖返回結果  # 紅燈正亮著...  # car0 正在等紅燈...  # car1 正在等紅燈...  # car2 正在等紅燈...  # car3 正在等紅燈...  # car4 正在等紅燈...  # >>>:car1加油門飆車了...  # I/O 操作釋放了全局解釋器鎖,其他地方搶到了,就執行,等你輸入了進入就緒態,搶鎖  # car2加油門飆車了...  # car3加油門飆車了...  # car0加油門飆車了...  # car4加油門飆車了...  # 151  # 手動輸入的值,然後才列印下一行  # 綠燈亮了

執行緒結合隊列

疑問:同一個進程下的多個執行緒本來就是數據共享的,為什麼還要用隊列? 因為隊列是管道+鎖,使用隊列就不需要自己手動操作鎖的問題,如果鎖操作不當極容易產生死鎖現象

三種隊列 Queue LifoQueue PriorityQueue 基本操作

from threading import Thread  import queue    q = queue.Queue()  q.put(1)  print(q.get())  # 1      q = queue.LifoQueue()  # Last in First Out  q.put(1)  q.put(2)  q.put(3)  print(q.get())  print(q.get())  print(q.get())  # 3  # 2  # 1      q = queue.PriorityQueue()  # 優先順序 Q  # 因為重名了,點put進去,看到的是Queue的方法  q.put((10, 'haha'))  # (priority number, data)  是一個元組,第一個是優先順序數字(數據越小,優先順序越高),第二是數據  q.put((100, 'hhe'))  q.put((0, 'hihi'))  q.put((-10, 'yyy'))  print(q.get())  print(q.get())  print(q.get())  print(q.get())  # (-10, 'yyy')  # (0, 'hihi')  # (10, 'haha')  # (100, 'hhe')