Python | 多線程死鎖問題的巧妙解決方法

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是Python專題的第25篇文章,我們一起來聊聊多線程開發當中死鎖的問題。

死鎖

死鎖的原理非常簡單,用一句話就可以描述完。就是當多線程訪問多個鎖的時候,不同的鎖被不同的線程持有,它們都在等待其他線程釋放出鎖來,於是便陷入了永久等待。比如A線程持有1號鎖,等待2號鎖,B線程持有2號鎖等待1號鎖,那麼它們永遠也等不到執行的那天,這種情況就叫做死鎖。

關於死鎖有一個著名的問題叫做哲學家就餐問題,有5個哲學家圍坐在一起,他們每個人需要拿到兩個叉子才可以吃飯。如果他們同時拿起自己左手邊的叉子,那麼就會永遠等待右手邊的叉子釋放出來。這樣就陷入了永久等待,於是這些哲學家都會餓死。

img
img

這是一個很形象的模型,因為在計算機並發場景當中,一些資源的數量往往是有限的。很有可能出現多個線程搶佔的情況,如果處理不好就會發生大家都獲取了一個資源,然後在等待另外的資源的情況。

對於死鎖的問題有多種解決方法,這裡我們介紹比較簡單的一種,就是對這些鎖進行編號。我們規定當一個線程需要同時持有多個鎖的時候,必須要按照序號升序的順序對這些鎖進行訪問。通過上下文管理器我們可以很容易實現這一點。

上下文管理器

首先我們來簡單介紹一下上下文管理器,上下文管理器我們其實經常使用,比如我們經常使用的with語句就是一個上下文管理器的經典使用。當我們通過with語句打開文件的時候,它會自動替我們處理好文件讀取之後的關閉以及拋出異常的處理,可以節約我們大量的代碼。

同樣我們也可以自己定義一個上下文處理器,其實很簡單,我們只需要實現__enter__和__exit__這兩個函數即可。__enter__函數用來實現進入資源之前的操作和處理,那麼顯然__exit__函數對應的就是使用資源結束之後或者是出現異常的處理邏輯。有了這兩個函數之後,我們就有了自己的上下文處理類了。

我們來看一個樣例:

class Sample:
    def __enter__(self):
        print('enter resources')
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print('exit')
        # print(exc_type)
        # print(exc_val)
        # print(exc_tb)

    def doSomething(self):
        a = 1/1
        return a

def getSample():
    return Sample()

if __name__ == '__main__':
    with getSample() as sample:
        print('do something')
        sample.doSomething()

當我們運行這段代碼的時候,屏幕上打印的結果和我們的預期是一致的。

image-20200803091558632
image-20200803091558632

我們觀察一下__exit__函數,會發現它的參數有4個,後面的三個參數對應的是拋出異常的情況。type對應異常的類型,val對應異常時的輸出值,trace對應異常拋出時的運行堆棧。這些信息都是我們排查異常的時候經常需要用到的信息,通過這三個字段,我們可以根據我們的需要對可能出現的異常進行自定義的處理。

實現上下文管理器並不一定要通過類實現,Python當中也提供了上下文管理的註解,通過使用註解我們可以很方便地實現上下文管理。我們同樣也來看一個例子:

import time
from contextlib import contextmanager

@contextmanager
def timethis(label):
    start = time.time()
    try:
        yield
    finally:
        end = time.time()
        print('{}: {}'.format(label, end - start))
        
        
with timethis('timer'):
    pass

在這個方法當中yield之前的部分相當於__enter__函數,yield之後的部分相當於__exit__。如果出現異常會在try語句當中拋出,那麼我們編寫except對異常進行處理即可。

避免死鎖

了解了上下文管理器之後,我們要做的就是在lock的外面包裝一層,使得我們在獲取和釋放鎖的時候可以根據我們的需要,對鎖進行排序,按照升序的順序進行持有。

這段代碼源於Python的著名進階書籍《Python cookbook》,非常經典:

from contextlib import contextmanager

# 用來存儲local的數據
_local = threading.local()

@contextmanager
def acquire(*locks):
 # 對鎖按照id進行排序
    locks = sorted(locks, key=lambda x: id(x))

    # 如果已經持有鎖當中的序號有比當前更大的,說明策略失敗
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # 獲取所有鎖
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # 倒敘釋放
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

這段代碼寫得非常漂亮,可讀性很高,邏輯我們都應該能看懂,但是有一個小問題是這裡用到了threading.local這個組件。

它是一個多線程場景當中的共享變量,雖然說是共享的,但是對於每個線程來說讀取到的值都是獨立的。聽起來有些難以理解,其實我們可以將它理解成一個dict,dict的key是每一個線程的id,value是一個存儲數據的dict。每個線程在訪問local變量的時候,都相當於先通過線程id獲取了一個獨立的dict,再對這個dict進行的操作。

看起來我們在使用的時候直接使用了_local,這是因為通過線程id先進行查詢的步驟在其中封裝了。不明就裡的話可能會覺得有些難以理解。

我們再來看下這個acquire的使用:

x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        with acquire(x_lock, y_lock):
            print('Thread-1')

def thread_2():
    while True:
        with acquire(y_lock, x_lock):
            print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.start()

t2 = threading.Thread(target=thread_2)
t2.start()

運行一下會發現沒有出現死鎖的情況,但如果我們把代碼稍加調整,寫成這樣,那麼就會觸發異常了。

def thread_1():
    while True:
        with acquire(x_lock):
            with acquire(y_lock):
             print('Thread-1')

def thread_2():
    while True:
        with acquire(y_lock):
            with acquire(x_lock):
             print('Thread-1')

因為我們把鎖寫成了層次結構,這樣就沒辦法進行排序保證持有的有序性了,那麼就會觸發我們代碼當中定義的異常。

最後我們再來看下哲學家就餐問題,通過我們自己實現的acquire函數我們可以非常方便地解決他們死鎖吃不了飯的問題。

import threading

def philosopher(left, right):
    while True:
        with acquire(left,right):
             print(threading.currentThread(), 'eating')

# 叉子的數量
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

for n in range(NSTICKS):
    t = threading.Thread(target=philosopher,
                         args=(chopsticks[n],chopsticks[(n+1) % NSTICKS]))
    t.start()

總結

關於死鎖的問題,對鎖進行排序只是其中的一種解決方案,除此之外還有很多解決死鎖的模型。比如我們可以讓線程在嘗試持有新的鎖失敗的時候主動放棄所有目前已經持有的鎖,比如我們可以設置機制檢測死鎖的發生並對其進行處理等等。發散出去其實有很多種方法,這些方法起作用的原理各不相同,其中涉及大量操作系統的基礎概念和知識,感興趣的同學可以深入研究一下這個部分,一定會對操作系統以及鎖的使用有一個深刻的認識。

今天的文章到這裡就結束了,如果喜歡本文的話,請來一波素質三連,給我一點支持吧(關注、轉發、點贊)。

– END –

掃碼關注,獲取更多文章