2.Python進程間的通信之隊列(Queue)和生產者消費者模型
一、隊列
1.1 概念介紹—–multiprocess.Queue
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
1.2 方法介紹
Queue([maxsize])
:創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
:返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
:如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
:如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()
方法)。
1.3 其他方法(了解)
q.close()
:關閉隊列,防止隊列中加入更多數據。調用此方法時,後台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()
操作上,關閉生產者中的隊列不會導致get()
方法返回錯誤。
q.cancel_join_thread()
:不會再進程退出時自動連接後台線程。這可以防止join_thread()
方法阻塞。
q.join_thread()
:連接隊列的後台線程。此方法用於在調用q.close()
方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()
方法可以禁止這種行為。
1.4 代碼實例 —multiprocess.Queue
1.4.1隊列用法
'''
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
'''
from multiprocessing import Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果隊列已經滿了,程序就會停在這裡,等待數據被別人取走,再將數據放入隊列。
# 如果隊列中的數據一直不被取走,程序就會永遠停在這裡。
try:
q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
print('隊列已經滿了')
# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) #滿了
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊列已經空了,那麼繼續取就會出現阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
print('隊列已經空了')
print(q.empty()) #空了
上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
1.4.2 子進程發送數據給父進程
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。
if __name__ == '__main__':
q = Queue() #創建一個Queue對象
p = Process(target=f, args=(q,)) #創建一個進程
p.start()
print(q.get())
p.join()
上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微複雜一些的例子:批量生產數據放入隊列再批量獲取結果。
1.4.3 批量生產數據放入隊列再批量獲取結果
import os
import time
import multiprocessing
# 向queue中輸入數據的函數
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
# 向queue中輸出數據的函數
def outputQ(queue):
info = queue.get()
print ('%s%s%s'%(str(os.getpid()), '(get):',info))
# Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] # store input processes
record2 = [] # store output processes
queue = multiprocessing.Queue(3)
# 輸入進程
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# 輸出進程
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
二、生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
2.1 為什麼要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
2.2 什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
2.3 代碼
2.3.1 基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('生產了 %s' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
print('主')
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死循環。
2.3.2 改良版——生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信號則結束
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('生產了 %s' %(os.getpid(),res))
q.put(None) #發送結束信號
if __name__ == '__main__':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
print('主')
注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束後才應該發送該信號。
2.3.3 主進程在生產者生產完畢後發送結束信號None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信號則結束
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(q):
for i in range(2):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('生產了 %s' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
p1.join()
q.put(None) #發送結束信號
print('主')
但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決
2.3.4 多個消費者的例子:有幾個消費者就需要發送幾次結束信號
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到結束信號則結束
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('生產了 %s' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
#開始
p1.start()
p2.start()
p3.start()
c1.start()
p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號
p2.join()
p3.join()
q.put(None) #有幾個消費者就應該發送幾次結束信號None
q.put(None) #發送結束信號
print('主')
這時可以用 JoinableQueue([maxsize]) 這個隊列來解決。
三、JoinableQueue([maxsize])
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
3.1.方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
:使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join()
:生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
3.2.JoinableQueue隊列實現消費之生產者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了
def producer(name,q):
for i in range(10):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('生產了 %s' %(os.getpid(),res))
q.join() #生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。
if __name__ == '__main__':
q=JoinableQueue()
#生產者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True
c2.daemon=True
#開始
p_l=[p1,p2,p3,c1,c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')
#主進程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
#因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。