Java 並發編程要點

使用線程

有三種使用線程的方法:

  • 實現 Runnable 接口;
  • 實現 Callable 接口;
  • 繼承 Thread 類。

實現 Runnable 和 Callable 接口的類只能當做一個可以在線程中運行的任務,不是真正意義上的線程,因此最後還需要通過 Thread 來調用。可以理解為任務是通過線程驅動從而執行的。

實現 Runnable 接口

需要實現接口中的 run() 方法。

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        // ...
    }
}

使用 Runnable 實例再創建一個 Thread 實例,然後調用 Thread 實例的 start() 方法來啟動線程。

public static void main(String[] args) {
    MyRunnable instance = new MyRunnable();
    Thread thread = new Thread(instance);
    thread.start();
}

實現 Callable 接口

與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進行封裝。

public class MyCallable implements Callable<Integer> {
    public Integer call() {
        return 123;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    MyCallable mc = new MyCallable();
    FutureTask<Integer> ft = new FutureTask<>(mc);
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get());
}

繼承 Thread 類

同樣也是需要實現 run() 方法,因為 Thread 類也實現了 Runable 接口。

當調用 start() 方法啟動一個線程時,虛擬機會將該線程放入就緒隊列中等待被調度,當一個線程被調度時會執行該線程的 run() 方法。

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyThread mt = new MyThread();
    mt.start();
}

實現接口 VS 繼承 Thread

實現接口會更好一些,因為:

  • Java 不支持多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實現多個接口;
  • 類可能只要求可執行就行,繼承整個 Thread 類開銷過大。

線程機制

Executor

Executor 管理多個異步任務的執行,而無需程序員顯式地管理線程的生命周期。這裡的異步是指多個任務的執行互不干擾,不需要進行同步操作。

主要有三種 Executor:

  • CachedThreadPool:一個任務創建一個線程;
  • FixedThreadPool:所有任務只能使用固定大小的線程;
  • SingleThreadExecutor:相當於大小為 1 的 FixedThreadPool。
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        executorService.execute(new MyRunnable());
    }
    executorService.shutdown();
}

Daemon

守護線程是程序運行時在後台提供服務的線程,不屬於程序中不可或缺的部分。

當所有非守護線程結束時,程序也就終止,同時會殺死所有守護線程。

main() 屬於非守護線程。

在線程啟動之前使用 setDaemon() 方法可以將一個線程設置為守護線程。

public static void main(String[] args) {
    Thread thread = new Thread(new MyRunnable());
    thread.setDaemon(true);
}

sleep()

Thread.sleep(millisec) 方法會休眠當前正在執行的線程,millisec 單位為毫秒。

sleep() 可能會拋出 InterruptedException,因為異常不能跨線程傳播回 main() 中,因此必須在本地進行處理。線程中拋出的其它異常也同樣需要在本地進行處理。

public void run() {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

yield()

對靜態方法 Thread.yield() 的調用聲明了當前線程已經完成了生命周期中最重要的部分,可以切換給其它線程來執行。該方法只是對線程調度器的一個建議,而且也只是建議具有相同優先級的其它線程可以運行。

public void run() {
    Thread.yield();
}

中斷線程

一個線程執行完畢之後會自動結束,如果在運行過程中發生異常也會提前結束。

InterruptedException

通過調用一個線程的 interrupt() 來中斷該線程,如果該線程處於阻塞、限期等待或者無限期等待狀態,那麼就會拋出 InterruptedException,從而提前結束該線程。但是不能中斷 I/O 阻塞和 synchronized 鎖阻塞。

對於以下代碼,在 main() 中啟動一個線程之後再中斷它,由於線程中調用了 Thread.sleep() 方法,因此會拋出一個 InterruptedException,從而提前結束線程,不執行之後的語句。

public class InterruptExample {

    private static class MyThread1 extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new MyThread1();
    thread1.start();
    thread1.interrupt();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at InterruptExample.lambda$main$0(InterruptExample.java:5)
    at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

interrupted()

如果一個線程的 run() 方法執行一個無限循環,並且沒有執行 sleep() 等會拋出 InterruptedException 的操作,那麼調用線程的 interrupt() 方法就無法使線程提前結束。

但是調用 interrupt() 方法會設置線程的中斷標記,此時調用 interrupted() 方法會返回 true。因此可以在循環體中使用 interrupted() 方法來判斷線程是否處於中斷狀態,從而提前結束線程。

public class InterruptExample {

    private static class MyThread2 extends Thread {
        @Override
        public void run() {
            while (!interrupted()) {
                // ..
            }
            System.out.println("Thread end");
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread2 = new MyThread2();
    thread2.start();
    thread2.interrupt();
}
Thread end

Executor 的中斷操作

調用 Executor 的 shutdown() 方法會等待線程都執行完畢之後再關閉,但是如果調用的是 shutdownNow() 方法,則相當於調用每個線程的 interrupt() 方法。

以下使用 Lambda 創建線程,相當於創建了一個匿名內部線程。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

如果只想中斷 Executor 中的一個線程,可以通過使用 submit() 方法來提交一個線程,它會返回一個 Future<?> 對象,通過調用該對象的 cancel(true) 方法就可以中斷線程。

Future<?> future = executorService.submit(() -> {
    // ..
});
future.cancel(true);

互斥同步

Java 提供了兩種鎖機制來控制多個線程對共享資源的互斥訪問,第一個是 JVM 實現的 synchronized,而另一個是 JDK 實現的 ReentrantLock。

synchronized

1. 同步一個代碼塊

public void func() {
    synchronized (this) {
        // ...
    }
}

它只作用於同一個對象,如果調用兩個對象上的同步代碼塊,就不會進行同步。

對於以下代碼,使用 ExecutorService 執行了兩個線程,由於調用的是同一個對象的同步代碼塊,因此這兩個線程會進行同步,當一個線程進入同步語句塊時,另一個線程就必須等待。

public class SynchronizedExample {

    public void func1() {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

對於以下代碼,兩個線程調用了不同對象的同步代碼塊,因此這兩個線程就不需要同步。從輸出結果可以看出,兩個線程交叉執行。

public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

2. 同步一個方法

public synchronized void func () {
    // ...
}

它和同步代碼塊一樣,作用於同一個對象。

3. 同步一個類

public void func() {
    synchronized (SynchronizedExample.class) {
        // ...
    }
}

作用於整個類,也就是說兩個線程調用同一個類的不同對象上的這種同步語句,也會進行同步。

public class SynchronizedExample {

    public void func2() {
        synchronized (SynchronizedExample.class) {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        }
    }
}
public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func2());
    executorService.execute(() -> e2.func2());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

4. 同步一個靜態方法

public synchronized static void fun() {
    // ...
}

作用於整個類。

ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 確保釋放鎖,從而避免發生死鎖。
        }
    }
}
public static void main(String[] args) {
    LockExample lockExample = new LockExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> lockExample.func());
    executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

比較

1. 鎖的實現

synchronized 是 JVM 實現的,而 ReentrantLock 是 JDK 實現的。

2. 性能

新版本 Java 對 synchronized 進行了很多優化,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同。

3. 等待可中斷

當持有鎖的線程長期不釋放鎖的時候,正在等待的線程可以選擇放棄等待,改為處理其他事情。

ReentrantLock 可中斷,而 synchronized 不行。

4. 公平鎖

公平鎖是指多個線程在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖。

synchronized 中的鎖是非公平的,ReentrantLock 默認情況下也是非公平的,但是也可以是公平的。

5. 鎖綁定多個條件

一個 ReentrantLock 可以同時綁定多個 Condition 對象。

使用選擇

除非需要使用 ReentrantLock 的高級功能,否則優先使用 synchronized。這是因為 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。並且使用 synchronized 不用擔心沒有釋放鎖而導致死鎖問題,因為 JVM 會確保鎖的釋放。

線程之間的協作

當多個線程可以一起工作去解決某個問題時,如果某些部分必須在其它部分之前完成,那麼就需要對線程進行協調。

join()

在線程中調用另一個線程的 join() 方法,會將當前線程掛起,而不是忙等待,直到目標線程結束。

對於以下代碼,雖然 b 線程先啟動,但是因為在 b 線程中調用了 a 線程的 join() 方法,b 線程會等待 a 線程結束才繼續執行,因此最後能夠保證 a 線程的輸出先於 b 線程的輸出。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}
public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
A
B

wait()、notify()、notifyAll()

調用 wait() 使得線程等待某個條件滿足,線程在等待時會被掛起,當其他線程的運行使得這個條件滿足時,其它線程會調用 notify() 或者 notifyAll() 來喚醒掛起的線程。

它們都屬於 Object 的一部分,而不屬於 Thread。

只能用在同步方法或者同步控制塊中使用,否則會在運行時拋出 IllegalMonitorStateException。

使用 wait() 掛起期間,線程會釋放鎖。這是因為,如果沒有釋放鎖,那麼其它線程就無法進入對象的同步方法或者同步控制塊中,那麼就無法執行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。

public class WaitNotifyExample {

    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

wait() 和 sleep() 的區別

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態方法;
  • wait() 會釋放鎖,sleep() 不會。

await()、signal()、signalAll()

java.util.concurrent 類庫中提供了 Condition 類來實現線程之間的協調,可以在 Condition 上調用 await() 方法使線程等待,其它線程調用 signal() 或 signalAll() 方法喚醒等待的線程。

相比於 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。

使用 Lock 來獲取一個 Condition 對象。

public class AwaitSignalExample {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

線程狀態

一個線程只能處於一種狀態,並且這裡的線程狀態特指 Java 虛擬機的線程狀態,不能反映線程在特定操作系統下的狀態。

新建(New)

創建後尚未啟動。

可運行(Runable)

正在 Java 虛擬機中運行。但是在操作系統層面,它可能處於運行狀態,也可能等待資源調度(例如處理器資源),資源調度完成就進入運行狀態。所以該狀態的可運行是指可以被運行,具體有沒有運行要看底層操作系統的資源調度。

阻塞(Blocked)

請求獲取 monitor lock 從而進入 synchronized 函數或者代碼塊,但是其它線程已經佔用了該 monitor lock,所以出於阻塞狀態。要結束該狀態進入從而 RUNABLE 需要其他線程釋放 monitor lock。

無限期等待(Waiting)

等待其它線程顯式地喚醒。

阻塞和等待的區別在於,阻塞是被動的,它是在等待獲取 monitor lock。而等待是主動的,通過調用 Object.wait() 等方法進入。

進入方法 退出方法
沒有設置 Timeout 參數的 Object.wait() 方法 Object.notify() / Object.notifyAll()
沒有設置 Timeout 參數的 Thread.join() 方法 被調用的線程執行完畢
LockSupport.park() 方法 LockSupport.unpark(Thread)

限期等待(Timed waiting)

無需等待其它線程顯式地喚醒,在一定時間之後會被系統自動喚醒。

進入方法 退出方法
Thread.sleep() 方法 時間結束
設置了 Timeout 參數的 Object.wait() 方法 時間結束 / Object.notify() / Object.notifyAll()
設置了 Timeout 參數的 Thread.join() 方法 時間結束 / 被調用的線程執行完畢
LockSupport.parkNanos() 方法 LockSupport.unpark(Thread)
LockSupport.parkUntil() 方法 LockSupport.unpark(Thread)

調用 Thread.sleep() 方法使線程進入限期等待狀態時,常常用「使一個線程睡眠」進行描述。調用 Object.wait() 方法使線程進入限期等待或者無限期等待時,常常用「掛起一個線程」進行描述。睡眠和掛起是用來描述行為,而阻塞和等待用來描述狀態。

死亡(Terminated)

可以是線程結束任務之後自己結束,或者產生了異常而結束。

線程安全

多個線程不管以何種方式訪問某個類,並且在主調代碼中不需要進行同步,都能表現正確的行為。

線程安全有以下幾種實現方式:

不可變

不可變(Immutable)的對象一定是線程安全的,不需要再採取任何的線程安全保障措施。只要一個不可變的對象被正確地構建出來,永遠也不會看到它在多個線程之中處於不一致的狀態。多線程環境下,應當盡量使對象成為不可變,來滿足線程安全。

不可變的類型:

  • final 關鍵字修飾的基本數據類型
  • String
  • 枚舉類型
  • Number 部分子類,如 Long 和 Double 等數值包裝類型,BigInteger 和 BigDecimal 等大數據類型。但同為 Number 的原子類 AtomicInteger 和 AtomicLong 則是可變的。

對於集合類型,可以使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a", 1);
    }
}
Exception in thread "main" java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先對原始的集合進行拷貝,需要對集合進行修改的方法都直接拋出異常。

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

互斥同步

synchronized 和 ReentrantLock。

非阻塞同步

互斥同步最主要的問題就是線程阻塞和喚醒所帶來的性能問題,因此這種同步也稱為阻塞同步。

互斥同步屬於一種悲觀的並發策略,總是認為只要不去做正確的同步措施,那就肯定會出現問題。無論共享數據是否真的會出現競爭,它都要進行加鎖(這裡討論的是概念模型,實際上虛擬機會優化掉很大一部分不必要的加鎖)、用戶態核心態轉換、維護鎖計數器和檢查是否有被阻塞的線程需要喚醒等操作。

隨着硬件指令集的發展,我們可以使用基於衝突檢測的樂觀並發策略:先進行操作,如果沒有其它線程爭用共享數據,那操作就成功了,否則採取補償措施(不斷地重試,直到成功為止)。這種樂觀的並發策略的許多實現都不需要將線程阻塞,因此這種同步操作稱為非阻塞同步。

1. CAS

樂觀鎖需要操作和衝突檢測這兩個步驟具備原子性,這裡就不能再使用互斥同步來保證了,只能靠硬件來完成。硬件支持的原子性操作最典型的是:比較並交換(Compare-and-Swap,CAS)。CAS 指令需要有 3 個操作數,分別是內存地址 V、舊的預期值 A 和新值 B。當執行操作時,只有當 V 的值等於 A,才將 V 的值更新為 B。

2. AtomicInteger

J.U.C 包裏面的整數原子類 AtomicInteger 的方法調用了 Unsafe 類的 CAS 操作。

以下代碼使用了 AtomicInteger 執行了自增的操作。

private AtomicInteger cnt = new AtomicInteger();

public void add() {
    cnt.incrementAndGet();
}

以下代碼是 incrementAndGet() 的源碼,它調用了 Unsafe 的 getAndAddInt() 。

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下代碼是 getAndAddInt() 源碼,var1 指示對象內存地址,var2 指示該字段相對對象內存地址的偏移,var4 指示操作需要加的數值,這裡為 1。通過 getIntVolatile(var1, var2) 得到舊的預期值,通過調用 compareAndSwapInt() 來進行 CAS 比較,如果該字段內存地址中的值等於 var5,那麼就更新內存地址為 var1+var2 的變量為 var5+var4。

可以看到 getAndAddInt() 在一個循環中進行,發生衝突的做法是不斷的進行重試。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

3. ABA

如果一個變量初次讀取的時候是 A 值,它的值被改成了 B,後來又被改回為 A,那 CAS 操作就會誤認為它從來沒有被改變過。

J.U.C 包提供了一個帶有標記的原子引用類 AtomicStampedReference 來解決這個問題,它可以通過控制變量值的版本來保證 CAS 的正確性。大部分情況下 ABA 問題不會影響程序並發的正確性,如果需要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。

無同步方案

要保證線程安全,並不是一定就要進行同步。如果一個方法本來就不涉及共享數據,那它自然就無須任何同步措施去保證正確性。

1. 棧封閉

多個線程訪問同一個方法的局部變量時,不會出現線程安全問題,因為局部變量存儲在虛擬機棧中,屬於線程私有的。

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for (int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());
    executorService.execute(() -> example.add100());
    executorService.shutdown();
}
100
100

2. 線程本地存儲(Thread Local Storage)

如果一段代碼中所需要的數據必須與其他代碼共享,那就看看這些共享數據的代碼是否能保證在同一個線程中執行。如果能保證,我們就可以把共享數據的可見範圍限制在同一個線程之內,這樣,無須同步也能保證線程之間不出現數據爭用的問題。

符合這種特點的應用並不少見,大部分使用消費隊列的架構模式(如「生產者-消費者」模式)都會將產品的消費過程盡量在一個線程中消費完。其中最重要的一個應用實例就是經典 Web 交互模型中的「一個請求對應一個服務器線程」(Thread-per-Request)的處理方式,這種處理方式的廣泛應用使得很多 Web 服務端應用都可以使用線程本地存儲來解決線程安全問題。

可以使用 java.lang.ThreadLocal 類來實現線程本地存儲功能。

對於以下代碼,thread1 中設置 threadLocal 為 1,而 thread2 設置 threadLocal 為 2。過了一段時間之後,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。

public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}
1

為了理解 ThreadLocal,先看以下代碼:

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

它所對應的底層結構圖為:

img

每個 Thread 都有一個 ThreadLocal.ThreadLocalMap 對象。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

當調用一個 ThreadLocal 的 set(T value) 方法時,先得到當前線程的 ThreadLocalMap 對象,然後將 ThreadLocal->value 鍵值對插入到該 Map 中。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

get() 方法類似。

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

ThreadLocal 從理論上講並不是用來解決多線程並發問題的,因為根本不存在多線程競爭。

在一些場景 (尤其是使用線程池) 下,由於 ThreadLocal.ThreadLocalMap 的底層數據結構導致 ThreadLocal 有內存泄漏的情況,應該儘可能在每次使用 ThreadLocal 後手動調用 remove(),以避免出現 ThreadLocal 經典的內存泄漏甚至是造成自身業務混亂的風險。

3. 可重入代碼(Reentrant Code)

這種代碼也叫做純代碼(Pure Code),可以在代碼執行的任何時刻中斷它,轉而去執行另外一段代碼(包括遞歸調用它本身),而在控制權返回後,原來的程序不會出現任何錯誤。

可重入代碼有一些共同的特徵,例如不依賴存儲在堆上的數據和公用的系統資源、用到的狀態量都由參數中傳入、不調用非可重入的方法等。

線程不安全示例

如果多個線程對同一個共享數據進行訪問而不採取同步操作的話,那麼操作的結果是不一致的。

以下代碼演示了 1000 個線程同時對 cnt 執行自增操作,操作結束之後它的值有可能小於 1000。

public class ThreadUnsafeExample {

    private int cnt = 0;

    public void add() {
        cnt++;
    }

    public int get() {
        return cnt;
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    ThreadUnsafeExample example = new ThreadUnsafeExample();
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}
997

J.U.C – AQS

java.util.concurrent(J.U.C)大大提高了並發性能,AQS 被認為是 J.U.C 的核心。

CountDownLatch

用來控制一個或者多個線程等待多個線程。

維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些因為調用 await() 方法而在等待的線程就會被喚醒。

public class CountdownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 10;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("run..");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..run..run..run..run..run..run..run..end

CyclicBarrier

用來控制多個線程互相等待,只有當多個線程都到達時,這些線程才會繼續執行。

和 CountdownLatch 相似,都是通過維護計數器來實現的。線程執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有調用 await() 方法而在等待的線程才能繼續執行。

CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器通過調用 reset() 方法可以循環使用,所以它才叫做循環屏障。

CyclicBarrier 有兩個構造函數,其中 parties 指示計數器的初始值,barrierAction 在所有線程都到達屏障的時候會執行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
public class CyclicBarrierExample {

    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 類似於操作系統中的信號量,可以控制對互斥資源的訪問線程數。

以下代碼模擬了對某個服務的並發請求,每次只能有 3 個客戶端同時訪問,請求總數為 10。

public class SemaphoreExample {

    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
2 1 2 2 2 2 2 1 2 2

J.U.C – 其它組件

FutureTask

在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用於異步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那麼就可以用 FutureTask 來封裝這個任務,主線程在完成自己的任務之後再去獲取結果。

public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}
other task is running...
4950

BlockingQueue

java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現:

  • FIFO 隊列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
  • 優先級隊列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內容;如果隊列為滿 put() 將阻塞,直到隊列有空閑位置。

使用 BlockingQueue 實現生產者消費者問題

public class ProducerConsumer {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("produce..");
        }
    }

    private static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                String product = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("consume..");
        }
    }
}
public static void main(String[] args) {
    for (int i = 0; i < 2; i++) {
        Producer producer = new Producer();
        producer.start();
    }
    for (int i = 0; i < 5; i++) {
        Consumer consumer = new Consumer();
        consumer.start();
    }
    for (int i = 0; i < 3; i++) {
        Producer producer = new Producer();
        producer.start();
    }
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

ForkJoin

主要用於並行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務並行計算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任務足夠小則直接計算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任務
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的線程池,線程數量取決於 CPU 核數。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 實現了工作竊取算法來提高 CPU 的利用率。每個線程都維護了一個雙端隊列,用來存儲需要執行的任務。工作竊取算法允許空閑的線程從其它線程的雙端隊列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和隊列所屬線程發生競爭。例如下圖中,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。但是如果隊列中只有一個任務時還是會發生競爭。

多線程開發良好的實踐

  • 給線程起個有意義的名字,這樣可以方便找 Bug。
  • 縮小同步範圍,從而減少鎖爭用。例如對於 synchronized,應該盡量使用同步塊而不是同步方法。
  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 這些同步類簡化了編碼操作,而用 wait() 和 notify() 很難實現複雜控制流;其次,這些同步類是由最好的企業編寫和維護,在後續的 JDK 中還會不斷優化和完善。
  • 使用 BlockingQueue 實現生產者消費者問題。
  • 多用並發集合少用同步集合,例如應該使用 ConcurrentHashMap 而不是 Hashtable。
  • 使用本地變量和不可變類來保證線程安全。
  • 使用線程池而不是直接創建線程,這是因為創建線程代價很高,線程池可以有效地利用有限的線程來啟動任務。

本文發於://antoniopeng.com

Tags: