常用鎖原理的介紹(上)

本文是學習《多處理器編程的藝術》的筆記。

下面主要介紹了一些常用的鎖的原理,這些只是一些理論,離我們實際使用還是有一些差距的。不過這種理論也往往是相對比較好容易理解了掌握的,只有了解了這些理論,可以加深我們對鎖原理知識的理解。有能力的同學更可能根據這些理論訂製開發符合自己場景的高性能的鎖。

本篇文章並沒有具體講到ava中鎖的實現。但是通過本篇文章,相信大家對Java中鎖的實現也會有更深入的理解。因為Java中Lock的底層實現AbstractQueuedSynchronizer的原型在這裡就會講到。

同樣的這些原理在其他很多地方也能見到,比如TCP中關於碰撞的處理就和這裡提到的指數後退鎖是一樣的。zookeeper的分散式鎖也就是通過隊列的方式來實現的。

測設-設置鎖

import java.util.concurrent.atomic.AtomicBoolean;

public class TASLock {
    private AtomicBoolean state=new AtomicBoolean(false);
    public void lock(){
        while(state.getAndSet(true)){

        }
    }
    public void unlock(){
        state.set(false);
    }
}

TASLock通過將state變數的值由false變為true來設置加鎖。

注意:只有將state變數從false變為true的執行緒,可以獲得鎖,其他執行緒都會在state.getAndSet(true)上一直自旋


import java.util.concurrent.atomic.AtomicBoolean;

public class TTASLock {
    private AtomicBoolean state=new AtomicBoolean(false);
    public void lock(){
        while(true){
            while(state.get()){}
            if(!state.getAndSet(true)){
                return;
            }
        }
    }
    public void unlock(){
        state.set(false);
    }
}

TTASLock和TASLock基本一樣,只不過,TTASLock在設置state的值為true之前,首先會判斷state當前的值是不是false,如果不是false就會在state.getAndSet(true)上自旋等待;如果將state的值從false設置為true失敗就又會重新開始上述操作。


TASLock和TTASLock它們的性能如何呢?

從圖上的結果就能看出TASLock和TTASLock它們直接的性能差距還是比較明顯的。

為什麼會有這麼大的性能差距呢?

主要的原因是為了CPU訪問數據的速度,CPU內部都有一個小容量的存儲器cache,CPU只會跟cache通訊,並不會直接跟記憶體通訊。

getAndSet設置的變數是有volatile修飾的。根據快取一致性協議,這事getAndSet會直接將數據寫回記憶體,同時迫使其他處理器cache中的對應的副本無效,這時其他處理器就需要去記憶體中重新讀取對應變數的新值。

處理器寫回數據到記憶體、處理器從記憶體中讀取數據都是通過匯流排來完成的。是通過匯流排將數據寫到記憶體中的。而在任何時刻值只能有一個處理器佔用匯流排和記憶體通訊,其他想要讀寫記憶體的處理器會被阻塞。

這就是TASLock性能為什麼差的原因。

再來看看TTASLock,執行緒B第一次讀鎖時發生cache缺失,會從記憶體中讀取值到它的cache中。只要執行緒A持有鎖,執行緒B就會不斷重新該值,這時每次都是cache命中,不會佔用匯流排從記憶體中讀取,也不會降低其他執行緒的記憶體訪問速度。此外,釋放鎖的執行緒也不會被正在該鎖上自旋的執行緒所延遲。

然而,當鎖被釋放的情況並不理想。鎖的持有者將false寫入鎖變數來釋放鎖。該操作會使自旋執行緒的cache副本立刻失效。每個執行緒都將放生一次cache缺失並重新讀取新值,它們會同時調用getAndSet以獲取鎖。第一個成功的執行緒將再次使其他執行緒中cache中的值失效。這些失效執行緒接下來又重新從記憶體中讀取那個值,從而引起一場匯流排流量風暴。最終,所有執行緒再此平靜,進入本地自旋。

從上面也可以看出,本地自旋每次重複讀取被快取的值而不是反覆使用匯流排從記憶體中讀取,對設計高效的自旋鎖非常關鍵。

指數後退

TTASLock類中,性能消耗主要在當鎖看似空閑時,調用getAndSet來獲取鎖,這個過程極有可能存在高爭用。因為每個獲取鎖的執行緒幾乎都是同時看到鎖空閑,同時調用getAndSet來獲取鎖,但是這時每個獲取鎖的執行緒獲得鎖的機會都是非常小的。所以我們可以想辦法來避免每個獲取鎖的執行緒同時調用getAndSet,讓它們在不同的時間調用getAndSet,給正在競爭的執行緒以結束的機會,將會更有效。

所以當執行緒調用getAndSet獲取鎖失敗時,讓它們隨機暫停一段時間來減少它們的獲取鎖的爭用。暫停的時間太短,沒有意義;太長,會影響整個獲取鎖的時間。暫停時間固定的話,也沒有意義,因為下次它們又會是同時調用getAndSet。所以我們需要指定最小暫停時間minDelay,最大暫停時間maxDelay。限制暫停時間在它們之間的一個隨機值。

說了這麼多了,我們直接看程式碼吧。

首先定義一個暫停時間的類Backoff

import java.util.Random;

public class Backoff {
    final int minDelay,maxDelay;
    int limit;
    final Random random;
    public Backoff(int min,int max){
        minDelay=min;
        maxDelay=min;
        limit=minDelay;
        random=new Random();
    }
    public void backoff() throws InterruptedException {
        int delay=random.nextInt(limit);
        limit=Math.min(maxDelay,2*limit);
        Thread.sleep(delay);
    }
}

這裡的暫停時間上限每次都是*2,所以也叫指數後退。

這種指數後退的處理演算法TCP的碰撞處理也用到了

下面我們修改TTASLock類獲取鎖的getAndSet方法中返回false時,調用backoff方法隨機暫停個時間。我們把這個修改的類叫BackoffLock,它的完整程式碼就是下面這個樣子了

import java.util.concurrent.atomic.AtomicBoolean;

public class BackoffLock implements Lock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 1;
    private static final int MAX_DELAY = 10;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
            }
            if (!state.getAndSet(true)) {
                return;
            } else {
                try {
                    backoff.backoff();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public void unlock() {
        state.set(false);
    }
}


BackoffLock這種指數後退的鎖易於實現,且在許多系統結構中要比TASLock、TTASLock的性能要好。但是它的暫停時間的最優值的選擇與處理器的個數以及速度密切相關,因此,很難調整BackoffLock類以使它與各種不同的機器相互兼容。

隊列鎖

這是一種實現了可擴展自旋鎖的方法,比BackoffLock這種指數後退鎖複雜一些,但卻有更好的移植性。在BackoffLock中有兩個問題

  • cache一致性流量:所有執行緒都在同一個共享存儲單元上自旋,每一次成功的鎖訪問都會產生cache一致性流量
  • 臨界區利用率低:執行緒延遲過長,導致臨界區利用率低下。

可以將執行緒組織成一個隊列來克服這些缺點。在隊列中,每個執行緒檢測其前驅執行緒是否已完成來判斷是否輪到它自己。獲取鎖,釋放鎖只有當前節點的後續節點的cache失效,不會影響到其他節點;這樣就只有這個後續節點會去通過匯流排訪問記憶體,其他節點的cache依舊是有效的。


基於數組的鎖

import java.util.concurrent.atomic.AtomicInteger;

public class ALock implements Lock{
    private ThreadLocal<Integer>  mySlotIndex=ThreadLocal.withInitial(()->0);
    private AtomicInteger tail;
    private volatile boolean[] flag;
    private int size;
    public ALock(int capacity){
        size=capacity;
        tail=new AtomicInteger(0);
        flag=new boolean[capacity];
        flag[0]=true;
    }
    public void lock(){
        int slot=tail.getAndIncrement()%size;
        mySlotIndex.set(slot);
        while(!flag[slot]){};
    }
    public void unlock(){
        int slot= mySlotIndex.get();
        flag[slot]=false;
        flag[(slot+1)%size]=true;
    }
}

在上面的類中,有一個boolean類型的flag數組(如上圖所示,上圖中的數組大小為128),數組中每個位置對應一個要加鎖的執行緒。只有當數組中值為true的執行緒可以獲取鎖。有一個AtomicInteger類型的tail,每個加鎖的執行緒通過修改它的值,來獲得當前執行緒所對應的flag數組中的位置。

tail屬性是一個成員變數,可以被多個執行緒共享,初始值是0。為了獲得鎖,每個執行緒原子地增加tail1域的值。所得的結果值作為執行緒的槽,也就是對應數組flag的下標。同時這個結果值也通過ThreadLocal和對應的執行緒做了綁定。

如果flag[i]的值為true,那麼取得結果值為i的執行緒就有權獲得鎖。

在初始狀態時,只有flag[0]為true,其他的都是false。(java中原始變數的boolean的默認值是false)。表明只有當前只有增加tail的返回值是0的執行緒可以獲取鎖。

在釋放鎖值,執行緒把自己對應的數組位置值設置成false,同時將數組下一個位置的值設置成true。確保取的數組下一個位置的執行緒可以獲得鎖。

但是這種數組結構有兩個問題:

1 由於我們是先創建ALock的對象,這時flag數組的容量就確定了,但是我們每個獲取鎖的執行緒都需要去通過得到數據下標的方式來獲取鎖。但是我們有多少個執行緒來獲取鎖,常常在創建ALock對象時未知的,為了避免數組越界,我們通過取余的方式來得到執行緒在數組中的位置,但這就有了一個問題,多個執行緒可能會對應到一個數組位置,也就是可能存在多個執行緒同時獲得鎖的問題。

2 從上面右邊的圖上可以看到flag數組中多個元素在同一個快取行,也就有了偽共享的問題。


要避免偽共享的問題,那就需要每個執行緒在數組中更新的值(tail.getAndIncrement()%size;得到的值)都在單獨的一個快取行。換句話說也就是每個執行緒更新的值代表一個快取行。

每次獲取鎖時調用tail.getAndIncrement()%size;的返回值*(一個快取行的位元組數/數組中單個元素的位元組數)。這樣就可以確保每個執行緒修改數組的位置都在單獨的一個快取行。

CLH隊列鎖

上面可以看到基於數組的鎖的兩個問題:

1.存在多個執行緒可能同時獲得鎖的問題。

2.為了解決偽共享的問題,需要浪費記憶體和cache。

CLH隊列鎖就可以解決這個問題。

說到CLH,如果你對Java的鎖有一定的了解,那你對這個名字至少應該不會感到陌生,因為在AbstractQueuedSynchronizer中的隊列就是參考CLH隊列來實現的。


CLH隊列鎖是一個單向鏈表結構,每個節點都有一個指向它前驅節點的引用。每個節點的初始值都是true。當它的前驅節點的值是false時,代表它的前驅節點釋放了鎖,當前節點可以獲得鎖。

下面看看CLH隊列鎖的實現程式碼:

public class CLHLock {
    private AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
    ThreadLocal<QNode> myPred;
    ThreadLocal<QNode> myNode;

    public CLHLock() {
        tail = new AtomicReference<>(new QNode());
        myNode = ThreadLocal.withInitial(() -> new QNode());
        myPred = ThreadLocal.withInitial(() -> null);
    }

    public void lock() {
        QNode qNode = myNode.get();
        qNode.locked = true;
        QNode pred = tail.getAndSet(qNode);
        myPred.set(pred);
        while (pred.locked) {
        }
    }

    public void unlock() {
        QNode qNode = myNode.get();
        qNode.locked = false;
    }
    privaet static class QNode {
        public volatile boolean locked;
    }
}

隊列的每個節點都是一個QNode。這種其實是一種隱式隊列。因為它的隊列節點QNode並沒有屬性指向其前驅節點或者後續節點。

myPred 和 myNode都是ThreadLocal變數。它們分別存儲隊列中每個執行緒的在隊列上的前驅節點和它自己的節點。

它的前後節點是這樣串起來的:

  1. 13行首先myNode.get();返回myNode這個ThreadLocal變數存儲的節點。

  2. 每個執行緒加鎖的時候都會調用 tail.getAndSet(qNode),將隊列中的尾節點tail替換成第1步得到的節點

  3. “myPred.set(pred);這句程式碼將原來的tail節點存儲到myPred`這個ThreadLocal變數中。

這裡執行完之後,原來的tail節點就存儲到myPred中,而新的tail節點就是myNode存儲的節點。原來的tail節點也就成了新的tail節點的前驅節點。

這裡需要注意的是就算有多個執行緒同時調用tail.getAndSet(qNode),這行程式碼也是串列執行的。它們只會修改最新的tail節點。也就是只會影響到myNode存儲的節點是不是當前最新的tail節點。並不會影響到myNode存儲的節點和myNode存儲的節點的關係。也就是說不會改變myNode存儲的節點是myNode存儲的節點的前驅節點這個事實。

tail代表隊列的尾節點。初始時,隊列中只有一個tail節點。

在加鎖過程中,每個執行緒通過將自己添加到tail節點後面,將自己的節點(myNode)作為tail節點的值,同時將原來的tail節點的值更新尾當前執行緒的前驅節點(myPred)的值。通過這種方式,當前執行緒就將自己的節點加入到了隊列中。它去觀察它前驅節點(myPred)的值,當myPred節點的值為false時,表明當前執行緒可以獲得鎖。

在解鎖的過程中,只需要將當前執行緒節點(myNode)的值更新為false,它的後續執行緒就可以獲得鎖。


初始狀態:

​ 這時,只有一個tail節點。它的值false。

加鎖階段:

​ 首先會創建一個當前節點myNode,將它的值設置為True。

​ 同時調用tail.getAndSet(qNode)將自己更新為tail節點,它的返回值pred是原來的tail節點。

​ 再執行myPred.set(pred);將原來的tail節點設置成當前節點的前驅節點。

​ 當前節點通過自旋的方式去觀察前驅節點的值是否為false,只有當前驅節點為false時,表示前驅節點已經釋放鎖,當前節點可以獲得鎖。

解鎖階段:

​ 這裡只需要將當前節點myNode的值設置成false就可以了。

​ 當前執行緒結束後ThreadLocal變數也會自動被垃圾回收器回收。這裡的myPred節點由於沒有其他引用,就會被垃圾回收器回收。myNode節點要不就是tail節點,要不就是它的後續節點的前驅節點,所以myNode節點並不會被垃圾回收器回收。


CLH隊列鎖可以解決ALock的兩個問題。

1.由於我們當前時一個鏈表,所以不存在容量不夠的問題。

2.我們CLH隊列中的每個節點都是在調用加鎖階段才創建的。而且由於每個節點都是由不同執行緒創建的。所以我可以認為它們不會在同一個快取行中。

但是CLH隊列鎖也有一個問題。

1.由於每個執行緒都要去觀察它的前驅節點,進行自旋。所以在無cache的NUMA系統結構下,就會出現前驅節點可能在其他CPU記憶體上,性能就會比較差。


MCS隊列鎖:

MCS隊列鎖也被標識為QNode對象的鏈表,其中的每個QNode要麼表示一個鎖的持有者,要麼表示一個正在等待獲得鎖的執行緒。與CLHLock隊列鎖相比,它的鏈表時顯式的而不是虛擬的:整個鏈表通過QNode對象里的next屬性作為後續節點的引用,而在CLH隊列鎖中時通過ThreadLocal變數myPred來指向前驅節點的。

下面我們看看MCS隊列鎖的程式碼:

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

public class MCSLock implements Lock {

    private final AtomicReference<QNode> queue;
    private AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
    ThreadLocal<QNode> myNode;
    public MCSLock(){
        queue=new AtomicReference<QNode>();
        myNode=ThreadLocal.withInitial(()->new QNode());
    }


    @Override
    public void lock() {
      QNode qNode=myNode.get();
      QNode pred=tail.getAndSet(qNode);
      if(Objects.nonNull(pred)){
          qNode.locked=true;
          pred.next=qNode;
          while(qNode.locked){

          }
      }
    }

    @Override
    public void unlock() {
        QNode qNode=myNode.get();
        if(Objects.isNull(qNode.next)){
            if(tail.compareAndSet(qNode,null)){
                return ;
            }
            while(Objects.isNull(qNode.next)){

            }
        }
        qNode.next.locked=false;
        qNode.next=null;
    }
    
    private static class QNode {
    public volatile boolean locked;
    public volatile QNode next;
}
}

加鎖階段:

每個執行緒的 節點還是調用tail.getAndSet(qNode)將自己添加到整個隊列的尾部,同時修改自己為tail節點。它的返回值就是原來tail保存的節點。由於在初始化時未給tail分配節點(,所以tail的值有可能為空unlock也可能導致tail節點為空)。

接下來判斷原來的tail節點是否為空:

​ 1.如果為空,表示當前節點就是整個隊列的頭節點。這時就可以獲取鎖。

​ 2.如果原來的tail節點不為空,就將當前節點的locked屬性設置為true,同時將原來的tail節點的next屬性設置為當前節點。

在當前節點上自旋,等待當前節點的locked屬性為false,以便獲取鎖。


解鎖階段:

首先判斷當前節點的後續節點next屬性是否為空,如果為空,那就有兩種情況。

  1. 當前節點就是隊尾節點。
  2. 已經有了後續節點,但是後續節點還沒有調用pred.next=qNode;將當前要釋放鎖的節點的後續節點設置成它自己。

對於這兩種情況,都會通過調用tail.compareAndSet(qNode,null)來判斷當前節點是否是tail節點,並將tail節點由當前節點設置成null,然後直接返回。如果這句能執行成功,那就說明當前節點就是隊尾節點,也就是在當前解鎖的執行緒執行完tail.getAndSet(qNode)這就後,一直沒有其他執行緒執行過這行程式碼。這也就是解決上面next為空的第一種場景。

如果tail.compareAndSet(qNode,null)這句執行不成功,說明已經有其他執行緒執行了tail.getAndSet(qNode)這句程式碼,當前的tail節點已經不是當前執行緒自己的節點了 。這時就需要等待執行完tail.getAndSet(qNode)程式碼的執行緒直到它執行完pred.next=qNode;這句程式碼,這裡是通過自旋進行等待的。這也是解決上面說的第二種場景。

上面的第一種場景 在34行已經return,程式碼能執行到40行,就表示當前節點已經不是隊尾tail節點了。這時只需要將當前節點的後續節點next節點的locked值設置為false,從而喚醒next節點獲得鎖。

qNode.next=null;將後續節點和當前節點的鏈接斷開,方便當前節點被垃圾回收器回收。


對比CLH隊列鎖,MCS隊列鎖在加鎖階段只是在自己的節點上自旋,也就是沒有了無cache的NUMA問題。但是它解鎖的時候也需要在後續節點上進行自旋,不過這個自旋的時候就非常短,無cache的NUMA問題的問題基本也可以忽略吧。


時限隊列鎖

Java的Lock介面中有個boolean tryLock(long time, TimeUnit unit) throws InterruptedException;方法,調用者可以指定一個時限:調用這個方法為獲得鎖而準備等待的最大時間,如果在調用者獲得鎖之前超時,調用者則放棄獲得鎖的嘗試。

用CLH隊列鎖來舉例,我們可以重載lock方法,傳入最大的等待時間,執行緒在它的前驅節點自旋的時候判斷是否超時,如果超時就返回。

    public void lock(long   time) {
        long start=System.currentTimeMillis();
        QNode qNode = myNode.get();
        qNode.locked = true;
        QNode pred = tail.getAndSet(qNode);
        myPred.set(pred);
        while (pred.locked) {
            //超時
            if((System.currentTimeMillis()-start)>time){
                return;
            }
        }
    }

上面的程式碼是我自己寫的,書上沒有。上面的程式碼只是在超時時直接return,看著和正常獲取鎖的基本沒區別。嚴格來說,在超時時,我們還需要將當前節點的前驅節點和當前節點的後續節點連起來,從整個隊列中刪除掉當前節點。這裡有兩個問題:

1.CLH隊列鎖都只有前驅節點的引用,沒有後續節點的引用。所以如果要刪除當前節點,讓當前節點的前驅節點和後續節點連起來,就需要每個節點判斷它的前驅節點是不是由於超時等原因實效了,如果失效了 就要將它的前驅節點剔除掉,將它的前驅節點設置成它前驅節點的前驅節點。這個無疑增加了加鎖節點程式碼的複雜度。

2.從一個隊列節點中刪除一個節點還會擾亂並發鎖的釋放,這個解決起來也會比較困難。

所以我們需要採用一種迂迴的方法,不是在超時時主動的將當前節點從隊列中刪除掉,而是採用惰性方法:若一個執行緒超時,則該執行緒將它的節點標記為已放棄。這樣該執行緒在隊列中的後續(如果有)將會注意到它正在自旋的節點已經被放棄,於是開始在被放棄節點的前驅上自旋。

下面還是直接看程式碼吧

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TOLock implements Lock {
    static QNode AVAILABLE = new QNode();
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public TOLock() {
        tail = new AtomicReference<>();
        myNode = ThreadLocal.withInitial(() -> new QNode());
    }

    @Override
    public void lock() {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        long startTime = System.currentTimeMillis();
        long patience = TimeUnit.MILLISECONDS.convert(time, unit);
        QNode qNode = new QNode();
        myNode.set(qNode);
        qNode.pred = null;
        QNode myPred = tail.getAndSet(qNode);
        if (Objects.isNull(myPred) || myPred.pred == AVAILABLE) {
            return true;
        }
        while (System.currentTimeMillis() - startTime < patience) {
            QNode predPred = myPred.pred;
            if (predPred == AVAILABLE) {
                return true;
            } else if (Objects.nonNull(predPred)) {
                myPred = predPred;
            }

        }
        if (!tail.compareAndSet(qNode, myPred)) {
            qNode.pred = myPred;
        }
        return false;


    }

    @Override
    public void unlock() {
        QNode qNode = myNode.get();
        if (!tail.compareAndSet(qNode, null)) {
            qNode.pred = AVAILABLE;
        }
    }

    private static class QNode {
        public volatile QNode pred = null;
    }

下面我們簡單分析下上面的程式碼

首先定義了一個QNode對象(57行),作為當前執行緒加鎖狀態的節點,其中它有個pred屬性(58行)。關於pred的取值,分為下面幾種情況:

  1. 當前執行緒已經獲得了鎖,或者當前執行緒正在獲取鎖(此時還沒有超時),這時pred的值就是null
  2. 如果當前執行緒在獲取鎖的過程中超時了,這時pred的值就是它的前驅節點的pred的值。
  3. 如果當前執行緒解鎖時,隊列中已經有了後續執行緒,就將pred的值設置成AVAILABLE,指示它的後續節點可以獲得鎖。

加鎖的程式碼:

1.首先我們創建一個QNode對象,存儲到threadLocal變數myNode中。然後通過原子的方式將隊列中的tail尾節點更新為當前執行緒的QNode對象(28行),同時將原來的tail節點存儲到myPred變數中。

2.接下來判斷myPred的值:

​ (1).如果myPred==null,表示當前隊列中只有當前執行緒一個節點,此時它就可以獲取到鎖。

​ (2).如果myPred.pred == AVAILABLE,表示當前節點的前驅節點已經釋放了鎖,此時當前執行緒也就可以獲取到鎖。

​ (3).對於其他的情況,都不能直接獲取鎖。

3.在32行開始,使用while循環來等待它的前驅節點釋放鎖或者超時。

​ 在這裡面主要是通過對myPred變數的pred值進行判斷。在進入while循環時,myPred表示當前執行緒的前驅節點。前面已經說過pred的取值有3中情況,下面我們對while循環中pred取值的3中情況簡單分析下:

QNode predPred = myPred.pred;由於myPred是當前執行緒的前驅節點,所以這裡的predPred就是當前執行緒的前驅節點的pred屬性。

​ (1).predPred==null,表示當前執行緒的前驅節點也在等待獲取鎖,這時當前執行緒需要繼續在自己的節點上循環,等待前驅節點超時(36行),或者前驅節點獲得鎖後並釋放鎖(34行)。

​ (2).predPred==AVAILABLE表示前驅節點已經釋放了鎖,當前節點就可以獲取鎖。

​ (3)其他取值,這種其實也就只有一種取值了,就是42行的程式碼。不過這裡需要注意的是42行是當前執行緒節點超時的操作。while 循環中這裡就是當前執行緒的前驅節點超時的情況了。也就是36行的程式碼,直接修改myPred = predPred;跳過超時的節點,繼續 執行後續操作。

4.在41行,對超時進行一個處理。如果當前執行緒的節點依舊是隊列中的尾節點(在28行到41行之間,沒有其他執行緒執行第28行程式碼),就直接將當前節點從隊列中去掉。如果當前執行緒的節點已經不是尾節點,就將當前執行緒節點的pred的前驅節點。

解鎖的程式碼

這個就比較簡單了,直接判斷隊列中是否有後續節點,如果有後續節點,就將當前節點的pred設置成AVAILABLE,喚醒隊列中的後續節點。

Tags: