Python — Queue模組

學習契機

最近的一個項目中在使用grpc時遇到一個問題,由於client端可多達200,每個埠每10s向grpc server發送一次請求,server端接受client的請求後根據request資訊更新資料庫,再將資料庫和配置文件的某些數據封裝後返回給client。原程式碼的性能是0.26s/request,遠遠達不到所需性能,其中資料庫更新操作耗時達到80%,其中一個優化點就是將資料庫更新操作放在獨立的執行緒中。 在次之前沒有使用過執行緒編碼,學以致用後本著加深理解的想法,將這個過程記錄下來,這裡先記下用於執行緒間通訊的隊列Queue的相關知識。

概念

Python2中隊列庫名稱為Queue,Python3中已改名為queue,項目使用Python2.7.5版本,自然是使用Queue。 Queue模組中提供了同步的、執行緒安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先順序隊列PriorityQueue。這些隊列都實現了鎖原語,可在多執行緒通訊中直接使用。

Queue模組定義了以下類及異常,在隊列類中,maxsize限制可入隊列數據的數量,值小於等於0時代表不限制:

  • Queue.Queue(maxsize=0) FIFO隊列
  • Queue.LifoQueue(maxsize=0) LIFO隊列
  • Queue.PriorityQueue(maxsize=0) 優先順序隊列
  • Queue.Empty TODO
  • Queue.Full

Queue(Queue、LifoQueue、PriorityQueue)對象提供以下方法:

  • Queue.qsize() 返回隊列大小,但是不保證qsize() > 0時,get()不會阻塞;也不保證qsize() < maxsize時,put()不會阻塞。
  • Queue.empty() 返回True時,不保證put()時不會阻塞;返回False時不保證get()不會阻塞。
  • Queue.full() 返回True時,不保證get()時不會阻塞;返回False時不保證put()不會阻塞。
  • Queue.put(item[, block[, timeout]]) block默認值為False,指定為True時代表可以阻塞,若同時指定timeout,在超時時返回Full exception。
  • Queue.put_nowait(item) 等同put(item, False)
  • Queue.get([block[, timeout]])
  • Queue.get_nowait() 等同get(item, False)
  • Queue.task_done() 消費者執行緒調用。調用get()後,可調用task_done()告訴隊列該任務已經處理完畢。 如果當前一個join()正在阻塞,它將在隊列中的所有任務都處理完時恢復執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用)。
  • Queue.join() 阻塞調用執行緒,直到隊列中的所有任務被處理掉。 只要有數據被加入隊列,未完成的任務數就會增加。當消費者執行緒調用task_done()(意味著有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。

應用

UpdateThread是單一消費者進程,獲取FIFO隊列中的數據處理,GrpcThread是multi生產者執行緒,需要對往隊列中丟數據這個操作加鎖保證數據先後順序。

import threading  import Queue  import time    q = Queue.Queue()  q_lock = threading.Lock()      class UpdateThread(threading.Thread):        def __init__(self):          super(self.__class__, self).__init__()          self.setName(self.__class__.__name__)          self._setName = self.setName        @staticmethod      def update_stat():          global q          while not q.empty():              stat = q.get()              print 'Update stat (%s) in db' % stat        def run(self):          while True:              self.update_stat()              time.sleep(0.1)      class GrpcThread(threading.Thread):        def compose_stat(self, stat):          global q          q_lock.acquire()          q.put('%d: %s' % (stat, self.name))          q_lock.release()          return        def run(self):          for i in range(10):              self.compose_stat(i)              time.sleep(0.1)      def launch_update_thread():      UpdateThread().start()      if __name__ == '__main__':      launch_update_thread()      thread1 = GrpcThread()      thread2 = GrpcThread()        thread1.start()      thread2.start()