執行緒的三個同步器

  • 2020 年 4 月 12 日
  • 筆記

不知不覺就遇到了執行緒同步器問題,查了資料寫下了總結

1. CountDownLatch

日常中會有開啟多個執行緒去並發執行任務,而主執行緒要等所有子執行緒執行完之後才能運行的需求。之前我們是使用Thread.join方法來實現的,過程如下:

public static void main(String[] args) throws InterruptedException {        Thread t1 = new Thread( () -> {          try {              Thread.sleep(1000);              System.out.println("t1 over");          } catch (InterruptedException e) {              e.printStackTrace();          }      });        Thread t2 = new Thread( () -> {          try {              Thread.sleep(2000);              System.out.println("t2 over");          } catch (InterruptedException e) {              e.printStackTrace();          }      });        t1.start();      t2.start();        t1.join();      t2.join();        System.out.println("mian over");  }  
t1 over  t2 over  mian over  

join()方法不夠靈活,現在JDK提供了CountDownLatch這個類來實現所需功能

private static CountDownLatch countDownLatch = new CountDownLatch(2);    public static void main(String[] args) throws InterruptedException {        ExecutorService t = Executors.newCachedThreadPool();        Runnable r1 = () -> {          try {              System.out.println("r1 sleep");              Thread.sleep(1000);          } catch (InterruptedException e) {              e.printStackTrace();          } finally {              countDownLatch.countDown();          }      };        Runnable r2 = () -> {          try {              System.out.println("r2 sleep");              Thread.sleep(2000);          } catch (InterruptedException e) {              e.printStackTrace();          } finally {              countDownLatch.countDown();          }      };        t.submit(r1);      t.submit(r2);        System.out.println("main wait");      countDownLatch.await();      System.out.println("main over");  }  
main wait  r1 sleep  r2 sleep  main over  

CountDownLatch流程:

  • 新建CountDownLatch實例,傳入計數器次數
  • 主執行緒調用CountDownLatch.await()方法後會被阻塞
  • 子執行緒中在某處調用CountDownLatch.countDown()方法可使內部計數器減1
  • 當計數器變成0時,主執行緒的await()方法才會返回

CountDownLatch優點:

  • 調用Thread.join()調用執行緒會被阻塞至子執行緒運行完畢,而CountDownLatch.countDown()可在執行緒運行中執行
  • 使用執行緒池時是提交任務的,而沒有接觸到執行緒無法使用執行緒方法,那麼countDown()可加在Runnable中執行

CountDownLatch原理:

內部維護了一個計數器,當計數器為0就放行,源碼就不放了,熟悉AQS的同學想想就知道怎麼回事

  • 繼承了AQS,其實就是用AQS的state來表示計數器
  • await()方法內部有acquireSharedInterruptibly(),後者調用了重寫tryaquireShared()其實就是判斷計數器是否為0,不為0則阻塞進AQS隊列
  • countDown()方法內部有releaseShared(),後者調用了重寫tryReleaseShared()計數器減一,若為0,則喚醒阻塞執行緒

2. CyclicBarrier

滿足多個執行緒都到達同一個位置後才全部開始運行的需求。CountDownLatch是一次性使用的,計數器為0後再次調用會直接返回,此時升級版的CyclicBarrier來了,其一可以滿足計數器重置功能,且二還可以讓一組執行緒達到一個狀態後再全部同時執行

場景要求:假設一個任務分為3個階段,每個執行緒要串列地從低階段執行到高階段

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,                          () -> System.out.println("一個階段完成"));    public static void main(String[] args) throws InterruptedException {        ExecutorService service = Executors.newCachedThreadPool();        Runnable r1 = () -> {          try {              System.out.println(Thread.currentThread() + "Step1");              cyclicBarrier.await();                System.out.println(Thread.currentThread() + "Step2");              cyclicBarrier.await();                System.out.println(Thread.currentThread() + "Step3");            } catch (Exception e) {              e.printStackTrace();          }      };        Runnable r2 = () -> {          try {              System.out.println(Thread.currentThread() + "Step1");              cyclicBarrier.await();                System.out.println(Thread.currentThread() + "Step2");              cyclicBarrier.await();                System.out.println(Thread.currentThread() + "Step3");            } catch (Exception e) {              e.printStackTrace();          }      };        service.submit(r1);      service.submit(r2);        service.shutdown();  }  
Thread[pool-1-thread-1,5,main]Step1  Thread[pool-1-thread-2,5,main]Step1  一個階段完成  Thread[pool-1-thread-1,5,main]Step2  Thread[pool-1-thread-2,5,main]Step2  一個階段完成  Thread[pool-1-thread-1,5,main]Step3  Thread[pool-1-thread-2,5,main]Step3  

CyclicBarrier的流程

  • 和上面差不多就不一一解釋了
  • CyclicBarrier的構造方法中,第一個參數為計數器次數,第二個為階段結束後要執行的方法

CyclicBarrier的原理

  • 基於獨佔鎖,底層是AQS實現,獨佔鎖可以原子性改變計數器,以及條件隊列阻塞執行緒來實現執行緒同步
  • 內部有parties和count變數,實現重置功能
  • await()方法內調用dowait()方法
    • 獲取鎖更新次數減一
    • 沒有為0,阻塞當前執行緒加入條件隊列
    • 為0執行屏蔽點任務,然後喚醒條件隊列的全部執行緒

3. Semaphore

不同與前兩者,Semaphore訊號量內部計數器是遞增的,在需要同步的地方調用acquire指定需要同步的個數即可

private static Semaphore semaphore = new Semaphore(0);    public static void main(String[] args) throws InterruptedException {        ExecutorService service = Executors.newCachedThreadPool();        Runnable r1 = () -> {          try {              Thread.sleep(2000);          } catch (InterruptedException e) {              e.printStackTrace();          }          System.out.println(Thread.currentThread() + "over");          semaphore.release();;      };        Runnable r2 = () -> {          try {              Thread.sleep(1000);          } catch (InterruptedException e) {              e.printStackTrace();          }          System.out.println(Thread.currentThread() + "over");          semaphore.release();      };        service.submit(r1);      service.submit(r2);        semaphore.acquire(2);      System.out.println("All child thread over");        service.shutdown();  }  

Semaphore的流程

  • Semaphore的構造函數傳參複製當前計數器的值
  • 每個執行緒內部調用release()即計數器加1
  • 主執行緒調用acquire()方法傳參為2 ,會被阻塞至計數器到達2

Semaphore的原理

  • 底層還是使用AQS,提供了公平與非公平,也是用state表示次數
  • acquire()方法獲取一個訊號量,並且state減一
    • 若為0,直接返回
    • 不為0當前執行緒會被加入AQS阻塞隊列
  • release()方法,把當前Semaphore的訊號量加1,然後會選擇一個訊號量滿足的執行緒進行激活
  • 內部還實現了公平與非公平策略