執行緒的三個同步器
- 2020 年 4 月 12 日
- 筆記
不知不覺就遇到了執行緒同步器問題,查了資料寫下了總結
1. CountDownLatch
日常中會有開啟多個執行緒去並發執行任務,而主執行緒要等所有子執行緒執行完之後才能運行的需求。之前我們是使用Thread.join方法來實現的,過程如下:
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread( () -> { try { Thread.sleep(1000); System.out.println("t1 over"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t2 = new Thread( () -> { try { Thread.sleep(2000); System.out.println("t2 over"); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("mian over"); }
t1 over t2 over mian over
join()方法不夠靈活,現在JDK提供了CountDownLatch這個類來實現所需功能
private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { ExecutorService t = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { System.out.println("r1 sleep"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }; Runnable r2 = () -> { try { System.out.println("r2 sleep"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }; t.submit(r1); t.submit(r2); System.out.println("main wait"); countDownLatch.await(); System.out.println("main over"); }
main wait r1 sleep r2 sleep main over
CountDownLatch流程:
- 新建CountDownLatch實例,傳入計數器次數
- 主執行緒調用CountDownLatch.await()方法後會被阻塞
- 子執行緒中在某處調用CountDownLatch.countDown()方法可使內部計數器減1
- 當計數器變成0時,主執行緒的await()方法才會返回
CountDownLatch優點:
- 調用Thread.join()調用執行緒會被阻塞至子執行緒運行完畢,而CountDownLatch.countDown()可在執行緒運行中執行
- 使用執行緒池時是提交任務的,而沒有接觸到執行緒無法使用執行緒方法,那麼countDown()可加在Runnable中執行
CountDownLatch原理:
內部維護了一個計數器,當計數器為0就放行,源碼就不放了,熟悉AQS的同學想想就知道怎麼回事
- 繼承了AQS,其實就是用AQS的state來表示計數器
- await()方法內部有acquireSharedInterruptibly(),後者調用了重寫tryaquireShared()其實就是判斷計數器是否為0,不為0則阻塞進AQS隊列
- countDown()方法內部有releaseShared(),後者調用了重寫tryReleaseShared()計數器減一,若為0,則喚醒阻塞執行緒
2. CyclicBarrier
滿足多個執行緒都到達同一個位置後才全部開始運行的需求。CountDownLatch是一次性使用的,計數器為0後再次調用會直接返回,此時升級版的CyclicBarrier來了,其一可以滿足計數器重置功能,且二還可以讓一組執行緒達到一個狀態後再全部同時執行
場景要求:假設一個任務分為3個階段,每個執行緒要串列地從低階段執行到高階段
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("一個階段完成")); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { System.out.println(Thread.currentThread() + "Step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step3"); } catch (Exception e) { e.printStackTrace(); } }; Runnable r2 = () -> { try { System.out.println(Thread.currentThread() + "Step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread() + "Step3"); } catch (Exception e) { e.printStackTrace(); } }; service.submit(r1); service.submit(r2); service.shutdown(); }
Thread[pool-1-thread-1,5,main]Step1 Thread[pool-1-thread-2,5,main]Step1 一個階段完成 Thread[pool-1-thread-1,5,main]Step2 Thread[pool-1-thread-2,5,main]Step2 一個階段完成 Thread[pool-1-thread-1,5,main]Step3 Thread[pool-1-thread-2,5,main]Step3
CyclicBarrier的流程
- 和上面差不多就不一一解釋了
- CyclicBarrier的構造方法中,第一個參數為計數器次數,第二個為階段結束後要執行的方法
CyclicBarrier的原理
- 基於獨佔鎖,底層是AQS實現,獨佔鎖可以原子性改變計數器,以及條件隊列阻塞執行緒來實現執行緒同步
- 內部有parties和count變數,實現重置功能
- await()方法內調用dowait()方法
- 獲取鎖更新次數減一
- 沒有為0,阻塞當前執行緒加入條件隊列
- 為0執行屏蔽點任務,然後喚醒條件隊列的全部執行緒
3. Semaphore
不同與前兩者,Semaphore訊號量內部計數器是遞增的,在需要同步的地方調用acquire指定需要同步的個數即可
private static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Runnable r1 = () -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "over"); semaphore.release();; }; Runnable r2 = () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "over"); semaphore.release(); }; service.submit(r1); service.submit(r2); semaphore.acquire(2); System.out.println("All child thread over"); service.shutdown(); }
Semaphore的流程
- Semaphore的構造函數傳參複製當前計數器的值
- 每個執行緒內部調用release()即計數器加1
- 主執行緒調用acquire()方法傳參為2 ,會被阻塞至計數器到達2
Semaphore的原理
- 底層還是使用AQS,提供了公平與非公平,也是用state表示次數
- acquire()方法獲取一個訊號量,並且state減一
- 若為0,直接返回
- 不為0當前執行緒會被加入AQS阻塞隊列
- release()方法,把當前Semaphore的訊號量加1,然後會選擇一個訊號量滿足的執行緒進行激活
- 內部還實現了公平與非公平策略