進程隊列補充、socket實現伺服器並發、執行緒完結

  • 2019 年 12 月 16 日
  • 筆記

1.隊列補充

隊列內部是管道+鎖(數據在隊列中是阻塞的)

2.關於python並發與並行的補充

解釋型語言單個進程下多個執行緒不可以並行,但是向C語言等其他語言中在多核情況下是可以實現並行的,所有語言在單核下都是無法實現並行的,只能並發。

3.TCP服務端實現並發

#服務端  import socket  from threading import Thread        server = socket.socket()  server.bind(('127.0.0.1',6666))  server.listen(5)  def serv(conn,addr):      while True:          try:              print(addr)              rec_data = conn.recv(1024).decode('utf-8')              print(rec_data)              send_data = rec_data.upper()              conn.send(send_data.encode('utf-8'))          except Exception as e:              print(e)              break    while True:      conn,addr = server.accept()      t = Thread(target=serv,args=(conn,addr))      t.start()
#客戶端  import socket  import time    client = socket.socket()  client.connect(('127.0.0.1',6666))  while True:      client.send(b'hello')      data = client.recv(1024)      print(data)      time.sleep(1)

4.GIL全局解釋器鎖

在CPython中,全局解釋器鎖(即GIL)是一個互斥鎖,可以防止一個進程中的多個執行緒同時(並行)執行。 鎖定是必要的,主要是因為CPython的記憶體管理不是執行緒安全的。GIL的存在就是為了保證執行緒安全

什麼是保證執行緒安全呢?

同一進程的所有執行緒都運行在一個進程內,毫無疑問這些執行緒具有以下幾個特點:

1、所有數據都是共享的,這其中,程式碼作為一種數據也是被所有執行緒共享的(test.py的所有程式碼以及Cpython解釋器的所有程式碼) 例如:test.py定義一個函數work(程式碼內容如下圖),在進程內所有執行緒都能訪問到work的程式碼,於是我們可以開啟三個執行緒然後target都指向該程式碼,能訪問到意味著就是可以執行。 2、所有執行緒的任務,都需要將任務的程式碼當做參數傳給解釋器的程式碼去執行,即所有的執行緒要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的程式碼。 綜上:

如果多個執行緒的target=work,那麼執行流程是:

多個執行緒先訪問到解釋器的程式碼,即拿到執行許可權,然後將target的程式碼交給解釋器的程式碼去執行

解釋器的程式碼是所有執行緒共享的,所以垃圾回收執行緒也可能訪問到解釋器的程式碼而去執行,這就導致了一個問題:對於同一個數據100,可能執行緒1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什麼高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的程式碼。

GIL與Lock

機智的同學可能會問到這個問題:Python已經有一個GIL來保證同一時間只能有一個執行緒來執行了,為什麼這裡還需要lock?

首先,我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個執行緒來修改共享的數據

然後,我們可以得出結論:保護不同的數據就應該加不同的鎖。

最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程式的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock,如下圖

注意:多個執行緒過來執行,一旦遇到IO操作就會立馬釋放GIL解釋器鎖,交個下一個先進來的執行緒。

在純計算程式中GIL鎖起到鎖定python解釋器的作用,就是一個執行緒搶到解釋器後不會再有其他執行緒搶到解釋器的使用權(直到這個程式遇到IO操作,這時GIL會釋放解釋器的使用權)。

import time  from threading import Thread,current_thread      number = 100    def task():      global number      number2 = number      number = number2 - 1      print(number,current_thread().name)    for line in range(100):      t = Thread(target=task)      t.start()    99 Thread-1  98 Thread-2  97 Thread-3  96 Thread-4  95 Thread-5  94 Thread-6  93 Thread-7  92 Thread-8  91 Thread-9  90 Thread-10  。  。  。

給這個程式加上IO操作看下列印結果:

import time  from threading import Thread,current_thread      number = 100    def task():      global number      number2 = number      #print(number)      time.sleep(1)      number = number2 - 1      #time.sleep(1)#如果sleep放在這裡,則會列印結果都是0,這是因為執行緒間數據是共享的      print(number,current_thread().name)    for line in range(100):      t = Thread(target=task)      t.start()    99 Thread-24  99 Thread-23  99 Thread-22  99 Thread-20  99 Thread-18  99 Thread-17  99 Thread-15  99 Thread-21  99 Thread-14  .  .  .

5.驗證多執行緒的作用

什麼時候使用多執行緒,什麼時候使用多進程,多執行緒和多進程各有什麼優缺點?

結論:在計算密集型程式中使用多進程,這時能夠充分發揮電腦多核的優勢;在IO密集型的程式中使用多執行緒,這時能夠充分發揮多執行緒對CPU高校利用率的優勢。高效執行多個進程,內有多個IO密集型程式,要使用多進程+多執行緒。

對結論的驗證:

運算密集型操作驗證:

from threading import Thread  from multiprocessing import Process  import os,time    #計算密集型多進程下運行  def work1():      number = 0      for line in range(50000000):          number += 1      if __name__ == '__main__':      #測試計算密集型      print(os.cpu_count())#列印CPU有幾個內核      start_time = time.time()      list1 = []      for line in range(4):          p = Process(target=work1)          list1.append(p)          p.start()          #p.join()#join如果放在這裡比在列表裡執行速度慢,是因為在列表裡是等所有的進程都起來之後再告訴系統要加join,而在第一個for循環裡面則是每起一個進程都會告訴系統一次,這個過程需要時間。        for p in list1:          p.join()      end_time = time.time()      print(f'程式執行的時間{end_time - start_time}')  #12.24461030960083      #多執行緒下運行  from threading import Thread  from multiprocessing import Process  import os,time    #計算密集型  def work1():      number = 0      for line in range(50000000):          number += 1  #IO密集型  def work2():      time.sleep(1)    if __name__ == '__main__':      #測試計算密集型      print(os.cpu_count())#列印CPU有幾個內核      start_time = time.time()      list1 = []      for line in range(4):          p = Thread(target=work1)          list1.append(p)          p.start()          #p.join()#4.407487869262695        for p in list1:          p.join()      end_time = time.time()      print(f'程式執行的時間{end_time - start_time}')  #16.305360794067383

這裡本人的測試結果是在計算的數據比較大時開啟多進程才會有優勢,如果運算數據比較小,開啟多執行緒運算速度反而比開啟多進程快得多。

IO密集型操作驗證:

#多進程測試    from threading import Thread  from multiprocessing import Process  import os,time      #IO密集型  def work2():      time.sleep(1)    if __name__ == '__main__':      #測試IO密集型      print(os.cpu_count())#列印CPU有幾個內核      start_time = time.time()      list1 = []      for line in range(4):          p = Process(target=work2)          list1.append(p)          p.start()          #p.join()#4.407487869262695        for p in list1:          p.join()      end_time = time.time()      print(f'程式執行的時間{end_time - start_time}')  #2.642327070236206      #多執行緒測試  from threading import Thread  from multiprocessing import Process  import os,time      #IO密集型  def work2():      time.sleep(1)    if __name__ == '__main__':      #測試IO密集型      print(os.cpu_count())#列印CPU有幾個內核      start_time = time.time()      list1 = []      for line in range(4):          p = Thread(target=work2)          list1.append(p)          p.start()          #p.join()#4.407487869262695        for p in list1:          p.join()      end_time = time.time()      print(f'程式執行的時間{end_time - start_time}')  #1.0189073085784912

可以看出在IO密集型操作中,開相同數量的程要比開相同數量的執行緒執行速度慢

6.死鎖現象

所謂死鎖:是指兩個或兩個以上的進程或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖。

from threading import Lock,Thread,current_thread  import time      mutex_a = Lock()#實例化一把鎖  mutex_b = Lock()    class MyThread(Thread):      def run(self):          self.func1()          self.func2()        def func1(self):          mutex_a.acquire()          print(f'用戶{self.name}搶到了鎖a')          mutex_b.acquire()          print(f'用戶{self.name}搶到了鎖b')          mutex_b.release()          print(f'用戶{self.name}釋放了鎖b')          mutex_a.release()          print(f'用戶{self.name}釋放了鎖a')        def func2(self):          mutex_b.acquire()          print(f'用戶{self.name}搶到了鎖b')          time.sleep(1)          mutex_a.acquire()          print(f'用戶{self.name}搶到了鎖a')          mutex_a.release()          print(f'用戶{self.name}釋放了鎖a')          mutex_b.release()          print(f'用戶{self.name}釋放鎖b')    for line in range(10):      t = MyThread()      t.start()  用戶Thread-1搶到了鎖a  用戶Thread-1搶到了鎖b  用戶Thread-1釋放了鎖b  用戶Thread-1釋放了鎖a  用戶Thread-1搶到了鎖b  用戶Thread-2搶到了鎖a

當執行緒1搶到了b而執行緒2搶到了a時,執行緒1要執行強a的任務,而執行緒2要執行搶b的任務,而兩把鎖都沒有釋放手中的鎖,所以就造成了死鎖的現象。

7.遞歸鎖

遞歸鎖用於解決死鎖問題,遞歸鎖可以被多個執行緒使用,當第一個執行緒使用時,遇到幾把鎖,它的引用計數就為幾,只有當它的引用計數為零時才會給第二個執行緒使用。這樣擁有遞歸鎖的那個執行緒就可以將自己的鎖里的內容執行完然後,其他使用遞歸鎖的執行緒才可以執行。也就是但是第一個使用這把鎖的執行緒會對這把鎖加一個引用計數,只有引用計數為零時才能真正釋放該鎖。

from threading import Lock,Thread,RLock  import time      mutex_a = mutex_b = RLock()#實例化一把遞歸鎖  class MyThread(Thread):      def run(self):          self.func1()          self.func2()        def func1(self):          mutex_a.acquire()          print(f'用戶{self.name}搶到了鎖a')          mutex_b.acquire()          print(f'用戶{self.name}搶到了鎖b')          mutex_b.release()          print(f'用戶{self.name}釋放了鎖b')          mutex_a.release()          print(f'用戶{self.name}釋放了鎖a')        def func2(self):          mutex_b.acquire()          print(f'用戶{self.name}搶到了鎖b')          time.sleep(0.1)          mutex_a.acquire()          print(f'用戶{self.name}搶到了鎖a')          mutex_a.release()          print(f'用戶{self.name}釋放了鎖a')          mutex_b.release()          print(f'用戶{self.name}釋放鎖b')    for line in range(10):      t = MyThread()      t.start()

8.訊號量(了解)

互斥鎖比喻成一個家用馬桶,同一時間只能一個人用;訊號量比喻成一個公廁,同一時間可以有多個人用。

from threading import Semaphore,Lock,current_thread,Thread  import time    sm = Semaphore(5)#5個馬桶  #mutex = Lock()#一個馬桶  def task():      sm.acquire()      print(f'{current_thread().name}執行任務')      time.sleep(1)      sm.release()    for line in range(20):      t = Thread(target=task)      t.start()  #這段程式碼的功能是每次讓五個執行緒並發執行

9.執行緒隊列

執行緒Q:執行緒隊列 FIFO(先進先出)就是隊列,面試會問。

queue 是python解釋器自帶的模組,進程中的Queue是python解釋器自帶的模組multiprocessing裡面的一個類。

普通隊列(FIFO):先進先出 特殊隊列(LIFO):後進先出

import queue  q = queue.Queue()#先進先出  q.put(1)  q.put(2)  print('q',q.get())    q1 = queue.LifoQueue()#先進後出  q1.put(1)  q1.put(2)  print('q1',q1.get())    q 1  q1 2

優先順序隊列 優先順序根據數字來判斷,數字為幾優先順序就為幾,1的優先順序最高 若參數中傳的是元組,優先順序以第一個元素的數字大小為準,如果元組的第一個元素都為字母則以字母的ASCII碼為準,如果第一個元素的優先順序相同就判斷第二個元素的順序(漢字也會判斷順序)但是如果同一列的元素既有數字又有字母會報錯。

import queue  q2 = queue.PriorityQueue()#優先順序隊列    q2.put((1,2,3))  q2.put((2,2,3))  q2.put((3,2,3))  print(q2.get())    (1, 2, 3)

句柄

句柄(handle),有多種意義,第一種解釋:句柄是一種特殊的智慧指針。當一個應用程式要引用其他系統(如資料庫、作業系統)所管理的記憶體塊或對象時,就要使用句柄。