python並發編程-多執行緒實現服務端並發-GIL全局解釋器鎖-驗證python多執行緒是否有用-死鎖-遞歸鎖-訊號量-Event事件-執行緒結合隊列-03
- 2019 年 10 月 7 日
- 筆記
目錄
- 結合多執行緒實現服務端並發(不用socketserver模組)
- CIL全局解釋器鎖
******
- 驗證python的多執行緒是否有用需要分情況討論
- 死鎖與遞歸鎖
- 訊號量 Semaphore
- Event事件
- 執行緒結合隊列
結合多執行緒實現服務端並發(不用socketserver模組)
socketserver自帶多執行緒
服務端程式碼
import socket from threading import Thread ''' 服務端 1.固定的ip和埠 2.24小時不間斷提供服務 3.支援並發 ''' server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) def talk(conn): while True: # 模擬不停交互 try: data = conn.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) conn.send(b'Hi') except ConnectionResetError as e: print(e) break conn.close() # 鏈接循環 while True: conn, addr = server.accept() print(addr) t = Thread(target=talk, args=(conn, )) t.start() # 通訊循環 # 提取這塊程式碼,封裝起來 # while True: # try: # data = conn.recv(1024) # if len(data) == 0: # break # except ConnectionResetError as e: # print(e) # break # conn.close()
客戶端程式碼
import socket client = socket.socket() client.connect(('127.0.0.1', 8080)) while True: # 模擬不停交互 client.send(b'hello') data = client.recv(1024) print(data.decode('utf-8'))
CIL全局解釋器鎖******
''' In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython』s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) ''' ps:python解釋器有很多種 最常見的就是Cpython解釋器 """ GIL本質也是一把互斥鎖:將並發變成串列,犧牲效率保證數據的安全 用來阻止同一個進程下的多個執行緒的同時執行(同一個進程內多個執行緒無法實現並行但是可以實現並發) python的多執行緒無法並行就無法利用多核優勢 是不是就是沒有用了? GIL的存在是因為CPython解釋器的記憶體管理不是執行緒安全的 垃圾回收機制本質也是一個執行緒,進程間是不同的記憶體空間,執行緒間數據共享 """
每一個進程都有一個python解釋器,都有一個垃圾回收機制的執行緒 如果沒有GIL,允許多執行緒同時運行 執行緒1 執行到 a = 1,剛申請一塊記憶體空間,把1 放進去,正要與a 綁定關係之前突然垃圾回收機制掃描到這個1 沒有引用,順手就給清除掉了,那麼這個執行緒就直接報錯了

可能被問到的兩個判斷
1. GIL是python的特點嗎?
不是,它只是CPython解釋器的特點
2. 單進程下多個執行緒無法利用多核優勢是所有解釋型語言的通病
正確,如果解釋型語言能夠利用多核優勢,並行地執行程式碼,就會出現垃圾回收機制干擾執行緒數據的情況,CPython中就採用了CIL全局解釋器鎖來解決這一問題,犧牲多核優勢保證執行緒安全
解釋型語言都需要先解釋再執行,在CPython中是用GIL全局解釋器鎖
與普通互斥鎖的區別
程式碼遇到I/O操作就將GIL全局解釋器鎖給釋放了,保證執行緒安全但不能保證數據安全 GIL是專門保護執行緒安全的,要想保護數據安全需要單獨為數據處理加鎖(普通互斥鎖通常都是這樣的) 針對不同的數據操作應該加不同的鎖去處理
驗證GIL與普通互斥鎖的區別
import time from threading import Thread n = 100 def task(): global n tmp = n time.sleep(1) # IO ,遇到IO 就把GIL鎖釋放,給別的執行緒搶 n = tmp - 1 t_list = [] for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for i in t_list: i.join() print(n) # 99 # 寫上 time.sleep(1) 時 # 0
驗證python的多執行緒是否有用需要分情況討論
進程可以充分利用CPU(多核時體現),但消耗資源較(執行緒)大 執行緒較(進程)節省記憶體資源,但無法充分發揮多核CPU優勢
計算密集型任務
計算操作很依靠CPU
單核情況下
開執行緒更省資源
多核情況下
開進程更省時間
from multiprocessing import Process from threading import Thread import os import time def work(): res = 0 for i in range(100000000): res *= i if __name__ == '__main__': l = [] print(os.cpu_count()) # 4 # 4核CPU,我的CPU比較菜 start = time.time() for i in range(6): p = Process(target=work) # 多個進程同時運算 # p = Thread(target=work) # 執行緒排隊切換(並發)執行運算 l.append(p) p.start() for p in l: p.join() stop = time.time() print('run time is %s' % (stop - start)) # run time is 21.93324899673462 # p = Process(target=work) 多進程時 # run time is 35.11313056945801 # p = Thread(target=work) 多執行緒時
IO密集型任務
IO操作不太依靠CPU(IO操作會讓CPU空閑,程式進入阻塞態)
單核情況下
開執行緒更省資源
多核情況下
開執行緒更省資源(基本用不到多少CPU)
from multiprocessing import Process from threading import Thread import os import time def work(): time.sleep(2) if __name__ == '__main__': l = [] print(os.cpu_count()) # 4 start = time.time() for i in range(400): # p = Process(target=work) # 多進程,大部分時間耗費在創建進程上 p = Thread(target=work) # 多執行緒 l.append(p) p.start() for p in l: p.join() stop = time.time() print('run time is %s' % (stop - start)) # run time is 22.937195301055908 # p = Process(target=work) 多進程時 # run time is 2.0452797412872314 # p = Thread(target=work) 多執行緒時
小結論
python的多執行緒到底有沒有用,需要看情況而定,並且肯定是有用的(GIL全局解釋器鎖限制了python的多執行緒不能並行)
絕大數情況下還是多進程+多執行緒配合使用的
偽程式碼:編造程式碼實現效果演示一下
死鎖與遞歸鎖
死鎖
雙方接下來要的鎖都在對方手上,並且都不肯釋放鎖,就都在等待鎖被釋放再搶
import time from threading import Thread, Lock mutexA = Lock() mutexB = Lock() """ 只要類加括弧實例化對象 無論傳入的參數是否一樣,生成的對象肯定不一樣 (單例模式除外) """ class MyThread(Thread): def run(self): # 創建執行緒自動觸發run 方法, run方法內調用 func1 func2 相當於也是自動觸發 self.func1() self.func2() def func1(self): mutexA.acquire() print(f"{self.name}搶到了A鎖") # 執行也是要點時間的(雖然超級超級短) mutexB.acquire() print(f"{self.name}搶到了B鎖") mutexB.release() print(f"{self.name}釋放了B鎖") mutexA.release() print(f"{self.name}釋放了A鎖") def func2(self): mutexB.acquire() print(f"{self.name}搶到了B鎖") time.sleep(1) mutexA.acquire() print(f"{self.name}搶到了A鎖") mutexA.release() print(f"{self.name}釋放了A鎖") mutexB.release() print(f"{self.name}釋放了B鎖") for i in range(10): t = MyThread() t.start() # Thread-1搶到了A鎖 # Thread-1搶到了B鎖 # Thread-1釋放了B鎖 # Thread-1釋放了A鎖 # Thread-1搶到了B鎖 # Thread-2搶到了A鎖 # 程式卡住..... ''' 結果原因分析: 搶到A鎖,再搶B鎖沒人搶,再釋放B鎖也沒人搶,釋放A鎖執行func2 大家都去搶A鎖,我搶B鎖,搶到了休息一秒,別人還在接著往下搶鎖,搶到B鎖,去搶A鎖,我還在休息(執行程式碼超級快) 等我休息好了要搶B鎖,而B鎖別人拿著,別人又要搶完了A鎖才會釋放B鎖,而我要搶到了B鎖才會釋放A鎖,所以大家就都這樣僵著了...(程式就卡這兒了) '''
自己千萬不要輕易的處理鎖的問題(一般也不會涉及到)
遞歸鎖 RLock
遞歸鎖機制: RLock 可以被第一個搶到鎖的人連續acquire和release多次 每acquire一次,鎖身上的計數加 每release一次,鎖身上的計數減1 只要鎖的計數不為0,其他人都不能搶
from threading import Thread, RLock import time mutexA = mutexB = RLock() # mutexA 和 mutexB 是同一把鎖(不想改下面的程式碼) """ 只要類加括弧實例化對象 無論傳入的參數是否一樣,生成的對象肯定不一樣 (單例模式除外) """ class MyThread(Thread): def run(self): # 創建執行緒自動觸發run 方法, run方法內調用 func1 func2 相當於也是自動觸發 self.func1() self.func2() def func1(self): mutexA.acquire() print(f"{self.name}搶到了A鎖") # 執行也是要點時間的(雖然超級超級短) mutexB.acquire() print(f"{self.name}搶到了B鎖") mutexB.release() print(f"{self.name}釋放了B鎖") mutexA.release() print(f"{self.name}釋放了A鎖") def func2(self): mutexB.acquire() print(f"{self.name}搶到了B鎖") time.sleep(1) mutexA.acquire() print(f"{self.name}搶到了A鎖") mutexA.release() print(f"{self.name}釋放了A鎖") mutexB.release() print(f"{self.name}釋放了B鎖") for i in range(10): t = MyThread() t.start() # Thread-1搶到了A鎖 # Thread-1搶到了B鎖 # Thread-1釋放了B鎖 # Thread-1釋放了A鎖 # Thread-1搶到了B鎖 # ....省略大量列印結果..... # Thread-1釋放了B鎖 # Thread-9搶到了B鎖 # Thread-9搶到了A鎖 # Thread-9釋放了A鎖 # Thread-9釋放了B鎖 # ---> 誰搶到了下面一大段都是誰在操作
訊號量 Semaphore
這裡的訊號量不是通用概念,在不同的地方有不同的意義,對應不同的知識點
比喻
互斥鎖–> 廁所(一把鎖)
訊號量–> 公共廁所(多把鎖)
import random import time from threading import Thread, Semaphore semaphore = Semaphore(2) # 造了一個含有五個坑位的公共廁所 def task(name): semaphore.acquire() print(f"{name}佔了一個坑位") time.sleep(random.randint(1, 3)) semaphore.release() print(f"{name}拉完了") for i in range(5): t = Thread(target=task, args=(i,)) t.start() # 0佔了一個坑位 # 1佔了一個坑位 # 1拉完了 # 2佔了一個坑位 # 0拉完了 # 3佔了一個坑位 # 3拉完了 # 4佔了一個坑位 # 2拉完了 # 4拉完了
Event事件
可利用event
實現子執行緒等待某個子執行緒的結束再接著執行
import time from threading import Thread, Event event = Event() def light(): print("紅燈正亮著...") time.sleep(2) # -------------------------------------- # event.set() 發出訊號 # 同一 Event對象.wait()處將收到訊號 # 不再等待,接著往下執行 # -------------------------------------- event.set() # 測試GIL全局解釋器鎖 start # a = 1 + 6 * 4 * 4 / 12 * 1*151*158*235*122*21*45/121 # CPU運算不會釋放GIL鎖 # msg = input('>>>:').strip() # I/O 操作會釋放GIL鎖 # 測試GIL全局解釋器鎖 end print("綠燈亮了") def car(name): print(f"{name} 正在等紅燈...") # -------------------------------------- # event.wait() 等待訊號 # 未收到訊號就在這裡等待訊號 # 類似隊列的 .get() .put()等待 # -------------------------------------- event.wait() print(f"{name}加油門飆車了...") _light = Thread(target=light) _light.start() for i in range(5): t = Thread(target=car, args=(f'car{i}',)) t.start() # 紅燈正亮著... # car0 正在等紅燈... # car1 正在等紅燈... # car2 正在等紅燈... # car3 正在等紅燈... # car4 正在等紅燈... # 綠燈亮了 # car0加油門飆車了... # car2加油門飆車了... # car4加油門飆車了... # car1加油門飆車了... # car3加油門飆車了... # # 測試GIL全局解釋器鎖返回結果 # 紅燈正亮著... # car0 正在等紅燈... # car1 正在等紅燈... # car2 正在等紅燈... # car3 正在等紅燈... # car4 正在等紅燈... # >>>:car1加油門飆車了... # I/O 操作釋放了全局解釋器鎖,其他地方搶到了,就執行,等你輸入了進入就緒態,搶鎖 # car2加油門飆車了... # car3加油門飆車了... # car0加油門飆車了... # car4加油門飆車了... # 151 # 手動輸入的值,然後才列印下一行 # 綠燈亮了
執行緒結合隊列
疑問:同一個進程下的多個執行緒本來就是數據共享的,為什麼還要用隊列? 因為隊列是管道+鎖,使用隊列就不需要自己手動操作鎖的問題,如果鎖操作不當極容易產生死鎖現象
三種隊列 Queue LifoQueue PriorityQueue 基本操作
from threading import Thread import queue q = queue.Queue() q.put(1) print(q.get()) # 1 q = queue.LifoQueue() # Last in First Out q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) # 3 # 2 # 1 q = queue.PriorityQueue() # 優先順序 Q # 因為重名了,點put進去,看到的是Queue的方法 q.put((10, 'haha')) # (priority number, data) 是一個元組,第一個是優先順序數字(數據越小,優先順序越高),第二是數據 q.put((100, 'hhe')) q.put((0, 'hihi')) q.put((-10, 'yyy')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # (-10, 'yyy') # (0, 'hihi') # (10, 'haha') # (100, 'hhe')