Java 並發工具類 CountDownLatch、CyclicBarrier、Semaphore、Exchanger

本文部分摘自《Java 並發編程的藝術》

CountDownLatch

CountDownLatch 允許一個或多個線程等待其他線程完成操作。假設現有一個需求:我們需要解析一個 Excel 里多個 sheet 的數據,此時可以考慮使用多線程,每個線程解析一個 sheet 的數據,等到所有的 sheet 都解析完之後,程序需要提示解析完成。在這個需求中,要實現主線程等待所有線程完成 sheet 的解析操作,最簡單的做法就是使用 join() 方法

public class JoinCountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        Thread parser1 = new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("parser2 finish");
            }
        });

        Thread parser2 = new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("parser2 finish");
            }
        });

        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }
}

在 JDK5 之後的並發包中提供的 CountDownLatch 也可以實現 join 的功能,並且比 join 的功能更多

public class CountDownLatchTest {

    // CountDown 的構造函數接收一個 int 類型的參數作為計數器
    // 假設想等待 N 個點完成,就傳入 N
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {

        new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println(1);
                // 每當調用 countDown 方法時,N 就會減一
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
		// await 會阻塞當前線程,直到 N 變成零
        c.await();
        System.out.println(3);
    }
}

CyclicBarrier

CyclicBarrier 可以讓一組線程到達一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會放行,所有被屏障攔截的線程才會繼續運行

public class CyclicBarrierTest {
    
	// 傳入參數為2
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {

        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    //調用await方法,計數減一,並阻塞,直到計數為零才放行 
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

如果把 new CyclicBarrier(2) 修改為 new CyclicBarrier(3),則主線程和子線程會永遠等待,因為沒有第三個線程執行 await 方法

CyclicBarrier 還提供一個更高級的構造函數 CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行 barrierAction 方法

public class CyclicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {

        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {

        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

最終輸出結果一定先是 3 開頭

Semaphore

Semaphore(信號量)用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源

1. 應用場景

Semaphore 可以用於做流量控制,特別是公用資源有限的應用場景,比如數據庫連接

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {

            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

代碼中,雖然有 30 個線程執行,但只允許 10 個線程並發執行。Semaphore 的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore 的用法也很簡單,首先線程使用 Semaphore 的 acquire() 方法獲取一個許可證,使用完之後調用 release() 方法歸還即可,還可以使用 tryAcquire() 方法嘗試獲取許可證

Exchanger

Exchanger(交換者)是一個用於線程間協作的工具類,用於線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以彼此交換數據。這兩個線程通過 exchange 方法交換數據,如果第一個線程先執行 exchange 方法,它會一直等待第二個線程執行 exchange 方法,當兩個線程都到達同步點時,兩個線程就可以交換數據了

假設現在有一個需求:我們需要將紙質銀行流水通過人工的方式錄入電子銀行流水,為了避免錯誤,採用 AB 崗兩人進行錄入,錄入完成後,系統需加載這兩人錄入的數據進行比較,看看是否錄入一致

public class ExchangerTest {

    private static final Exchanger<String> exchanger = new Exchanger<>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    // A錄入銀行流水數據
                    String A = "銀行流水A";
                    exchanger.exchange(A);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        threadPool.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    // B錄入銀行流水數據
                    String B = "銀行流水B";
                    String A = exchanger.exchange(B);
                    System.out.println("A 和 B 數據是否一致:" + A.equals(B) + ", A 錄入的是:" + A
                        + ", B 錄入的是:" + B);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        threadPool.shutdown();
    }
}