Python异步协程(asyncio详解)

续上篇讲解yield from博客,上篇链接://www.cnblogs.com/Red-Sun/p/16889182.html
PS:本博客是个人笔记分享,不需要扫码加群或必须关注什么的(如果外站需要加群或关注的可以直接去我主页查看)
欢迎大家光临ヾ(≧▽≦*)o我的博客首页//www.cnblogs.com/Red-Sun/
首先要了解什么是协程,其次知道异步跟同步的区别。(PS:个人喜欢多做比喻,不恰当地方望指正)
本文仅仅是个人学习笔记,有错的地方望各位指点。
如果把进程比作从A处到B处去这件事,那么线程就是可供选择的多条道路,协程就是道路上特殊路段(类似限速,一整条道路都是特殊路段的话,就是全部由协程实现)
例图如下:

1. 什么是协程(Coroutines)

在了解异步之前,先大致了解一下什么是协程。
网上的讲法有各种:

  • 协程是一种比线程更加轻量级的存在
  • 协程是一种用户级的轻量级线程
  • 协程,又称微线程

大体看过之后就感觉,我好像懂了,有好像没懂,个人感觉有点晕乎乎的,没太明白。(PS:可能是我个人智商没够不能快速领悟的原因)
个人理解(PS:不涉及其本质来源、底层实现、仅仅就着这个异步爬虫来说):协程就像一条带应急车道的高速公路(具体作用就是让任务有了暂停切换功能)
线程:把需要执行的任务比作汽车,线程就像一条单行且只有一条道的高速公路,只有等前一辆车到达终点后面的车才能出发,如果其中一辆出了事情停在了路上,那么这俩车后面的车就只能原地等待直到它恢复并到达终点才能继续上路。
协程:把需要执行的任务比作汽车,协程就像一条带应急车道的高速公路,如果汽车在中途出了问题就可以直接到一边的应急车道停下处理问题,下一辆车可以直接上路,简单来说就是可以通过程序控制哪辆车行驶,哪辆车在应急车道休息。

2.同步跟异步

同步跟异步是两个相对的概念:
同步:意味着有序
异步:意味着无序
小故事模拟事件:
小明在家需要完成如下事情:

  1. 电饭锅煮饭大约30分钟
  2. 洗衣机洗衣服大约40分钟
  3. 写作业大约50分钟

在同步情况下:小明需要电饭锅处等待30分钟、洗衣机处等待40分钟、写作业50分钟,总计花费时间120分钟。
在异步情况下:小明需要电饭锅处理并启动花费10分钟、洗衣机处理并启动花费10分钟,写作业花费50分钟,总计花费时间70分钟。
即同步必须一件事情结束之后再进行下一件事,异步是可以在一件事情没结束就去处理另外一件事情了。
注意:此处异步比同步耗时更短是有前提条件的!要是I/O阻塞才可以(说人话:类似电饭锅煮饭,电饭锅可以自行完成这种的)
如果把条件中的电饭锅换成柴火,洗衣机换成搓衣板,那么事情就只能一件一件完成了,两者耗时相近。

3.asyncio异步协程

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
此处使用的是Python 3.5之后出现的async/await来实现协程,需要yield实现协程的可以去我上篇博客瞅瞅:点击此处快速跳转

基础补充(比较基础的内容懂的可以直接跳)

  1. 普通函数
def function():
    return 1

2.由async做前缀的普通函数变成了异步函数

async def asynchronous():
    return 1

而异步函数不同于普通函数不可能被直接调用

async def asynchronous():
    return 1

print(asynchronous())


尝试用send驱动这个协程

async def asynchronous():
    return 1

asynchronous().send(None)


值有了不过存储在了这个StopIteration报错中,于是有了下方的执行器

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : async_function.py
# @Software: PyCharm

async def asynchronous():
    return 1


def run(async_function):  # 用try解决报错问题,运行协程函数
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(asynchronous))


成功执行(`・ω・´)ゞ(`・ω・´)ゞ


在协程函数中await的使用(PS:await只能使用在有async修饰的函数中不然会报错)
await的作用是挂起自身的协程,直到await修饰的协程完成并返回结果(可参照第一点什么是协程中的描述)

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : await_function.py
# @Software: PyCharm

async def asynchronous():
    return 1

async def await_function():  # await挂起自身函数,等待另外协程函数运行完毕
    result = await asynchronous()
    return result

def run(async_function):  # 用try解决报错问题,运行协程函数
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(await_function))


执行流程 run函数->await_function函数->执行到await时->await_function挂起(暂停等待)->asynchronous函数执行并返回1 ->await_function继续运行返回result ->print打印result值

使用进阶

对asyncio的使用首先要了解:

  1. 事件循环

创建一个循环类似不停运行的洗衣机,把事件(类似衣服)放到循环中,个人描述就像是将需要清洗的衣服都放到洗衣机中一共处理。

  1. Future

Future对象表示未完成的计算,还未完成的结果(PS:等待要洗的衣服们(假想成脏衣服堆))

  1. Task

是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。(PS:那个脏衣服堆中的单独一件,可以被扔到洗衣机洗的脏衣服)

3.8版本之前的代码

先讲需要自己创建loop的后面再讲3.8更新后的更容易记忆一点(PS:3.8的更为简约想直接看3.8版的也可)

1.下面是一个基础的运行实例
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例1.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    now_time = time.time()
    await asyncio.sleep(1)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    print('花费时间:{}秒'.format(time.time()-now_time))

event = async_function()  # 创建协程事件对象

loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
loop.run_until_complete(event)  # 通过run_until_complete方法直接运行event,该方法会一直等待直到event运行完毕
loop.close()  # 结束循环


2.关于task对象的操作
(1)创建任务对象并打印其状态
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例2.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    now_time = time.time()
    await asyncio.sleep(1)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    print('花费时间:{}秒'.format(time.time()-now_time))

event = async_function()  # 创建协程事件对象

loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
task = loop.create_task(event)  # 创建任务对象
print(task)  # 任务运行中task
loop.run_until_complete(task)  # 等待task运行完毕
print(task)  # 任务运行结束task状态
loop.close()  # 结束循环


运行中:状态显示为running
运行结束后:状态显示done,result为协程函数返回值,因为此函数无返回值所以为None

(2)获取task返回值
  • 方法一:通过task.result()的方法获取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 10:40
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例3.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    now_time = time.time()
    await asyncio.sleep(1)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    return '花费时间:{}秒'.format(time.time() - now_time)  # 将打印语句换成返回值


event = async_function()  # 创建协程事件对象

loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
task = loop.create_task(event)  # 创建任务对象
print(task)  # 任务运行中task
try:
    print(task.result())  # 任务未完成打印result会抛出InvalidStateError错误
except asyncio.InvalidStateError as r:
    print(r)  # InvalidStateError报错信息
loop.run_until_complete(task)  # 等待task运行完毕
print(task)  # 任务运行结束task状态
print(task.result())  # 打印出task的返回值
loop.close()  # 结束循环

  • 方法二:通过add_done_callback()添加完成回调
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 11:15
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例4.py
# @Software: PyCharm

import asyncio
import time


def task_callback(future):  # 回调函数获取任务完成后的返回值
    print(future.result())

async def async_function():  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    now_time = time.time()
    await asyncio.sleep(1)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    return '花费时间:{}秒'.format(time.time() - now_time)  # 将打印语句换成返回值


event = async_function()  # 创建协程事件对象
loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
task = loop.create_task(event)  # 创建任务对象

task.add_done_callback(task_callback)  # 为而任务添加回调函数

loop.run_until_complete(task)  # 等待task运行完毕
loop.close()  # 结束循环


通过 Future 的 add_done_callback() 方法来添加回调函数,当任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
PS:Function ‘add_done_callback’ doesn’t return anything(函数“add_done_callback”不返回任何内容)

3.多任务tasks的实现
(1)通过asyncio.wait()来控制多任务
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 14:12
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例5.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    await asyncio.sleep(num)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    print('协程花费时间:{}秒'.format(time.time() - now_time))  

now_time = time.time()  # 程序运行时的时间戳
events = [async_function(num=num) for num in range(1, 4)]  # 创建协程事件列表
loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
tasks = asyncio.wait(events)  # 通过asyncio.wait(events)创建多任务对象


loop.run_until_complete(tasks)  # 等待task运行完毕
loop.close()  # 结束循环
print('总运行花费时常:{}秒'.format(time.time() - now_time))

(2)多任务获取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:38
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例6.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    await asyncio.sleep(num)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    return '协程花费时间:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程序运行时的时间戳
loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
tasks = [loop.create_task(async_function(num=num)) for num in range(1, 4)]  # 通过事件循环的create_task方法创建任务列表
events = asyncio.wait(tasks)  # 通过asyncio.wait(tasks)将任务收集起来

loop.run_until_complete(events)  # 等待events运行完毕
for task in tasks:  # 遍历循环列表,将对应任务返回值打印出来
    print(task.result())
loop.close()  # 结束循环

print('总运行花费时常:{}秒'.format(time.time() - now_time))

(3)通过add_done_callback()添加回调
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:58
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例7.py
# @Software: PyCharm
import asyncio
import time


def task_callback(future):  # 回调函数获取任务完成后的返回值
    print(future.result())


async def async_function(num):  # async修饰的异步函数,在该函数中可以添加await进行暂停并切换到其他异步函数中
    await asyncio.sleep(num)  # 当执行await future这行代码时(future对象就是被await修饰的函数),首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
    return '协程花费时间:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程序运行时的时间戳
loop = asyncio.get_event_loop()  # 通过get_event_loop方法获取事件循环对象
tasks = []  # 任务收集列表(PS:就像脏衣服堆)
for num in range(1, 4):
    task = loop.create_task(async_function(num=num))  # 创建单个任务(单件脏衣服)
    task.add_done_callback(task_callback)  # 为每个任务添加对应的回调函数
    tasks.append(task)
events = asyncio.wait(tasks)  # 通过asyncio.wait(tasks)将任务收集起来PS:想象成装脏衣服的篮子

loop.run_until_complete(events)  # 等待events运行完毕

loop.close()  # 结束循环

print('红后总运行花费时长:{}秒'.format(time.time() - now_time))

4.动态不停添加任务task实现

除了像上面第第三点那种设定循环一口气执行的(就像把脏衣服一口气塞进洗衣机),还可以一个一个执行(把脏衣服一件一件放进去)。
方法:另外创建一条线程,在其中创建一个一直循环的事件循环。(PS:换个大地方放下一台能够一直运行的洗衣机,就可以把脏衣服一件一件丢进去了)

(1)同步状态下
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 14:22
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例8.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 创建线程版洗衣机
    asyncio.set_event_loop(loop)  # 在线程中调用loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 会永远阻塞当前线程,直到有人停止了该loop为止。


def function(num):  # 同步执行的任务方法
    print('任务{}花费时间:{}秒'.format(num, time.time() - now_time))
    return '任务{}完成时间:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程序运行时的时间戳
new_loop = asyncio.new_event_loop()  # 创建一个新的loop,get_event_loop()只会在主线程创建新的event loop,其他线程中调用 get_event_loop() 则会报错
t = Thread(target=thread_new_loop, args=(new_loop,))  # 创建线程
t.start()  # 启动线程
even = new_loop.call_soon_threadsafe(function, 1)  # 调用call_soon_threadsafe实现回调(详细描述往下找)
even.cancel()  # 当call_soon_threadsafe对象执行cancel()方法就会取消该任务事件(当速度够快有概率取消前已经执行)
new_loop.call_soon_threadsafe(function, 2)
new_loop.call_soon_threadsafe(function, 3)


loop.call_soon():传入目标函数和参数,可以将目标函数放到事件循环loop中,返回值是一个 asyncio.Handle 对象,此对象内只有一个方法为 cancel()方法,用来取消回调函数。
loop.call_soon_threadsafe() :比上一个多了个threadsafe保护线程安全。

(2)异步状态下

与同步相比,函数为异步函数并且通过asyncio.run_coroutine_threadsafe()方法回调

# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 16:16
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例9.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 创建线程版洗衣机
    asyncio.set_event_loop(loop)  # 在线程中调用loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 会永远阻塞当前线程,直到有人停止了该loop为止。


async def async_function(num):  # 异步执行的任务方法
    await asyncio.sleep(num)
    print('异步任务{}花费时间:{}秒'.format(num, time.time() - now_time))
    return '异步任务{}完成时间:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程序运行时的时间戳
new_loop = asyncio.new_event_loop()  # 创建一个新的loop,get_event_loop()只会在主线程创建新的event loop,其他线程中调用 get_event_loop() 则会报错
t = Thread(target=thread_new_loop, args=(new_loop,))  # 创建线程
t.start()  # 启动线程
even = asyncio.run_coroutine_threadsafe(async_function(1), new_loop)  # 调用asyncio.run_coroutine_threadsafe实现回调
even.cancel()  # 当run_coroutine_threadsafe对象执行cancel()方法就会取消该任务事件(当速度够快有概率取消前已经执行)
asyncio.run_coroutine_threadsafe(async_function(2), new_loop)
asyncio.run_coroutine_threadsafe(async_function(3), new_loop)
print('红后主进程运行花费时长:{}秒'.format(time.time() - now_time))



因为使用了loop.run_forever()所以会一直启用事件循环到stop()的调用终止。
若要主线程退出时子线程也退出,可以设置子线程为守护线程 t.setDaemon(True)需要在线程执行前设置。

3.8以后的(PS:只要简单使用直接看这个就行)

运行协程的三种基本方式
async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

(1)用run()运行协程
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:34
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例10.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


async def async_function(num):  # 异步执行的任务方法
    await asyncio.sleep(num)
    print('异步任务{}完成时间:{}秒'.format(num, time.time() - now_time))


now_time = time.time()  # 程序运行时的时间戳
asyncio.run(async_function(1))  # 用asyncio.run直接运行协程参数为协程函数及其参数

(2)用create_task()创建task
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:37
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例11.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 异步执行的任务方法
    await asyncio.sleep(num)
    print('异步任务{}完成时间:{}秒'.format(num, time.time() - now_time))


async def main():  # 异步主函数用于调度其他异步函数
    tasks = []  # tasks列表用于存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    for task in tasks:
        await task


now_time = time.time()  # 程序运行时的时间戳
asyncio.run(main())  # 用asyncio.run直接运行协程参数为协程函数及其参数
print('【红后】最终执行时间:{}'.format(time.time() - now_time))


PS:必须先通过asyncio.create_task将task创建到event loop中,再通过await等待,如果直接用await等待则会导致异步变同步

(3)用gather()收集返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/29 9:25
# @Author  : 红后
# @Email   : [email protected]
# @blog    : //www.cnblogs.com/Red-Sun
# @File    : 实例12.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 异步执行的任务方法
    await asyncio.sleep(num)
    return '异步任务{}完成时间:{}秒'.format(num, time.time() - now_time)


async def main():  # 异步主函数用于调度其他异步函数
    tasks = []  # tasks列表用于存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])  # 将task作为参数传入gather,等异步任务都结束后返回结果列表
    print(response)

now_time = time.time()  # 程序运行时的时间戳
asyncio.run(main())  # 用asyncio.run直接运行协程参数为协程函数及其参数
print('【红后】最终执行时间:{}'.format(time.time() - now_time))