創建進程池與執行緒池concurrent.futures模組的使用
- 2019 年 10 月 3 日
- 筆記
一、進程池。
當並發的任務數量遠遠大於電腦所能承受的範圍,即無法一次性開啟過多的任務數量就應該考慮去 限制進程數或執行緒數,從而保證伺服器不會因超載而癱瘓。這時候就出現了進程池和執行緒池。
二、concurrent.futures模組介紹
concurrent.futures模組提供了高度封裝的非同步調用介面
ThreadPoolExecutor:執行緒池,提供非同步調用
ProcessPoolExecutor:進程池,提供非同步調用
Both implement the same interface, which is defined by the abstract Executor class
三、基本方法:
submit(fn, *args, **kwargs)
:非同步提交任務
map(func, *iterables, timeout=None, chunksize=1)
:取代for循環submit的操作
shutdown(wait=True)
:相當於進程池的pool.close()+pool.join()
操作
- wait=True,等待池內所有任務執行完畢回收完資源後才繼續
- wait=False,立即返回,並不會等待池內的任務執行完畢
- 但不管wait參數為何值,整個程式都會等到所有任務執行完畢
- submit和map必須在shutdown之前
result(timeout=None)
:取得結果
add_done_callback(fn)
:回調函數
done()
:判斷某一個執行緒是否完成
cancle()
:取消某個任務
四、進程池程式碼實例——ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor from multiprocessing import current_process import time def func(i): print(f'進程 {current_process().name} 正在執行任務 {i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 進程池只有4個進程 lt = [] for i in range(20): # 假設執行20個任務 future = pool.submit(func,i) # func任務要做20次,4個進程負責完成這個20個任務 # print(future.result()) # 如果沒有結果就一直等待拿到結果,導致了所有任務都在串列 lt.append(future) pool.shutdown() # 默認為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞, for fu in lt: print(fu.result()) # 等待所有的任務都執行完了,一起把返回值列印出來
五、執行緒池程式碼示例——ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(i): print(f'執行緒 {currentThread().name} 正在執行任務 {i}') time.sleep(1) return i**2 if __name__ == '__main__': fool = ThreadPoolExecutor(4) # 執行緒池裡只有4個執行緒 lt = [] for i in range(20): futrue = fool.submit(func,i) # func任務要做20次,4個執行緒負責完成這20次任務 lt.append(futrue) fool.shutdown() # 默認為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞, for fu in lt: print(fu.result()) # 等待所有的任務都執行完了,一起把返回值列印出來
六、回調函數add_done_callback(fn)
提交任務的兩種方式:
同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行程式碼
非同步: 提交了一個任務,不要等執行完了,可以直接執行下一行程式碼。
ps:進程和執行緒回調方法的使用寫一塊了,注釋掉的是進程的使用。
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'執行緒 {currentThread().name} 正在執行任務 {i}') # print(f'進程 {current_process().name} 正在執行任務 {i}') time.sleep(1) return i**2 def parse(futrue): # 處理拿到的結果 print(futrue.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 執行緒池裡只有4個執行緒 # pool = ProcessPoolExecutor(4) # 進程池裡只有4個進程 lt = [] for i in range(20): futrue = pool.submit(task,i) # task任務要做20次,分別由四個進程完成這20個任務 futrue.add_done_callback(parse) # 為當前任務綁定一個函數,在當前任務執行結束的時候會觸發這個函數 # 會把futrue對象作為參數傳給函數 # 這個稱之為回調函數,處理完了回來就調用這個函數。
跟上面執行緒池裡的例子相比:回調函數的作用,不需要等待所有的任務執行完才列印返回值。每執行完一個任務直接列印結果,實現一個並發的效果,效率有所提升。