python笔记:python中实现异步

  • 2019 年 11 月 21 日
  • 筆記

实现异步最经典的方法是起一个线程,然后调用回调函数。在python中的yield关键字,可以简单的切换代码的上下文。这为优雅的实现异步提供了可能。

系统协程处理

在python中,也能使用协程来进行任务的处理。由于python不能利用多核优势,协程在某种程度上比线程的效率更高。然而,在协程中,任务不能是阻塞的。因为协程的任务不能并行。阻塞一个任务将阻塞住整个流程。

import time  import asyncio    async def money_counter():      time.sleep(5)      return 1000    async def guest(username):      print("guest {} ask for money".format(username))      money = await money_counter()      print("get money {}".format(money))    loop = asyncio.get_event_loop()  tasks = [      asyncio.ensure_future(guest("yzh")),      asyncio.ensure_future(guest("zhh"))]    loop.run_until_complete(asyncio.gather(*tasks))  loop.close()    print("continue")  input()    # 很遗憾,结果是这样的:  # guest yzh ask for money  # get money 1000  # guest zhh ask for money  # get money 1000  # continue

上面的示例中由于使用的是sleep,并不能节省时间。但是如果换成IO操作(asyncio.open_connection),则可以提升性能。

那是不是无解了?

当然不是 我们要用Loop的函数处理ThreadPoolExecutor对象,让它能把future的result传回await。 在Py的异步中起线程调用阻塞函数通常没有什么意义。 看代码吧:

import time  import asyncio  from concurrent import futures    thread_pool = futures.ThreadPoolExecutor(4)    def money_counter():      time.sleep(2)      return 1000    async def guest(username):      print("guest {} ask for money".format(username))      # money = await money_counter()      money = await loop.run_in_executor(thread_pool, money_counter)      print("get money {}".format(money))    loop = asyncio.get_event_loop()  tasks = [      asyncio.ensure_future(guest("yzh")),      asyncio.ensure_future(guest("zhh"))]  loop.run_until_complete(asyncio.gather(*tasks))  loop.close()  print("continue")  input()

包装一个callback的异步函数

如何把一个使用callback的函数包装成能使用await语法的乖宝宝呢? 答案是,使用future来进行封装。下面有一个小例子。

import asyncio  import time  from multiprocessing.dummy import Pool as ThreadPool    loop = asyncio.get_event_loop()    tasks = []  def tasks_run():      while True:          if tasks:              time.sleep(1)              temp_task = tasks.pop(0)              loop.call_soon_threadsafe(temp_task)  # 在其它线程调用callback需要使用这个函数          time.sleep(1)  p = ThreadPool(2)  p.apply_async(tasks_run)    def asyncfn(clb):      tasks.append(clb)    def wraper():      myfuture = asyncio.Future()      print("clb0 {}".format(myfuture))      def clb():          print("clb2 {}".format(myfuture))          myfuture.set_result("hello")      asyncfn(clb)      return myfuture    async def main1():      data = await wraper()      print("data is {}".format(data))    loop.run_until_complete(main1())  loop.close()

如果让我来实现

如果让我来实现单线程异步框架,我要怎么做呢? 其实很简单,所谓异步,一定要有调度,要能并行。要并行就一定不能阻塞,要有多线程,或者调用其它的异步接口(比如IO,数据库)。 最简单的方式如下:

loop = Loop()  loop.add(task1)  loop.add(task2)    def task1():      dosth_before()      result = yield dosth_async1()      return result    def task2():      dosth_before()      result = yield dosth_async2()      return result

loop中,循环这两个任务,首先执行task1,当执行到result = yield dosth_async1()的时候,yield dosth_async1()执行(此时还没result的事)。这里一定不能阻塞。所以,它立即就返回了dosth_async1()对象。

重点来了,此时loop不要去管task1了,马上起动task2,执行到yield dosth_async2()后,又跳出了task2,此时,所有的任务都循环了一遍。该执行第二遍loop了。

在第二次循环中,如果dosth_async1()已经有结果了。我们点task1(用task1.send(result)),让它继续执行,task1得到了result,该干嘛干嘛。执行完毕后,轮到了task2,故技重施,如果dosth_async2有结果了,那就点task2。 整个loop执行完毕。