Python非同步協程(asyncio詳解)

續上篇講解yield from部落格,上篇鏈接://www.cnblogs.com/Red-Sun/p/16889182.html
PS:本部落格是個人筆記分享,不需要掃碼加群或必須關注什麼的(如果外站需要加群或關注的可以直接去我主頁查看)
歡迎大家光臨ヾ(≧▽≦*)o我的部落格首頁//www.cnblogs.com/Red-Sun/
首先要了解什麼是協程,其次知道非同步跟同步的區別。(PS:個人喜歡多做比喻,不恰當地方望指正)
本文僅僅是個人學習筆記,有錯的地方望各位指點。
如果把進程比作從A處到B處去這件事,那麼執行緒就是可供選擇的多條道路,協程就是道路上特殊路段(類似限速,一整條道路都是特殊路段的話,就是全部由協程實現)
例圖如下:

1. 什麼是協程(Coroutines)

在了解非同步之前,先大致了解一下什麼是協程。
網上的講法有各種:

  • 協程是一種比執行緒更加輕量級的存在
  • 協程是一種用戶級的輕量級執行緒
  • 協程,又稱微執行緒

大體看過之後就感覺,我好像懂了,有好像沒懂,個人感覺有點暈乎乎的,沒太明白。(PS:可能是我個人智商沒夠不能快速領悟的原因)
個人理解(PS:不涉及其本質來源、底層實現、僅僅就著這個非同步爬蟲來說):協程就像一條帶應急車道的高速公路(具體作用就是讓任務有了暫停切換功能)
執行緒:把需要執行的任務比作汽車,執行緒就像一條單行且只有一條道的高速公路,只有等前一輛車到達終點後面的車才能出發,如果其中一輛出了事情停在了路上,那麼這倆車後面的車就只能原地等待直到它恢復併到達終點才能繼續上路。
協程:把需要執行的任務比作汽車,協程就像一條帶應急車道的高速公路,如果汽車在中途出了問題就可以直接到一邊的應急車道停下處理問題,下一輛車可以直接上路,簡單來說就是可以通過程式控制哪輛車行駛,哪輛車在應急車道休息。

2.同步跟非同步

同步跟非同步是兩個相對的概念:
同步:意味著有序
非同步:意味著無序
小故事模擬事件:
小明在家需要完成如下事情:

  1. 電飯鍋煮飯大約30分鐘
  2. 洗衣機洗衣服大約40分鐘
  3. 寫作業大約50分鐘

在同步情況下:小明需要電飯鍋處等待30分鐘、洗衣機處等待40分鐘、寫作業50分鐘,總計花費時間120分鐘。
在非同步情況下:小明需要電飯鍋處理並啟動花費10分鐘、洗衣機處理並啟動花費10分鐘,寫作業花費50分鐘,總計花費時間70分鐘。
即同步必須一件事情結束之後再進行下一件事,非同步是可以在一件事情沒結束就去處理另外一件事情了。
注意:此處非同步比同步耗時更短是有前提條件的!要是I/O阻塞才可以(說人話:類似電飯鍋煮飯,電飯鍋可以自行完成這種的)
如果把條件中的電飯鍋換成柴火,洗衣機換成搓衣板,那麼事情就只能一件一件完成了,兩者耗時相近。

3.asyncio非同步協程

asyncio即Asynchronous I/O是python一個用來處理並發(concurrent)事件的包,是很多python非同步架構的基礎,多用於處理高並發網路請求方面的問題。
此處使用的是Python 3.5之後出現的async/await來實現協程,需要yield實現協程的可以去我上篇部落格瞅瞅:點擊此處快速跳轉

基礎補充(比較基礎的內容懂的可以直接跳)

  1. 普通函數
def function():
    return 1

2.由async做前綴的普通函數變成了非同步函數

async def asynchronous():
    return 1

而非同步函數不同於普通函數不可能被直接調用

async def asynchronous():
    return 1

print(asynchronous())


嘗試用send驅動這個協程

async def asynchronous():
    return 1

asynchronous().send(None)


值有了不過存儲在了這個StopIteration報錯中,於是有了下方的執行器

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : async_function.py
# @Software: PyCharm

async def asynchronous():
    return 1


def run(async_function):  # 用try解決報錯問題,運行協程函數
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(asynchronous))


成功執行(`・ω・´)ゞ(`・ω・´)ゞ


在協程函數中await的使用(PS:await只能使用在有async修飾的函數中不然會報錯)
await的作用是掛起自身的協程,直到await修飾的協程完成並返回結果(可參照第一點什麼是協程中的描述)

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : await_function.py
# @Software: PyCharm

async def asynchronous():
    return 1

async def await_function():  # await掛起自身函數,等待另外協程函數運行完畢
    result = await asynchronous()
    return result

def run(async_function):  # 用try解決報錯問題,運行協程函數
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(await_function))


執行流程 run函數->await_function函數->執行到await時->await_function掛起(暫停等待)->asynchronous函數執行並返回1 ->await_function繼續運行返回result ->print列印result值

使用進階

對asyncio的使用首先要了解:

  1. 事件循環

創建一個循環類似不停運行的洗衣機,把事件(類似衣服)放到循環中,個人描述就像是將需要清洗的衣服都放到洗衣機中一共處理。

  1. Future

Future對象表示未完成的計算,還未完成的結果(PS:等待要洗的衣服們(假想成臟衣服堆))

  1. Task

是Future的子類,作用是在運行某個任務的同時可以並發的運行多個任務。(PS:那個臟衣服堆中的單獨一件,可以被扔到洗衣機洗的臟衣服)

3.8版本之前的程式碼

先講需要自己創建loop的後面再講3.8更新後的更容易記憶一點(PS:3.8的更為簡約想直接看3.8版的也可)

1.下面是一個基礎的運行實例
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例1.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('花費時間:{}秒'.format(time.time()-now_time))

event = async_function()  # 創建協程事件對象

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
loop.run_until_complete(event)  # 通過run_until_complete方法直接運行event,該方法會一直等待直到event運行完畢
loop.close()  # 結束循環


2.關於task對象的操作
(1)創建任務對象並列印其狀態
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例2.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('花費時間:{}秒'.format(time.time()-now_time))

event = async_function()  # 創建協程事件對象

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
task = loop.create_task(event)  # 創建任務對象
print(task)  # 任務運行中task
loop.run_until_complete(task)  # 等待task運行完畢
print(task)  # 任務運行結束task狀態
loop.close()  # 結束循環


運行中:狀態顯示為running
運行結束後:狀態顯示done,result為協程函數返回值,因為此函數無返回值所以為None

(2)獲取task返回值
  • 方法一:通過task.result()的方法獲取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 10:40
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例3.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '花費時間:{}秒'.format(time.time() - now_time)  # 將列印語句換成返回值


event = async_function()  # 創建協程事件對象

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
task = loop.create_task(event)  # 創建任務對象
print(task)  # 任務運行中task
try:
    print(task.result())  # 任務未完成列印result會拋出InvalidStateError錯誤
except asyncio.InvalidStateError as r:
    print(r)  # InvalidStateError報錯資訊
loop.run_until_complete(task)  # 等待task運行完畢
print(task)  # 任務運行結束task狀態
print(task.result())  # 列印出task的返回值
loop.close()  # 結束循環

  • 方法二:通過add_done_callback()添加完成回調
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 11:15
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例4.py
# @Software: PyCharm

import asyncio
import time


def task_callback(future):  # 回調函數獲取任務完成後的返回值
    print(future.result())

async def async_function():  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '花費時間:{}秒'.format(time.time() - now_time)  # 將列印語句換成返回值


event = async_function()  # 創建協程事件對象
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
task = loop.create_task(event)  # 創建任務對象

task.add_done_callback(task_callback)  # 為而任務添加回調函數

loop.run_until_complete(task)  # 等待task運行完畢
loop.close()  # 結束循環


通過 Future 的 add_done_callback() 方法來添加回調函數,當任務完成後,程式會自動觸發該回調函數,並將對應的 Future 對象作為參數傳給該回調函數。
PS:Function ‘add_done_callback’ doesn’t return anything(函數「add_done_callback」不返回任何內容)

3.多任務tasks的實現
(1)通過asyncio.wait()來控制多任務
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 14:12
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例5.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('協程花費時間:{}秒'.format(time.time() - now_time))  

now_time = time.time()  # 程式運行時的時間戳
events = [async_function(num=num) for num in range(1, 4)]  # 創建協程事件列表
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
tasks = asyncio.wait(events)  # 通過asyncio.wait(events)創建多任務對象


loop.run_until_complete(tasks)  # 等待task運行完畢
loop.close()  # 結束循環
print('總運行花費時常:{}秒'.format(time.time() - now_time))

(2)多任務獲取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:38
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例6.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '協程花費時間:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程式運行時的時間戳
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
tasks = [loop.create_task(async_function(num=num)) for num in range(1, 4)]  # 通過事件循環的create_task方法創建任務列表
events = asyncio.wait(tasks)  # 通過asyncio.wait(tasks)將任務收集起來

loop.run_until_complete(events)  # 等待events運行完畢
for task in tasks:  # 遍歷循環列表,將對應任務返回值列印出來
    print(task.result())
loop.close()  # 結束循環

print('總運行花費時常:{}秒'.format(time.time() - now_time))

(3)通過add_done_callback()添加回調
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:58
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例7.py
# @Software: PyCharm
import asyncio
import time


def task_callback(future):  # 回調函數獲取任務完成後的返回值
    print(future.result())


async def async_function(num):  # async修飾的非同步函數,在該函數中可以添加await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future對象就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '協程花費時間:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程式運行時的時間戳
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件循環對象
tasks = []  # 任務收集列表(PS:就像臟衣服堆)
for num in range(1, 4):
    task = loop.create_task(async_function(num=num))  # 創建單個任務(單件臟衣服)
    task.add_done_callback(task_callback)  # 為每個任務添加對應的回調函數
    tasks.append(task)
events = asyncio.wait(tasks)  # 通過asyncio.wait(tasks)將任務收集起來PS:想像成裝臟衣服的籃子

loop.run_until_complete(events)  # 等待events運行完畢

loop.close()  # 結束循環

print('紅後總運行花費時長:{}秒'.format(time.time() - now_time))

4.動態不停添加任務task實現

除了像上面第第三點那種設定循環一口氣執行的(就像把臟衣服一口氣塞進洗衣機),還可以一個一個執行(把臟衣服一件一件放進去)。
方法:另外創建一條執行緒,在其中創建一個一直循環的事件循環。(PS:換個大地方放下一台能夠一直運行的洗衣機,就可以把臟衣服一件一件丟進去了)

(1)同步狀態下
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 14:22
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例8.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 創建執行緒版洗衣機
    asyncio.set_event_loop(loop)  # 在執行緒中調用loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 會永遠阻塞當前執行緒,直到有人停止了該loop為止。


def function(num):  # 同步執行的任務方法
    print('任務{}花費時間:{}秒'.format(num, time.time() - now_time))
    return '任務{}完成時間:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程式運行時的時間戳
new_loop = asyncio.new_event_loop()  # 創建一個新的loop,get_event_loop()只會在主執行緒創建新的event loop,其他執行緒中調用 get_event_loop() 則會報錯
t = Thread(target=thread_new_loop, args=(new_loop,))  # 創建執行緒
t.start()  # 啟動執行緒
even = new_loop.call_soon_threadsafe(function, 1)  # 調用call_soon_threadsafe實現回調(詳細描述往下找)
even.cancel()  # 當call_soon_threadsafe對象執行cancel()方法就會取消該任務事件(當速度夠快有概率取消前已經執行)
new_loop.call_soon_threadsafe(function, 2)
new_loop.call_soon_threadsafe(function, 3)


loop.call_soon():傳入目標函數和參數,可以將目標函數放到事件循環loop中,返回值是一個 asyncio.Handle 對象,此對象內只有一個方法為 cancel()方法,用來取消回調函數。
loop.call_soon_threadsafe() :比上一個多了個threadsafe保護執行緒安全。

(2)非同步狀態下

與同步相比,函數為非同步函數並且通過asyncio.run_coroutine_threadsafe()方法回調

# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 16:16
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例9.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 創建執行緒版洗衣機
    asyncio.set_event_loop(loop)  # 在執行緒中調用loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 會永遠阻塞當前執行緒,直到有人停止了該loop為止。


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}花費時間:{}秒'.format(num, time.time() - now_time))
    return '非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程式運行時的時間戳
new_loop = asyncio.new_event_loop()  # 創建一個新的loop,get_event_loop()只會在主執行緒創建新的event loop,其他執行緒中調用 get_event_loop() 則會報錯
t = Thread(target=thread_new_loop, args=(new_loop,))  # 創建執行緒
t.start()  # 啟動執行緒
even = asyncio.run_coroutine_threadsafe(async_function(1), new_loop)  # 調用asyncio.run_coroutine_threadsafe實現回調
even.cancel()  # 當run_coroutine_threadsafe對象執行cancel()方法就會取消該任務事件(當速度夠快有概率取消前已經執行)
asyncio.run_coroutine_threadsafe(async_function(2), new_loop)
asyncio.run_coroutine_threadsafe(async_function(3), new_loop)
print('紅後主進程運行花費時長:{}秒'.format(time.time() - now_time))



因為使用了loop.run_forever()所以會一直啟用事件循環到stop()的調用終止。
若要主執行緒退出時子執行緒也退出,可以設置子執行緒為守護執行緒 t.setDaemon(True)需要在執行緒執行前設置。

3.8以後的(PS:只要簡單使用直接看這個就行)

運行協程的三種基本方式
async.run() 運行協程
async.create_task()創建task
async.gather()獲取返回值

(1)用run()運行協程
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:34
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例10.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time))


now_time = time.time()  # 程式運行時的時間戳
asyncio.run(async_function(1))  # 用asyncio.run直接運行協程參數為協程函數及其參數

(2)用create_task()創建task
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:37
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例11.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time))


async def main():  # 非同步主函數用於調度其他非同步函數
    tasks = []  # tasks列表用於存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    for task in tasks:
        await task


now_time = time.time()  # 程式運行時的時間戳
asyncio.run(main())  # 用asyncio.run直接運行協程參數為協程函數及其參數
print('【紅後】最終執行時間:{}'.format(time.time() - now_time))


PS:必須先通過asyncio.create_task將task創建到event loop中,再通過await等待,如果直接用await等待則會導致非同步變同步

(3)用gather()收集返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/29 9:25
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 實例12.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    return '非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time)


async def main():  # 非同步主函數用於調度其他非同步函數
    tasks = []  # tasks列表用於存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])  # 將task作為參數傳入gather,等非同步任務都結束後返回結果列表
    print(response)

now_time = time.time()  # 程式運行時的時間戳
asyncio.run(main())  # 用asyncio.run直接運行協程參數為協程函數及其參數
print('【紅後】最終執行時間:{}'.format(time.time() - now_time))