How does it work? — threading.Condition

  • 2019 年 11 月 30 日
  • 筆記

繼兩年前的上一篇文章之後,不靠譜部落客終於想起了How does it work這個坑。主要是近期也沒有遇到可值得分享的「精巧」的實現。之前其實也過了一遍threading模組的源碼,對裡面的各種鎖也只是有個大概印象,並且它們之前非常像,很容易讓人confusing。這次碰到實際需要,於是仔細看了一下源碼,發現還是有很多搞頭的。當然,你只是使用的話照著例子用就好了不會出錯,但還是值得花點工夫弄清裡面的原理。

Condition的使用示例

下面是我隨便從網上搜來的程式碼片段

Python

import threading, time  class Hider(threading.Thread):      def __init__(self, cond, name):          super(Hider, self).__init__()          self.cond = cond          self.name = name      def run(self):          time.sleep(1) #確保先運行Seeker中的方法          self.cond.acquire() #b          print self.name + ': 我已經把眼睛蒙上了'          self.cond.notify()          self.cond.wait() #c                           #f          print self.name + ': 我找到你了 ~_~'          self.cond.notify()          self.cond.release()                              #g          print self.name + ': 我贏了'   #h  class Seeker(threading.Thread):      def __init__(self, cond, name):          super(Seeker, self).__init__()          self.cond = cond          self.name = name      def run(self):          self.cond.acquire()          self.cond.wait()    #a    #釋放對瑣的佔用,同時執行緒掛起在這裡,直到被notify並重新佔有瑣。                              #d          print self.name + ': 我已經藏好了,你快來找我吧'          self.cond.notify()          self.cond.wait()    #e                              #h          self.cond.release()          print self.name + ': 被你找到了,哎~~~'  cond = threading.Condition()  seeker = Seeker(cond, 'seeker')  hider = Hider(cond, 'hider')  seeker.start()  hider.start()

這裡用的捉迷藏,換成聊天也可以。其實就是兩個執行緒間的同步,一應一答。一個執行緒執行完操作以後通知另一方並等待應答。下面我們要解決一些問題:

  1. 它跟Lock有什麼區別?
  2. 可以注意到雙方動作前都acquire了同一個Condition,這樣不阻塞嗎?
  3. 為什麼一定要acquire?我換成獲取一個普通的Lock行嗎?

Condition源碼分析

Condition的初始化方法為Condition(lock),其中lock不傳的話默認是一個RLock(),即可重入鎖,關於鎖的區別比較好理解,這裡就不啰嗦了。初始完之後會把傳入的鎖存為屬性,然後Conditionacquirerelease就只是對這個鎖的獲取釋放而已。所以:

Python

lock = Lock()  cond = Condition(lock)  cond.acquire()  # 換成lock.acquire()完全等價,release類似

到此為止還看不出為何要用Condition而不用Lock,關鍵是下面兩個方法waitnotify,我把程式碼完整貼出附上自己的注釋:

Python

def wait(self, timeout=None):      if not self._is_owned():    # 必須先獲取self._lock          raise RuntimeError("cannot wait on un-acquired lock")      waiter = _allocate_lock()       # 新建一個鎖      waiter.acquire()    # 獲取剛剛新建的鎖      self._waiters.append(waiter)    # 加入waiters列表      saved_state = self._release_save()    # 這裡釋放了self._lock      gotit = False      try:    # restore state no matter what (e.g., KeyboardInterrupt)          if timeout is None:              waiter.acquire()    # 再次獲取新建的鎖              gotit = True          else:              if timeout > 0:                  gotit = waiter.acquire(True, timeout)   # 等待時間後返回              else:                  gotit = waiter.acquire(False)   # timeout == 0, 立即返回          return gotit      finally:          self._acquire_restore(saved_state)    # 重新恢復self._lock狀態          if not gotit:   # 如果是非阻塞的(timeout != None)              try:                  self._waiters.remove(waiter)    # 從waiters列表刪除              except ValueError:                  pass

果然talk is cheap, show me the code,看了程式碼就一目了然了。可以看到self._lock(就是初始化時傳入的那個鎖)在第7行之前是佔用狀態的,此時其他執行緒不可插入,然後整個try-block里self._lock是釋放狀態可被其他執行緒獲取。通過再次獲取同一個waiter鎖達到了阻塞的效果,這樣看起來就像是新加入了一個等待者在等待某個事件。等待的這個事件,就是其他執行緒用同一個Condition實例調用的notify方法:

Python

def notify(self, n=1):      if not self._is_owned():          raise RuntimeError("cannot notify on un-acquired lock")      all_waiters = self._waiters   # 獲取所有等待的鎖      waiters_to_notify = _deque(_islice(all_waiters, n))    # 只選給定數量的等待者,如果是notify_all方法則是全部      if not waiters_to_notify:          return      for waiter in waiters_to_notify:          waiter.release()    #          try:              all_waiters.remove(waiter)          except ValueError:              pass

可以看到notify方法全程都擁有鎖self._lock,這樣保證了只有Notify完成之後對方才能下一步動作。調用時序如下:

總結來說的話,就是只有wait()方法能主動釋放鎖,而notify()不能,所以waiter執行緒一定要先啟動,防止發生死鎖。

Event與Condition

threading中還有一個Event,與Condition非常類似。區別在於前者等待、監聽某個值is_set()為真,而後者只是一個通知等待的模型。並且Event中監聽值翻轉以後,正是通過Condition去通知等待者的。Event變成set以後,就「失效」了,要手動clear一次才能繼續使用用,而Condition是可以無限wait, notify循環的。

Python

class Event:      def set(self):          with self._cond:              self._flag = True              self._cond.notify_all()

其實,Condition也包含一個wait_for(eval_function, timeout)方法,用來等待某函數的返回值為真。這個方法用起來和Event的作用是很像的,你可以理解為Event只是提供了一個包裝好了的Condition