python:rxpy 使用async協程

  • 2019 年 11 月 22 日
  • 筆記

終於弄清怎麼在rxpy中使用flat_map調用協程了,直接上程式碼

import asyncio    from rx import Observable      def warp_future(func):      def inner(arg):          future = asyncio.ensure_future(func(arg))          return Observable.from_future(future)        return inner      async def main():      async def add(args):          return args[0] + args[1]        s1 = Observable.of(1, 3, 5)      s2 = Observable.of(2, 4, 6)        stream = Observable.zip_array(s1, s2)           .flat_map(warp_future(add))           .where(lambda x: x > 3)           .map(lambda x: print(x))      await stream      loop = asyncio.get_event_loop()  loop.run_until_complete(main())  loop.close()