JUC源碼學習筆記2——AQS共享和Semaphore,CountDownLatch

本文主要講述AQS的共享模式,共享和獨佔具有類似的套路,所以如果你不清楚AQS的獨佔的話,可以看我的《JUC源碼學習筆記1》
主要參考內容有《Java並發編程的藝術》,《Java並發編程實戰》和下面兩位博主的博客
//segmentfault.com/a/1190000016447307 這是我見過講AQS共享最好的博客
//www.cnblogs.com/micrari/p/6937995.html 這個文章是對PROPAGATE的作用比較好的詮釋

一丶Semaphore+AQS共享模式

1.Semaphore的作用

公司有五個坑位可以用來上廁所,對於一個廁所來說,五個坑位可以看作是五個共享的資源,同時可以允許五個員工(線程)來上廁所,當前任何一個員工進入其中一個坑位,那麼可用坑位(共享資源)減少,當員工出來的時候,共享資源被釋放,當全部都被人佔用的時候,後續上廁所的人需要等待(表現在線程獲取共享資源阻塞)當然這個等待可以被中斷(測試給等待的開發提了bug,開發放棄排隊回到工位)這個等待也可以超時(等太久心態崩了不等了)

Semaphore信號量來控制多個線程同時訪問某個特定共享資源的操作數量

很直觀,我們可以意識到,Semaphore式基於AQS的共享模式

2.Semaphore常用方法

方法 描述
public Semaphore(int permits) 指定許可數量的構造方法(廁所有多少個坑位)
public Semaphore(int permits, boolean fair) 創建具有給定許可數量和給定公平設置的信號量(第二個參數指定釋放公平,好比說員工的素質,有沒有上廁所不排隊的人)
public void acquire() throws InterruptedException 可中斷的獲取一個許可,如果獲取許可,許可數量減少1方法返回,否則阻塞當前線程之到出現以下情況
1. 其他線程釋放了許可,並且當前線程獲得了許可(廁所出來了一個,而且你如願如廁)
2.其他線程中斷了當前線程(測試提bug打電話中斷了你的排隊)
public void acquireUninterruptibly() 和acquire() 類似,但是這個方法不響應中斷即在獲取許可的途中不會因為中斷了放棄(人有三急,天王老子來了也得先如廁)
public boolean tryAcquire() 嘗試獲取許可,如果成功獲取許可返回true並且減少許可,反之返回false,(你來到廁所隨便看了以下,有可以的坑位立馬進去,反之直接回工位)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException 和tryAcquire()類似,但是響應中斷,支持超時,如果在指定時間餒獲取到共享資源返回true,如果超時未獲取返回false,如果獲取的途中被中斷拋出中斷異常
public void release() 釋放許可,並且許可數量加1(如廁完釋放坑位)
public int availablePermits() 返回此信號量中可用的當前許可數。
public int drainPermits() 獲取並返回所有立即可用的許可證。
protected void reducePermits(int reduction) 按指示的減少量減少可用許可證的數量。
acquire,acquireUninterruptibly,tryAcquire,release還有支持獲取指定數量共享資源的重載方法

3.Semaphore是如何實現的

顯而易見,Semaphore是基於AQS的共享模式,Semaphore方法的都是委託給Sync

3.1 acquire()——可中斷的獲取許可(無參數獲取一個,有參數指定獲取n個)

Semaphore的acquire方法直接調用的是sync的acquireSharedInterruptibly(1),這個方法在sync的父類AbstractQueuedSynchronizer中進行了實現

和ReentrantLock類似的套路,很多並發都是使用這種內部類的方式,把功能的實現交給內部類

3.1.1 tryAcquireShared 嘗試獲取共享資源
相對於獨佔的鎖的`tryAcquire(int arg)`返回boolean類型的值,共享鎖的`tryAcquireShared(int acquires)`返回的是一個整型值:

- 如果該值小於0,則代表當前線程獲取共享鎖失敗
- 如果該值大於0,則代表當前線程獲取共享鎖成功,並且接下來其他線程嘗試獲取共享鎖的行為很可能成功
- 如果該值等於0,則代表當前線程獲取共享鎖成功,但是接下來其他線程嘗試獲取共享鎖的行為會失敗
3.1.1.1 非公平的嘗試獲取共享資源

直接調用的是nonfairTryAcquireShared方法

final int nonfairTryAcquireShared(int acquires) {
    //一個自選
    for (;;) {
        //可用的許可
        int available = getState();
		//剩餘=可用-當前需要的許可
        int remaining = available - acquires;//   1
        
		//如果剩餘小於0 或 cas設置許可數量位true 返回剩餘剩餘許可數量
        //值得品一品
        if (remaining < 0 ||
            compareAndSetState(available, remaining))//  2
            return remaining;
    }
}
  • 自旋結束的情況

    1. 剩餘許可小於0 表示當前剩餘的許可不足以滿足我們的要求

    2. 當前許可可以滿足我們的需求,且成功CAS修改許可的數量

    可能線程A 執行到1這一行發現是足夠的,但是當前很多線程在競爭資源,導致執行2的時候當前線程CAS失敗,那麼會進入下一輪循環
    
3.1.1.2 公平的嘗試獲取共享資源
protected int tryAcquireShared(int acquires) {
    for (;;) {
        
        //如果前面有線程在等待,公平起見,返回-1 獲取共享資源失敗
        if (hasQueuedPredecessors())
            return -1;
        
        //和非公平一樣
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
3.1.2 doAcquireSharedInterruptibly 排隊獲取共享資源

雖然在獨佔模式中沒有名稱叫doAcquireInterruptibly的方法,但是還是那個套路

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {

    //構造節點,加入到同步隊列尾部
    final Node node = addWaiter(Node.SHARED);
    //獲取失敗標誌
    boolean failed = true;
    try {
        //自選
        for (;;) {
            //前繼節點
            final Node p = node.predecessor();
			//前繼節點是頭節點
            if (p == head) {
                //嘗試獲取共享資源
                int r = tryAcquireShared(arg);
				//獲取成功
                if (r >= 0) {
                    //設置為頭節點並且傳播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //掛起當前線程 如果被中斷那麼直接拋出中斷異常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        //如果當前節點放棄了,這裡對應被中斷了(超時獲取方法在超時的情況也會進入)
        if (failed)
            cancelAcquire(node);
    }
}

和獨佔不同的點在於:

  • addWaiter(Node.SHARED) 標記當前節點是共享模式

    這個Node.SHARED設置到當前節點的nextWaiter屬性上,nextWaiter在此的作用只是標記當前節點的模式(獨佔or共享)

    在Condition等待隊列中才起到串聯等待線程的作用的,後續會有專門一篇講解
    
  • 獨佔的時候調用的是setHead方法,這裡調用的是 setHeadAndPropagate(當前線程節點,tryAcquireShared返回值(在信號量中可以理解為剩餘的許可證數量))

3.1.3 setHeadAndPropagate 設置為頭節點並且傳播
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //設置為頭,頭在AQS是獲取到鎖的線程,也意味着從同步隊列中出隊了,
    setHead(node);
    
    //喚醒
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared方法的作用的是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點,這個方法的詳細作用的後面講解,現在我們來分析,setHeadAndPropagate在什麼情況下會調用這個方法

  • 剩餘的共享資源大於0 propagate > 0

    在共享鎖模式下,鎖可以被多個線程所共同持有,既然當前線程已經拿到共享鎖了,其還有剩餘的共享資源,那麼就可以直接通知後繼節點來拿鎖,而不必等待鎖被釋放的時候再通知。(來到廁所發現五個坑都可用,發消息給好兄弟,快來,拉屎自由)

  • 當前節點的下一個節點不為空且是共享模式 if (s == null || s.isShared())

  • 舊頭節點等待狀態小於0 or 當前頭節點等待狀態小於0

    共享資源在被獲取後,線程都會設置自己為頭節點,所有頭節點在共享模式中表示的是獲取到共享資源的線程或者曾經獲取共享資源的線程

3.1.4 doReleaseShared 喚醒後續等待線程
在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點

這個方法除了在setHeadAndPropagate 中被調用意外,還在共享資源的釋放(releaseShared)中會被調用,想像一個場景,存在一個線程A釋放鎖的同時,一個線程B拿到鎖,前者調用releaseShared,後者調用setHeadAndPropagate ,並發的調用到doReleaseShared 方法進行喚醒頭節點下一個節點,所以doReleaseShared 需要考慮線程安全問題

3.1.4.1源碼粗略解讀
//值得好好品一品
private void doReleaseShared() {
    //循環
    for (;;) {
        //頭  可能這一行執行完h就是舊的頭,存在另外一個線程獲取到共享鎖,將自己設置為頭
        Node h = head;
        //h頭不為null 不等於尾,說明至少需要當前隊列中至少有兩個節點
        if (h != null && h != tail) {
            //h頭的狀態
            int ws = h.waitStatus;
            //h頭狀態為SINGNAL 說明後續節點入隊的時候addWaiter把當前節點的狀態設置,說明後續節點需要喚醒
            if (ws == Node.SIGNAL) {
                //CAS修改h狀態為0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;          
                //喚醒後續線程
                unparkSuccessor(h);
            }
            //h狀態為0 且CAS設置為PROPAGATE 失敗
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        
        //h 等於當前頭,說明執行上面一段的時候沒有頭沒有變化,沒有其他線程獲取共享資源
        if (h == head)                  
            break;
    }
}
3.1.4.2 doReleaseShared 循環退出的條件和這樣設計的目的
if (h == head)                  
     break;

這裡h是剛進入doReleaseShared 時候的頭節點,head是當前隊列的頭,如果二者相等那麼退出循環

  • 什麼時候不相等昵——線程A釋放共享資源調用doReleaseShared 的時候還沒執行到循環退出的時候,線程B獲取到共享資源設置自己為新的頭節點
  • 不相等發生什麼情況——線程A繼續for循環執行,假如這個時候線程B也釋放資源,那麼這個方法存在被多個線程一起執行的情況
  • 這樣設計的目的——設計成每個節點只喚醒自己的後繼線程也可以實現同樣的功能,但是多個線程一起執行喚醒可以提高喚醒等待共享資源線程的效率。甚至在新的線程獲取到共享鎖的時候還會調用doReleaseShared,喚醒後繼節點,
一個廁所有五個坑,在某一個時刻五個坑被ABCDE佔用,後面還有EF兩個倒霉等待排成隊列,ABCDE佔用坑的時候都會設置自己為頭節點,會有幾個人獲取到坑位的時候調用doReleaseShared (比如D第四個來,發現還有一個坑,立馬說,後面的兄弟還有一個廁所)再比如五個坑都被佔用但是E發現自己的狀態為SINGAL(是E排隊的時候提醒自己拉完提醒自己,他先玩會兒手機(掛起))

某一個時刻多個人拉完的時候,釋放坑位走出廁所,A釋放到if (h == head)的時候,發現頭節點變化了,繼續喊兄弟們去看看說不定有坑位,B也是如此,同一個時間可能有多個拉完的人都在喚醒後面的人去上廁所,這樣後面排隊玩手機的人,被喚醒的效率更高,從而提升了廁所的利用效率
3.1.4.3.doReleaseShared 喚醒的邏輯
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            
            //1.情況一
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;           
                unparkSuccessor(h);
            }
            
            //2.情況二
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;              
        }
        if (h == head)                   
            break;
    }
}
  • 情況一

    ws == Node.SIGNAL 說明是後繼節點入隊的時候修改了節點的狀態,表示需要喚醒

    這裡使用CAS保證多個線程執行當前方法只有一個線程可以成功喚醒後續的線程(共享鎖存在多個線程並發喚醒的情況)

    同時兩個小夥子從廁所出來,只需要一個人通知到等待的人就好
    
  • 情況二

    ws == 0 可能是頭節點沒有後繼入隊,所以節點狀態為初始狀態,但是上面的 if (h != null && h != tail) 保證了隊列中至少存在兩個節點

    ws == 0 還可能是上面的情況1修改為了0,但是這種情況不會進入當前分支

    最後只可能是尾節點成為了頭節點,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)需要返回false才能繼續循環,說明後續節點入隊修改了節點的狀態為SIGANAL,此時會繼續循環喚醒後繼節點。注意到最上面if (h != null && h != tail) 也就是說隊列至少存在兩個節點,讓代碼運行到情況1,要想ws == Node.SIGNAL不成立說明這個頭節點剛剛成為頭節點,狀態還沒來得及被後繼節點修改為SINGANL,緊接着後繼節點恰好修改了頭節點狀態為SINGAL才能促使!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)為true,也就是說明情況二是在新的頭節點產生,且沒來得及被後繼節點修改為SINGAL,並且在頭節點線程執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)的一瞬間後繼節點搶先一步修改了頭節點的狀態為SINGAL才能走到情況二的continue中

    10:40:01的時候A成功獲得如廁的權利,此時廁所是滿的,A設置自己為頭節點,發現原來的頭節點狀態是SINGANL,他準備喚醒後面排隊的兄弟
    10:40:02 B發現沒有廁所,排在隊列的第一個,準備修改A的狀態為SINGAL(讓A記得喚醒自己)此時A已經在執行喚醒的流程了,此時隊列存在兩個節點,A為頭,B為尾巴,A執行到情況1,發現自己不是SINGAL,來到情況2,準備修改自己狀態為PROPAGATE但是失敗了(此時B剛好修改A狀態為SINGAL了)A繼續執行for循環(可能存在其他人上完廁所,喚醒了B,B成為新頭節點),
    此時A會拿到隊列的頭節點(最近剛剛獲得鎖的節點)繼續執行for循環,最後隊列的頭節點沒有變化了,A才罷休
    

3.2 acquireUninterruptibly——不可中斷的獲取許可證

直接調用的是AQS的acquireShared(1)方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

大致邏輯和acquireSharedInterruptibly,其不響應中斷體現在doAcquireShared中

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //如果被中斷了,那麼補上中斷,而不是拋出異常
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //parkAndCheckInterrupt() 放回true 表示當前線程從LockSupport中返回是因為被中斷了,那麼把interrupted置為true,繼續循環
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

套路和獨佔模式中類似,當前線程從LockSupport中返回後檢查其中斷表示,發現是由中斷那麼會在當前獲取到共享資源後補上中斷標識

3.3tryAcquire(int permits, long timeout, TimeUnit unit)超時獲取許可

直接調用AQS的tryAcquireSharedNanos方法

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	
    //如果直接獲取共享資源成功那麼直接返回true了 短路後續的doAcquireSharedNanos
    return tryAcquireShared(arg) >= 0 ||

        //超時獲取共享資源
        doAcquireSharedNanos(arg, nanosTimeout);
}
3.3.1doAcquireSharedNanos 超時獲取共享資源
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; 
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

和獨佔的超時獲取一樣的套路,都是必須剩餘時間大於spinForTimeoutThreshold閾值才掛起否則進行自旋,響應中斷也是類似的內容

3.4 release(int permits)——釋放許可

直接調用的AQS的releaseShared

public final boolean releaseShared(int arg) {
    //信號量中靜態內部類重寫
    if (tryReleaseShared(arg)) {
		//喚醒後續等待線程 前面說過
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    //自旋
    for (;;) {
       
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
       
        //CAS設置許可的數量,
        if (compareAndSetState(current, next))
            return true;
    }
}

和獨佔不同的是,共享鎖存在多個多個線程一起釋放的情況,所以使用自選+CAS保證許可證的數量不會在並發的情況下出現錯誤

減少許可的邏輯類似,也是循環+CAS的方式

二丶CountDownLatch閉鎖

1.CountDownLatch的作用

閉鎖的工具相當於一扇門,在閉鎖達到結束狀態前,門一直是關閉,沒有任何線程可以通過,當達到結束狀態後,閉鎖才會讓允許所有線程通過,當閉鎖到達結束狀態後不會再改變狀態保持開狀態

2.CountDownLatch的使用場景和其常用方法

2.1使用場景

  • 確保某個計算再所有資源準備後在繼續執行
  • 確保某個服務在其依賴的所有其他其他服務都啟動後才啟動
  • 等待某個操作所有參與者就緒後再繼續執行(moba所有玩家確認接受遊戲後再進入選英雄)

2.2常用方法

方法 作用
void await() throws InterruptedException 使當前線程等待直到鎖存器倒計時到零,從這個方法返回的兩個方式
1.計數到0
2.等待的線程被中斷,拋出中斷異常返回
void await(long timeout, TimeUnit unit)throws InterruptedException 和await()類似,但是如果超時也會直接返回
void countDown() 計數減1,如果計數到達0那麼所有等待的線程將可以通行
long getCount() 返回當前計數。此方法通常用於調試和測試目的

3.CountDownLatch 是如何實現的

AB線程需要等待CDE執行完後繼續執行,其實CDE霸佔鎖阻塞AB,後CDE都釋放鎖後AB才能繼續運行

3.1 await 響應中斷的等待計數為0 & await(long timeout, TimeUnit unit) 超時響應中斷等待計數為0

3.1.1 await 響應中斷的等待計數

直接調用靜態內部類的acquireSharedInterruptibly(1)方法,這個方法會直接調用靜態內部類實例的tryAcquireShared(1)方法

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

獲取AQS中state(CountDownLatch構造方法設置此值,表示需要等待多少個線程執行countDown),上面我們說過tryAcquireShared返大於等於0表示獲取共享資源成功,負數表示失敗後續會進入等待隊列中,這裡並沒有返回0這種情況,如果共享資源為0表示「門開了」執行此方法的線程可以自由的運行了,反之將排隊等待,之所以沒有返回0,是因為CountDownLatch支持多個線程比如ABC一起等待,返回0表示當前線程獲取資源成功但是後續線程獲取會失敗,返回1可以保證當前線程看見門開了後會去喚醒其他線程

3.1.2 await(long timeout, TimeUnit unit) 超時響應中斷等待計數為0

直接調用的AQS的tryAcquireSharedNanos,同樣調用重寫的tryAcquireShared 方法,後續調用doAcquireSharedNanos 邏輯和上面信號量的內容一樣

3.2 countDown 計數減少1

直接調用AQS的releaseShared方法,調用到重寫的tryReleaseShared方法

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        
        //減少之前就是0 直接返回false
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
 			//減少到0 才會返回true
            return nextc == 0;
    }
}

還是同樣的套路CAS+自選保證state,但是如果減少之前就已經為0的話的返回false,且只有減少後為0 才會返回true,保證了無效的多次喚醒

假如計數為4,但是五個線程執行countDown,第五個線程執行的時候發現被四個老六搶先一步了,直接返回了false,前面四個老六執行的時候也只有最後一個才會返回true(減少到0)這時候才會執行tryReleaseShared 進行喚醒

三丶關於共享資源的一點圖

1.doReleaseShared多線程同時調用的情況

可以看到當多個線程釋放共享資源的時候,如果當前隊列中存在排隊的節點,那麼可能存在多線程一起並發調用doReleaseShared的可能,如果頭節點為signal 說明後續節點需要喚醒,使用CAS保證了只有一個線程可以成功執行unparkSuccessor喚醒後續線程,後續線程也許在此之前執行了tryAcquireShard返回負數,準備掛起自己,也許在掛起自己之前被執行了unpark,或者掛起之後立馬被執行了unpark,繼續拿共享資源,而那些CAS失敗的線程會繼續喚醒,這點體現了三個資源釋放,不會只喚醒一個。並且這個方法退出的方法只有在喚醒途中頭節點沒有變化的情況,沒有變法說明共享資源的爭搶沒有那麼激烈了(頭節點是最近拿到共享資源的節點)

2.doReleaseShared一個極其短暫的狀態

這個時候線程B 和線程A 必定存在一個線程CAS失敗,如果線程B失敗,那麼意味線程A成功CAS為SIGNAL,但是shouldParkAfterFailedAcquire 返回false 還要繼續自旋,這時候也許tryAcquireShared成功了就沒有必要掛起了,如果線程A自選到tryAcquireShared,被一個老六線程搶先一步獲取共享共享資源了,這時候線程A會執行shouldParkAfterFailedAcquire 返回true 準備掛起自己了,這是線程B也許就成功喚醒了線程A。如果線程ACAS失敗了,還會進行一次自旋,線程B如果CAS成功也會進行一次自旋,也許線程A就成功拿到共享資源改變自己為頭節點,線程B還要執行一次自旋。這一切都為了提高系統的吞吐量讓共享資源盡量不要浪費,不要因為喚醒的不及時而讓需要應該工作的線程被掛起。

Tags: