一日一技:在Python 的執行緒中運行協程

  • 2020 年 2 月 26 日
  • 筆記

攝影:產品經理

下廚:kingname

一篇文章理解Python非同步編程的基本原理這篇文章中,我們講到,如果在非同步程式碼裡面又包含了一段非常耗時的同步程式碼,非同步程式碼就會被卡住。

那麼有沒有辦法讓同步程式碼與非同步程式碼看起來也是同時運行的呢?方法就是使用事件循環的.run_in_executor()方法。

我們來看一下 Python 官方文檔[1]中的說法:

那麼怎麼使用呢?還是以非常耗時的遞歸方式計算斐波那契數列的這個函數為例:

def sync_calc_fib(n):      if n in [1, 2]:          return1      return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)      async def calc_fib(n):      result = sync_calc_fib(n)      print(f'第 {n} 項計算完成,結果是:{result}')      return result  

我們現在需要用 aiohttp 訪問一個延遲5秒的網頁,同時計算斐波那契數列第36項。

首先我們看看單獨計算第36項需要5秒鐘:

我們再來看看如果直接把這計算斐波那契數列和請求網站的兩個非同步任務放在一起「並行」,實際時間是兩個任務的時間疊加:

具體原因我在上一篇文章裡面已經做了說明。

現在,我想讓兩個任務「同時運行」,於是就可以這樣修改程式碼:

import aiohttp  import asyncio  import time  from concurrent.futures import ThreadPoolExecutor      async def request(sleep_time):      async with aiohttp.ClientSession() as client:          resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')          resp_json = await resp.json()          print(resp_json)      def sync_calc_fib(n):      if n in [1, 2]:          return 1      return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)      def calc_fib(n):      result = sync_calc_fib(n)      print(f'第 {n} 項計算完成,結果是:{result}')      return result      async def main():      start = time.perf_counter()      loop = asyncio.get_event_loop()      with ThreadPoolExecutor(max_workers=4) as executor:          tasks_list = [              loop.run_in_executor(executor, calc_fib, 36),              asyncio.create_task(request(5))          ]          await asyncio.gather(*tasks_list)          end = time.perf_counter()          print(f'總計耗時:{end - start}')      asyncio.run(main())  

運行效果如下圖所示:

在5秒鐘的時間,就把計算斐波那契數列和請求5秒延遲的網站都做完了。

實現這樣的轉變,關鍵的程式碼就是:loop.run_in_executor(executor, calc_fib, 36)

其中的 loop就是主執行緒的事件循環(event loop),它是用來調度同一個執行緒裡面的多個協程。

executor是我們使用ThreadPoolExecutor(max_workers=4)創建的一個有4個執行緒的執行緒池,calc_fib是一個耗時的同步函數,36是傳入calc_fib的參數。loop.run_in_executor(executor, calc_fib, 36)的意思是說:

  1. calc_fib函數放到執行緒池裡面去運行
  2. 給執行緒池增加一個回調函數,這個回調函數會在運行結束後的下一次事件循環把結果保存下來。

請注意上圖中紅色箭頭對應的calc_fib這是一個同步函數,請與上一篇文章中的非同步函數區分開。run_in_executor的第二個參數需要是一個同步函數的函數名。

在上面的例子中,我們創建的是有4個執行緒的執行緒池。所以這個執行緒池最多允許4個阻塞式的同步函數「並行」。

參考資料

[1]

Python 官方文檔: https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools