隊列、進程互斥鎖、執行緒
- 2019 年 12 月 16 日
- 筆記
1.進程的並行和並發
並行: 並行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個執行緒,四核的CPU )
並發: 並發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提高效率。
2.並行和並發的區別
並行是從微觀上,也就是在一個精確的時間片刻,有不同的程式在執行,這就要求必須有多個處理器。
並發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個伺服器同時處理多個session。
3.進程互斥鎖
作用:讓加鎖的部分由並發變成串列,犧牲了執行效率,保證了數據安全。
應用:在程式使用同一份數據時,就會引發數據安全和數據混亂等問題,需要使用鎖來維持數據的順序取用。
下面的小程式模擬搶票軟體,對票數進行修改
#查看余票 import json import time from multiprocessing import Process from multiprocessing import Lock #查看余票 def search(user): #打開data文件查看余票 with open('data.txt','r',encoding='utf-8') as f: dic = json.load(f) print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}') #搶票功能 def buy(user): with open('data.txt','r',encoding='utf-8') as f : dic = json.load(f) if dic.get("ticket_num")>0: dic["ticket_num"] -= 1 with open('data.txt','w',encoding='utf-8') as f: json.dump(dic,f) print(f'用戶{user}搶票成功!') else: print(f'用戶{user}搶票失敗') #開始搶票 def run(user,mutex): search(user) mutex.acquire() buy(user) mutex.release() if __name__ == '__main__': #調用Lock類實例化一個所對象 mutex = Lock() # mutex.acquire()#加鎖 # mutex.release()#釋放鎖 for i in range(10): #並發十個子進程 p = Process(target=run,args=(f'{i}',mutex)) p.start() 用戶1查看余票,還剩6 用戶1搶票成功! 用戶0查看余票,還剩5 用戶0搶票成功! 用戶2查看余票,還剩4 用戶2搶票成功! 用戶3查看余票,還剩3 用戶3搶票成功! 用戶4查看余票,還剩2 用戶4搶票成功! 用戶6查看余票,還剩1 用戶6搶票成功! 用戶5查看余票,還剩0 用戶5搶票失敗 用戶7查看余票,還剩0 用戶7搶票失敗 用戶9查看余票,還剩0 用戶9搶票失敗 用戶8查看余票,還剩0 用戶8搶票失敗 #這裡如果不使用互斥鎖就會導致票數和搶到的人數不符。
4.隊列
原則:先進先出(堆棧,先進後出)
相當於記憶體中產生一個隊列空間,可以存放多個數據,但是數據是先進去的先被取出來。
4.1multiprocess.Queue介紹
Queue是多進程的列隊,可以實現多進程間的數據傳遞。
Queue([maxsize])
:創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支援執行緒以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
:返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
:如果調用此方法時 q為空,返回True。如果其他進程或執行緒正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
:如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()
方法)。
其他方法(了解)
q.close()
:關閉隊列,防止隊列中加入更多數據。調用此方法時,後台執行緒將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束訊號或異常。例如,如果某個使用者正被阻塞在get()
操作上,關閉生產者中的隊列不會導致get()
方法返回錯誤。
q.cancel_join_thread()
:不會再進程退出時自動連接後台執行緒。這可以防止join_thread()
方法阻塞。
q.join_thread()
:連接隊列的後台執行緒。此方法用於在調用q.close()
方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()
方法可以禁止這種行為。
4.2 IPC進程間通訊實例1
from multiprocessing import Process,Queue def test1(q): data = '數據hello' q.put(data)#向隊列中添加數據,如果列隊已經填滿則會卡在這裡不會往下執行,直到列隊空出位置讓其把數據放進去 print('進程1開始添加數據到列隊中。。') def test2(q): data = q.get()#從隊列中取出數據,如果列隊中已經沒有數據給它,也會卡住 #q.get_nowait()#如果獲取不到數據就報錯 print(f'進程2從隊列中獲取數據{data}') #q.empty()#判斷列隊是否為空,返回bool值 #q.full()#判斷列隊是否滿了,返回bool值 if __name__ == '__main__': q = Queue(2)#括弧內填隊列中可以存放元素的個數,不填默認為無限大 p1 = Process(target=test1,args=(q,)) p2 = Process(target=test2,args=(q,)) p1.start() p2.start() p2.join() print('主程式')
4.3 ICP通訊實例2:生產者與消費者模型
生產者:生產數據的
消費者:使用數據的
在程式中,生產者把數據添加到隊列中,消費者從隊列中獲取數據。
from multiprocessing import Queue,Process import time def producer(name,food,q): for i in range(9): data = food,i msg = f'用戶{name}開始製作{data}' print(msg) q.put(data) time.sleep(0.1)#由於cup執行速度太快,這裡加個延時,讓兩個消費者都能搶到CPU的使用權 def consumer(name,q): while True: data = q.get() if not data: break print(f'用戶{name}開始吃{data}') if __name__ == '__main__': q = Queue() #創造生產者 p1 = Process(target=producer,args=('tank','油條',q)) p2 = Process(target=producer,args=('小明','饅頭',q)) #消費者 c1 = Process(target=consumer,args=('tom',q)) c2 = Process(target=consumer,args=('juery',q)) p1.start() p2.start() c1.daemon = True c2.daemon = True#為消費者添加守護進程,主程式完成就結束掉 c1.start() c2.start() p2.join()#這裡的目的是當生產者p2等消費者吃完再結束,給主程式加延時也能達到同樣的效果 #time.sleep(2) print('主程式')
5.執行緒
5.1什麼是執行緒?
執行緒(thread)是作業系統能夠進行運算調度的最小單位。它包含在進程之中,是進程的實際運行單位。一條執行緒指的是進程中一個單一順序控制流,一個進程可以並發多個執行緒,每條執行緒並發執行不同的任務。同一進程中的多條執行緒將共享該進程中的全部系統資源,如虛擬地址空間,文件描述和訊號處理等等。但是同一進程中的多個執行緒有各自的調用棧,自己的暫存器環境,自己的進程本地存儲。
在多核或多CPU,或支援Hyper-threading的CPU上使用多執行緒程式設計的好處是顯而易見,即提高了程式的執行吞吐率。在單CPU單核的電腦上,使用多執行緒技術,也可以把進程中負責I/O處理、人機交互而常被阻塞的部分與密集計算的部分分開來執行,編寫專門的workhorse執行緒執行密集計算,從而提高了程式的執行效率,進一步提高系統的並發性。
進程與執行緒的區別:
進程是系統進行資源分配和調度的基本單位,執行緒是是作業系統能夠進行運算調度的最小單位。執行緒包含在進程之中,是進程的實際運行單位。
為什麼要使用執行緒?
執行緒進一步提高了CPU的使用效率。
注意:執行緒不能實現並行,只能實現並發。
注意:進程是資源分配的最小單位,執行緒是CPU調度的最小單位。每一個進程中至少有一個執行緒。
5.2 使用執行緒的實際場景

開啟一個字處理軟體進程,該進程肯定需要辦不止一件事情,比如監聽鍵盤輸入,處理文字,定時自動將文字保存到硬碟,這三個任務操作的都是同一塊數據,因而不能用多進程。只能在一個進程里並發地開啟三個執行緒,如果是單執行緒,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。
5.3 記憶體中的執行緒

多個執行緒共享同一個進程的地址空間中的資源,是對一台電腦上多個進程的模擬,有時也稱執行緒為輕量級的進程。
而對一台電腦上多個進程,則共享物理記憶體、磁碟、印表機等其他物理資源。多執行緒的運行也多進程的運行類似,是CPU在多個執行緒之間的快速切換。
不同的進程之間是充滿敵意的,彼此是搶佔、競爭CPU的關係,如果迅雷會和QQ搶資源。而同一個進程是由一個程式設計師的程式創建,所以同一進程內的執行緒是合作關係,一個執行緒可以訪問另外一個執行緒的記憶體地址,大家都是共享的,一個執行緒乾死了另外一個執行緒的記憶體,那純屬程式設計師腦子有問題。
類似於進程,每個執行緒也有自己的堆棧,不同於進程,執行緒庫無法利用時鐘中斷強制執行緒讓出CPU,可以調用thread_yield運行執行緒自動放棄CPU,讓另外一個執行緒運行。
執行緒通常是有益的,但是帶來了不小程式設計難度,執行緒的問題是:
- 父進程有多個執行緒,那麼開啟的子執行緒是否需要同樣多的執行緒。
- 在同一個進程中,如果一個執行緒關閉了文件,而另外一個執行緒正準備往該文件內寫內容呢?
因此,在多執行緒的程式碼中,需要更多的心思來設計程式的邏輯、保護程式的數據。
5.4用戶級執行緒和內核級執行緒(了解)
執行緒的實現可以分為兩類:用戶級執行緒(User-Level Thread)和內核線執行緒(Kernel-Level Thread),後者又稱為內核支援的執行緒或輕量級進程。在多執行緒作業系統中,各個系統的實現方式並不相同,在有的系統中實現了用戶級執行緒,有的系統中實現了內核級執行緒。
5.4.1用戶級執行緒
內核的切換由用戶態程式自己控制內核切換,不需要內核干涉,少了進出內核態的消耗,但不能很好的利用多核CPU。

在用戶空間模擬作業系統對進程的調度,來調用一個進程中的執行緒,每個進程中都會有一個運行時系統,用來調度執行緒。此時當該進程獲取CPU時,進程內再調度出一個執行緒去執行,同一時刻只有一個執行緒執行。
5.4.2內核級執行緒
內核級執行緒:切換由內核控制,當執行緒進行切換的時候,由用戶態轉化為內核態。切換完畢要從內核態返回用戶態;可以很好的利用smp,即利用多核CPU。windows執行緒就是這樣的。

5.5 用戶級與內核級執行緒的對比
5.5.1 用戶級執行緒和內核級執行緒的區別
- 內核支援執行緒是OS內核可感知的,而用戶級執行緒是OS內核不可感知的。
- 用戶級執行緒的創建、撤消和調度不需要OS內核的支援,是在語言(如Java)這一級處理的;而內核支援執行緒的創建、撤消和調度都需OS內核提供支援,而且與進程的創建、撤消和調度大體是相同的。
- 用戶級執行緒執行系統調用指令時將導致其所屬進程被中斷,而內核支援執行緒執行系統調用指令時,只導致該執行緒被中斷。
- 在只有用戶級執行緒的系統內,CPU調度還是以進程為單位,處於運行狀態的進程中的多個執行緒,由用戶程式控制執行緒的輪換運行;在有內核支援執行緒的系統內,CPU調度則以執行緒為單位,由OS的執行緒調度程式負責執行緒的調度。
- 用戶級執行緒的程式實體是運行在用戶態下的程式,而內核支援執行緒的程式實體則是可以運行在任何狀態下的程式。
5.5.2內核執行緒的優缺點
優點:當有多個處理機時,一個進程的多個執行緒可以同時執行。
缺點:由內核進行調度。
5.5.3用戶級執行緒的優缺點
- 優點:
- 執行緒的調度不需要內核直接參与,控制簡單。
- 可以在不支援執行緒的作業系統中實現。
- 創建和銷毀執行緒、執行緒切換代價等執行緒管理的代價比內核執行緒少得多。
- 允許每個進程訂製自己的調度演算法,執行緒管理比較靈活。
- 執行緒能夠利用的表空間和堆棧空間比內核級執行緒多。
- 同一進程中只能同時有一個執行緒在運行,如果有一個執行緒使用了系統調用而阻塞,那麼整個進程* 都會被掛起。另外,頁面失效也會產生同樣的問題。
- 缺點:
- 資源調度按照進程進行,多個處理機下,同一個進程中的執行緒只能在同一個處理機下分時復用
5.6 混合實現
用戶級與內核級的多路復用,內核同一調度內核執行緒,每個內核執行緒對應n個用戶執行緒。

5.6.1 linux作業系統的 NPTL
歷史:在內核2.6以前的調度實體都是進程,內核並沒有真正支援執行緒。它是能過一個系統調用clone()來實現的,這個調用創建了一份調用進程的拷貝,跟fork()不同的是,這份進程拷貝完全共享了調用進程的地址空間。LinuxThread就是通過這個系統調用來提供執行緒在內核級的支援的(許多以前的執行緒實現都完全是在用戶態,內核根本不知道執行緒的存在)。非常不幸的是,這種方法有相當多的地方沒有遵循POSIX標準,特別是在訊號處理,調度,進程間通訊原語等方面。
很顯然,為了改進LinuxThread必須得到內核的支援,並且需要重寫執行緒庫。為了實現這個需求,開始有兩個相互競爭的項目:IBM啟動的NGTP(Next Generation POSIX Threads)項目,以及Redhat公司的NPTL。在2003年的年中,IBM放棄了NGTP,也就是大約那時,Redhat發布了最初的NPTL。
NPTL最開始在redhat linux 9里發布,現在從RHEL3起內核2.6起都支援NPTL,並且完全成了GNU C庫的一部分。
設計:NPTL使用了跟LinuxThread相同的辦法,在內核裡面執行緒仍然被當作是一個進程,並且仍然使用了clone()系統調用(在NPTL庫里調用)。但是,NPTL需要內核級的特殊支援來實現,比如需要掛起然後再喚醒執行緒的執行緒同步原語futex.
NPTL也是一個1*1的執行緒庫,就是說,當你使用pthread_create()調用創建一個執行緒後,在內核里就相應創建了一個調度實體,在linux里就是一個新進程,這個方法最大可能的簡化了執行緒的實現。
除NPTL的11模型外還有一個mn模型,通常這種模型的用戶執行緒數會比內核的調度實體多。在這種實現里,執行緒庫本身必須去處理可能存在的調度,這樣在執行緒庫內部的上下文切換通常都會相當的快,因為它避免了系統調用轉到內核態。然而這種模型增加了執行緒實現的複雜性,並可能出現諸如優先順序反轉的問題,此外,用戶態的調度如何跟內核態的調度進行協調也是很難讓人滿意。
5.7 GIL全局解釋器鎖
Python程式碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個執行緒在執行。雖然 Python 解釋器中可以「運行」多個執行緒,但在任意時刻只有一個執行緒在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在運行。
在多執行緒環境中,Python 虛擬機按以下方式執行:
- 設置 GIL;
- 切換到一個執行緒去運行;
- 運行指定數量的位元組碼指令或者執行緒主動讓出控制(可以調用 time.sleep(0));
- 把執行緒設置為睡眠狀態;
- 解鎖 GIL;
- 再次重複以上所有步驟。
在調用外部程式碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的位元組碼被運行,所以不會做執行緒切換)編寫擴展的程式設計師可以主動解鎖GIL。
5.8 開啟執行緒的兩種方式
5.8.1 方式一:直接實例化Thread的對象
如果要創建多個執行緒可以使用for循環
from threading import Thread import time def task(): print('執行緒開啟') time.sleep(1) print('執行緒結束') if __name__ == '__main__': t = Thread(target=task)#實例化執行緒對象,可以在主程式進行,也可以不再主程式進行 t.start()
5.8.2 方式二:繼承Thread類
from threading import Thread import time class MyThresd(Thread): def run(self): print('執行緒開啟') time.sleep(1) print('執行緒結束') t = MyThresd() t.start()
5.9 執行緒對象的屬性
執行緒的屬性和進程的屬性有些相似,功能也相似。
Thread實例對象的方法:
- join()子執行緒結束後主執行緒再結束
- start()開啟執行緒
- is_alive()查看執行緒是否存活返回bool值
- isAlive()查看執行緒是否存活返回bool值
- daemon = True守護進程
getName()
:返回執行緒名。setName()
:設置執行緒名。
threading模組提供的一些方法:
threading.currentThread()
:返回當前的執行緒變數。threading.enumerate()
:返回一個包含正在運行的執行緒的list。正在運行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。threading.activeCount()
:返回正在運行的執行緒數量,與len(threading.enumerate())有相同的結果。
from threading import Thread from threading import current_thread import time def task(): print(f'執行緒開啟{current_thread().name}') time.sleep(3) print(f'執行緒結束{current_thread().name}') if __name__ == '__main__': for i in range(3): t = Thread(target=task) t.start() print(t.isAlive()) print(t.is_alive()) t1 = Thread(target=task) t1.daemon = True t1.start()#這裡t1能否正常結束就看t1能否快速的搶到CPU執行自己的程式碼了,能搶到則可以正常列印「執行緒結束」,否則就被主程式結束掉了 print('主執行緒') 執行緒開啟Thread-1 True True 執行緒開啟Thread-2 True True 執行緒開啟Thread-3 True True 執行緒開啟Thread-4 主執行緒 執行緒結束Thread-1 執行緒結束Thread-3 執行緒結束Thread-2
5.10 執行緒互斥鎖
執行緒互斥鎖和進程互斥鎖的作用是一樣的,用法也很相似,在需要保護數據的地方加鎖就可以了。
from threading import Thread,Lock import time mutex = Lock() n = 100 def task(i): print(f'執行緒{i}啟動。。') global n mutex.acquire()#獲取,加鎖 temp = n time.sleep(0.1) n = temp - 1 print(n) mutex.release()#釋放 #如果不加鎖,么個執行緒獲取到的值都是100,所有程式都在執行100-1的操作,加鎖之後,每個執行緒獲取到的數據是前一個執行緒計算完成的結果 if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task,args=(i,)) t_l.append(t) t.start() for t in t_l: t.join() print(n)