Python協程之asyncio

asyncio 是 Python 中的異步IO庫,用來編寫並發協程,適用於IO阻塞且需要大量並發的場景,例如爬蟲、文件讀寫。

asyncio 在 Python3.4 被引入,經過幾個版本的迭代,特性、語法糖均有了不同程度的改進,這也使得不同版本的 Python 在 asyncio 的用法上各不相同,顯得有些雜亂,以前使用的時候也是本着能用就行的原則,在寫法上走了一些彎路,現在對 Python3.7+ 和 Python3.6 中 asyncio 的用法做一個梳理,以便以後能更好的使用。

協程與asyncio

協程,又稱微線程,它不被操作系統內核所管理,而完全是有程序控制,協程切換花銷小,因而有更高的性能。

協程可以比作子程序,不同的是,執行過程中協程可以掛起當前狀態,轉而執行其他協程,在適當的時候返回來接着執行,協程間的切換不需要涉及任何系統調用或任何阻塞調用,完全由協程調度器進行調度。

Python 中以 asyncio 為依賴,使用 async/await 語法糖進行協程的創建和使用,如下 async 語法創建一個協程函數:

async def work():
    pass

在協程中除了普通函數的功能外最主要的作用就是:使用 await 語法等待另一個協程結束,這將掛起當前協程,直到另一個協程產生結果再繼續執行:

async def work():
    await asyncio.sleep(1)
    print('continue')

asyncio.sleep() 是 asyncio 包內置的協程函數,這裡模擬耗時的IO操作,上面這個協程執行到這一句會掛起當前協程而去執行其他協程,直到sleep結束,當有多個協程任務是,這種切換會讓它們的IO操作並行處理。

注意,執行一個協程函數並不會真正的運行它,而是會返回一個協程對象,要使協程真正的運行,需要將它們加入到事件循環中運行,官方建議 asyncio 程序應當有一個主入口協程,用來管理所有其他的協程任務:

async def main():
    await work()

在 Python3.7+ 中,運行這個 asyncio 程序只需要一句:asyncio.run(main()) ,而在 Python3.6 中,需要手動獲取事件循環並加入協程任務:

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

事件循環就是一個循環隊列,對其中的協程進行調度執行,當把一個協程加入循環,這個協程創建的其他協程都會自動加入到當前事件循環中。

其實協程對象也不是直接運行,而是被封裝成一個個待執行的 Task ,大多數情況下 asyncio 會幫我們進行封裝,我們也可以提前自行封裝 Task 來獲得對協程更多的控制權,注意,封裝 Task 需要當前線程有正在運行的事件循環,否則將引 RuntimeError,這也就是官方建議使用主入口協程的原因,如果在主入口協程之外創建任務就需要先手動獲取事件循環然後使用底層的方法 loop.create_task(),任務創建後便有了狀態,可以查看運行情況,查看結果,取消任務等:

async def main():
    task = asyncio.create_task(work())
    print(task)
    await task
    print(task)

#----執行結果----#
<Task pending name='Task-2' coro=<work() running at d:\tmp\code\asy.py:5>>
<Task finished name='Task-2' coro=<work() done, defined at d:\tmp\code\asy.py:5> result=None>

asyncio.create_task() 是 Python3.7 加入的高層級API,在 Python3.6,需要使用低層級API asyncio.ensure_future() 來創建 Future,Future 也是一個管理協程運行狀態的對象,與 Task 沒有本質上的區別。

並發協程

通常,一個含有一系列並發協程的程序寫法如下(Python3.7+):

import asyncio
import time


async def work(num: int):
    '''
    一個工作協程,接收一個數字,將它 +1 後返回
    '''
    print(f'working {num} ...')
    await asyncio.sleep(1)    # 模擬耗時的IO操作
    print(f'{num} -> {num+1} done')
    return num + 1


async def main():
    '''
    主協程,創建一系列並發協程並運行它們
    '''
    # 任務隊列
    tasks = [work(num) for num in range(0, 5)]
    # 並發執行隊列中的協程並等待結果返回
    results = await asyncio.gather(*tasks)
    print(results)


if __name__ == "__main__":
    asyncio.run(main())

並發運行多個協程任務的關鍵就是 asyncio.gather(*tasks),它接受多個協程任務並將它們加入到事件循環,所有任務都運行完成後會返回結果列表,這裡我們也沒有手動封裝 Task,因為 gather 函數會自動封裝。

並發運行還有另一個方法 asyncio.wait(tasks),它們的區別是:

  • gather 比 wait 更加高層,gather 可以將任務分組,一般優先使用 gather:
tasks1 = [work(num) for num in range(0, 5)]
tasks2 = [work(num) for num in range(5, 10)]
group1 = asyncio.gather(*tasks1)
group2 = asyncio.gather(*tasks2)
results1, results2 = await asyncio.gather(group1, group2)
print(results1, results2)
  • 在某些定製化任務需求的時候,可以使用 wait:
# Python3.8 版本後,直接向 wait() 傳入協程對象已棄用,必須手動創建 Task
tasks = [asyncio.create_task(work(num)) for num in range(0, 5)]
done, pending = await asyncio.wait(tasks)
for task in tasks:
    if task in done:
        print(task.result())
for p in pending:
    p.cancel()

Tips

  • await 語句後必須是一個 可等待對象 ,可等待對象主要有三種:Python協程,Task,Future。通常情況下沒有必要在應用層級的代碼中創建 Future 對象。
  • 在 asyncio 程序中使用同步代碼雖然並不會報錯,但是也失去了並發的意義,例如網絡請求,如果使用僅支持同步的 requests,在發起一次請求後在收到響應結果之前不能發起其他請求,這樣要並發訪問多個網頁時,即使使用了 asyncio,在發送一次請求後切換到其他協程還是會因為同步問題而阻塞,並不能有速度上的提升,這時候就需要其他支持異步請求庫如 aiohttp
  • 關於 asyncio 的更多更詳細的操作見 官方文檔