隊列、進程互斥鎖、執行緒

  • 2019 年 12 月 16 日
  • 筆記

1.進程的並行和並發

並行: 並行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個執行緒,四核的CPU )

並發: 並發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提高效率。

2.並行和並發的區別

並行是從微觀上,也就是在一個精確的時間片刻,有不同的程式在執行,這就要求必須有多個處理器。

並發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個伺服器同時處理多個session。

3.進程互斥鎖

作用:讓加鎖的部分由並發變成串列,犧牲了執行效率,保證了數據安全。

應用:在程式使用同一份數據時,就會引發數據安全和數據混亂等問題,需要使用鎖來維持數據的順序取用。

下面的小程式模擬搶票軟體,對票數進行修改

#查看余票  import json  import time  from multiprocessing import Process  from multiprocessing import Lock  #查看余票  def search(user):      #打開data文件查看余票      with open('data.txt','r',encoding='utf-8') as f:          dic = json.load(f)        print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}')    #搶票功能  def buy(user):        with open('data.txt','r',encoding='utf-8') as f :          dic = json.load(f)      if dic.get("ticket_num")>0:          dic["ticket_num"] -= 1          with open('data.txt','w',encoding='utf-8') as f:              json.dump(dic,f)            print(f'用戶{user}搶票成功!')      else:          print(f'用戶{user}搶票失敗')  #開始搶票  def run(user,mutex):      search(user)      mutex.acquire()      buy(user)      mutex.release()    if __name__ == '__main__':      #調用Lock類實例化一個所對象      mutex = Lock()        # mutex.acquire()#加鎖      # mutex.release()#釋放鎖      for i in range(10):          #並發十個子進程            p = Process(target=run,args=(f'{i}',mutex))          p.start()    用戶1查看余票,還剩6  用戶1搶票成功!  用戶0查看余票,還剩5  用戶0搶票成功!  用戶2查看余票,還剩4  用戶2搶票成功!  用戶3查看余票,還剩3  用戶3搶票成功!  用戶4查看余票,還剩2  用戶4搶票成功!  用戶6查看余票,還剩1  用戶6搶票成功!  用戶5查看余票,還剩0  用戶5搶票失敗  用戶7查看余票,還剩0  用戶7搶票失敗  用戶9查看余票,還剩0  用戶9搶票失敗  用戶8查看余票,還剩0  用戶8搶票失敗  #這裡如果不使用互斥鎖就會導致票數和搶到的人數不符。

4.隊列

原則:先進先出(堆棧,先進後出)

相當於記憶體中產生一個隊列空間,可以存放多個數據,但是數據是先進去的先被取出來。

4.1multiprocess.Queue介紹

Queue是多進程的列隊,可以實現多進程間的數據傳遞。

Queue([maxsize]):創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支援執行緒以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ):返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait() :同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) :將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。

q.qsize() :返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty() :如果調用此方法時 q為空,返回True。如果其他進程或執行緒正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() :如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。

其他方法(了解)

q.close() :關閉隊列,防止隊列中加入更多數據。調用此方法時,後台執行緒將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() :不會再進程退出時自動連接後台執行緒。這可以防止join_thread()方法阻塞。

q.join_thread() :連接隊列的後台執行緒。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

4.2 IPC進程間通訊實例1

from multiprocessing import Process,Queue    def test1(q):      data = '數據hello'      q.put(data)#向隊列中添加數據,如果列隊已經填滿則會卡在這裡不會往下執行,直到列隊空出位置讓其把數據放進去        print('進程1開始添加數據到列隊中。。')    def test2(q):      data = q.get()#從隊列中取出數據,如果列隊中已經沒有數據給它,也會卡住      #q.get_nowait()#如果獲取不到數據就報錯      print(f'進程2從隊列中獲取數據{data}')      #q.empty()#判斷列隊是否為空,返回bool值      #q.full()#判斷列隊是否滿了,返回bool值    if __name__ == '__main__':      q = Queue(2)#括弧內填隊列中可以存放元素的個數,不填默認為無限大        p1 = Process(target=test1,args=(q,))      p2 = Process(target=test2,args=(q,))        p1.start()      p2.start()      p2.join()      print('主程式')

4.3 ICP通訊實例2:生產者與消費者模型

生產者:生產數據的

消費者:使用數據的

在程式中,生產者把數據添加到隊列中,消費者從隊列中獲取數據。

from multiprocessing import Queue,Process  import time      def producer(name,food,q):      for i in range(9):          data = food,i          msg = f'用戶{name}開始製作{data}'          print(msg)          q.put(data)          time.sleep(0.1)#由於cup執行速度太快,這裡加個延時,讓兩個消費者都能搶到CPU的使用權    def consumer(name,q):      while True:          data = q.get()          if not data:              break          print(f'用戶{name}開始吃{data}')    if __name__ == '__main__':      q = Queue()      #創造生產者      p1 = Process(target=producer,args=('tank','油條',q))      p2 = Process(target=producer,args=('小明','饅頭',q))      #消費者      c1 = Process(target=consumer,args=('tom',q))      c2 = Process(target=consumer,args=('juery',q))        p1.start()      p2.start()        c1.daemon = True      c2.daemon = True#為消費者添加守護進程,主程式完成就結束掉      c1.start()      c2.start()      p2.join()#這裡的目的是當生產者p2等消費者吃完再結束,給主程式加延時也能達到同樣的效果      #time.sleep(2)      print('主程式')

5.執行緒

5.1什麼是執行緒?

​ 執行緒(thread)是作業系統能夠進行運算調度的最小單位。它包含在進程之中,是進程的實際運行單位。一條執行緒指的是進程中一個單一順序控制流,一個進程可以並發多個執行緒,每條執行緒並發執行不同的任務。同一進程中的多條執行緒將共享該進程中的全部系統資源,如虛擬地址空間,文件描述和訊號處理等等。但是同一進程中的多個執行緒有各自的調用棧,自己的暫存器環境,自己的進程本地存儲。

​ 在多核或多CPU,或支援Hyper-threading的CPU上使用多執行緒程式設計的好處是顯而易見,即提高了程式的執行吞吐率。在單CPU單核的電腦上,使用多執行緒技術,也可以把進程中負責I/O處理、人機交互而常被阻塞的部分與密集計算的部分分開來執行,編寫專門的workhorse執行緒執行密集計算,從而提高了程式的執行效率,進一步提高系統的並發性。

進程與執行緒的區別:

進程是系統進行資源分配和調度的基本單位,執行緒是是作業系統能夠進行運算調度的最小單位。執行緒包含在進程之中,是進程的實際運行單位。

為什麼要使用執行緒?

執行緒進一步提高了CPU的使用效率。

注意:執行緒不能實現並行,只能實現並發。

注意:進程是資源分配的最小單位,執行緒是CPU調度的最小單位。每一個進程中至少有一個執行緒。

5.2 使用執行緒的實際場景

開啟一個字處理軟體進程,該進程肯定需要辦不止一件事情,比如監聽鍵盤輸入,處理文字,定時自動將文字保存到硬碟,這三個任務操作的都是同一塊數據,因而不能用多進程。只能在一個進程里並發地開啟三個執行緒,如果是單執行緒,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。

5.3 記憶體中的執行緒

多個執行緒共享同一個進程的地址空間中的資源,是對一台電腦上多個進程的模擬,有時也稱執行緒為輕量級的進程。

而對一台電腦上多個進程,則共享物理記憶體、磁碟、印表機等其他物理資源。多執行緒的運行也多進程的運行類似,是CPU在多個執行緒之間的快速切換。

不同的進程之間是充滿敵意的,彼此是搶佔、競爭CPU的關係,如果迅雷會和QQ搶資源。而同一個進程是由一個程式設計師的程式創建,所以同一進程內的執行緒是合作關係,一個執行緒可以訪問另外一個執行緒的記憶體地址,大家都是共享的,一個執行緒乾死了另外一個執行緒的記憶體,那純屬程式設計師腦子有問題。

類似於進程,每個執行緒也有自己的堆棧,不同於進程,執行緒庫無法利用時鐘中斷強制執行緒讓出CPU,可以調用thread_yield運行執行緒自動放棄CPU,讓另外一個執行緒運行。

執行緒通常是有益的,但是帶來了不小程式設計難度,執行緒的問題是:

  1. 父進程有多個執行緒,那麼開啟的子執行緒是否需要同樣多的執行緒。
  2. 在同一個進程中,如果一個執行緒關閉了文件,而另外一個執行緒正準備往該文件內寫內容呢?

因此,在多執行緒的程式碼中,需要更多的心思來設計程式的邏輯、保護程式的數據。

5.4用戶級執行緒和內核級執行緒(了解)

執行緒的實現可以分為兩類:用戶級執行緒(User-Level Thread)和內核線執行緒(Kernel-Level Thread),後者又稱為內核支援的執行緒或輕量級進程。在多執行緒作業系統中,各個系統的實現方式並不相同,在有的系統中實現了用戶級執行緒,有的系統中實現了內核級執行緒。

5.4.1用戶級執行緒

內核的切換由用戶態程式自己控制內核切換,不需要內核干涉,少了進出內核態的消耗,但不能很好的利用多核CPU。

在用戶空間模擬作業系統對進程的調度,來調用一個進程中的執行緒,每個進程中都會有一個運行時系統,用來調度執行緒。此時當該進程獲取CPU時,進程內再調度出一個執行緒去執行,同一時刻只有一個執行緒執行。

5.4.2內核級執行緒

內核級執行緒:切換由內核控制,當執行緒進行切換的時候,由用戶態轉化為內核態。切換完畢要從內核態返回用戶態;可以很好的利用smp,即利用多核CPU。windows執行緒就是這樣的。

5.5 用戶級與內核級執行緒的對比

5.5.1 用戶級執行緒和內核級執行緒的區別

  1. 內核支援執行緒是OS內核可感知的,而用戶級執行緒是OS內核不可感知的。
  2. 用戶級執行緒的創建、撤消和調度不需要OS內核的支援,是在語言(如Java)這一級處理的;而內核支援執行緒的創建、撤消和調度都需OS內核提供支援,而且與進程的創建、撤消和調度大體是相同的。
  3. 用戶級執行緒執行系統調用指令時將導致其所屬進程被中斷,而內核支援執行緒執行系統調用指令時,只導致該執行緒被中斷。
  4. 在只有用戶級執行緒的系統內,CPU調度還是以進程為單位,處於運行狀態的進程中的多個執行緒,由用戶程式控制執行緒的輪換運行;在有內核支援執行緒的系統內,CPU調度則以執行緒為單位,由OS的執行緒調度程式負責執行緒的調度。
  5. 用戶級執行緒的程式實體是運行在用戶態下的程式,而內核支援執行緒的程式實體則是可以運行在任何狀態下的程式。

5.5.2內核執行緒的優缺點

優點:當有多個處理機時,一個進程的多個執行緒可以同時執行。

缺點:由內核進行調度。

5.5.3用戶級執行緒的優缺點

  • 優點:
    • 執行緒的調度不需要內核直接參与,控制簡單。
    • 可以在不支援執行緒的作業系統中實現。
    • 創建和銷毀執行緒、執行緒切換代價等執行緒管理的代價比內核執行緒少得多。
    • 允許每個進程訂製自己的調度演算法,執行緒管理比較靈活。
    • 執行緒能夠利用的表空間和堆棧空間比內核級執行緒多。
    • 同一進程中只能同時有一個執行緒在運行,如果有一個執行緒使用了系統調用而阻塞,那麼整個進程* 都會被掛起。另外,頁面失效也會產生同樣的問題。
  • 缺點:
    • 資源調度按照進程進行,多個處理機下,同一個進程中的執行緒只能在同一個處理機下分時復用

5.6 混合實現

用戶級與內核級的多路復用,內核同一調度內核執行緒,每個內核執行緒對應n個用戶執行緒。

5.6.1 linux作業系統的 NPTL 

歷史:在內核2.6以前的調度實體都是進程,內核並沒有真正支援執行緒。它是能過一個系統調用clone()來實現的,這個調用創建了一份調用進程的拷貝,跟fork()不同的是,這份進程拷貝完全共享了調用進程的地址空間。LinuxThread就是通過這個系統調用來提供執行緒在內核級的支援的(許多以前的執行緒實現都完全是在用戶態,內核根本不知道執行緒的存在)。非常不幸的是,這種方法有相當多的地方沒有遵循POSIX標準,特別是在訊號處理,調度,進程間通訊原語等方面。

很顯然,為了改進LinuxThread必須得到內核的支援,並且需要重寫執行緒庫。為了實現這個需求,開始有兩個相互競爭的項目:IBM啟動的NGTP(Next Generation POSIX Threads)項目,以及Redhat公司的NPTL。在2003年的年中,IBM放棄了NGTP,也就是大約那時,Redhat發布了最初的NPTL。

NPTL最開始在redhat linux 9里發布,現在從RHEL3起內核2.6起都支援NPTL,並且完全成了GNU C庫的一部分。

設計:NPTL使用了跟LinuxThread相同的辦法,在內核裡面執行緒仍然被當作是一個進程,並且仍然使用了clone()系統調用(在NPTL庫里調用)。但是,NPTL需要內核級的特殊支援來實現,比如需要掛起然後再喚醒執行緒的執行緒同步原語futex.

NPTL也是一個1*1的執行緒庫,就是說,當你使用pthread_create()調用創建一個執行緒後,在內核里就相應創建了一個調度實體,在linux里就是一個新進程,這個方法最大可能的簡化了執行緒的實現。

除NPTL的11模型外還有一個mn模型,通常這種模型的用戶執行緒數會比內核的調度實體多。在這種實現里,執行緒庫本身必須去處理可能存在的調度,這樣在執行緒庫內部的上下文切換通常都會相當的快,因為它避免了系統調用轉到內核態。然而這種模型增加了執行緒實現的複雜性,並可能出現諸如優先順序反轉的問題,此外,用戶態的調度如何跟內核態的調度進行協調也是很難讓人滿意。

5.7 GIL全局解釋器鎖

Python程式碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個執行緒在執行。雖然 Python 解釋器中可以「運行」多個執行緒,但在任意時刻只有一個執行緒在解釋器中運行。

對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在運行。

在多執行緒環境中,Python 虛擬機按以下方式執行:

  1. 設置 GIL;
  2. 切換到一個執行緒去運行;
  3. 運行指定數量的位元組碼指令或者執行緒主動讓出控制(可以調用 time.sleep(0));
  4. 把執行緒設置為睡眠狀態;
  5. 解鎖 GIL;
  6. 再次重複以上所有步驟。

在調用外部程式碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的位元組碼被運行,所以不會做執行緒切換)編寫擴展的程式設計師可以主動解鎖GIL。

5.8 開啟執行緒的兩種方式

5.8.1 方式一:直接實例化Thread的對象

如果要創建多個執行緒可以使用for循環

from threading import Thread  import time      def task():      print('執行緒開啟')      time.sleep(1)      print('執行緒結束')    if __name__ == '__main__':      t = Thread(target=task)#實例化執行緒對象,可以在主程式進行,也可以不再主程式進行      t.start()

5.8.2 方式二:繼承Thread類

from threading import Thread  import time    class MyThresd(Thread):      def run(self):          print('執行緒開啟')          time.sleep(1)          print('執行緒結束')    t = MyThresd()  t.start()

5.9 執行緒對象的屬性

執行緒的屬性和進程的屬性有些相似,功能也相似。

Thread實例對象的方法:

  • join()子執行緒結束後主執行緒再結束
  • start()開啟執行緒
  • is_alive()查看執行緒是否存活返回bool值
  • isAlive()查看執行緒是否存活返回bool值
  • daemon = True守護進程
  • getName():返回執行緒名。
  • setName():設置執行緒名。

threading模組提供的一些方法:

  • threading.currentThread():返回當前的執行緒變數。
  • threading.enumerate():返回一個包含正在運行的執行緒的list。正在運行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  • threading.activeCount():返回正在運行的執行緒數量,與len(threading.enumerate())有相同的結果。
from threading import Thread  from threading import current_thread  import time    def task():      print(f'執行緒開啟{current_thread().name}')      time.sleep(3)      print(f'執行緒結束{current_thread().name}')    if __name__ == '__main__':      for i in range(3):          t = Thread(target=task)          t.start()          print(t.isAlive())          print(t.is_alive())      t1 = Thread(target=task)      t1.daemon = True      t1.start()#這裡t1能否正常結束就看t1能否快速的搶到CPU執行自己的程式碼了,能搶到則可以正常列印「執行緒結束」,否則就被主程式結束掉了      print('主執行緒')    執行緒開啟Thread-1  True  True  執行緒開啟Thread-2  True  True  執行緒開啟Thread-3  True  True  執行緒開啟Thread-4  主執行緒  執行緒結束Thread-1  執行緒結束Thread-3  執行緒結束Thread-2

5.10 執行緒互斥鎖

執行緒互斥鎖和進程互斥鎖的作用是一樣的,用法也很相似,在需要保護數據的地方加鎖就可以了。

from threading import Thread,Lock  import time    mutex = Lock()  n = 100    def task(i):      print(f'執行緒{i}啟動。。')      global n      mutex.acquire()#獲取,加鎖      temp = n      time.sleep(0.1)        n = temp - 1      print(n)      mutex.release()#釋放      #如果不加鎖,么個執行緒獲取到的值都是100,所有程式都在執行100-1的操作,加鎖之後,每個執行緒獲取到的數據是前一個執行緒計算完成的結果    if __name__ == '__main__':      t_l = []      for i in range(100):          t = Thread(target=task,args=(i,))          t_l.append(t)          t.start()        for t in t_l:          t.join()      print(n)