­

Python 協程的詳細用法和例子

從句法上看,協程與生成器類似,都是定義體中包含 yield 關鍵字的函數。可是,在協程中, yield 通常出現在表達式的右邊(例如, datum = yield),可以產出值,也可以不產出 —— 如果 yield 關鍵字後面沒有表達式,那麼生成器產出 None。

協程可能會從調用方接收數據,不過調用方把數據提供給協程使用的是 .send(datum) 方法,而不是next(…) 函數。

==yield 關鍵字甚至還可以不接收或傳出數據。不管數據如何流動, yield 都是一種流程控制工具,使用它可以實現協作式多任務:協程可以把控制器讓步給中心調度程序,從而激活其他的協程==。

協程的生成器的基本行為

這裡有一個最簡單的協程代碼:

def simple_coroutine():      print('-> start')      x = yield      print('-> recived', x)    sc = simple_coroutine()    next(sc)  sc.send('zhexiao')

解釋: 1. 協程使用生成器函數定義:定義體中有 yield 關鍵字。 2. yield 在表達式中使用;如果協程只需從客戶那裡接收數據,那麼產出的值是 None —— 這個值是隱式指定的,因為 yield 關鍵字右邊沒有表達式。 3. 首先要調用 next(…) 函數,因為生成器還沒啟動,沒在 yield 語句處暫停,所以一開始無法發送數據。 4. 調用send方法,把值傳給 yield 的變量,然後協程恢復,繼續執行下面的代碼,直到運行到下一個 yield 表達式,或者終止。

==注意:send方法只有當協程處於 GEN_SUSPENDED 狀態下時才會運作,所以我們使用 next() 方法激活協程到 yield 表達式處停止,或者我們也可以使用 sc.send(None),效果與 next(sc) 一樣==。

協程的四個狀態:

協程可以身處四個狀態中的一個。當前狀態可以使用inspect.getgeneratorstate(…) 函數確定,該函數會返回下述字符串中的一個: 1. GEN_CREATED:等待開始執行 2. GEN_RUNNING:解釋器正在執行 3. GEN_SUSPENED:在yield表達式處暫停 4. GEN_CLOSED:執行結束

==最先調用 next(sc) 函數這一步通常稱為「預激」(prime)協程==(即,讓協程向前執行到第一個 yield 表達式,準備好作為活躍的協程使用)。

import inspect    def simple_coroutine(a):      print('-> start')        b = yield a      print('-> recived', a, b)        c = yield a + b      print('-> recived', a, b, c)    # run  sc = simple_coroutine(5)    next(sc)  sc.send(6) # 5, 6  sc.send(7) # 5, 6, 7

示例:使用協程計算移動平均值

def averager():      total = 0.0      count = 0      avg = None        while True:          num = yield avg          total += num          count += 1          avg = total/count    # run  ag = averager()  # 預激協程  print(next(ag))     # None    print(ag.send(10))  # 10  print(ag.send(20))  # 15

解釋: 1. 調用 next(ag) 函數後,協程會向前執行到 yield 表達式,產出 average 變量的初始值——None。 2. 此時,協程在 yield 表達式處暫停。 3. 使用 send() 激活協程,把發送的值賦給 num,並計算出 avg 的值。 4. 使用 print 打印出 yield 返回的數據。

終止協程和異常處理

協程中未處理的異常會向上冒泡,傳給 next 函數或 send 方法的調用方(即觸發協程的對象)。

==終止協程的一種方式:發送某個哨符值,讓協程退出。內置的 None 和Ellipsis 等常量經常用作哨符值==。

顯式地把異常發給協程

從 Python 2.5 開始,客戶代碼可以在生成器對象上調用兩個方法,顯式地把異常發給協程。

generator.throw(exc_type[, exc_value[, traceback]])

致使生成器在暫停的 yield 表達式處拋出指定的異常。如果生成器處理了拋出的異常,代碼會向前執行到下一個 yield 表達式,而產出的值會成為調用 generator.throw方法得到的返回值。如果生成器沒有處理拋出的異常,異常會向上冒泡,傳到調用方的上下文中。

generator.close()

致使生成器在暫停的 yield 表達式處拋出 GeneratorExit 異常。如果生成器沒有處理這個異常,或者拋出了 StopIteration 異常(通常是指運行到結尾),調用方不會報錯。如果收到 GeneratorExit 異常,生成器一定不能產出值,否則解釋器會拋出RuntimeError 異常。生成器拋出的其他異常會向上冒泡,傳給調用方。

異常處理示例:

class DemoException(Exception):      """      custom exception      """    def handle_exception():      print('-> start')        while True:          try:              x = yield          except DemoException:              print('-> run demo exception')          else:              print('-> recived x:', x)        raise RuntimeError('this line should never run')    he = handle_exception()  next(he)  he.send(10) # recived x: 10  he.send(20) # recived x: 20    he.throw(DemoException) # run demo exception    he.send(40) # recived x: 40  he.close()

如果傳入無法處理的異常,則協程會終止:

he.throw(Exception) # run demo exception

yield from獲取協程的返回值

為了得到返回值,協程必須正常終止;然後生成器對象會拋出StopIteration 異常,異常對象的 value 屬性保存着返回的值。

==yield from 結構會在內部自動捕獲 StopIteration 異常==。對 yield from 結構來說,解釋器不僅會捕獲 StopIteration 異常,還會把value 屬性的值變成 yield from 表達式的值。

yield from基本用法

==在生成器 gen 中使用 yield from subgen() 時, subgen 會獲得控制權,把產出的值傳給 gen 的調用方,即調用方可以直接控制 subgen。與此同時, gen 會阻塞,等待 subgen 終止==。

下面2個函數的作用一樣,只是使用了 yield from 的更加簡潔:

def gen():      for c in 'AB':          yield c    print(list(gen()))    def gen_new():      yield from 'AB'    print(list(gen_new()))

==yield from x 表達式對 x 對象所做的第一件事是,調用 iter(x),從中獲取迭代器,因此, x 可以是任何可迭代的對象,這只是 yield from 最基礎的用法==。

yield from高級用法

==yield from 的主要功能是打開雙向通道,把最外層的調用方與最內層的子生成器連接起來,這樣二者可以直接發送和產出值,還可以直接傳入異常,而不用在位於中間的協程中添加大量處理異常的樣板代碼==。

yield from 專門的術語

  1. 委派生成器:包含 yield from 表達式的生成器函數。
  2. 子生成器:從 yield from 中 部分獲取的生成器。

圖示

解釋: 1. 委派生成器在 yield from 表達式處暫停時,調用方可以直接把數據發給子生成器。 2. 子生成器再把產出的值發給調用方。 3. 子生成器返回之後,解釋器會拋出 StopIteration 異常,並把返回值附加到異常對象上,此時委派生成器會恢復。

高級示例

from collections import namedtuple    ResClass = namedtuple('Res', 'count average')      # 子生成器  def averager():      total = 0.0      count = 0      average = None        while True:          term = yield          if term is None:              break          total += term          count += 1          average = total / count        return ResClass(count, average)      # 委派生成器  def grouper(storages, key):      while True:          # 獲取averager()返回的值          storages[key] = yield from averager()      # 客戶端代碼  def client():      process_data = {          'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],          'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46]      }        storages = {}      for k, v in process_data.items():          # 獲得協程          coroutine = grouper(storages, k)            # 預激協程          next(coroutine)            # 發送數據到協程          for dt in v:              coroutine.send(dt)            # 終止協程          coroutine.send(None)      print(storages)    # run  client()

解釋: 1. 外層 for 循環每次迭代會新建一個 grouper 實例,賦值給 coroutine 變量; grouper 是委派生成器。 2. 調用 next(coroutine),預激委派生成器 grouper,此時進入 while True 循環,調用子生成器 averager 後,在 yield from 表達式處暫停。 3. 內層 for 循環調用 coroutine.send(value),直接把值傳給子生成器 averager。同時,當前的 grouper 實例(coroutine)在 yield from 表達式處暫停。 4. 內層循環結束後, grouper 實例依舊在 yield from 表達式處暫停,因此, grouper函數定義體中為 results[key] 賦值的語句還沒有執行。 5. coroutine.send(None) 終止 averager 子生成器,子生成器拋出 StopIteration 異常並將返回的數據包含在異常對象的value中,yield from 可以直接抓取 StopItration 異常並將異常對象的 value 賦值給 results[key]

yield from的意義

  1. 子生成器產出的值都直接傳給委派生成器的調用方(即客戶端代碼)。
  2. 使用 send() 方法發給委派生成器的值都直接傳給子生成器。如果發送的值是None,那麼會調用子生成器的 next() 方法。如果發送的值不是 None,那麼會調用子生成器的 send() 方法。如果調用的方法拋出 StopIteration 異常,那麼委派生成器恢復運行。任何其他異常都會向上冒泡,傳給委派生成器。
  3. 生成器退出時,生成器(或子生成器)中的 return expr 表達式會觸發 StopIteration(expr) 異常拋出。
  4. yield from 表達式的值是子生成器終止時傳給 StopIteration 異常的第一個參數。
  5. 傳入委派生成器的異常,除了 GeneratorExit 之外都傳給子生成器的 throw() 方法。如果調用 throw() 方法時拋出 StopIteration 異常,委派生成器恢復運行。 StopIteration 之外的異常會向上冒泡,傳給委派生成器。
  6. 如果把 GeneratorExit 異常傳入委派生成器,或者在委派生成器上調用 close() 方法,那麼在子生成器上調用 close() 方法,如果它有的話。如果調用close()方法導致異常拋出,那麼異常會向上冒泡,傳給委派生成器;否則,委派生成器拋出GeneratorExit 異常。

使用案例

協程能自然地表述很多算法,例如仿真、遊戲、異步 I/O,以及其他事件驅動型編程形式或協作式多任務。協程是 asyncio 包的基礎構建。通過仿真系統能說明如何使用協程代替線程實現並發的活動。

在仿真領域,進程這個術語指代模型中某個實體的活動,與操作系統中的進程無關。仿真系統中的一個進程可以使用操作系統中的一個進程實現,但是通常會使用一個線程或一個協程實現。

的士示例

import collections    # time 字段是事件發生時的仿真時間,  # proc 字段是的士進程實例的編號,  # action 字段是描述活動的字符串。  Event = collections.namedtuple('Event', 'time proc action')      def taxi_process(proc_num, trips_num, start_time=0):      """      每次改變狀態時創建事件,把控制權讓給仿真器      :param proc_num:      :param trips_num:      :param start_time:      :return:      """      time = yield Event(start_time, proc_num, 'leave garage')        for i in range(trips_num):          time = yield Event(time, proc_num, 'pick up people')          time = yield Event(time, proc_num, 'drop off people')        yield Event(time, proc_num, 'go home')    # run  t1 = taxi_process(1, 1)  a = next(t1)  print(a)    # Event(time=0, proc=1, action='leave garage')  b = t1.send(a.time + 6)  print(b)    # Event(time=6, proc=1, action='pick up people')  c = t1.send(b.time + 12)  print(c)    # Event(time=18, proc=1, action='drop off people')  d = t1.send(c.time + 1)  print(d)    # Event(time=19, proc=1, action='go home')

模擬控制台控制3個的士異步

import collections  import queue  import random    # time 字段是事件發生時的仿真時間,  # proc 字段是的士進程實例的編號,  # action 字段是描述活動的字符串。  Event = collections.namedtuple('Event', 'time proc action')      def taxi_process(proc_num, trips_num, start_time=0):      """      每次改變狀態時創建事件,把控制權讓給仿真器      :param proc_num:      :param trips_num:      :param start_time:      :return:      """      time = yield Event(start_time, proc_num, 'leave garage')        for i in range(trips_num):          time = yield Event(time, proc_num, 'pick up people')          time = yield Event(time, proc_num, 'drop off people')        yield Event(time, proc_num, 'go home')      class SimulateTaxi(object):      """      模擬的士控制台      """        def __init__(self, proc_map):          # 保存排定事件的 PriorityQueue 對象,          # 如果進來的是tuple類型,則默認使用tuple[0]做排序          self.events = queue.PriorityQueue()          # procs_map 參數是一個字典,使用dict構建本地副本          self.procs = dict(proc_map)        def run(self, end_time):          """          排定並顯示事件,直到時間結束          :param end_time:          :return:          """          for _, taxi_gen in self.procs.items():              leave_evt = next(taxi_gen)              self.events.put(leave_evt)            # 仿真系統的主循環          simulate_time = 0          while simulate_time < end_time:              if self.events.empty():                  print('*** end of events ***')                  break                # 第一個事件的發生              current_evt = self.events.get()              simulate_time, proc_num, action = current_evt              print('taxi:', proc_num, ', at time:', simulate_time, ', ', action)                # 準備下個事件的發生              proc_gen = self.procs[proc_num]              next_simulate_time = simulate_time + self.compute_duration()                try:                  next_evt = proc_gen.send(next_simulate_time)              except StopIteration:                  del self.procs[proc_num]              else:                  self.events.put(next_evt)          else:              msg = '*** end of simulation time: {} events pending ***'              print(msg.format(self.events.qsize()))        @staticmethod      def compute_duration():          """          隨機產生下個事件發生的時間          :return:          """          duration_time = random.randint(1, 20)          return duration_time      # 生成3個的士,現在全部都沒有離開garage  taxis = {i: taxi_process(i, (i + 1) * 2, i * 5)           for i in range(3)}    # 模擬運行  st = SimulateTaxi(taxis)  st.run(100)