實例詳解 Java 死鎖與破解死鎖

鎖和被保護資源之間的關係

我們把一段需要互斥執行的代碼稱為臨界區。線程在進入臨界區之前,首先嘗試加鎖 lock(),如果成功,則進入臨界區,此時我們稱這個線程持有鎖;否則呢就等待,直到持有鎖的線程解鎖;持有鎖的線程執行完臨界區的代碼後,執行解鎖 unlock()。這樣理解本身沒有問題,但卻很容易讓我們忽視兩個非常非常重要的點:我們鎖的是什麼?我們保護的又是什麼?

我們知道在現實世界裏,鎖和鎖要保護的資源是有對應關係的,比如你用你家的鎖保護你家的東西,我用我家的鎖保護我家的東西。在並發編程世界裏,鎖和資源也應該有這個關係,因此,一個好的鎖模型如下圖所示。

鎖模型

首先,我們要把臨界區要保護的資源標註出來,如圖中臨界區里增加了一個元素:受保護的資源 R;其次,我們要保護資源 R 就得為它創建一把鎖 LR;最後,針對這把鎖 LR,我們還需在進出臨界區時添上加鎖操作和解鎖操作。另外,在鎖 LR 和受保護資源之間,我特地用一條線做了關聯,這個關聯關係非常重要。很多並發 Bug 的出現都是因為把它忽略了,然後就出現了類似鎖自家門來保護他家資產的事情,這樣的 Bug 非常不好診斷,因為潛意識裡我們認為已經正確加鎖了。

受保護資源和鎖之間的關聯關係非常重要,他們的關係是怎樣的呢?一個合理的關係是:受保護資源和鎖之間的關聯關係是 N:1 的關係

互斥鎖,在並發領域的知名度極高,只要有了並發問題,大家首先容易想到的就是加鎖,因為大家都知道,加鎖能夠保證執行臨界區代碼的互斥性。這樣理解雖然正確,但是卻不能夠指導你真正用好互斥鎖。臨界區的代碼是操作受保護資源的路徑,類似於球場的入口,入口一定要檢票,也就是要加鎖,但不是隨便一把鎖都能有效。所以必須深入分析鎖定的對象和受保護資源的關係,綜合考慮受保護資源的訪問路徑,多方面考量才能用好互斥鎖。

synchronized 是 Java 在語言層面提供的互斥原語,其實 Java 裏面還有很多其他類型的鎖,但作為互斥鎖,原理都是相通的:鎖,一定有一個要鎖定的對象,至於這個鎖定的對象要保護的資源以及在哪裡加鎖 / 解鎖,就屬於設計層面的事情了。

對如何保護多個資源已經很有心得了,關鍵是要分析多個資源之間的關係。如果資源之間沒有關係,很好處理,每個資源一把鎖就可以了。如果資源之間有關聯關係,就要選擇一個粒度更大的鎖,這個鎖應該能夠覆蓋所有相關的資源。除此之外,還要梳理出有哪些訪問路徑,所有的訪問路徑都要設置合適的鎖,這個過程可以類比一下門票管理

我們再引申一下上面提到的關聯關係,關聯關係如果用更具體、更專業的語言來描述的話,其實是一種「原子性」特徵,我們提到的原子性,主要是面向 CPU 指令的,轉賬操作的原子性則是屬於是面向高級語言的,不過它們本質上是一樣的。

「原子性」的本質是什麼?其實不是不可分割,不可分割只是外在表現,其本質是多個資源間有一致性的要求,操作的中間狀態對外不可見。例如,在 32 位的機器上寫 long 型變量有中間狀態(只寫了 64 位中的 32 位),在銀行轉賬的操作中也有中間狀態(賬戶 A 減少了 100,賬戶 B 還沒來得及發生變化)。所以解決原子性問題,是要保證中間狀態對外不可見。 

來看一個例子,銀行轉賬的例子:

class Account {
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    synchronized(Account.class) {
      if (this.balance > amt) {
        this.balance -= amt;
        target.balance += amt;
      }
    }
  } 
}

這裡用 Account.class 作為互斥鎖,來解決銀行業務裏面的轉賬問題,雖然這個方案不存在並發問題,但是所有賬戶的轉賬操作都是串行的,例如賬戶 A 轉賬戶 B、賬戶 C 轉賬戶 D 這兩個轉賬操作現實世界裏是可以並行的,但是在這個方案里卻被串行化了,這樣的話,性能太差。

試想互聯網支付盛行的當下,8 億網民每人每天一筆交易,每天就是 8 億筆交易;每筆交易都對應着一次轉賬操作,8 億筆交易就是 8 億次轉賬操作,也就是說平均到每秒就是近 1 萬次轉賬操作,若所有的轉賬操作都串行,性能完全不能接受。

那下面我們就嘗試着把性能提升一下。

向現實世界要答案

現實世界裏,賬戶轉賬操作是支持並發的,而且絕對是真正的並行,銀行所有的窗口都可以做轉賬操作。只要我們能仿照現實世界做轉賬操作,串行的問題就解決了。

我們試想在古代,沒有信息化,賬戶的存在形式真的就是一個賬本,而且每個賬戶都有一個賬本,這些賬本都統一存放在文件架上。銀行櫃員在給我們做轉賬時,要去文件架上把轉出賬本和轉入賬本都拿到手,然後做轉賬。這個櫃員在拿賬本的時候可能遇到以下三種情況:

  1. 文件架上恰好有轉出賬本和轉入賬本,那就同時拿走;

  2. 如果文件架上只有轉出賬本和轉入賬本之一,那這個櫃員就先把文件架上有的賬本拿到手,同時等着其他櫃員把另外一個賬本送回來;

  3. 轉出賬本和轉入賬本都沒有,那這個櫃員就等着兩個賬本都被送回來。

上面這個過程在編程的世界裏怎麼實現呢?其實用兩把鎖就實現了,轉出賬本一把,轉入賬本另一把。在 transfer() 方法內部,我們首先嘗試鎖定轉出賬戶 this(先把轉出賬本拿到手),然後嘗試鎖定轉入賬戶 target(再把轉入賬本拿到手),只有當兩者都成功時,才執行轉賬操作。這個邏輯可以圖形化為下圖這個樣子。

兩個轉賬操作並行示意圖

而至於詳細的代碼實現,如下所示。經過這樣的優化後,賬戶 A 轉賬戶 B 和賬戶 C 轉賬戶 D 這兩個轉賬操作就可以並行了。

class Account {
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    // 鎖定轉出賬戶
    synchronized(this) {              
      // 鎖定轉入賬戶
      synchronized(target) {           
        if (this.balance > amt) {
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

沒有免費的午餐

上面的實現看上去很完美,並且也算是將鎖用得出神入化了。相對於用 Account.class 作為互斥鎖,鎖定的範圍太大,而我們鎖定兩個賬戶範圍就小多了,這樣的鎖,叫細粒度鎖。使用細粒度鎖可以提高並行度,是性能優化的一個重要手段。

這個時候可能你已經開始警覺了,使用細粒度鎖這麼簡單,有這樣的好事,是不是也要付出點什麼代價啊?編寫並發程序就需要這樣時時刻刻保持謹慎。

的確,使用細粒度鎖是有代價的,這個代價就是可能會導致死鎖。

在詳細介紹死鎖之前,我們先看看現實世界裏的一種特殊場景。如果有客戶找櫃員張三做個轉賬業務:賬戶 A 轉賬戶 B 100 元,此時另一個客戶找櫃員李四也做個轉賬業務:賬戶 B 轉賬戶 A 100 元,於是張三和李四同時都去文件架上拿賬本,這時候有可能湊巧張三拿到了賬本 A,李四拿到了賬本 B。張三拿到賬本 A 後就等着賬本 B(賬本 B 已經被李四拿走),而李四拿到賬本 B 後就等着賬本 A(賬本 A 已經被張三拿走),他們要等多久呢?他們會永遠等待下去…因為張三不會把賬本 A 送回去,李四也不會把賬本 B 送回去。我們姑且稱為死等吧。

轉賬業務中的「死等」

現實世界裏的死等,就是編程領域的死鎖了。死鎖的一個比較專業的定義是:一組互相競爭資源的線程因互相等待,導致「永久」阻塞的現象。

上面轉賬的代碼是怎麼發生死鎖的呢?我們假設線程 T1 執行賬戶 A 轉賬戶 B 的操作,賬戶 A.transfer(賬戶 B);同時線程 T2 執行賬戶 B 轉賬戶 A 的操作,賬戶 B.transfer(賬戶 A)。當 T1 和 T2 同時執行完①處的代碼時,T1 獲得了賬戶 A 的鎖(對於 T1,this 是賬戶 A),而 T2 獲得了賬戶 B 的鎖(對於 T2,this 是賬戶 B)。之後 T1 和 T2 在執行②處的代碼時,T1 試圖獲取賬戶 B 的鎖時,發現賬戶 B 已經被鎖定(被 T2 鎖定),所以 T1 開始等待;T2 則試圖獲取賬戶 A 的鎖時,發現賬戶 A 已經被鎖定(被 T1 鎖定),所以 T2 也開始等待。於是 T1 和 T2 會無期限地等待下去,也就是我們所說的死鎖了。

class Account {
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    // 鎖定轉出賬戶
    synchronized(this){     ①
      // 鎖定轉入賬戶
      synchronized(target){ ②
        if (this.balance > amt) {
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

關於這種現象,我們還可以藉助資源分配圖來可視化鎖的佔用情況(資源分配圖是個有向圖,它可以描述資源和線程的狀態)。其中,資源用方形節點表示,線程用圓形節點表示;資源中的點指向線程的邊表示線程已經獲得該資源,線程指向資源的邊則表示線程請求資源,但尚未得到。轉賬發生死鎖時的資源分配圖就如下圖所示,一個「各據山頭死等」的尷尬局面。

轉賬發生死鎖時的資源分配圖

如何預防死鎖

並發程序一旦死鎖,一般沒有特別好的方法,很多時候我們只能重啟應用。因此,解決死鎖問題最好的辦法還是規避死鎖。

那如何避免死鎖呢?要避免死鎖就需要分析死鎖發生的條件,有個叫 Coffman 的牛人早就總結過了,只有以下這四個條件都發生時才會出現死鎖:

  1. 互斥,共享資源 X 和 Y 只能被一個線程佔用;

  2. 佔有且等待,線程 T1 已經取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;

  3. 不可搶佔,其他線程不能強行搶佔線程 T1 佔有的資源;

  4. 循環等待,線程 T1 等待線程 T2 佔有的資源,線程 T2 等待線程 T1 佔有的資源,就是循環等待。

反過來分析,也就是說只要我們破壞其中一個,就可以成功避免死鎖的發生。

其中,互斥這個條件我們沒有辦法破壞,因為我們用鎖為的就是互斥。不過其他三個條件都是有辦法破壞掉的,到底如何做呢?

  1. 對於「佔用且等待」這個條件,我們可以一次性申請所有的資源,這樣就不存在等待了。

  2. 對於「不可搶佔」這個條件,佔用部分資源的線程進一步申請其他資源時,如果申請不到,可以主動釋放它佔有的資源,這樣不可搶佔這個條件就破壞掉了。

  3. 對於「循環等待」這個條件,可以靠按序申請資源來預防。所謂按序申請,是指資源是有線性順序的,申請的時候可以先申請資源序號小的,再申請資源序號大的,這樣線性化後自然就不存在循環了。

我們已經從理論上解決了如何預防死鎖,那具體如何體現在代碼上呢?下面我們就來嘗試用代碼實踐一下這些理論。

1. 破壞佔用且等待條件

從理論上講,要破壞這個條件,可以一次性申請所有資源。在現實世界裏,就拿前面我們提到的轉賬操作來講,它需要的資源有兩個,一個是轉出賬戶,另一個是轉入賬戶,當這兩個賬戶同時被申請時,我們該怎麼解決這個問題呢?

可以增加一個賬本管理員,然後只允許賬本管理員從文件架上拿賬本,也就是說櫃員不能直接在文件架上拿賬本,必須通過賬本管理員才能拿到想要的賬本。例如,張三同時申請賬本 A 和 B,賬本管理員如果發現文件架上只有賬本 A,這個時候賬本管理員是不會把賬本 A 拿下來給張三的,只有賬本 A 和 B 都在的時候才會給張三。這樣就保證了「一次性申請所有資源」。

通過賬本管理員拿賬本

對應到編程領域,「同時申請」這個操作是一個臨界區,我們也需要一個角色(Java 裏面的類)來管理這個臨界區,我們就把這個角色定為 Allocator。它有兩個重要功能,分別是:同時申請資源 apply() 和同時釋放資源 free()。賬戶 Account 類裏面持有一個 Allocator 的單例(必須是單例,只能由一個人來分配資源)。當賬戶 Account 在執行轉賬操作的時候,首先向 Allocator 同時申請轉出賬戶和轉入賬戶這兩個資源,成功後再鎖定這兩個資源;當轉賬操作執行完,釋放鎖之後,我們需通知 Allocator 同時釋放轉出賬戶和轉入賬戶這兩個資源。具體的代碼實現如下。

class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申請所有資源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 歸還資源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}
 
class Account {
  // actr 應該為單例
  private Allocator actr;
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    // 一次性申請轉出賬戶和轉入賬戶,直到成功
    while(!actr.apply(this, target))
      ;
    try{
      // 鎖定轉出賬戶
      synchronized(this){              
        // 鎖定轉入賬戶
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}

2. 破壞不可搶佔條件

破壞不可搶佔條件看上去很簡單,核心是要能夠主動釋放它佔有的資源,這一點 synchronized 是做不到的。原因是 synchronized 申請資源的時候,如果申請不到,線程直接進入阻塞狀態了,而線程進入阻塞狀態,啥都幹不了,也釋放不了線程已經佔有的資源。

你可能會質疑,「Java 作為排行榜第一的語言,這都解決不了?」你的懷疑很有道理,Java 在語言層次確實沒有解決這個問題,不過在 SDK 層面還是解決了的,java.util.concurrent 這個包下面提供的 Lock 是可以輕鬆解決這個問題的。關於這個話題,咱們後面會詳細講。

3. 破壞循環等待條件

破壞這個條件,需要對資源進行排序,然後按序申請資源。這個實現非常簡單,我們假設每個賬戶都有不同的屬性 id,這個 id 可以作為排序字段,申請的時候,我們可以按照從小到大的順序來申請。比如下面代碼中,①~⑥處的代碼對轉出賬戶(this)和轉入賬戶(target)排序,然後按照序號從小到大的順序鎖定賬戶。這樣就不存在「循環」等待了。

class Account {
  private int id;
  private int balance;
  // 轉賬
  void transfer(Account target, int amt){
    Account left = this        ①
    Account right = target;    ②
    if (this.id > target.id) { ③
      left = target;           ④
      right = this;            ⑤
    }                          ⑥
    // 鎖定序號小的賬戶
    synchronized(left){
      // 鎖定序號大的賬戶
      synchronized(right){ 
        if (this.balance > amt){
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

如果在獲取多個鎖的時候操作耗時非常短,而且並發衝突量也不大時,這個方案還挺不錯的,因為這種場景下,循環上幾次或者幾十次就能一次性獲取轉出賬戶和轉入賬戶了。但是如果 apply() 操作耗時長,或者並發衝突量大的時候,循環等待這種方案就不適用了,因為在這種場景下,可能要循環上萬次才能獲取到鎖,太消耗 CPU 了。

其實在這種場景下,最好的方案應該是:如果線程要求的條件(轉出賬本和轉入賬本同在文件架上)不滿足,則線程阻塞自己,進入等待狀態;當線程要求的條件(轉出賬本和轉入賬本同在文件架上)滿足後,通知等待的線程重新執行。其中,使用線程阻塞的方式就能避免循環等待消耗 CPU 的問題。

那 Java 語言是否支持這種等待 – 通知機制呢?答案是:一定支持(畢竟佔據排行榜第一那麼久)。下面我們就來看看 Java 語言是如何支持等待 – 通知機制的。

用 synchronized 實現等待 – 通知機制

在 Java 語言里,等待 – 通知機制可以有多種實現方式,比如 Java 語言內置的 synchronized 配合 wait()、notify()、notifyAll() 這三個方法就能輕鬆實現。

如何用 synchronized 實現互斥鎖,你應該已經很熟悉了。在下面這個圖裡,左邊有一個等待隊列,同一時刻,只允許一個線程進入 synchronized 保護的臨界區(這個臨界區可以看作大夫的診室),當有一個線程進入臨界區後,其他線程就只能進入圖中左邊的等待隊列里等待(相當於患者分診等待)。這個等待隊列和互斥鎖是一對一的關係,每個互斥鎖都有自己獨立的等待隊列。

wait() 操作工作原理圖

在並發程序中,當一個線程進入臨界區後,由於某些條件不滿足,需要進入等待狀態,Java 對象的 wait() 方法就能夠滿足這種需求。如上圖所示,當調用 wait() 方法後,當前線程就會被阻塞,並且進入到右邊的等待隊列中,這個等待隊列也是互斥鎖的等待隊列。 線程在進入等待隊列的同時,會釋放持有的互斥鎖,線程釋放鎖後,其他線程就有機會獲得鎖,並進入臨界區了。

notify() 是會隨機地通知等待隊列中的一個線程,而 notifyAll() 會通知等待隊列中的所有線程。從感覺上來講,應該是 notify() 更好一些,因為即便通知所有線程,也只有一個線程能夠進入臨界區。但那所謂的感覺往往都蘊藏着風險,實際上使用 notify() 也很有風險,它的風險在於可能導致某些線程永遠不會被通知到。

具體參考下面的代碼:

class Allocator {
  private List<Object> als;
  // 一次性申請所有資源
  synchronized void apply(
    Object from, Object to){
    // 經典寫法
    while(als.contains(from) ||
         als.contains(to)){
      try{
        wait();
      }catch(Exception e){
      }   
    } 
    als.add(from);
    als.add(to);  
  }
  // 歸還資源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

在上面的代碼中,我用的是 notifyAll() 來實現通知機制,為什麼不使用 notify() 呢?這二者是有區別的,notify() 是會隨機地通知等待隊列中的一個線程,而 notifyAll() 會通知等待隊列中的所有線程。從感覺上來講,應該是 notify() 更好一些,因為即便通知所有線程,也只有一個線程能夠進入臨界區。但那所謂的感覺往往都蘊藏着風險,實際上使用 notify() 也很有風險,它的風險在於可能導致某些線程永遠不會被通知到。

假設我們有資源 A、B、C、D,線程 1 申請到了 AB,線程 2 申請到了 CD,此時線程 3 申請 AB,會進入等待隊列(AB 分配給線程 1,線程 3 要求的條件不滿足),線程 4 申請 CD 也會進入等待隊列。我們再假設之後線程 1 歸還了資源 AB,如果使用 notify() 來通知等待隊列中的線程,有可能被通知的是線程 4,但線程 4 申請的是 CD,所以此時線程 4 還是會繼續等待,而真正該喚醒的線程 3 就再也沒有機會被喚醒了。

所以除非經過深思熟慮,否則盡量使用 notifyAll()。

總結

當我們在編程世界裏遇到問題時,應不局限於當下,可以換個思路,向現實世界要答案,利用現實世界的模型來構思解決方案,這樣往往能夠讓我們的方案更容易理解,也更能夠看清楚問題的本質。

但是現實世界的模型有些細節往往會被我們忽視。因為在現實世界裏,人太智能了,以致有些細節實在是顯得太不重要了。在轉賬的模型中,我們為什麼會忽視死鎖問題呢?原因主要是在現實世界,我們會交流,並且會很智能地交流。而編程世界裏,兩個線程是不會智能地交流的。所以在利用現實模型建模的時候,我們還要仔細對比現實世界和編程世界裏的各角色之間的差異。

我們今天這一篇文章主要講了用細粒度鎖來鎖定多個資源時,要注意死鎖的問題。這個就需要你能把它強化為一個思維定勢,遇到這種場景,馬上想到可能存在死鎖問題。當你知道風險之後,才有機會談如何預防和避免,因此,識別出風險很重要。

預防死鎖主要是破壞三個條件中的一個,有了這個思路後,實現就簡單了。但仍需注意的是,有時候預防死鎖成本也是很高的。例如上面轉賬那個例子,我們破壞佔用且等待條件的成本就比破壞循環等待條件的成本高,破壞佔用且等待條件,我們也是鎖了所有的賬戶,而且還是用了死循環 while(!actr.apply(this, target));方法,不過好在 apply() 這個方法基本不耗時。 在轉賬這個例子中,破壞循環等待條件就是成本最低的一個方案。

所以我們在選擇具體方案的時候,還需要評估一下操作成本,從中選擇一個成本最低的方案。

 

相關文章

實例詳解 Java 死鎖與破解死鎖

Java 內存模型 

可見性、原子性和有序性問題:並發編程Bug的源頭

Java CAS 原理詳解

深入詳解 Java 線程

Java 並發編程學習總結

Tags: