運籌帷幄決勝千里,Python3.10原生協程asyncio工業級真實協程非同步消費任務調度實踐
我們一直都相信這樣一種說法:協程是比多執行緒更高效的一種並發工作方式,它完全由程式本身所控制,也就是在用戶態執行,協程避免了像執行緒切換那樣產生的上下文切換,在性能方面得到了很大的提升。毫無疑問,這是顛撲不破的業界共識,是放之四海而皆準的真理。
但事實上,協程遠比大多數人想像中的複雜,正因為協程的「用戶態」特性,任務調度權掌握在撰寫協程任務的人手裡,而僅僅依賴async和await關鍵字遠遠達不到「調度」的級別,有時候反而會拖累任務效率,使其在任務執行效率上還不及「系統態」的多執行緒和多進程,本次我們來探討一下Python3原生協程任務的調度管理。
Python3.10協程庫async.io的基本操作
事件循環(Eventloop)是 原生協程庫asyncio 的核心,可以理解為總指揮。Eventloop實例提供了註冊、取消和執行任務和回調的方法。
Eventloop可以將一些非同步方法綁定到事件循環上,事件循環會循環執行這些方法,但是和多執行緒一樣,同時只能執行一個方法,因為協程也是單執行緒執行。當執行到某個方法時,如果它遇到了阻塞,事件循環會暫停它的執行去執行其他的方法,與此同時為這個方法註冊一個回調事件,當某個方法從阻塞中恢復,下次輪詢到它的時候將會繼續執行,亦或者,當沒有輪詢到它,它提前從阻塞中恢復,也可以通過回調事件進行切換,如此往複,這就是事件循環的簡單邏輯。
而上面最核心的動作就是切換別的方法,怎麼切換?用await關鍵字:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
async def job2():
print('job2開始')
async def main():
await job1()
await job2()
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job1結束
job2開始
是的,切則切了,可切的對嗎?事實上這兩個協程任務並沒有達成「協作」,因為它們是同步執行的,所以並不是在方法內await了,就可以達成協程的工作方式,我們需要並發啟動這兩個協程任務:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
async def job2():
print('job2開始')
async def main():
#await job1()
#await job2()
await asyncio.gather(job1(), job2())
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job2開始
job1結束
如果沒有asyncio.gather的參與,協程方法就是普通的同步方法,就算用async聲明了非同步也無濟於事。而asyncio.gather的基礎功能就是將協程任務並發執行,從而達成「協作」。
但事實上,Python3.10也支援「同步寫法」的協程方法:
async def create_task():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
await task1
await task2
這裡我們通過asyncio.create_task對job1和job2進行封裝,返回的對象再通過await進行調用,由此兩個單獨的非同步方法就都被綁定到同一個Eventloop了,這樣雖然寫法上同步,但其實是非同步執行:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
async def job2():
print('job2開始')
async def create_task():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
await task1
await task2
async def main():
#await job1()
#await job2()
await asyncio.gather(job1(), job2())
if __name__ == '__main__':
asyncio.run(create_task())
系統返回:
job1開始
job2開始
job1結束
協程任務的上下游監控
解決了並發執行的問題,現在假設每個非同步任務都會返回一個操作結果:
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
通過asyncio.gather方法,我們可以收集到任務執行結果:
async def main():
res = await asyncio.gather(job1(), job2())
print(res)
並發執行任務:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
async def main():
res = await asyncio.gather(job1(), job2())
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job2開始
job1結束
['job1', 'job2']
但任務結果僅僅也就是方法的返回值,除此之外,並沒有其他有價值的資訊,對協程任務的執行明細諱莫如深。
現在我們換成asyncio.wait方法:
async def main():
res = await asyncio.wait([job1(), job2()])
print(res)
依然並發執行:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
async def main():
res = await asyncio.wait([job1(), job2()])
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job2開始
job1結束
({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任務結果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任務結果'>}, set())
可以看出,asyncio.wait返回的是任務對象,裡面存儲了大部分的任務資訊,包括執行狀態。
在默認情況下,asyncio.wait會等待全部任務完成 (return_when=’ALL_COMPLETED’),它還支援 return_when=’FIRST_COMPLETED’(第一個協程完成就返回)和 return_when=’FIRST_EXCEPTION’(出現第一個異常就返回)。
這就非常令人興奮了,因為如果非同步消費任務是發簡訊之類的需要統計達到率的任務,利用asyncio.wait特性,我們就可以第一時間記錄任務完成或者異常的具體時間。
協程任務守護
假設由於某種原因,我們手動終止任務消費:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
async def main():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
task1.cancel()
res = await asyncio.gather(task1, task2)
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統報錯:
File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main
res = await asyncio.gather(task1, task2)
asyncio.exceptions.CancelledError
這裡job1被手動取消,但會影響job2的執行,這違背了協程「互相提攜」的特性。
事實上,asyncio.gather方法可以捕獲協程任務的異常:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
async def main():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
task1.cancel()
res = await asyncio.gather(task1, task2,return_exceptions=True)
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job2開始
[CancelledError(''), 'job2任務結果']
可以看到job1沒有被執行,並且異常替代了任務結果作為返回值。
但如果協程任務啟動之後,需要保證任務情況下都不會被取消,此時可以使用asyncio.shield方法守護協程任務:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())
res = await asyncio.gather(task1, task2,return_exceptions=True)
task1.cancel()
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job2開始
job1結束
['job1任務結果', 'job2任務結果']
協程任務回調
假設協程任務執行完畢之後,需要立刻進行回調操作,比如將任務結果推送到其他介面服務上:
import asyncio
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
def callback(future):
print(f'回調任務: {future.result()}')
async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())
task1.add_done_callback(callback)
res = await asyncio.gather(task1, task2,return_exceptions=True)
print(res)
if __name__ == '__main__':
asyncio.run(main())
這裡我們通過add_done_callback方法對job1指定了callback方法,當任務執行完以後,callback會被調用,系統返回:
job1開始
job2開始
job1結束
回調任務: job1任務結果
['job1任務結果', 'job2任務結果']
與此同時,add_done_callback方法不僅可以獲取協程任務返回值,它自己也支援參數參數傳遞:
import asyncio
from functools import partial
async def job1():
print('job1開始')
await asyncio.sleep(1)
print('job1結束')
return "job1任務結果"
async def job2():
print('job2開始')
return "job2任務結果"
def callback(future,num):
print(f"回調參數{num}")
print(f'回調任務: {future.result()}')
async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())
task1.add_done_callback(partial(callback,num=1))
res = await asyncio.gather(task1, task2,return_exceptions=True)
print(res)
if __name__ == '__main__':
asyncio.run(main())
系統返回:
job1開始
job2開始
job1結束
回調參數1
回調任務: job1任務結果
['job1任務結果', 'job2任務結果']
結語
成也用戶態,敗也用戶態。所謂水能載舟亦能覆舟,協程消費任務的調度遠比多執行緒的系統級調度要複雜,稍不留神就會造成業務上的「同步」阻塞,弄巧成拙,適得其反。這也解釋了為什麼相似場景中多執行緒的出場率要遠遠高於協程,就是因為多執行緒不需要考慮啟動後的「切換」問題,無為而為,簡單粗暴。