創建進程池與執行緒池concurrent.futures模組的使用

  • 2019 年 10 月 3 日
  • 筆記

一、進程池。

當並發的任務數量遠遠大於電腦所能承受的範圍,即無法一次性開啟過多的任務數量就應該考慮去 限制進程數或執行緒數,從而保證伺服器不會因超載而癱瘓。這時候就出現了進程池和執行緒池。

二、concurrent.futures模組介紹

concurrent.futures模組提供了高度封裝的非同步調用介面

ThreadPoolExecutor:執行緒池,提供非同步調用

ProcessPoolExecutor:進程池,提供非同步調用

Both implement the same interface, which is defined by the abstract Executor class

三、基本方法:

submit(fn, *args, **kwargs):非同步提交任務

map(func, *iterables, timeout=None, chunksize=1):取代for循環submit的操作

shutdown(wait=True):相當於進程池的pool.close()+pool.join()操作

  • wait=True,等待池內所有任務執行完畢回收完資源後才繼續
  • wait=False,立即返回,並不會等待池內的任務執行完畢
  • 但不管wait參數為何值,整個程式都會等到所有任務執行完畢
  • submit和map必須在shutdown之前

result(timeout=None):取得結果

add_done_callback(fn):回調函數

done():判斷某一個執行緒是否完成

cancle():取消某個任務

四、進程池程式碼實例——ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor  from multiprocessing import current_process  import time    def func(i):      print(f'進程 {current_process().name} 正在執行任務 {i}')      time.sleep(1)      return i**2    if __name__ == '__main__':      pool = ProcessPoolExecutor(4)  # 進程池只有4個進程      lt = []      for i in range(20):  # 假設執行20個任務          future = pool.submit(func,i)   # func任務要做20次,4個進程負責完成這個20個任務          # print(future.result())   # 如果沒有結果就一直等待拿到結果,導致了所有任務都在串列          lt.append(future)      pool.shutdown() # 默認為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞,      for fu in lt:          print(fu.result())  # 等待所有的任務都執行完了,一起把返回值列印出來    

五、執行緒池程式碼示例——ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor  from threading import currentThread  import time    def func(i):      print(f'執行緒 {currentThread().name} 正在執行任務 {i}')      time.sleep(1)      return i**2    if __name__ == '__main__':      fool = ThreadPoolExecutor(4)  # 執行緒池裡只有4個執行緒      lt = []      for i in range(20):          futrue = fool.submit(func,i)   # func任務要做20次,4個執行緒負責完成這20次任務          lt.append(futrue)      fool.shutdown()  # 默認為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞,      for fu in lt:          print(fu.result())   # 等待所有的任務都執行完了,一起把返回值列印出來

六、回調函數add_done_callback(fn)

提交任務的兩種方式:
同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行程式碼
非同步: 提交了一個任務,不要等執行完了,可以直接執行下一行程式碼。

ps:進程和執行緒回調方法的使用寫一塊了,注釋掉的是進程的使用。

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor  from threading import currentThread  from multiprocessing import current_process  import time    def task(i):      print(f'執行緒 {currentThread().name} 正在執行任務 {i}')      # print(f'進程 {current_process().name} 正在執行任務 {i}')      time.sleep(1)      return i**2    def parse(futrue):      # 處理拿到的結果      print(futrue.result())    if __name__ == '__main__':      pool = ThreadPoolExecutor(4)  # 執行緒池裡只有4個執行緒      # pool = ProcessPoolExecutor(4)  # 進程池裡只有4個進程      lt = []      for i in range(20):          futrue = pool.submit(task,i)  # task任務要做20次,分別由四個進程完成這20個任務          futrue.add_done_callback(parse)          # 為當前任務綁定一個函數,在當前任務執行結束的時候會觸發這個函數          # 會把futrue對象作為參數傳給函數          # 這個稱之為回調函數,處理完了回來就調用這個函數。  

跟上面執行緒池裡的例子相比:回調函數的作用,不需要等待所有的任務執行完才列印返回值。每執行完一個任務直接列印結果,實現一個並發的效果,效率有所提升。