­

守護進程,鎖,隊列(生產者消費者模型)

一、守護進程

守護進程會隨着主進程的代碼結束之後在結束,而不是等待整個主進程結束(因為主進程要回收資源)

主進程的代碼什麼時候結束,守護進程就什麼時候結束,和其他子進程執行進度無關

主進程會等待所有的子進程結束,是為了回收子進程的資源

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

開啟一個守護進程:在start一個進程之前設置 daemon = True

import time
from multiprocessing import Process
​
​
def func1():
    while True:         # 此進程會每隔1秒無限打印 is func1
        print('is func1')
        time.sleep(1)
​
​
if __name__ == '__main__':
    p = Process(target=func1)
    p.daemon = True     # 表示p是一個守護進程
    p.start()
    time.sleep(3)       # 主進程代碼只執行3秒,守護進程同樣只執行3秒
    
# 輸出
is func1
is func1
is func1

要求守護進程p1必須在p2進程執行結束之後才結束

import time
from multiprocessing import Process
​
​
def func1():
    while True:         # 此進程會每無限打印,每秒打印一次 is func1
        print('is func1')
        time.sleep(1)
​
​
def func2():
    for i in range(5):  # 此進程會執行5秒,每秒打印一次 is func2
        print('is fun2')
        time.sleep(1)
​
​
if __name__ == '__main__':
    p1 = Process(target=func1)
    p1.daemon = True     # 表示p是一個守護進程
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p2.join()           # 主進程會等待 p2 進程結束後在結束
    
# 等待p2結束 ——> 主進程代碼才結束 ——> 守護進程結束

二、進程同步—鎖(multiprocessing . Lock)

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端是沒問題的。

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,速度時慢了,但犧牲速度保證了數據安全。

鎖非常細緻,可以控制具體的某一句代碼控制成同步的,其他的還是並發的

1、加鎖的格式:Lock

from multiprocessing import Process, Lock
import time
​
​
def func(i, lock):
    lock.acquire()  # 拿鑰匙
    print(f'被鎖起來的代碼{i}')
    time.sleep(1)   # 鎖起來的代碼執行時間
    lock.release()  # 還鑰匙
​
​
    # 也可以只寫成這樣
    # 鎖非常細緻,可以控制具體的某一句代碼控制成同步的,其他的還是並發的
    # 和with open 一樣的,(推薦這樣寫,在執行裏面的的代碼時那鑰匙開鎖,執行完了之後還鑰匙)
    # 代替acquire和release 並且在此基礎上做一些異常處理,保證即便一個進程的代碼出錯退出了,也會歸還鑰匙
    
    """
    with lock:
        print(f'被鎖起來的代碼{i}')
        time.sleep(1)  # 鎖起來的代碼執行時間
    """
​
​
if __name__ == '__main__':
    lock = Lock()   # 生成鎖
    for i in range(5):
        p = Process(target=func, args=(i, lock))
        p.start()
        
# 輸出
被鎖起來的代碼0
被鎖起來的代碼1
被鎖起來的代碼3
被鎖起來的代碼2
被鎖起來的代碼4

多進程搶票例子不加鎖:效率高,但是順序容易錯亂

只剩一張票卻被多人買到,因為每有一人買票,買到了就會去修改票的數量,在這期間會有延遲,在同一時間又有一人來買票,但是票的數量還沒改回來,造成了數據不安全同一張票被多人購買。

import json
import time
from multiprocessing import Process
​
​
def search(i):
    with open('ticket', encoding='utf-8', mode='r') as f:
        ticket = json.load(f)
        print(f'{i}查:當前的票還剩餘{ticket["count"]}張。')
​
# 查詢還剩多少票,如果大於0就把數量減1後在寫入文件,中間會有延遲
def buy_ticket(i):
    with open('ticket', encoding='utf-8', mode='r') as f:
        ticket = json.load(f)
        if ticket['count'] > 0:
            ticket['count'] -= 1
            print(f'{i},買到票了。')
​
    time.sleep(0.1)         # 模擬買到票後修改票數量網絡延遲
    with open('ticket', encoding='utf-8', mode='w') as f:
        json.dump(ticket, f)
​
​
def get_ticket(i):
    search(i)
    buy_ticket(i)
​
​
if __name__ == '__main__':
    user = [('小楊',), ('小紅',), ('鮑勃',), ('艾倫',), ('愛麗',)]
    for name in user:
        p = Process(target=get_ticket, args=name)
        p.start()
        
# 輸出
小楊查:當前的票還剩餘1張。
小楊,買到票了。
小紅查:當前的票還剩餘1張。
小紅,買到票了。
鮑勃查:當前的票還剩餘1張。
鮑勃,買到票了。
艾倫查:當前的票還剩餘1張。
艾倫,買到票了。
愛麗查:當前的票還剩餘1張。
愛麗,買到票了。

多進程搶票例子加鎖:犧牲了效率,但是保障了數據安全和順序

import json
import time
from multiprocessing import Process, Lock
​
​
def search(i):
    with open('ticket', encoding='utf-8', mode='r') as f:
        ticket = json.load(f)
        print(f'{i}查:當前的票還剩餘{ticket["count"]}張。')
​
​
def buy_ticket(i):
    with open('ticket', encoding='utf-8', mode='r') as f:
        ticket = json.load(f)
        if ticket['count'] > 0:
            ticket['count'] -= 1
            print(f'{i},買到票了。')
​
    time.sleep(0.1)         # 模擬買到票後修改票數量網絡延遲
    with open('ticket', encoding='utf-8', mode='w') as f:
        json.dump(ticket, f)
​
​
def get_ticket(i, lock):
    search(i)       # 查票沒加鎖,所以所有人都是同時查票的
    with lock:      # 只在買票這裡加了鎖,同時只有一個人能買票,和修改票的數量
        buy_ticket(i)
​
​
if __name__ == '__main__':
    user = ['小楊', '小紅', '鮑勃', '艾倫', '愛麗']
    lock = Lock()
    for name in user:
        p = Process(target=get_ticket, args=(name, lock))
        p.start()
        
# 輸出
小楊查:當前的票還剩餘2張。
小楊,買到票了。
小紅查:當前的票還剩餘2張。
鮑勃查:當前的票還剩餘2張。
愛麗查:當前的票還剩餘2張。
艾倫查:當前的票還剩餘2張。
小紅,買到票了。

Lock為互斥鎖,不能在同一個進程中連續acquire多次

from multiprocessing import Lock # 互斥鎖 不能再同一個進程中連續acquire多次
lock = Lock()       # 創建鎖
lock.acquire()      # 拿鑰匙
print(1)
lock.acquire()      # 拿鑰匙——>會卡在這裡,因為沒有還鑰匙,就拿不到鑰匙,拿鑰匙和還鑰匙要成對出現
print(2)
lock.release()      # 還鑰匙
# 輸出
1

三、隊列

進程之間的通信(IPC):兩種形式

1、基於文件的:同一台機器上的多個進程之間通信

Queue 隊列

基於socket的文件級別的通信來完成數據傳遞的

2、基於網絡的

第三方工具(消息中間件)

memcache

redis

rabbitmg

kafka

總之進程彼此之間內存是相互隔離,要實現進程間通信(IPC), multiprocessing模塊支持隊列,這種方法是使用消息傳遞的。

1、主要方法:

from multiprocessing import Queue
​
q = Queue()     # 實例化一個隊列
​
q.get()         # 放入數據到隊列中
q.put()         # 從隊列中取出數據
q.get_nowait()  # 同q.get(False)
q.put_nowait()  # 同q.put(False)
# 遵循先進先出,後進後出原則

2、隊列用法:

子進程和主進程之間的數據傳遞

from multiprocessing import Process, Queue
​
​
def func1(q):
    # 2、子進程中對主進程中的隊列進行傳值5次
    for i in range(5):
        q.put(f'子進程func1數據{i}')
​
​
if __name__ == '__main__':
    q = Queue()     # 1、在主進程實例化一個隊列
    p = Process(target=func1, args=(q, ))
    p.start()
​
    # 3、在主進程中查看子進程對主進程的隊列所傳的值
    for i in range(5):
        print(q.get())
# 輸出
子進程func1數據0
子進程func1數據1
子進程func1數據2
子進程func1數據3
子進程func1數據4

子進程之間的數據傳遞

from multiprocessing import Process, Queue
​
​
def func1(q):
    # 子進程中對主進程中的隊列進行傳值3次
    for i in range(3):
        q.put(f'子進程func1數據{i}')
​
​
def func2(q):
    # 子進程中對主進程中的隊列進行傳值2次
    for i in range(2):
        q.put(f'子進程func2數據{i}')
​
​
def show_func(q):
    # 子進程查看其他子進程傳入的數據
    for i in range(5):
        print(q.get())
​
​
if __name__ == '__main__':
    q = Queue()     # 在主進程實例化一個隊列
    p1 = Process(target=func1, args=(q, ))
    p1.start()
    p2 = Process(target=func2, args=(q,))
    p2.start()
    p3 = Process(target=show_func, args=(q,))
    p3.start()
    
# 輸出
子進程func1數據0
子進程func1數據1
子進程func1數據2
子進程func2數據0
子進程func2數據1

四、生產者消費者模型

在並發編程中使用生產者消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線和消費線的工作能力來提高程序的整體處理數據的速度。

為什麼要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

本質 : 就是讓生產數據和消費數據的效率達到平衡並且最大化的效率,這樣的話能夠提高我們程序的執行效率,並且也能夠降低我們要開的並發數,還能讓我們的操作系統壓力小一下

基於隊列實現生產者消費者模型:

from multiprocessing import Process, Queue
import time
​
​
def consumer(q, name):  # 消費者:通常取到數據之後還要進行某些操作
    while True:
        food = q.get()
        if food:
            time.sleep(1)   # 消費者吃的速度
            print(f'{name}吃了{food}')
        else:
            print('吃完了!!')
            break   # 當接收到了結束信號就會自動結束,不然會卡住。
​
​
def producer(q, name, food):  # 生產者:通常在放數據之前需要先通過某些代碼來獲取數據
    for i in range(3):
        # 生產一個食物需要2秒, 而吃的話只要1秒,這樣生產者就需要兩個才能平衡
        time.sleep(2)         
        print(f'{name}生產了一個食物{food+str(i)}')
        q.put(f'{food+str(i)}')
​
​
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, '小紅', '蘋果'))
    p2 = Process(target=producer, args=(q, '小楊', '橘子'))
    c = Process(target=consumer, args=(q, '鮑勃'))
    p1.start()  # 生產者p1執行
    p2.start()  # 生產者p2執行
    c.start()   # 消費者c執行
    p1.join()   # 等待p1生產者生產完
    p2.join()   # 等待p2生產者生產完
    q.put(False)    # 生產者全部生產完了之後就放入一個False結束信號
                    # 這樣消費者發現這個信號就主動退出,消費者有幾個信號就要有幾個
# 輸出
小紅生產了一個食物蘋果0
鮑勃吃了蘋果0
小楊生產了一個食物橘子0
鮑勃吃了橘子0
小紅生產了一個食物蘋果1
鮑勃吃了蘋果1
小楊生產了一個食物橘子1
鮑勃吃了橘子1
小紅生產了一個食物蘋果2
鮑勃吃了蘋果2
小楊生產了一個食物橘子2
鮑勃吃了橘子2
吃完了!!

生產者消費者模型總結:

程序中有兩類角色:

1、一類負責生產數據(生產者)

2、一類負責處理數據(消費者)

引入生產者消費者模型為了解決的問題是:

平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度

如何實現:

生產者<——>隊列<——>消費者