Python — Queue模組
- 2020 年 1 月 5 日
- 筆記
學習契機
最近的一個項目中在使用grpc時遇到一個問題,由於client端可多達200,每個埠每10s向grpc server發送一次請求,server端接受client的請求後根據request資訊更新資料庫,再將資料庫和配置文件的某些數據封裝後返回給client。原程式碼的性能是0.26s/request,遠遠達不到所需性能,其中資料庫更新操作耗時達到80%,其中一個優化點就是將資料庫更新操作放在獨立的執行緒中。 在次之前沒有使用過執行緒編碼,學以致用後本著加深理解的想法,將這個過程記錄下來,這裡先記下用於執行緒間通訊的隊列Queue的相關知識。
概念
Python2中隊列庫名稱為Queue,Python3中已改名為queue,項目使用Python2.7.5版本,自然是使用Queue。 Queue模組中提供了同步的、執行緒安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先順序隊列PriorityQueue。這些隊列都實現了鎖原語,可在多執行緒通訊中直接使用。
Queue模組定義了以下類及異常,在隊列類中,maxsize
限制可入隊列數據的數量,值小於等於0時代表不限制:
-
Queue.Queue(maxsize=0)
FIFO隊列 -
Queue.LifoQueue(maxsize=0)
LIFO隊列 -
Queue.PriorityQueue(maxsize=0)
優先順序隊列 -
Queue.Empty
TODO
Queue.Full
Queue(Queue、LifoQueue、PriorityQueue)對象提供以下方法:
-
Queue.qsize()
返回隊列大小,但是不保證qsize() > 0時,get()不會阻塞;也不保證qsize() < maxsize時,put()不會阻塞。 -
Queue.empty()
返回True時,不保證put()時不會阻塞;返回False時不保證get()不會阻塞。 -
Queue.full()
返回True時,不保證get()時不會阻塞;返回False時不保證put()不會阻塞。 -
Queue.put(item[, block[, timeout]])
block默認值為False,指定為True時代表可以阻塞,若同時指定timeout,在超時時返回Full exception。 -
Queue.put_nowait(item)
等同put(item, False)
Queue.get([block[, timeout]])
-
Queue.get_nowait()
等同get(item, False)
-
Queue.task_done()
消費者執行緒調用。調用get()後,可調用task_done()告訴隊列該任務已經處理完畢。 如果當前一個join()正在阻塞,它將在隊列中的所有任務都處理完時恢復執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用)。 -
Queue.join()
阻塞調用執行緒,直到隊列中的所有任務被處理掉。 只要有數據被加入隊列,未完成的任務數就會增加。當消費者執行緒調用task_done()(意味著有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。
應用
UpdateThread
是單一消費者進程,獲取FIFO隊列中的數據處理,GrpcThread
是multi生產者執行緒,需要對往隊列中丟數據這個操作加鎖保證數據先後順序。
import threading import Queue import time q = Queue.Queue() q_lock = threading.Lock() class UpdateThread(threading.Thread): def __init__(self): super(self.__class__, self).__init__() self.setName(self.__class__.__name__) self._setName = self.setName @staticmethod def update_stat(): global q while not q.empty(): stat = q.get() print 'Update stat (%s) in db' % stat def run(self): while True: self.update_stat() time.sleep(0.1) class GrpcThread(threading.Thread): def compose_stat(self, stat): global q q_lock.acquire() q.put('%d: %s' % (stat, self.name)) q_lock.release() return def run(self): for i in range(10): self.compose_stat(i) time.sleep(0.1) def launch_update_thread(): UpdateThread().start() if __name__ == '__main__': launch_update_thread() thread1 = GrpcThread() thread2 = GrpcThread() thread1.start() thread2.start()