Java 並發工具類 CountDownLatch、CyclicBarrier、Semaphore、Exchanger
本文部分摘自《Java 並發編程的藝術》
CountDownLatch
CountDownLatch 允許一個或多個線程等待其他線程完成操作。假設現有一個需求:我們需要解析一個 Excel 里多個 sheet 的數據,此時可以考慮使用多線程,每個線程解析一個 sheet 的數據,等到所有的 sheet 都解析完之後,程序需要提示解析完成。在這個需求中,要實現主線程等待所有線程完成 sheet 的解析操作,最簡單的做法就是使用 join() 方法
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
在 JDK5 之後的並發包中提供的 CountDownLatch 也可以實現 join 的功能,並且比 join 的功能更多
public class CountDownLatchTest {
// CountDown 的構造函數接收一個 int 類型的參數作為計數器
// 假設想等待 N 個點完成,就傳入 N
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
// 每當調用 countDown 方法時,N 就會減一
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
// await 會阻塞當前線程,直到 N 變成零
c.await();
System.out.println(3);
}
}
CyclicBarrier
CyclicBarrier 可以讓一組線程到達一個屏障(同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會放行,所有被屏障攔截的線程才會繼續運行
public class CyclicBarrierTest {
// 傳入參數為2
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//調用await方法,計數減一,並阻塞,直到計數為零才放行
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
}
如果把 new CyclicBarrier(2) 修改為 new CyclicBarrier(3),則主線程和子線程會永遠等待,因為沒有第三個線程執行 await 方法
CyclicBarrier 還提供一個更高級的構造函數 CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行 barrierAction 方法
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
最終輸出結果一定先是 3 開頭
Semaphore
Semaphore(信號量)用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源
1. 應用場景
Semaphore 可以用於做流量控制,特別是公用資源有限的應用場景,比如數據庫連接
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
代碼中,雖然有 30 個線程執行,但只允許 10 個線程並發執行。Semaphore 的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore 的用法也很簡單,首先線程使用 Semaphore 的 acquire() 方法獲取一個許可證,使用完之後調用 release() 方法歸還即可,還可以使用 tryAcquire() 方法嘗試獲取許可證
Exchanger
Exchanger(交換者)是一個用於線程間協作的工具類,用於線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以彼此交換數據。這兩個線程通過 exchange 方法交換數據,如果第一個線程先執行 exchange 方法,它會一直等待第二個線程執行 exchange 方法,當兩個線程都到達同步點時,兩個線程就可以交換數據了
假設現在有一個需求:我們需要將紙質銀行流水通過人工的方式錄入電子銀行流水,為了避免錯誤,採用 AB 崗兩人進行錄入,錄入完成後,系統需加載這兩人錄入的數據進行比較,看看是否錄入一致
public class ExchangerTest {
private static final Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// A錄入銀行流水數據
String A = "銀行流水A";
exchanger.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// B錄入銀行流水數據
String B = "銀行流水B";
String A = exchanger.exchange(B);
System.out.println("A 和 B 數據是否一致:" + A.equals(B) + ", A 錄入的是:" + A
+ ", B 錄入的是:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}