深入理解並發編程同步工具類

大家好,我是陶朱公Boy。

今天跟大家分享一個並發編程領域中的一個知識點——同步工具類。

我將結合一個真實線上案例作為背景來展開講解這一知識點。給大家講清楚什麼是同步工具類、適合的場景、解決了什麼問題、各個實現方案的對比。希望對大家理解同步工具類這個知識點有所幫助。

我們先看一個案例:

需求描述

圖一:邏輯架構圖

有一個線上「人臉識別」的應用,應用首次啟動要求多執行緒並行將存儲在DB中的人臉數據(512位的double類型數組)載入到本地應用快取中,主執行緒需要等待所有子執行緒完成任務後,才能繼續執行餘下的業務邏輯(比如載入dubbo組件)。

拿到這個需求,大家不妨先思考一下,如果讓你來實現,你打算怎麼做?思考點是什麼?

需求分析

讓我們一起來分析一下這個需求:

首先這個需求是應用首次啟動,需要用多執行緒並行執行任務的,充分利用CPU的多核機制,加快整體任務的處理速度。

其次大家先可以看下上述圖一,多執行緒並行執行下,主執行緒需要等待所有子執行緒完成任務後才能繼續執行餘下的業務邏輯。

要實現這個需求,我們就要思考一下看有沒有一種機制能讓主執行緒等待其他子執行緒完成任務後,它再繼續執行它餘下的業務邏輯?

方案實現

★方案一:Thread.join()

什麼是join?

join方法是Thread類內部的一個方法,是一種一個執行緒等待另一個或多個執行緒完成任務的機制。

基本語義:

如果一個執行緒A調用了thread.join()方法,那麼當前執行緒A需要等待thread執行緒完成任務後,才能從thread.join()阻塞處返回。

示例程式碼:

 public class JoinDemo {
 ​
   public static void main(String[] args) throws InterruptedException {
 ​
     Thread thread0=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
 ​
     Thread thread1=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
     thread0.start();
     thread1.start();
     thread1.join();
     System.out.println("main 1...");
     thread0.join();
     System.out.println("main 0...");
 ​
     System.out.println("====all finish===");
 ​
 ​
   }
 }

結果列印:

 

原理:

源碼解析:

從源碼細節來看(為了方便陳述,我們假設有一個執行緒A調用thread.join()),我們說執行緒A持有了thread對象的一把鎖,while循環判斷thread執行緒是否存活,如果返回false,表示thread執行緒任務尚未結束,那麼執行緒A就會被掛起,釋放鎖,執行緒狀態進入等待狀態,等待被喚醒。

而喚醒的更多細節是在thread執行緒退出時,底層調用exit方法,詳見hotspot關於thread.cpp文件中JavaThread::exit部分。如下(倒數第二行):

 void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
   assert(this == JavaThread::current(), "thread consistency check");
   ...
   // Notify waiters on thread object. This has to be done after exit() is called
   // on the thread (if the thread is the last thread in a daemon ThreadGroup the
   // group should have the destroyed bit set before waiters are notified).
   ensure_join(this);
   assert(!this->has_pending_exception(), "ensure_join should have cleared");
   ...
 ​
 ​
 static void ensure_join(JavaThread* thread) {
   // We do not need to grap the Threads_lock, since we are operating on ourself.
   Handle threadObj(thread, thread->threadObj());
   assert(threadObj.not_null(), "java thread object must exist");
   ObjectLocker lock(threadObj, thread);
   // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
   // Thread is exiting. So set thread_status field in java.lang.Thread class to TERMINATED.
   java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
   //這裡是清除native執行緒,這個操作會導致isAlive()方法返回false
   java_lang_Thread::set_thread(threadObj(), NULL);
   //喚醒等待在thread對象上的所有執行緒  lock.notify_all(thread);  // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
 }

 

方案二:閉鎖(CountDownLatch)

什麼是閉鎖?

閉鎖是一種同步工具類,可以延遲執行緒進度直到其達到終止狀態。

閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,直到到達結束狀態時,這扇門將會永久打開。
閉鎖用來確保某些任務直到其他任務都完成後才繼續執行。

基本語義:

countDownLatch的構造函數接收一個int類型的參數作為計數器,比如你傳入了參數N,那意思就是需要等待N個點完成。當我們調用countDown方法時,這個計數器就會減1,await方法會一直阻塞主執行緒,直到N變0為止。

原理:

 

適用場景:

像應用程式首次啟動,主執行緒需要等待其他子執行緒完成任務後,才能做餘下事情,並且是一次性的。 像作者文章開始處提的這個需求,其實比較適合用CountDownLatch這個方案,主執行緒必須等到子執行緒的任務完成,才能進一步載入其他組件,比如dubbo。

示例程式碼:

 public class CountDownLatchDemo {
     public static void main(String[] args) {
         ExecutorService service = Executors.newFixedThreadPool(3);
         final CountDownLatch latch = new CountDownLatch(3);
         for (int i = 0; i < 3; i++) {
             Runnable runnable = new Runnable() {
                 @Override
                 public void run() {
                     try {
                         System.out.println("子執行緒" + Thread.currentThread().getName() + "開始執行");
                         //睡眠個幾十毫秒
                         Thread.sleep((long) (Math.random() * 10000));
                         System.out.println("子執行緒" + Thread.currentThread().getName() + "執行完成");
                         latch.countDown();//當前執行緒調用此方法,則計數減一
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             };
             service.execute(runnable);
         }
         try {
             System.out.println("主執行緒" + Thread.currentThread().getName() + "等待子執行緒執行完成...");
             latch.await(5,TimeUnit.SECONDS);//阻塞當前執行緒,直到計數器的值為0
             System.out.println("阻塞完畢!主執行緒" + Thread.currentThread().getName() + "繼續執行業務邏輯...");
             service.shutdownNow();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
 ​
     }
 }
 結果列印:子執行緒pool-1-thread-1開始執行
 子執行緒pool-1-thread-2開始執行
 子執行緒pool-1-thread-3開始執行
 主執行緒main等待子執行緒執行完成...
 子執行緒pool-1-thread-2執行完成
 子執行緒pool-1-thread-1執行完成
 子執行緒pool-1-thread-3執行完成
 阻塞完畢!主執行緒main繼續執行業務邏輯...

源碼解析:

 /**
  * 靜態內部類,自定義同步器組件
  */
 private final Sync sync;
  
 /**
  * 只有一個構造方法,接收一個count值
  */
 public CountDownLatch(int count) {
     // count值不能小於0
     if (count < 0) throw new IllegalArgumentException("count < 0");
     // 自定義一個同步組件;通過繼承AQS組件實現;
     this.sync = new Sync(count);
 }
 private static final class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 4982264981922014374L;
  
     Sync(int count) {
         // 使用構造函傳遞的參數值count作為同步狀態值。
         setState(count);
     }
  
     /** 獲取當前的count值 */
     int getCount() {
         return getState();
     }
  
     /**共享式獲取同步狀態<br>
      * 這是AQS的模板方法acquireShared、acquireSharedInterruptibly等方法內部將會調用的方法,
      * 由子類實現,這個方法的作用是嘗試獲取一次共享鎖,對於AQS來說,
      * 此方法返回值大於等於0,表示獲取共享鎖成功,反之則獲取共享鎖失敗,
      * 而在這裡,實際上就是判斷count是否等於0,執行緒能否向下運行
      */
     protected int tryAcquireShared(int acquires) {
         // 此處判斷state的值是否為0,也就是判斷count是否為0,
         // 若count為0,返回1,表示獲取鎖成功,此時執行緒將不會阻塞,正常運行
         // 若count不為0,則返回-1,表示獲取鎖失敗,執行緒將會被阻塞
         // 從這裡我們已經可以看出CountDownLatch的實現方式了
         return (getState() == 0) ? 1 : -1;
     }
  
     /**共享式釋放同步狀態<br>
      * 此方法的作用是用來釋放AQS的共享鎖,返回true表示釋放成功,反之則失敗
      * 此方法將會在AQS的模板方法releaseShared中被調用,
      * 在CountDownLatch中,這個方法用來減小count值
      */
     protected boolean tryReleaseShared(int releases) {
         // 使用死循環不斷嘗試釋放鎖
         for (;;) {
             // 首先獲取當前state的值,也就是count值
             int c = getState();
             /**若count值已經等於0,則不能繼續減小了,於是直接返回false
             /* 為什麼返回的是false,因為等於0表示之前等待的那些執行緒已經被喚醒了,            *若返回true,AQS會嘗試喚醒執行緒,若返回false,則直接結束,所以
             * 在沒有執行緒等待的情況下,返回false直接結束是正確的            */
             if (c == 0)
                 return false;
             // 若count不等於0,則將其-1
             int nextc = c-1;
             // compareAndSetState的作用是將count值從c,修改為新的nextc
             // 此方法基於CAS實現,保證了操作的原子性
             if (compareAndSetState(c, nextc))
                 // 若nextc == 0,則返回的是true,表示已經沒有鎖了,執行緒可以運行了,
                 // 若nextc > 0,則表示執行緒還需要繼續阻塞,此處將返回false
                 return nextc == 0;
         }
     }
 ​
 }

我們看下示例程式碼中關於latch.countDown()方法源碼部分:

 /**
  * 此方法的作用就是將count的值-1,如果count等於0了,就喚醒等待的執行緒
  */
 public void countDown() {
     // 這裡直接調用sync的releaseShared方法,這個方法的實現在AQS中,也是AQS提供的模板方法,
     // 這個方法的作用是當前執行緒釋放鎖,若釋放失敗,返回false,若釋放成功,則返回false,
     // 若鎖被釋放成功,則當前執行緒會喚醒AQS同步隊列中第一個被阻塞的執行緒,讓他嘗試獲取鎖
     // 對於CountDownLatch來說,釋放鎖實際上就是讓count - 1,只有當count被減小為0,
     // 鎖才是真正被釋放,執行緒才能繼續向下運行
     sync.releaseShared(1);
 }
 /**
 * 共享式的釋放同步狀態
 */
 public final boolean releaseShared(int arg) {
     // 調用tryReleaseShared嘗試釋放鎖,這個方法已經由Sycn重寫,請回顧上面對此方法的分析
     // 若tryReleaseShared返回true,表示count經過這次釋放後,等於0了,於是執行doReleaseShared
     if (tryReleaseShared(arg)) {
         // 這個方法的作用是喚醒AQS的同步隊列中,正在等待的第一個執行緒
         // 而我們分析acquireSharedInterruptibly方法時已經說過,
         // 若一個執行緒被喚醒,檢測到count == 0,會繼續喚醒下一個等待的執行緒
         // 也就是說,這個方法的作用是,在count == 0時,喚醒所有等待的執行緒
         doReleaseShared();
         return true;
     }
     return false;
 }

接下來我們看下另一個比較重要的方法即await方法部分源碼:

 // 此方法用來讓當前執行緒阻塞,直到count減小為0才恢復執行
 public void await() throws InterruptedException {
     // 這裡直接調用sync的acquireSharedInterruptibly方法,這個方法定義在AQS中
     // 方法的作用是嘗試獲取共享鎖,若獲取失敗,則執行緒將會被加入到AQS的同步隊列中等待
     // 直到獲取成功為止。且這個方法是會響應中斷的,執行緒在阻塞的過程中,若被其他執行緒中斷,
     // 則此方法會通過拋出異常的方式結束等待。
     sync.acquireSharedInterruptibly(1);
 }
 ​
 /**
 *此方法是AQS中提供的一個模板方法,用以獲取共享鎖,並且會響應中斷 */
 public final void acquireSharedInterruptibly(int arg)
     throws InterruptedException {
     // 首先判斷當前執行緒釋放被中斷,若被中斷,則直接拋出異常結束
     if (Thread.interrupted())
         throw new InterruptedException();
     
     // 調用tryAcquireShared方法嘗試獲取鎖,這個方法被Sycn類重寫了,
     // 若count == 0,則這個方法會返回1,表示獲取鎖成功,則這裡會直接返回,執行緒不會被阻塞;否則返回-1
     // 若count < 0,將會執行下面的doAcquireSharedInterruptibly方法,
     // 此處請去查看Sync中tryAcquireShared方法的實現
     if (tryAcquireShared(arg) < 0)
         // 下面這個方法的作用是,執行緒獲取鎖失敗,將會加入到AQS的同步隊列中阻塞等待,
         // 直到成功獲取到鎖,而此處成功獲取到鎖的條件就是count == 0,若當前執行緒在等待的過程中,
         // 成功地獲取了鎖,則它會繼續喚醒在它後面等待的執行緒,也嘗試獲取鎖,
         // 這也就是說,只要count == 0了,則被阻塞的執行緒都能恢復運行
         doAcquireSharedInterruptibly(arg);
 ​
 }

從源碼細節來看,我們知道CountDownLatch底層是繼承了AQS框架,是一個自定義同步組件。

AQS的狀態變數被它當做了一個所謂的計數器實現。主執行緒調用await方法後,發現state的值不等於0,進入同步隊列中阻塞等待。子執行緒每次調用countDown方法後,計數器減一,直到為0。這時會喚醒處於阻塞狀態的主執行緒,然後主執行緒就會從await方法出返回。

方案三:柵欄(CyclicBarrier)

什麼是柵欄?

CyclicBarrier字面意思是可循環(Cyclic)使用的柵欄(Barrier)。它的意思是讓一組執行緒到達一個柵欄時被阻塞,直到最後一個耗時較長的執行緒完成任務後也到達柵欄時,柵欄才會打開,此時所有被柵欄攔截的執行緒才會繼續執行。

基本語義:

CyclicBarrier有一個默認構造方法:CyclicBarrier(int parties),參數parties表示被柵欄攔截的執行緒數量。

每個執行緒調用await()方法告訴柵欄我已經到達柵欄,然後當前執行緒就會被阻塞,直到以下任一情況發生時,當前執行緒從await方法處返回。

  • 最後一個執行緒到達
  • 其他執行緒中斷當前執行緒
  • 其他執行緒等待柵欄超時;通過調用await帶超時時間的方法。
    await(long timeout, TimeUnit unit)
  • 其他一些執行緒在此屏障上調用重置

原理:

在CyclicBarrier的內部定義了一個Lock對象,每當一個執行緒調用await方法時,將攔截的執行緒數減1,然後判斷剩餘攔截數是否為初始值parties,如果不是,進入Lock對象的條件隊列等待。如果是,執行barrierAction對象的Runnable方法,然後將鎖的條件隊列中的所有執行緒放入鎖等待隊列中,這些執行緒會依次的獲取鎖、釋放鎖。

適用場景:

1)實現多人遊戲,直到所有玩家都加入才能開始。

2)經典場景:多執行緒計算數據,然後匯總結算結果場景。(比如一個Excel有多份sheet數據,開啟多執行緒,每個執行緒處理一個sheet,最終將每個sheet的計算結果進行匯總)

示例程式碼:

 public class CyclicBarrierTest2 {
     static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new CalculateResult());
 ​
     public static void main(String[] args) {
         new Thread(() -> {
             try {
                 System.out.println("執行緒A處理完sheet0數據...總計100");
                 cyclicBarrier.await();
             } catch (Exception e) {
 ​
             }
 ​
         }).start();
 ​
         try {
             System.out.println("執行緒B處理完sheet1數據...總計200");
             cyclicBarrier.await();
         } catch (Exception e) {
 ​
         }
     }
 ​
     static class CalculateResult implements Runnable {
 ​
         @Override
         public void run() {
             System.out.println("【匯匯流排程】開始統計各個子執行緒的計算結果...,總計300");
 ​
         }
     }
 }

響應結果列印:

 執行緒B處理完sheet1數據...總計200
 執行緒A處理完sheet0數據...總計100
 【匯匯流排程】開始統計各個子執行緒的計算結果...總計300

 

方案四:訊號量(Semaphore)

什麼是訊號量?

訊號量是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。

基本語義:

從Semaphore的構造方法Semaphore(int permits)來看,入參permits表示可用的許可數量。如果我們在方法內部執行操作前先執行了acquire()方法,那麼當前執行緒就會嘗試去獲取可用的許可,如果獲取不到,就會被阻塞(或者中途被其他執行緒中斷),直到有可用的許可為止。

執行release()方法意味著會釋放許可給Semaphore。此時許可數量就會加一。

使用場景:

Semaphore在有限**公共資源**場景下,應用比較廣泛,比如資料庫連接池場景。

大家可以想像一下,比如我們平時在用的C3P0、druid等資料庫連接池,因為資料庫連接數是有限制的,面對突如其來的激增流量,一下子把有限的連接數量給占完了,那沒有獲取到可用的連接的執行緒咋辦?是直接失敗嗎?

我們期望的效果是讓這些沒獲取到連接的執行緒先暫時阻塞一會,而不是立即失敗,這樣一旦有可用的連接,這些被阻塞的執行緒就可以獲取到連接而繼續工作。

示例程式碼:

 public class BoundedHashSet<T> {
 ​
     private final Set<T> set;
 ​
     private final Semaphore sem;
 ​
     public BoundedHashSet(int bound) {
         this.set = Collections.synchronizedSet(new HashSet<T>());
         this.sem = new Semaphore(bound);
     }
 ​
     public boolean add(T o) throws InterruptedException {
         sem.acquire();
 ​
         boolean wasAdded = false;
 ​
         try {
             wasAdded = set.add(o);//如果元素已存在,返回false;否則true
             return wasAdded;
         } catch (Exception e) {
 ​
         } finally {
             if (!wasAdded) {//如果元素已經存在,則釋放許可給訊號量
                 sem.release();
             }
         }
         return wasAdded;
     }
 ​
     public boolean remove(Object o) {
         boolean wasRemoved = set.remove(o);
         if (wasRemoved) {
             sem.release();//從容器中移除元素後,需要釋放許可給訊號量。
         }
         return wasRemoved;
     }
 ​
 ​
 }

總結

上述需求的實現方案我例舉了join、CountDownLatch、CyclicBarrierSemaphore這幾種。

期間也介紹了每種方案的實現原理、適用場景、源碼解析。它們語意上有一些相似的地方,但差異性也很明顯,接下來我們詳細對它們進行一下對比。

首先我們說當前執行緒調用t.join()儘管能達到當前執行緒等待執行緒t完成任務的業務語義。但細緻的區別是join方法調用後必須要等到t執行緒完成它的任務後,當前執行緒才能從阻塞出返回。而CountDownLatch、CyclicBarrier顯然提供了更細粒度的控制。像CountDownLatch只要主執行緒將countDownLatch實例對象傳遞給子執行緒,子執行緒在方法內部某個地方執行latch.countDownLatch(),每調用一次計數器就會減1,直到為0,最後主執行緒就能感知到並從await阻塞出返回,不需要等到任務的完成。

其次我們說在當前執行緒方法內部,一旦出現超過2個join方法,整體程式碼就會變的很臟、可讀性降低。反觀JUC分裝的CountDownLatch、CyclicBarrier等組件,通過對共享實例的操作(可以把這個實例傳給子執行緒,然後子執行緒任務執行的時候調用相應方法,比如latch.countDown()) 顯得更加清晰、優雅。

最後比較一下CyclicBarrier和CountDownLatch的差異性。比起CountDownLatch顯然CyclicBarrier功能更多,比如支援reset方法。CountDownLatch的計數器只能使用一次,而CyclicBarrier可以多次使用,只要調用reset方法即可。(比如CyclicBarrier典型的數據統計場景,因為中途可能部分執行緒統計出錯或外部數據的訂正,可能需要重新再來一次計算,那麼這個時候,CountDownLatch無能為力,而CyclicBarrier只要子執行緒調用reset方法即可)。

而Semaphore往往用來針對多執行緒並發訪問指定有限資源的場景,比如資料庫連接池場景。

 

寫到最後

如果這篇文章你看了對你有幫助或啟發,麻煩關注、點贊一下作者。你的肯定是作者創作源源不斷的動力。

公眾號

歡迎大家關注我的公眾號:【陶朱公Boy

裡面不僅彙集了硬核的乾貨技術、還彙集了像左耳朵耗子、張朝陽總結的高效學習方法論、職場升遷竅門、軟技能。希望能輔助你到達你的夢想之地!

加群

同時作者還建了一個技術交流群,互聯網寒冬,大家一起抱團取暖!關注公眾號後回復」加群「,拉你入群。與眾多高手一起切磋、交流。相信肯定會有所收穫!