運籌帷幄決勝千里,Python3.10原生協程asyncio工業級真實協程非同步消費任務調度實踐

我們一直都相信這樣一種說法:協程是比多執行緒更高效的一種並發工作方式,它完全由程式本身所控制,也就是在用戶態執行,協程避免了像執行緒切換那樣產生的上下文切換,在性能方面得到了很大的提升。毫無疑問,這是顛撲不破的業界共識,是放之四海而皆準的真理。

但事實上,協程遠比大多數人想像中的複雜,正因為協程的「用戶態」特性,任務調度權掌握在撰寫協程任務的人手裡,而僅僅依賴async和await關鍵字遠遠達不到「調度」的級別,有時候反而會拖累任務效率,使其在任務執行效率上還不及「系統態」的多執行緒和多進程,本次我們來探討一下Python3原生協程任務的調度管理。

Python3.10協程庫async.io的基本操作

事件循環(Eventloop)是 原生協程庫asyncio 的核心,可以理解為總指揮。Eventloop實例提供了註冊、取消和執行任務和回調的方法。

Eventloop可以將一些非同步方法綁定到事件循環上,事件循環會循環執行這些方法,但是和多執行緒一樣,同時只能執行一個方法,因為協程也是單執行緒執行。當執行到某個方法時,如果它遇到了阻塞,事件循環會暫停它的執行去執行其他的方法,與此同時為這個方法註冊一個回調事件,當某個方法從阻塞中恢復,下次輪詢到它的時候將會繼續執行,亦或者,當沒有輪詢到它,它提前從阻塞中恢復,也可以通過回調事件進行切換,如此往複,這就是事件循環的簡單邏輯。

而上面最核心的動作就是切換別的方法,怎麼切換?用await關鍵字:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
  
async def job2():  
    print('job2開始')  
  
  
async def main():  
    await job1()  
    await job2()  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job1結束  
job2開始

是的,切則切了,可切的對嗎?事實上這兩個協程任務並沒有達成「協作」,因為它們是同步執行的,所以並不是在方法內await了,就可以達成協程的工作方式,我們需要並發啟動這兩個協程任務:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
  
async def job2():  
    print('job2開始')  
  
  
async def main():  
    #await job1()  
    #await job2()  
    await asyncio.gather(job1(), job2())  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job2開始  
job1結束

如果沒有asyncio.gather的參與,協程方法就是普通的同步方法,就算用async聲明了非同步也無濟於事。而asyncio.gather的基礎功能就是將協程任務並發執行,從而達成「協作」。

但事實上,Python3.10也支援「同步寫法」的協程方法:

async def create_task():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    await task1  
    await task2

這裡我們通過asyncio.create_task對job1和job2進行封裝,返回的對象再通過await進行調用,由此兩個單獨的非同步方法就都被綁定到同一個Eventloop了,這樣雖然寫法上同步,但其實是非同步執行:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
  
async def job2():  
    print('job2開始')  
  
  
async def create_task():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    await task1  
    await task2  
  
  
async def main():  
    #await job1()  
    #await job2()  
    await asyncio.gather(job1(), job2())  
  
  
if __name__ == '__main__':  
    asyncio.run(create_task())

系統返回:

job1開始  
job2開始  
job1結束

協程任務的上下游監控

解決了並發執行的問題,現在假設每個非同步任務都會返回一個操作結果:

async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"

通過asyncio.gather方法,我們可以收集到任務執行結果:

async def main():  
  
    res = await asyncio.gather(job1(), job2())  
    print(res)

並發執行任務:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
  
async def main():  
  
    res = await asyncio.gather(job1(), job2())  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job2開始  
job1結束  
['job1', 'job2']

但任務結果僅僅也就是方法的返回值,除此之外,並沒有其他有價值的資訊,對協程任務的執行明細諱莫如深。

現在我們換成asyncio.wait方法:

async def main():  
  
    res = await asyncio.wait([job1(), job2()])  
    print(res)

依然並發執行:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
  
async def main():  
  
    res = await asyncio.wait([job1(), job2()])  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job2開始  
job1結束  
({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任務結果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任務結果'>}, set())

可以看出,asyncio.wait返回的是任務對象,裡面存儲了大部分的任務資訊,包括執行狀態。

在默認情況下,asyncio.wait會等待全部任務完成 (return_when=’ALL_COMPLETED’),它還支援 return_when=’FIRST_COMPLETED’(第一個協程完成就返回)和 return_when=’FIRST_EXCEPTION’(出現第一個異常就返回)。

這就非常令人興奮了,因為如果非同步消費任務是發簡訊之類的需要統計達到率的任務,利用asyncio.wait特性,我們就可以第一時間記錄任務完成或者異常的具體時間。

協程任務守護

假設由於某種原因,我們手動終止任務消費:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
  
async def main():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    task1.cancel()  
    res = await asyncio.gather(task1, task2)  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統報錯:

File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main  
    res = await asyncio.gather(task1, task2)  
asyncio.exceptions.CancelledError  
  

這裡job1被手動取消,但會影響job2的執行,這違背了協程「互相提攜」的特性。

事實上,asyncio.gather方法可以捕獲協程任務的異常:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
  
async def main():  
    task1 = asyncio.create_task(job1())  
    task2 = asyncio.create_task(job2())  
    task1.cancel()  
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job2開始  
[CancelledError(''), 'job2任務結果']

可以看到job1沒有被執行,並且異常替代了任務結果作為返回值。

但如果協程任務啟動之後,需要保證任務情況下都不會被取消,此時可以使用asyncio.shield方法守護協程任務:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    task1.cancel()  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job2開始  
job1結束  
['job1任務結果', 'job2任務結果']

協程任務回調

假設協程任務執行完畢之後,需要立刻進行回調操作,比如將任務結果推送到其他介面服務上:

import asyncio  
  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
def callback(future):  
    print(f'回調任務: {future.result()}')  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
  
    task1.add_done_callback(callback)  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

這裡我們通過add_done_callback方法對job1指定了callback方法,當任務執行完以後,callback會被調用,系統返回:

job1開始  
job2開始  
job1結束  
回調任務: job1任務結果  
['job1任務結果', 'job2任務結果']

與此同時,add_done_callback方法不僅可以獲取協程任務返回值,它自己也支援參數參數傳遞:

import asyncio  
from functools import partial  
  
async def job1():  
    print('job1開始')  
    await asyncio.sleep(1)  
    print('job1結束')  
  
    return "job1任務結果"  
  
  
async def job2():  
    print('job2開始')  
  
    return "job2任務結果"  
  
  
def callback(future,num):  
    print(f"回調參數{num}")  
    print(f'回調任務: {future.result()}')  
  
  
  
async def main():  
    task1 = asyncio.shield(job1())  
    task2 = asyncio.create_task(job2())  
  
    task1.add_done_callback(partial(callback,num=1))  
      
    res = await asyncio.gather(task1, task2,return_exceptions=True)  
  
    print(res)  
  
  
if __name__ == '__main__':  
    asyncio.run(main())

系統返回:

job1開始  
job2開始  
job1結束  
回調參數1  
回調任務: job1任務結果  
['job1任務結果', 'job2任務結果']

結語

成也用戶態,敗也用戶態。所謂水能載舟亦能覆舟,協程消費任務的調度遠比多執行緒的系統級調度要複雜,稍不留神就會造成業務上的「同步」阻塞,弄巧成拙,適得其反。這也解釋了為什麼相似場景中多執行緒的出場率要遠遠高於協程,就是因為多執行緒不需要考慮啟動後的「切換」問題,無為而為,簡單粗暴。