python並發編程-進程池執行緒池-協程-I/O模型-04

  • 2019 年 10 月 7 日
  • 筆記

目錄

進程池執行緒池的使用*****

無論是開執行緒還是開進程都會消耗資源,即使開執行緒消耗的資遠比開進程的少

而物理設備的性能是有限的,雖然可以加設備來提升上限,但如果像淘寶雙十一那樣,只有很少的時刻需要大量的資源,為了滿足這個去買一大堆伺服器顯然是不划算的

(電腦中)池的目的:在保證電腦硬體安全的情況下最大限度的利用電腦硬體,池其實是降低了程式的運行效率,但是保證了電腦硬體的安全(硬體的發展跟不上軟體的速度)

進程池執行緒池的目的:為了限制開設的進程數和執行緒數,從而保證電腦硬體的安全

進程池/執行緒池的創建和提交回調

import random  import time  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor      def task(i):      time.sleep(random.random())      print(f"{i} is over...")      return f"{i}² = {i * i}"      if __name__ == '__main__':  # 進程池的時候一定要放在這裡面      '''不放報錯 concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.'''        # -------------------------------------------------      # 1.實例化進程池/執行緒池對象,並限制進程池/執行緒池中進程/執行緒數量      # -------------------------------------------------      # pool = ThreadPoolExecutor(3, 'MyThread-')  # 不指定參數的情況下,默認是當前 CPU個數*5 , 也可以指定執行緒個數      pool = ProcessPoolExecutor(3)  # 不指定參數的情況下,默認是當前 CPU個數 , 也可以指定進程個數(創進程不能傳第二個參數)        # for i in range(5):      #     # -------------------------------------------------      #     # 2.執行緒池對象.submit() 非同步提交任務      #     #   提交任務的兩種方式      #     #       同步:提交完任務之後,在原地等待任務的返回結果,再繼續執行下一步程式碼      #     #       非同步:提交任務之後,不等待任務的返回結果(這個結果怎麼拿?),直接進行下一步操作      #     # -------------------------------------------------      #     pool.submit(task, i)      # print("主")      #      # # 0 is running...      # # 1 is running...      # # 2 is running...      # # 主      # # 1 is over...      # # 3 is running...      # # 0 is over...      # # 4 is running...      # # 4 is over...      # # 3 is over...      # # 2 is over...        # for i in range(5):      #     future = pool.submit(task, i)      #     # print(future)  # <Future at 0x21a130dbb00 state=running>   <Future at 0x21a1321ec50 state=pending>      #     # -------------------------------------------------      #     # future = pool.submit(task, i)      #     # future.result()       接收返回值並獲取回調值      #     # -------------------------------------------------      #     print(future.result())      # print("主")      # # 0 is running...      # # 0 is over...      # # 0² = 0      # # 1 is running...      # # 1 is over...      # # 1² = 1      # # 2 is running...      # # 2 is over...      # # 2² = 4      # # 3 is running...      # # 3 is over...      # # 3² = 9      # # 4 is running...      # # 4 is over...      # # 4² = 16      # # 主        # future_list = []      # for i in range(5):      #     future = pool.submit(task, i)      #     future_list.append(future)      #      # for future in future_list:      #     print(f">>:{future.result()}")  # 依次等每個 future的結果,所以是絕對有序的      # print("主")      # # 0 is running...      # # 1 is running...      # # 2 is running...      # # 0 is over...      # # 3 is running...      # # >>:0² = 0      # # 1 is over...      # # 4 is running...      # # >>:1² = 1      # # 4 is over...      # # 2 is over...      # # >>:2² = 4      # # 3 is over...      # # >>:3² = 9      # # >>:4² = 16      # # 主        future_list = []      for i in range(5):          future = pool.submit(task, i)          future_list.append(future)        pool.shutdown()  # 關閉池子且等待池子中所有的任務運行完畢        for future in future_list:          print(f">>:{future.result()}")  # 依次等每個 future的結果,所以是絕對有序的      print("主")      # 0 is running...      # 1 is running...      # 2 is running...      # 2 is over...      # 3 is running...      # 0 is over...      # 4 is running...      # 4 is over...      # 1 is over...      # 3 is over...      # >>:0² = 0      # >>:1² = 1      # >>:2² = 4      # >>:3² = 9      # >>:4² = 16      # 主

驗證復用池子里的執行緒或進程

池子中創建的進程或執行緒創建一次就不會再創建了,至始至終用的都是最初的那幾個,這樣的話就可以節省反覆開闢進程或執行緒的資源了 不是動態創建動態銷毀的(如果是好幾百個,可想而知)

import random  import time  import os  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  from threading import current_thread      def task(i):      time.sleep(random.random())      # print(f"{os.getpid()} {i} is over...")      print(f"{os.getpid()} {current_thread().name} {i} is over...")      return f"{i}² = {i * i}"      if __name__ == '__main__':  # 進程池的時候一定要放在這裡面      # pool = ProcessPoolExecutor(3)      pool = ThreadPoolExecutor(3, 'MyThreading')        future_list = []      for i in range(5):          future = pool.submit(task, i)          future_list.append(future)        pool.shutdown()  # 關閉池子且等待池子中所有的任務運行完畢        for future in future_list:          print(f">>:{future.result()}")  # 依次等每個 future的結果,所以是絕對有序的      print("主")  # 11000 0 is over...  # 復用了進程號(即沒有去開闢新的記憶體空間)  # 8024 2 is over...  # 10100 1 is over...  # 11000 3 is over...  # 8024 4 is over...  # >>:0² = 0  # >>:1² = 1  # >>:2² = 4  # >>:3² = 9  # >>:4² = 16  # 主      # 使用執行緒池的列印結果  # 13024 MyThreading_1 1 is over...  # 1.復用了執行緒  # 13024 MyThreading_1 3 is over...  # 2.復用了執行緒  # 13024 MyThreading_2 2 is over...  # 13024 MyThreading_0 0 is over...  # 13024 MyThreading_1 4 is over...  # >>:0² = 0  # >>:1² = 1  # >>:2² = 4  # >>:3² = 9  # >>:4² = 16  # 主

非同步回調機制

這(.add_done_callback())其實是 .submit() 返回結果對象的方法

非同步回調機制:當非同步提交的任務有返回結果之後,會自動觸發回調函數的執行

import random  import time  import os  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  from threading import current_thread      def callback(future):      print(f"我拿到了回調結果:{future.result()}")      def task(i):      time.sleep(random.random())      # print(f"{os.getpid()} {i} is over...")      print(f"{os.getpid()} {current_thread().name} {i} is over...")      return f"{i}² = {i * i}"      if __name__ == '__main__':  # 進程池的時候一定要放在這裡面      # pool = ProcessPoolExecutor(3)      pool = ThreadPoolExecutor(3, 'MyThreading')        future_list = []      for i in range(5):          # -----------------------------------------------------          # .submit().add_done_callback() 自動調用回調函數          #   會自動將 .submit()的返回結果作為參數傳給.add_done_callback() 中傳入的函數去調用執行          #       .add_done_callback() 其實是 .submit()返回對象自身的方法          # -----------------------------------------------------          future = pool.submit(task, i).add_done_callback(callback)          future_list.append(future)        pool.shutdown()  # 關閉池子且等待池子中所有的任務運行完畢        print("主")    # 11348 MyThreading_0 0 is over...  # 我拿到了回調結果:0² = 0  # 11348 MyThreading_2 2 is over...  # 我拿到了回調結果:2² = 4  # 11348 MyThreading_0 3 is over...  # 我拿到了回調結果:3² = 9  # 11348 MyThreading_1 1 is over...  # 我拿到了回調結果:1² = 1  # 11348 MyThreading_2 4 is over...  # 我拿到了回調結果:4² = 16  # 主

通過閉包給回調函數添加額外參數(擴展)

# 省略導模組等  # 執行緒池/進程池對象.submit() 會返回一個 future對象,該對象有.add_done_callback()方法(是一個對象綁定函數),參數是一個函數名(除了對象自身默認傳入,無法為該函數傳參)  # 這裡利用閉包函數返回內部函數名的特點 直接調用這個閉包函數,達到傳參的效果,可為回調函數添加更多的擴展性  def outter(*args, **kwargs):      def callback(res):          # 可以拿到 *args, **kwargs 參數做一些事情          print(res.result())      return callback      pool_list = []  for i in range(15):      pool_list.append(pool.submit(task, i).add_done_callback(outter(1, 2, 3, a=1, c=3)))  # 朝執行緒池中提交任務(非同步)

協程***

後期項目支援高並發可能才會用到

概念回顧(協程這裡再理一下)

進程:資源單位(車間)

執行緒:作業系統的最小執行單位(流水線)

協程:單執行緒下實現並發的效果(完全是技術人員編造出來的名詞)

並發:看起來像同時執行(多道技術核心:切換+保存狀態)

協程:通過程式碼層面自己監測程式中的I/O行為,自己實現切換,讓作業系統誤認為這個執行緒沒有I/O,從而保證程式在運行態和就緒態來回切換(不進入阻塞態),更大限度地利用CPU,最大程度上提高執行緒的執行效率

切換+保存狀態就一定能夠提升效率嗎? ​ 切換+保存狀態 不一定能提升程式的效率

  • 當任務是計算密集型,反而會降低效率
  • 如果是IO密集型,會提升效率

如何實現協程

生成器的yield 可以實現保存狀態(行不通)

但,效率更低了

# # 串列執行  # import time  #  #  # def func1():  #     for i in range(10000000):  #         i + 1  #  #  # def func2():  #     for i in range(10000000):  #         i + 1  #  #  # start = time.time()  # func1()  # func2()  # stop = time.time()  # print(stop - start)  # # 1.2481744289398193      # 基於yield並發執行  import time      def func1():      while True:          10000000 + 1          yield      def func2():      g = func1()      for i in range(10000000):          i + 1          next(g)      start = time.time()  func2()  stop = time.time()  print(stop - start)  # 1.9084477424621582

gevent模組實現

模組安裝下載

搜索並下載(這裡是因為我配了兩個鏡像源,所以出來了兩個選項,隨便選一個)

gevent基本介紹

from gevent import spawn, monkey  monkey.patch_all()  # 一般這個要寫在很前面(例如導socket模組之前)  # 兩行亦可寫成一行 from gevent import monkey;monkey.patch_all()    g1 = spawn(eat, 1, 2, 3, x=4, y=5)  # 創建一個協程對象g1,spawn括弧內第一個參數是函數名,如eat,後面是該函數(eat)所需要的參數  g2 = spawn(func2)    g1.join()  # 等待協程g1結束  g2.join()  # 等待協程g2結束  # 上述兩步亦可合作一步:joinall([g1,g2])    g1.value  # 拿到eat函數執行的返回值

通過gevent實現遇到 IO自動切換狀態(單執行緒下並發)

import time    from gevent import spawn  # gevent 本身識別不了time.sleep() 等不屬於該模組內的I/O操作  # 使用下面的操作來支援  from gevent import monkey  monkey.patch_all()  # 監測程式碼中所有 I/O 行為      def heng(name):      print(f"{name} 哼")      time.sleep(2)      print(f"{name} 哼 ...")      def ha(name):      print(f"{name} 哈")      time.sleep(3)      print(f"{name} 哈 ...")      # start_time = time.time()  # heng('egon')  # ha('jason')  # print(f"主 {time.time() - start_time}")  # # 主 5.005069732666016      start_time = time.time()  s1 = spawn(heng, 'egon')  s2 = spawn(ha, 'jason')  s1.join()  s2.join()    print(f"主 {time.time() - start_time}")  # 主 3.0046989917755127

在計算密集型任務中使用

from gevent import spawn, monkey    monkey.patch_all()    import time      def func1():      for i in range(10000000):          i + 1      def func2():      for i in range(10000000):          i + 1      start = time.time()  g = spawn(func1)  g2 = spawn(func2)  g.join()  g2.join()  stop = time.time()  print(stop - start)  # 1.1324069499969482    # 與前面普通的串列執行時間 1.2481744289398193 相近

利用gevent在單執行緒下實現並發(協程)

服務端

import socket  from gevent import spawn  from gevent import monkey  # 讓 gevent 能夠識別python的 IO  monkey.patch_all()    server = socket.socket()  server.bind(('127.0.0.1', 8080))  server.listen(5)      def talk(conn):      while True:          try:              data = conn.recv(1024)              if len(data) == 0: break              print(data.decode('utf-8'))              conn.send(data.upper())          except ConnectionResetError as e:              print(e)              break      conn.close()      def wait_client_connect():      while True:          conn, addr = server.accept()          spawn(talk, conn)      if __name__ == '__main__':      g1 = spawn(wait_client_connect)      g1.join()  # 別忘了加上

客戶端

import socket  from threading import Thread, current_thread      def create_client():      client = socket.socket()      client.connect(('127.0.0.1', 8080))      n = 0      while True:          data = '%s %s' % (current_thread().name, n)          client.send(data.encode('utf-8'))          res = client.recv(1024)          print(res.decode('utf-8'))          n += 1      for i in range(400):  # 手動開400個執行緒連接客戶端(測試的是服務端單執行緒實現並發)      t = Thread(target=create_client)      t.start()

最大程度下提高程式碼的執行效率(實現高並發)

  • 多進程下使用多執行緒
  • 多執行緒下使用多協程

大前提 IO密集型任務

I/O 模型(只放了幾張圖)

此部分內容摘抄自部落格: Python從入門到精通之IO模型

程式間數據交互,本質上數據都是從記憶體中取的(包括socket的recv等)

阻塞I/O模型

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。 而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶記憶體,然後kernel返回結果,用戶進程才解除block的狀態,重新運行起來。

非阻塞I/O模型

從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那麼它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據準備好了,並且又再次收到了用戶進程的system call,那麼它馬上就將數據拷貝到了用戶記憶體(這一階段仍然是阻塞的),然後返回。


也就是說非阻塞的recvform系統調用調用之後,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之後,可以干點別的事情,然後再發起recvform系統調用。重複上面的過程,循環往複的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

多路復用I/O模型

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。 這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這裡需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。

訊號驅動I/O模型

涉及太少,暫不做了解

非同步I/O模型

用戶進程發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block。然後,kernel會等待數據準備完成,然後將數據拷貝到用戶記憶體,當這一切都完成之後,kernel會給用戶進程發送一個signal,告訴它read操作完成了。