Python:執行緒、進程與協程(3)——
- 2020 年 1 月 3 日
- 筆記
Queue模組是提供隊列操作的模組,隊列是執行緒間最常用的交換數據的形式。該模組提供了三種隊列:
Queue.Queue(maxsize):先進先出,maxsize是隊列的大小,其值為非正數時為無線循環隊列
Queue.LifoQueue(maxsize):後進先出,相當於棧
Queue.PriorityQueue(maxsize):優先順序隊列。
其中LifoQueue,PriorityQueue是Queue的子類。三者擁有以下共同的方法:
qsize():返回近似的隊列大小。為什麼要加「近似」二字呢?因為當該值大於0的時候並不保證並發執行的時候get()方法不被阻塞,同樣,對於put()方法有效。
empty():返回布爾值,隊列為空時,返回True,反之返回False。
full():當設定了隊列大小的時候,如果隊列滿了,則返回True,否則返回False。
put(item[,block[,timeout]]):向隊列里添加元素item,block設置為False的時候,如果隊列滿了則拋出Full異常。如果block設置為True,timeout設置為None時,則會一種等到有空位的時候再添加進隊列;否則會根據timeout設定的超時值拋出Full異常。
put_nowwait(item):等價與put(item,False)。block設置為False的時候,如果隊列為空,則拋出Empty異常。如果block設置為True,timeout設置為None時,則會一種等到有空位的時候再添加進隊列;否則會根據timeout設定的超時值拋出Empty異常。
get([block[,timeout]]):從隊列中刪除元素並返回該元素的值,如果timeout是一個正數,它會阻塞最多超時秒數,並且如果在該時間內沒有可用的項目,則引發Empty異常。
get_nowwait():等價於get(False)
task_done():發送訊號表明入列任務已完成,經常在消費者執行緒中用到。
join():阻塞直至隊列所有元素處理完畢,然後再處理其它操作。
(一)源碼分析
Queue模組用起來很簡單很簡單,但我覺得有必要把該模組的相關源程式碼貼出來分析下,會學到不少東西,看看大神們寫的程式碼多麼美觀,多麼結構化模組化,再想想自己寫的程式碼,都是淚呀,來學習學習。為了縮減篇幅,源碼的注釋部分被刪減掉。
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
通過後面的幾個函數分析知道,Queue對象是在collections模組的queue基礎上(關於collections模組參考 Python:使用Counter進行計數統計及collections模組),加上threading模組互斥鎖和條件變數封裝的。
deque是一個雙端隊列,很適用於隊列和棧。上面的Queue對象就是一個先進先出的隊列,所以首先_init()函數定義了一個雙端隊列,然後它的定義了_put()和_get()函數,它們分別是從雙端隊列右邊添加元素、左邊刪除元素,這就構成了一個先進先出隊列,同理很容易想到LifoQueue(後進先出隊列)的實現了,保證隊列右邊添加右邊刪除就可以。可以貼出源程式碼看看。
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
雖然它的"queue"沒有用queue(),用列表也是一樣的,因為列表append()和pop()操作是在最右邊添加元素和刪除最右邊元素。
再來看看PriorityQueue,他是個優先順序隊列,這裡用到了heapq模組的heappush()和heappop()兩個函數。heapq模組對堆這種數據結構進行了模組化,可以建立這種數據結構,同時heapq模組也提供了相應的方法來對堆做操作。其中_init()函數里self.queue=[]可以看作是建立了一個空堆。heappush() 往堆中插入一條新的值 ,heappop() 從堆中彈出最小值 ,這就可以實現優先順序(關於heapq模組這裡這是簡單的介紹)。源程式碼如下:
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
基本的數據結構分析完了,接著分析其它的部分。
mutex 是個threading.Lock()對象,是互斥鎖;not_empty、 not_full 、all_tasks_done這三個都是threading.Condition()對象,條件變數,而且維護的是同一把鎖對象mutex(關於threading模組中Lock對象和Condition對象可參考上篇博文Python:執行緒、進程與協程(2)——threading模組)。
其中:
self.mutex互斥鎖:任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有該互斥鎖。acquire()獲取鎖,release()釋放鎖。同時該互斥鎖被三個條件變數共同維護。
self.not_empty條件變數:執行緒添加數據到隊列中後,會調用self.not_empty.notify()通知其它執行緒,然後喚醒一個移除元素的執行緒。
self.not_full條件變數:當一個元素被移除出隊列時,會喚醒一個添加元素的執行緒。
self.all_tasks_done條件變數 :在未完成任務的數量被刪除至0時,通知所有任務完成
self.unfinished_tasks : 定義未完成任務數量
再來看看主要方法:
(1)put()
源程式碼如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full獲得鎖 try: if self.maxsize > 0: #如果隊列長度有限制 if not block: #如果沒阻塞 if self._qsize() == self.maxsize: #如果隊列滿了拋異常 raise Full elif timeout is None: #有阻塞且超時為空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超時非負時,結束時間=當前時間+超時時間 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到時後,拋異常 raise Full #如果沒到時,隊列是滿的就會一直被掛起,直到有「位置」騰出 self.not_full.wait(remaining) self._put(item) #調用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任務+1 self.not_empty.notify() #通知非空,喚醒非空掛起的任務 finally: self.not_full.release() #not_full釋放鎖
默認情況下block為True,timeout為None。如果隊列滿則會等待,未滿則會調用_put方法將進程加入deque中(後面介紹),並且未完成任務加1還會通知隊列非空。
如果設置block參數為Flase,隊列滿時則會拋異常。如果設置了超時那麼在時間到之前進行阻塞,時間一到拋異常。這個方法使用not_full對象進行操作。
(2)get()
源碼如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty獲得鎖 try: if not block: #不阻塞時 if not self._qsize(): #隊列為空時拋異常 raise Empty elif timeout is None: #不限時時,隊列為空則會等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #調用_get方法,移除並獲得項目 self.not_full.notify() #通知非滿 return item #返回項目 finally: self.not_empty.release() #釋放鎖
邏輯跟put()函數一樣,參數默認情況下隊列空了則會等待,否則將會調用_get方法(往下看)移除並獲得一個項,最後返回這個項。這個方法使用not_empty對象進行操作。
不過我覺得put()與get()兩個函數結合起來理解比較好。not_full與not_empty代表的是兩種不同操作類型的執行緒,not_full可以理解成is-not-full,即隊列是否滿了,默認是沒有滿,沒有滿時not_full這個條件變數才能獲取鎖,並做一些條件判斷,只有符合條件才能向隊列里加元素,添加成功後就會通知not_empty條件變數隊列里不是空的,「我」剛剛添加進了一個元素,滿足可以執行刪除動作的基本條件了(隊列不是空的,想想如果是空的執行刪除動作就沒有意義了),同時喚醒一些被掛起的執行移除動作的執行緒,讓這些執行緒重新判斷條件,如果條件准許就會執行刪除動作,然後又通知not_full條件變數,告訴「它」隊列不是滿的,因為「我」剛才刪除了一個元素(想想如果隊列滿了添加元素就添加不進呀,就沒意義了),滿足了添加元素的基本條件(隊列不是滿的),同時喚醒一些被掛起的執行添加動作的執行緒,這些執行緒又會進行條件判斷,符合條件就會添加元素,否則繼續掛起,依次類推,同時這樣也保證了執行緒的安全。正與前面所說,當一個元素被移除出隊列時,會喚醒一個添加元素的執行緒;當添加一個元素時會喚醒一個刪除元素的執行緒。
這是我想了一段時間得出的一種我個人理解的解釋,不知道對不對或者說合不合理,如果有大神對這部分知識很熟悉了解,歡迎留言批評指正。
(3)task_done()
源碼如下:
def task_done(self): self.all_tasks_done.acquire() #獲得鎖 try: unfinished = self.unfinished_tasks - 1 #判斷隊列中一個執行緒的任務是否全部完成 if unfinished <= 0: #是則進行通知,或在過量調用時報異常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否則未完成任務數量-1 finally: self.all_tasks_done.release() #最後釋放鎖
這個方法判斷隊列中一個執行緒的任務是否全部完成,首先會通過all_tasks_done對象獲得鎖,如果是則進行通知,最後釋放鎖。
(4)join()
源碼如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任務,將調用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,當隊列中有未完成進程時,調用join方法來阻塞,直到他們都完成。
其它的方法都比較簡單,也比較好理解,有興趣可以去看看Queue.py里的源碼,要注意的是任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有互斥鎖mutex。
(二)簡單例子
(1)一個簡單例子
實現一個執行緒不斷生成一個隨機數到一個隊列中
實現一個執行緒從上面的隊列裡面不斷的取出奇數
實現另外一個執行緒從上面的隊列裡面不斷取出偶數
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生產數據""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#隊列長度小於等於7時添加元素 is_product = True for i in range(2): #每次向隊列里添加兩個元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #將數據依次存入隊列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 將數據依次存入隊列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#隊列長度大於7時開始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()
這個例子跟上篇博文Python:執行緒、進程與協程(2)——threading模組中介紹Condition的例子很像,就是構造了一個長度為20的隊列,當隊列1元素個數小於8時就忘隊列中添加元素,當隊列滿後,就不再添加,當隊列元素大於7個時,才會取元素,否則不取元素。有興趣的可以動手試試,仔細體會下。
(2)執行緒池
在使用多執行緒處理任務時也不是執行緒越多越好,由於在切換執行緒的時候,需要切換上下文環境,依然會造成cpu的大量開銷。為解決這個問題,執行緒池的概念被提出來了。預先創建好一個較為優化的數量的執行緒,讓過來的任務立刻能夠使用,就形成了執行緒池。在python中,沒有內置的較好的執行緒池模組,需要自己實現或使用第三方模組。
#coding=utf-8 import queue import threading import contextlib import time StopEvent = object() # 創建空對象 class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 執行緒池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果執行緒池已經終止,則返回True否則None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 創建一個執行緒 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完所有的任務後,所有執行緒停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 無論是否還有任務,終止執行緒 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄執行緒中正在等待的執行緒數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) # pool.close() # pool.terminate()