python笔记:python中实现异步
- 2019 年 11 月 21 日
- 筆記
实现异步最经典的方法是起一个线程,然后调用回调函数。在python中的yield关键字,可以简单的切换代码的上下文。这为优雅的实现异步提供了可能。
系统协程处理
在python中,也能使用协程来进行任务的处理。由于python不能利用多核优势,协程在某种程度上比线程的效率更高。然而,在协程中,任务不能是阻塞的。因为协程的任务不能并行。阻塞一个任务将阻塞住整个流程。
import time import asyncio async def money_counter(): time.sleep(5) return 1000 async def guest(username): print("guest {} ask for money".format(username)) money = await money_counter() print("get money {}".format(money)) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(guest("yzh")), asyncio.ensure_future(guest("zhh"))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print("continue") input() # 很遗憾,结果是这样的: # guest yzh ask for money # get money 1000 # guest zhh ask for money # get money 1000 # continue
上面的示例中由于使用的是sleep,并不能节省时间。但是如果换成IO操作(asyncio.open_connection),则可以提升性能。
那是不是无解了?
当然不是 我们要用Loop的函数处理ThreadPoolExecutor对象,让它能把future的result传回await。 在Py的异步中起线程调用阻塞函数通常没有什么意义。 看代码吧:
import time import asyncio from concurrent import futures thread_pool = futures.ThreadPoolExecutor(4) def money_counter(): time.sleep(2) return 1000 async def guest(username): print("guest {} ask for money".format(username)) # money = await money_counter() money = await loop.run_in_executor(thread_pool, money_counter) print("get money {}".format(money)) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(guest("yzh")), asyncio.ensure_future(guest("zhh"))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close() print("continue") input()
包装一个callback的异步函数
如何把一个使用callback的函数包装成能使用await语法的乖宝宝呢? 答案是,使用future来进行封装。下面有一个小例子。
import asyncio import time from multiprocessing.dummy import Pool as ThreadPool loop = asyncio.get_event_loop() tasks = [] def tasks_run(): while True: if tasks: time.sleep(1) temp_task = tasks.pop(0) loop.call_soon_threadsafe(temp_task) # 在其它线程调用callback需要使用这个函数 time.sleep(1) p = ThreadPool(2) p.apply_async(tasks_run) def asyncfn(clb): tasks.append(clb) def wraper(): myfuture = asyncio.Future() print("clb0 {}".format(myfuture)) def clb(): print("clb2 {}".format(myfuture)) myfuture.set_result("hello") asyncfn(clb) return myfuture async def main1(): data = await wraper() print("data is {}".format(data)) loop.run_until_complete(main1()) loop.close()
如果让我来实现
如果让我来实现单线程异步框架,我要怎么做呢? 其实很简单,所谓异步,一定要有调度,要能并行。要并行就一定不能阻塞,要有多线程,或者调用其它的异步接口(比如IO,数据库)。 最简单的方式如下:
loop = Loop() loop.add(task1) loop.add(task2) def task1(): dosth_before() result = yield dosth_async1() return result def task2(): dosth_before() result = yield dosth_async2() return result
loop中,循环这两个任务,首先执行task1,当执行到result = yield dosth_async1()
的时候,yield dosth_async1()
执行(此时还没result的事)。这里一定不能阻塞。所以,它立即就返回了dosth_async1()
对象。
重点来了,此时loop不要去管task1了,马上起动task2,执行到yield dosth_async2()
后,又跳出了task2,此时,所有的任务都循环了一遍。该执行第二遍loop了。
在第二次循环中,如果dosth_async1()
已经有结果了。我们点task1(用task1.send(result)),让它继续执行,task1得到了result,该干嘛干嘛。执行完毕后,轮到了task2,故技重施,如果dosth_async2
有结果了,那就点task2。 整个loop执行完毕。