­

Asyncio—Python牛不牛就靠你了

  • 2019 年 10 月 6 日
  • 筆記

之前在看gevent的时候不小心又看到了这个模块,gevent其实并不是python官方的标准库,有一些缺陷,所以这个时候Asyncio出现了。

这是官网也非常推荐的一个实现高并发的一个模块。在python3.6中已经稳定支持了。

首先要做的事情:

Asyncio是干嘛的?

异步,并发,协程

CPU 的执行是顺序的,线程是操作系统提供的一种机制,允许我们在操作系统的层面上实现“并行”。而协程则可以认为是应用程序提供的一种机制(用户或库来完成),允许我们在应用程序的层面上实现“并行”。

由于本质上程序是顺序执行的,要实现这种“并行”的假像,我们需要一种机制,来“暂停”当前的执行流,并在之后“恢复”之前的执行流。这在操作系统及多线程/多进程中称为“上下文切换” (context switch)。其中“上下文”记录了某个线程执行的状态,包括线程里用到的各个变量,线程的调用栈等。而“切换”指的就是保存某个线程当前的运行状态,之后再从之前的状态中恢复。只不过线程相关的工作是由操作系统完成,而协程则是由应用程序自己来完成。

关于asyncio,有很多的模块支持,如图(一部分):

详情可参考:

https://github.com/aio-libs

下面来介绍一下Asyncio里面可等待的对象(可等待的对象的意思就是可以在await方法中进行使用)一共分为以下三种:

coroutine (协程):

协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

task (任务):

用来设置日程,以便并发执行协程,是对协程进一步封装,其中包含了任务的各种状态。

future(最终结果):

是一种特殊的 低层级 可等待对象,表示一个异步操作的最终结果。

Coroutine

关于协程,一般通过async/await方法进行声明定义,来看一个最基本的例子,在hello输出1秒后输出world。

import asyncio    async def main():      print('hello')      await asyncio.sleep(1)      print('world')

以上就是定义一个简单的协程方法,定义好来就可以运行,关于运行,我们有三种方法可以调用(代表三种不同的运行机制),他们分别是:run,await,create_task

Run函数

asyncio.run(coro, *, debug=False)

run函数运行传入的协程,负责管理 asyncio 事件循环并完结异步生成器。

当有其他 asyncio 事件循环在同一线程中运行时,run函数不能被调用。

如果 debug 为 True,事件循环将以调试模式运行。

run函数总是会创建一个新的事件循环并在结束时关闭。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。也就是说,run函数里面的第一个参数应该是main函数。

Create_task函数

asyncio.create_task(coro)

将 coro 协程打包成一个 Task排入日程准备执行。返回 一个Task 对象。

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

await

await用于挂起阻塞的异步调用接口。

await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。

async def do_some_work(x):      print("waiting:",x)      # await 后面就是调用耗时的操作      await asyncio.sleep(x)      return "Done after {}s".format(x)

Future

Future 是一种特殊的可等待对象,表示一个异步操作的最终结果。

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况是不需要创建Future的代码的。 future会在api中用到,用户可在api中查看。

async def main():      await function_that_returns_a_future_object()        # this is also valid:      await asyncio.gather(          function_that_returns_a_future_object(),          some_python_coroutine()      )

Task

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。

task和future类似,可以运行协程。

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。

运行机制:一个事件循环每次运行一个 Task 对象。一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

创建Task:

import asyncio  import time    now = lambda: time.time()    async def do_some_work(x):      print("waiting:", x)    start = now()    coroutine = do_some_work(2)  loop = asyncio.get_event_loop()  task = loop.create_task(coroutine)  print(task)  loop.run_until_complete(task)  print(task)  print("Time:",now()-start)

关于阻塞

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

import asyncio  import time    now = lambda :time.time()    async def do_some_work(x):      print("waiting:",x)      # await 后面就是调用耗时的操作      await asyncio.sleep(x)      return "Done after {}s".format(x)    start = now()    coroutine = do_some_work(2)  loop = asyncio.get_event_loop()  task = asyncio.ensure_future(coroutine)  loop.run_until_complete(task)    print("Task ret:", task.result())  print("Time:", now() - start)

代码里的sleep,模拟了阻塞或者耗时操作,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。

关于并发

简而言之就是有多个任务需要同时进行,这个时候就相当于我在同一时刻需要完成多个任务。可以看看下面这个例子:

import asyncio  import time    now = lambda :time.time()  # 定义协程方法  async def do_work(x):      print("Waiting:",x)      await asyncio.sleep(x)      return "Done after {}s".format(x)    start = now()  # 实例协程  coroutine1 = do_work(1)  coroutine2 = do_work(2)  coroutine3 = do_work(4)  # 协程的最终结果  tasks = [      asyncio.ensure_future(coroutine1),      asyncio.ensure_future(coroutine2),      asyncio.ensure_future(coroutine3)  ]  # 最先调用get_event_loop,开启协程的入口  loop = asyncio.get_event_loop()  loop.run_until_complete(asyncio.wait(tasks))    for task in tasks:      print("Task ret:",task.result())  # 耗时  print("Use Time:",now()-start)

运行代码,我们可以看到运行的结果大概在4点几秒,小于七秒,如果是同步执行,我的最终耗时至少为1+2+4=7s,如果使用异步并发,总耗时接近在4s,4s的阻塞时间,足够前面两个协程执行完毕。这就是协程的并发使用。

关于协程还有很多的知识点,在这里只是管中窥豹,如果想要了解更多的内容,可以访问:

https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.gather