擼完這篇執行緒池,我快咳血了!

我們知道,執行緒需要的時候要進行創建,不需要的時候需要進行銷毀,但是執行緒的創建和銷毀都是一個開銷比較大的操作。

為什麼開銷大呢?

雖然我們程式設計師創建一個執行緒很容易,直接使用 new Thread() 創建就可以了,但是作業系統做的工作會多很多,它需要發出 系統調用,陷入內核,調用內核 API 創建執行緒,為執行緒分配資源等,這一些操作有很大的開銷。

所以,在高並發大流量的情況下,頻繁的創建和銷毀執行緒會大大拖慢響應速度,那麼有什麼能夠提高響應速度的方式嗎?方式有很多,盡量避免執行緒的創建和銷毀是一種提升性能的方式,也就是把執行緒 復用 起來,因為性能是我們日常最關注的因素。

本篇文章我們先來通過認識一下 Executor 框架、然後通過描述執行緒池的基本概念入手、逐步認識執行緒池的核心類,然後慢慢進入執行緒池的原理中,帶你一步一步理解執行緒池。

在 Java 中可以通過執行緒池來達到這樣的效果。今天我們就來詳細講解一下 Java 的執行緒池

Executor 框架

為什麼要先說一下 Executor 呢?因為我認為 Executor 是執行緒池的一個驅動,我們平常創建並執行執行緒用的一般都是 new Thread().start() 這個方法,這個方法更多強調 創建一個執行緒並開始運行。而我們後面講到創建執行緒池更多體現在驅動執行上。

Executor 的總體框架如下,我們下面會對 Executor 框架中的每個類進行介紹。

我們首先來認識一下 Executor

Executor 介面

Executor 是 java.util.concurrent 的頂級介面,這個介面只有一個方法,那就是 execute 方法。我們平常創建並啟動執行緒會使用 new Thread().start() ,而 Executor 中的 execute 方法替代了顯示創建執行緒的方式。Executor 的設計初衷就是將任務提交和任務執行細節進行解藕。使用 Executor 框架,你可以使用如下的方式創建執行緒

Executor executor = Executors.xxx // xxx 其實就是 Executor 的實現類,我們後面會說
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

execute方法接收一個 Runnable 實例,它用來執行一個任務,而任務就是一個實現了 Runnable 介面的類,但是 execute 方法不能接收實現了 Callable 介面的類,也就是說,execute 方法不能接收具有返回值的任務。

execute 方法創建的執行緒是非同步執行的,也就是說,你不用等待每個任務執行完畢後再執行下一個任務。

比如下面就是一個簡單的使用 Executor 創建並執行執行緒的示例

public class RunnableTask implements Runnable{

    @Override
    public void run() {
        System.out.println("running");
    }

    public static void main(String[] args) {
        Executor executor = Executors.newSingleThreadExecutor(); // 你可能不太理解這是什麼意思,我們後面會說。
        executor.execute(new RunnableTask());
    }
}

Executor 就相當於是族長,大佬只發號令,族長讓你非同步執行你就得非同步執行,族長說不用彙報任務你就不用回報,但是這個族長管的事情有點少,所以除了 Executor 之外,我們還需要認識其他管家,比如說管你這個執行緒啥時候終止,啥時候暫停,判斷你這個執行緒當前的狀態等,ExecutorService 就是一位大管家。

ExecutorService 介面

ExecutorService 也是一個介面,它是 Executor 的拓展,提供了一些 Executor 中沒有的方法,下面我們來介紹一下這些方法

void shutdown();

shutdown 方法調用後,ExecutorService 會有序關閉正在執行的任務,但是不接受新任務。如果任務已經關閉,那麼這個方法不會產生任何影響。

ExecutorService 還有一個和 shutdown 方法類似的方法是

List<Runnable> shutdownNow();

shutdownNow 會嘗試停止關閉所有正在執行的任務,停止正在等待的任務,並返回正在等待執行的任務列表。

既然 shutdown 和 shutdownNow 這麼相似,那麼二者有啥區別呢?

  • shutdown 方法只是會將執行緒池的狀態設置為 SHUTWDOWN ,正在執行的任務會繼續執行下去,執行緒池會等待任務的執行完畢,而沒有執行的執行緒則會中斷。
  • shutdownNow 方法會將執行緒池的狀態設置為 STOP,正在執行和等待的任務則被停止,返回等待執行的任務列表

ExecutorService 還有三個判斷執行緒狀態的方法,分別是

boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
  • isShutdown 方法表示執行器是否已經關閉,如果已經關閉,返回 true,否則返回 false。
  • isTerminated 方法表示判斷所有任務再關閉後是否已完成,如果完成返回 false,這個需要注意一點,除非首先調用 shutdown 或者 shutdownNow 方法,否則 isTerminated 方法永遠不會為 true。
  • awaitTermination 方法會阻塞,直到發出調用 shutdown 請求後所有的任務已經完成執行後才會解除。這個方法不是非常容易理解,下面通過一個小例子來看一下。
public static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws InterruptedException {
  for (int i = 0; i < 10; i++) {
    executorService.submit(() -> {
      System.out.println(Thread.currentThread().getName());
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });

  }

  executorService.shutdown();
  System.out.println("Waiting...");
  boolean isTermination = executorService.awaitTermination(3, TimeUnit.SECONDS);
  System.out.println("Waiting...Done");
  if(isTermination){
    System.out.println("All Thread Done");
  }
  System.out.println(Thread.currentThread().getName());
}

如果在調用 executorService.shutdown() 之後,所有執行緒完成任務,isTermination 返回 true,程式才會列印出 All Thread Done ,如果注釋掉 executorService.shutdown() 或者在任務沒有完成後 awaitTermination 就超時了,那麼 isTermination 就會返回 false。

ExecutorService 當大管家還有一個原因是因為它不僅能夠包容 Runnable 對象,還能夠接納 Callable 對象。在 ExecutorService 中,submit 方法扮演了這個角色。

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

submit 方法會返回一個 Future對象,<T> 表示範型,它是對 Callable 產生的返回值來說的,submit 方法提交的任務中的 call 方法如果返回 Integer,那麼 submit 方法就返回 Future<Integer>,依此類推。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

invokeAll 方法用於執行給定的任務結合,執行完成後會返回一個任務列表,任務列表每一項是一個任務,每個任務會包括任務狀態和執行結果,同樣 invokeAll 方法也會返回 Future 對象。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

invokeAny 會獲得最先完成任務的結果,即Callable<T> 介面中的 call 的返回值,在獲得結果時,會中斷其他正在執行的任務,具有阻塞性

大管家的職責相對於組長來說標準更多,管的事情也比較寬,但是大管家畢竟也是家族的中流砥柱,他不會做具體的活,他的下面有各個幹將,幹將是一個家族的核心,他負責完成大管家的工作。

AbstractExecutorService 抽象類

AbstractExecutorService 是一個抽象類,它實現了 ExecutorService 中的部分方法,它相當一個幹將,會分析大管家有哪些要做的工作,然後針對大管家的要求做一些具體的規劃,然後找他的得力助手 ThreadPoolExecutor 來完成目標。

AbstractExecutorService 這個抽象類主要實現了 invokeAllinvokeAny 方法,關於這兩個方法的源碼分析我們會在後面進行解釋。

ScheduledExecutorService 介面

ScheduledExecutorService 也是一個介面,它擴展了 ExecutorService 介面,提供了 ExecutorService 介面所沒有的功能,ScheduledExecutorService 顧名思義就是一個定時執行器,定時執行器可以安排命令在一定延遲時間後運行或者定期執行。

它主要有三個介面方法,一個重載方法。下面我們先來看一下這兩個重載方法。

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

schedule 方法能夠延遲一定時間後執行任務,並且只能執行一次。可以看到,schedule 方法也返回了一個 ScheduledFuture 對象,ScheduledFuture 對象擴展了 Future 和 Delayed 介面,它表示非同步延遲計算的結果。schedule 方法支援零延遲和負延遲,這兩類值都被視為立即執行任務。

還有一點需要說明的是,schedule 方法能夠接收相對的時間和周期作為參數,而不是固定的日期,你可以使用 date.getTime – System.currentTimeMillis() 來得到相對的時間間隔。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

scheduleAtFixedRate 表示任務會根據固定的速率在時間 initialDelay 後不斷地執行。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

這個方法和上面的方法很類似,它表示的是以固定延遲時間的方式來執行任務。

scheduleAtFixedRate 和 scheduleWithFixedDelay 這兩個方法容易混淆,下面我們通過一個示例來說明一下這兩個方法的區別。

public class ScheduleTest {

    public static void main(String[] args) {
        Runnable command = () -> {
            long startTime = System.currentTimeMillis();
            System.out.println("current timestamp = " + startTime);
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("time spend = " + (System.currentTimeMillis() - startTime));
        };

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
    }
}

輸出結果大致如下

可以看到,沒次列印出來 current timestamp 的時間間隔大約等於 1000 毫秒,所以可以斷定 scheduleAtFixedRate 是以恆定的速率來執行任務的。

然後我們再看一下 scheduleWithFixedDelay 方法,和上面測試類一樣,只不過我們把 scheduleAtFixedRate 換為了 scheduleWithFixedDelay 。

scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);

然後觀察一下輸出結果

可以看到,兩個 current timestamp 之間的間隔大約等於 1000(固定時間) + delay(time spend) 的總和,由此可以確定 scheduleWithFixedDelay 是以固定時延來執行的。

執行緒池的描述

下面我們先來認識一下什麼是執行緒池,執行緒池從概念上來看就是一個池子,什麼池子呢?是指管理同一組工作執行緒的池子,也就是說,執行緒池會統一管理內部的工作執行緒。

wiki 上說,執行緒池其實就是一種軟體設計模式,這種設計模式用於實現電腦程式中的並發。

![image-20210202200016478](/Users/mr.l/Library/Application Support/typora-user-images/image-20210202200016478.png)

比如下面就是一個簡單的執行緒池概念圖。

注意:這個圖只是一個概念模型,不是真正的執行緒池實現,希望讀者不要混淆。

可以看到,這種其實也相當於是生產者-消費者模型,任務隊列中的執行緒會進入到執行緒池中,由執行緒池進行管理,執行緒池中的一個個執行緒就是工作執行緒,工作執行緒執行完畢後會放入完成隊列中,代表已經完成的任務。

上圖有個缺點,那就是隊列中的執行緒執行完畢後就會銷毀,銷毀就會產生性能損耗,降低響應速度,而我們使用執行緒池的目的往往是需要把執行緒重用起來,提高程式性能。

所以我們應該把執行完成後的工作執行緒重新利用起來,等待下一次使用。

執行緒池創建

我們上面大概聊了一下什麼執行緒池的基本執行機制,你知道了執行緒是如何復用的,那麼任何事物不可能是憑空出現的,執行緒也一樣,那麼它是如何創建出來的呢?下面就不得不提一個工具類,那就是 Executors

Executors 也是java.util.concurrent 包下的成員,它是一個創建執行緒池的工廠,可以使用靜態工廠方法來創建執行緒池,下面就是 Executors 所能夠創建執行緒池的具體類型。

  • newFixedThreadPool:newFixedThreadPool 將會創建固定數量的執行緒池,這個數量可以由程式設計師通過創建 Executors.newFixedThreadPool(int nThreads)時手動指定,每次提交一個任務就會創建一個執行緒,在任何時候,nThreads 的值是最多允許活動的執行緒。如果在所有執行緒都處於活躍狀態時有額外的任務被創建,這些新創建的執行緒會進入等待隊列等待執行緒調度。如果有任何執行緒由於執行期間出現意外導致執行緒終止,那麼在執行後續任務時會使用等待隊列中的執行緒進行替代。

  • newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的執行緒池,它是基於 fork-join 機制的一種執行緒池實現,使用了 Work-Stealing 演算法。newWorkStealingPool 會創建足夠的執行緒來支援並行度,會使用多個隊列來減少競爭。work-stealing pool 執行緒池不會保證提交任務的執行順序。

  • newSingleThreadExecutor:newSingleThreadExecutor 是一個單執行緒的執行器,它只會創建單個執行緒來執行任務,如果這個執行緒異常結束,則會創建另外一個執行緒來替代。newSingleThreadExecutor 會確保任務在任務隊列中的執行次序,也就是說,任務的執行是 有序的

  • newCachedThreadPool:newCachedThreadPool 會根據實際需要創建一個可快取的執行緒池。如果執行緒池的執行緒數量超過實際需要處理的任務,那麼 newCachedThreadPool 將會回收多餘的執行緒。如果實際需要處理的執行緒不能滿足任務的數量,則回你添加新的執行緒到執行緒池中,執行緒池中執行緒的數量不存在任何限制。

  • newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很類似,只不過帶有 scheduled 的這個執行器哥們能夠在一定延遲後執行或者定期執行任務。

  • newScheduledThreadPool:這個執行緒池和上面的 scheduled 執行器類似,只不過 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一個 DelegatedScheduledExecutorService 代理,這其實包裝器設計模式的體現。

上面這些執行緒池的底層實現都是由 ThreadPoolExecutor 來提供支援的,所以要理解這些執行緒池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我們就來聊一聊 ThreadPoolExecutor。

ThreadPoolExecutor 類

ThreadPoolExecutor 位於 java.util.concurrent 工具類下,可以說它是執行緒池中最核心的一個類了。如果你要想把執行緒池理解透徹的話,就要首先了解一下這個類。

如果我們再拿上面家族舉例子的話,ThreadPoolExecutor 就是一個家族的骨幹人才,家族頂樑柱。ThreadPoolExecutor 做的工作真是太多太多了。

首先,ThreadPoolExecutor 提供了四個構造方法,然而前三個構造方法最終都會調用最後一個構造方法進行初始化

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
      // 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 			// 2
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 			// 3
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 			// 4
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

所以我們直接就來看一波最後這個執行緒池,看看參數都有啥,如果我沒數錯的話,應該是有 7 個參數(小學數學水平。。。。。。)

  • 首先,一個非常重要的參數就是 corePoolSize,核心執行緒池的容量/大小,你叫啥我覺得都沒毛病。只不過你得理解這個參數的意義,它和執行緒池的實現原理有非常密切的關係。你剛開始創建了一個執行緒池,此時是沒有任何執行緒的,這個很好理解,因為我現在沒有任務可以執行啊,創建執行緒幹啥啊?而且創建執行緒還有開銷啊,所以等到任務過來時再創建執行緒也不晚。但是!我要說但是了,如果調用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就會在沒有任務到來時創建執行緒,前者是創建 corePoolSize 個執行緒,後者是只創建一個執行緒。Lea 爺爺本來想讓我們程式設計師當個懶漢,等任務來了再干;可是你非要當個餓漢,提前完成任務。如果我們想當個懶漢的話,在創建了執行緒池後,執行緒池中的執行緒數為 0,當有任務來之後,就會創建一個執行緒去執行任務,當執行緒池中的執行緒數目達到 corePoolSize 後,就會把到達的任務放到快取隊列當中。

  • maximumPoolSize :又來一個執行緒池的容量,只不過這個是執行緒池的最大容量,也就是執行緒池所能容納最大的執行緒,而上面的 corePoolSize 只是核心執行緒容量。

我知道你此時會有疑問,那就是不知道如何核心執行緒的容量和執行緒最大容量的區別是吧?我們後面會解釋這點。

  • keepAliveTime:這個參數是執行緒池的保活機制,表示執行緒在沒有任務執行的情況下保持多久會終止。在默認情況下,這個參數只在執行緒數量大於 corePoolSize 時才會生效。當執行緒數量大於 corePoolSize 時,如果任意一個空閑的執行緒的等待時間 > keepAliveTime 後,那麼這個執行緒會被剔除,直到執行緒數量等於 corePoolSize 為止。如果調用了 allowCoreThreadTimeOut 方法,執行緒數量在 corePoolSize 範圍內也會生效,直到執行緒減為 0。

  • unit :這個參數好說,它就是一個 TimeUnit 的變數,unit 表示的是 keepAliveTime 的時間單位。unit 的類型有下面這幾種

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小時
    TimeUnit.MINUTES;           //分鐘
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //納秒
    
  • workQueue:這個參數表示的概念就是等待隊列,我們上面說過,如果核心執行緒 > corePoolSize 的話,就會把任務放入等待隊列,這個等待隊列的選擇也是一門學問。Lea 爺爺給我們展示了三種等待隊列的選擇

    • SynchronousQueue: 基於阻塞隊列(BlockingQueue)的實現,它會直接將任務交給消費者,必須等隊列中的添加元素被消費後才能繼續添加新的元素。使用 SynchronousQueue 阻塞隊列一般要求maximumPoolSizes 為無界,也就是 Integer.MAX_VALUE,避免執行緒拒絕執行操作。
    • LinkedBlockingQueue:LinkedBlockingQueue 是一個無界快取等待隊列。當前執行的執行緒數量達到 corePoolSize 的數量時,剩餘的元素會在阻塞隊列里等待。
    • ArrayBlockingQueue:ArrayBlockingQueue 是一個有界快取等待隊列,可以指定快取隊列的大小,當正在執行的執行緒數等於 corePoolSize 時,多餘的元素快取在 ArrayBlockingQueue 隊列中等待有空閑的執行緒時繼續執行,當 ArrayBlockingQueue 已滿時,加入 ArrayBlockingQueue 失敗,會開啟新的執行緒去執行,當執行緒數已經達到最大的 maximumPoolSizes 時,再有新的元素嘗試加入 ArrayBlockingQueue時會報錯
  • threadFactory:執行緒工廠,這個參數主要用來創建執行緒;

  • handler :拒絕策略,拒絕策略主要有以下取值

    • AbortPolicy:丟棄任務並拋出 RejectedExecutionException 異常。
    • DiscardPolicy: 直接丟棄任務,但是不拋出異常。
    • DiscardOldestPolicy:直接丟棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)。
    • CallerRunsPolicy:由調用執行緒處理該任務。

深入理解執行緒池

上面我和你簡單聊了一下執行緒池的基本構造,執行緒池有幾個非常重要的參數可以細細品味,但是哥們醒醒,接下來才是刺激的地方。

執行緒池狀態

首先我們先來聊聊執行緒池狀態,執行緒池狀態是一個非常有趣的設計點,ThreadPoolExecutor 使用 ctl 來存儲執行緒池狀態,這些狀態也叫做執行緒池的生命周期。想想也是,執行緒池作為一個存儲管理執行緒的資源池,它自己也要有這些狀態,以及狀態之間的變更才能更好的滿足我們的需求。ctl 其實就是一個 AtomicInteger 類型的變數,保證原子性

ctl 除了存儲執行緒池狀態之外,它還存儲 workerCount 這個概念,workerCount 指示的是有效執行緒數,workerCount 表示的是已經被允許啟動但不允許停止的工作執行緒數量。workerCount 的值與實際活動執行緒的數量不同。

ctl 高低位來判斷是執行緒池狀態還是工作執行緒數量,執行緒池狀態位於高位

這裡有個設計點,為什麼使用 AtomicInteger 而不是存儲上線更大的 AtomicLong 之類的呢?

Lea 並非沒有考慮過這個問題,為了表示 int 值,目前 workerCount 的大小是(2 ^ 29)-1(約 5 億個執行緒),而不是(2 ^ 31)-1(20億個)可表示的執行緒。如果將來有問題,可以將該變數更改為 AtomicLong。但是在需要之前,使用 int 可以使此程式碼更快,更簡單,int 存儲佔用存儲空間更小。

runState 具有如下幾種狀態

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

我們先上狀態輪轉圖,然後根據狀態輪轉圖做詳細的解釋。

這幾種狀態的解釋如下

  • RUNNING: 如果執行緒池處於 RUNNING 狀態下的話,能夠接收新任務,也能處理正在運行的任務。可以從 ctl 的初始化得知,執行緒池一旦創建出來就會處於 RUNNING 狀態,並且執行緒池中的有效執行緒數為 0。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • SHUTDOWN: 在調用 shutdown 方法後,執行緒池的狀態會由 RUNNING -> SHUTDOWN 狀態,位於 SHUTDOWN 狀態的執行緒池能夠處理正在運行的任務,但是不能接受新的任務,這和我們上面說的對與 shutdown 的描述一致。
  • STOP: 和 shutdown 方法類似,在調用 shutdownNow 方法時,程式會從 RUNNING/SHUTDOWN -> STOP 狀態,處於 STOP 狀態的執行緒池,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
  • TIDYING:TIDYING 狀態有個前置條件,分為兩種:一種是是當執行緒池位於 SHUTDOWN 狀態下,阻塞隊列和執行緒池中的執行緒數量為空時,會由 SHUTDOWN -> TIDYING;另一種是當執行緒池位於 STOP 狀態下時,執行緒池中的數量為空時,會由 STOP -> TIDYING 狀態。轉換為 TIDYING 的執行緒池會調用 terminated這個鉤子方法,terminated 在 ThreadPoolExecutor 類中是空實現,若用戶想在執行緒池變為 TIDYING 時,進行相應的處理,可以通過重載 terminated 函數來實現。
  • TERMINATED:TERMINATED 狀態是執行緒池的最後一個狀態,執行緒池處在 TIDYING 狀態時,執行完terminated 方法之後,就會由 TIDYING -> TERMINATED 狀態。此時表示執行緒池的徹底終止。

重要變數

下面我們一起來了解一下執行緒池中的重要變數。

private final BlockingQueue<Runnable> workQueue;

阻塞隊列,這個和我們上面說的阻塞隊列的參數是一個意思,因為在構造 ThreadPoolExecutor 時,會把參數的值賦給 this.workQueue。

private final ReentrantLock mainLock = new ReentrantLock(); 

執行緒池的主要狀態鎖,對執行緒池的狀態(比如執行緒池大小、運行狀態)的改變都需要使用到這個鎖

private final HashSet<Worker> workers = new HashSet<Worker>();

workers 持有執行緒池中所有執行緒的集合,只有持有上面 mainLock 的鎖才能夠訪問。

private final Condition termination = mainLock.newCondition();

等待條件,用來支援 awaitTermination 方法。Condition 和 Lock 一起使用可以實現通知/等待機制。

private int largestPoolSize;

largestPoolSize 表示執行緒池中最大池的大小,只有持有 mainLock 才能訪問

private long completedTaskCount;

completedTaskCount 表示任務完成的計數,它僅僅在任務終止時更新,需要持有 mainLock 才能訪問。

private volatile ThreadFactory threadFactory;

threadFactory 是創建執行緒的工廠,所有的執行緒都會使用這個工廠,調用 addWorker 方法創建。

private volatile RejectedExecutionHandler handler;

handler 表示拒絕策略,handler 會在執行緒飽和或者將要關閉的時候調用。

private volatile long keepAliveTime;

保活時間,它指的是空閑執行緒等待工作的超時時間,當存在多個 corePoolSize 或 allowCoreThreadTimeOut 時,執行緒將使用這個超時時間。

下面是一些其他變數,這些變數比較簡單,我就直接給出注釋了。

private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心執行緒設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即執行緒池中的執行緒數目大於這個參數時,提交的任務會被放進任務快取隊列)
private volatile int   maximumPoolSize;   //執行緒池最大能容忍的執行緒數
private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy(); // 默認的拒絕策略

任務提交

現在我們知道了 ThreadPoolExecutor 創建出來就會處於運行狀態,此時執行緒數量為 0 ,等任務到來時,執行緒池就會創建執行緒來執行任務,而下面我們的關注點就會放在任務提交這個過程上。

通常情況下,我們會使用

executor.execute() 

來執行任務,我在很多書和部落格教程上都看到過這個執行過程,下面是一些書和部落格教程所畫的 ThreadPoolExecutor 的執行示意圖和執行流程圖

執行示意圖

處理流程圖

ThreadPoolExecutor 的執行 execute 的方法分為下面四種情況

  1. 如果當前運行的工作執行緒少於 corePoolSize 的話,那麼會創建新執行緒來執行任務 ,這一步需要獲取 mainLock 全局鎖
  2. 如果運行執行緒不小於 corePoolSize,則將任務加入 BlockingQueue 阻塞隊列。
  3. 如果無法將任務加入 BlockingQueue 中,此時的現象就是隊列已滿,此時需要創建新的執行緒來處理任務,這一步同樣需呀獲取 mainLock 全局鎖。
  4. 如果創建新執行緒會使當前運行的執行緒超過 maximumPoolSize 的話,任務將被拒絕,並且使用 RejectedExecutionHandler.rejectEExecution() 方法拒絕新的任務。

ThreadPoolExecutor 採取上面的整體設計思路,是為了在執行 execute 方法時,避免獲取全局鎖,因為頻繁獲取全局鎖會是一個嚴重的可伸縮瓶頸,所以,幾乎所有的 execute 方法調用都是通過執行步驟2。

上面指出了 execute 的運行過程,整體上來說這個執行過程把非常重要的點講解出來了,但是不夠細緻,我查閱 ThreadPoolExecute 和部分源碼分析文章後,發現這事其實沒這麼簡單,先來看一下 execute 的源碼,我已經給出了中文注釋

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  // 獲取 ctl 的值
  int c = ctl.get();
  // 判斷 ctl 的值是否小於核心執行緒池的數量
  if (workerCountOf(c) < corePoolSize) {
    // 如果小於,增加工作隊列,command 就是一個個的任務
    if (addWorker(command, true))
      // 執行緒創建成功,直接返回
      return;
    // 執行緒添加不成功,需要再次判斷,每需要一次判斷都會獲取 ctl 的值
    c = ctl.get();
  }
  // 如果執行緒池處於運行狀態並且能夠成功的放入阻塞隊列
  if (isRunning(c) && workQueue.offer(command)) {
    // 再次進行檢查
    int recheck = ctl.get();
    // 如果不是運行態並且成功的從阻塞隊列中刪除
    if (! isRunning(recheck) && remove(command))
      // 執行拒絕策略
      reject(command);
    // worker 執行緒數量是否為 0
    else if (workerCountOf(recheck) == 0)
      // 增加工作執行緒
      addWorker(null, false);
  }
  // 如果不能增加工作執行緒的數量,就會直接執行拒絕策略
  else if (!addWorker(command, false))
    reject(command);
}

下面是我根據源碼畫出的執行流程圖

下面我們針對 execute 流程進行分析,可能有點啰嗦,因為幾個核心流程上面已經提過了,不過為了流程的完整性,我們再在這裡重新提一下。

  1. 如果執行緒池的核心數量少於 corePoolSize,那麼就會使用 addWorker 創建新執行緒,addworker 的流程我們會在下面進行分析。如果創建成功,那麼 execute 方法會直接返回。如果沒創建成功,可能是由於執行緒池已經 shutdown,可能是由於並發情況下 workerCountOf(c) < corePoolSize ,別的執行緒先創建了 worker 執行緒,導致 workerCoun t>= corePoolSize。
  2. 如果執行緒池還在 Running 狀態,會將 task 加入阻塞隊列,加入成功後會進行 double-check 雙重校驗,繼續下面的步驟,如果加入失敗,可能是由於隊列執行緒已滿,此時會判斷是否能夠加入執行緒池中,如果執行緒池也滿了的話,就會直接執行拒絕策略,如果執行緒池能加入,execute 方法結束。
  3. 步驟 2 中的 double-check 主要是為了判斷進入 workQueue 中的 task 是否能被執行:如果執行緒池已經不是 Running 狀態,則應該拒絕添加任務,從 workQueue 隊列中刪除任務。如果執行緒池是 Running,但是從 workQueue 中刪除失敗了,此時的原因可能是由於其他執行緒執行了這個任務,此時會直接執行拒絕策略。
  4. 如果執行緒是 Running 狀態,並且不能把任務從隊列中移除,進而判斷工作執行緒是否為 0 ,如果不為 0 ,execute 執行完畢,如果工作執行緒是 0 ,則會使用 addWorker 增加工作執行緒,execute 執行完畢。

添加 worker 執行緒

從上面的執行流程可以看出,添加一個 worker 涉及的工作也非常多,這也是一個比價難啃的點,我們一起來分析下,這是 worker 的源碼

private boolean addWorker(Runnable firstTask, boolean core) {
  // retry 的用法相當於 goto
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 僅在必要時檢查隊列是否為空。
    // 執行緒池狀態有五種,state 越小越是運行狀態
    // rs >= SHUTDOWN,表示此時執行緒池狀態可能是 SHUTDOWN、STOP、TIDYING、TERMINATED
    // 默認 rs >= SHUTDOWN,如果 rs = SHUTDOWN,直接返回 false
    // 默認 rs < SHUTDOWN,是 RUNNING,如果任務不是空,返回 false
    // 默認 RUNNING,任務是空,如果工作隊列為空,返回 false
    //
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;


    // 執行循環
    for (;;) {
      // 統計工作執行緒數量
      int wc = workerCountOf(c);
      // 如果 worker 數量>執行緒池最大上限 CAPACITY(即使用int低29位可以容納的最大值)
      // 或者 worker數量 > corePoolSize 或 worker數量>maximumPoolSize ),即已經超過了給定的邊界
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;

      // 使用 CAS 增加 worker 數量,增加成功,跳出循環。
      if (compareAndIncrementWorkerCount(c))
        break retry;

      // 檢查 ctl
      c = ctl.get();  // Re-read ctl
      // 如果狀態不等於之前獲取的 state,跳出內層循環,繼續去外層循環判斷
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  /*
          worker數量+1成功的後續操作
        * 添加到 workers Set 集合,並啟動 worker 執行緒
         */
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 包裝 Runnable 對象
    // 設置 firstTask 的值為 -1
    // 賦值給當前任務
    // 使用 worker 自身這個 runnable,調用 ThreadFactory 創建一個執行緒,並設置給worker的成員變數thread
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // 在持有鎖的時候重新檢查
        // 如果 ThreadFactory 失敗或在獲得鎖之前關閉,請回退。
        int rs = runStateOf(ctl.get());

        //如果執行緒池在運行 running<shutdown 或者 執行緒池已經 shutdown,且firstTask==null
        // (可能是 workQueue 中仍有未執行完成的任務,創建沒有初始任務的 worker 執行緒執行)
        //worker 數量 -1 的操作在 addWorkerFailed()
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();

          // workers 就是一個 HashSet 集合
          workers.add(w);

          // 設置最大的池大小 largestPoolSize,workerAdded 設置為true
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();
        workerStarted = true;
      }
    }
    //如果啟動執行緒失敗
    // worker 數量 -1
  } finally {
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

真長的一個方法,有點想吐血,其實我肝到現在已經肝不動了,但我一想到看這篇文章的讀者們能給我一個關注,就算咳出一口老血也值了。

這個方法的執行流程圖如下

這裡我們就不再文字描述了,但是上面流程圖中有一個對象引起了我的注意,那就是 worker 對象,這個對象就代表了執行緒池中的工作執行緒,那麼這個 worker 對象到底是啥呢?

worker 對象

Worker 位於 ThreadPoolExecutor 內部,它繼承了 AQS 類並且實現了 Runnable 介面。Worker 類主要維護了執行緒運行過程中的中斷控制狀態。它提供了鎖的獲取和釋放操作。在 worker 的實現中,我們使用了非重入的互斥鎖而不是使用重複鎖,因為 Lea 覺得我們不應該在調用諸如 setCorePoolSize 之類的控制方法時能夠重新獲取鎖。

worker 對象的源碼比較簡單和標準,這裡我們只說一下 worker 對象的構造方法,也就是

Worker(Runnable firstTask) {
  setState(-1); 
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}

構造一個 worker 對象需要做三步操作:

  • 初始 AQS 狀態為 -1,此時不允許中斷 interrupt(),只有在 worker 執行緒啟動了,執行了 runWorker() 方法後,將 state 置為0,才能進行中斷。
  • 將 firstTask 賦值給為當前類的全局變數
  • 通過 ThreadFactory 創建一個新的執行緒。

任務運行

我們前面的流程主要分析了執行緒池的 execute 方法的執行過程,這個執行過程相當於是任務提交過程,而我們下面要說的是從隊列中獲取任務並運行的這個工作流程。

一般情況下,我們會從初始任務開始運行,所以我們不需要獲取第一個任務。否則,只要執行緒池還處於 Running 狀態,我們會調用 getTask 方法獲取任務。getTask 方法可能會返回 null,此時可能是由於執行緒池狀態改變或者是配置參數更改而導致的退出。還有一種情況可能是由於 異常 而引發的,這個我們後面會細說。

下面來看一下 runWorker 方法的源碼:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  // 允許打斷
  //  new Worker() 是 state==-1,此處是調用 Worker 類的 tryRelease() 方法,
  //  將 state 置為0
  w.unlock();
  boolean completedAbruptly = true;
  try {
    // 調用 getTask() 獲取任務
    while (task != null || (task = getTask()) != null) {
      // 獲取全局鎖
      w.lock();
      // 確保只有在執行緒 STOPING 時,才會被設置中斷標誌,否則清除中斷標誌。
      // 如果一開始判斷執行緒池狀態 < STOPING,但 Thread.interrupted() 為 true,
      // 即執行緒已經被中斷,又清除了中斷標示,再次判斷執行緒池狀態是否 >= stop
      // 是,再次設置中斷標示,wt.interrupt()
      // 否,不做操作,清除中斷標示後進行後續步驟
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 執行前需要調用的方法,交給程式設計師自己來實現
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 執行後需要調用的方法,交給程式設計師自己來實現
          afterExecute(task, thrown);
        }
      } finally {
        // 把 task 置為 null,完成任務數 + 1,並進行解鎖
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
    // 最後處理 worker 的退出
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

下面是 runWorker 的執行流程圖

這裡需要注意一下最後的 processWorkerExit 方法,這裡面其實也做了很多事情,包括判斷 completedAbruptly 的布爾值來表示是否完成任務,獲取鎖,嘗試從隊列中移除 worker,然後嘗試中斷,接下來會判斷一下中斷狀態,在執行緒池當前狀態小於 STOP 的情況下會創建一個新的 worker 來替換被銷毀的 worker。

任務獲取

任務獲取就是 getTask 方法的執行過程,這個環節主要用來獲取任務和剔除任務。下面進入源碼分析環節

private Runnable getTask() {
  // 判斷最後一個 poll 是否超時。
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 必要時檢查隊列是否為空
    // 對執行緒池狀態的判斷,兩種情況會 workerCount-1,並且返回 null
    // 執行緒池狀態為 shutdown,且 workQueue 為空(反映了 shutdown 狀態的執行緒池還是要執行 workQueue 中剩餘的任務的)
    // 執行緒池狀態為 stop(shutdownNow() 會導致變成 STOP)(此時不用考慮 workQueue 的情況)
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    // 是否需要定時從 workQueue 中獲取
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    // 如果工作執行緒的數量大於 maximumPoolSize 會進行執行緒剔除
    // 如果使用了 allowCoreThreadTimeOut ,並且工作執行緒不為0或者隊列有任務的話,會直接進行執行緒剔除
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }
		
    try {
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

getTask 方法的執行流程圖如下

工作執行緒退出

工作執行緒退出是 runWorker 的最後一步,這一步會判斷工作執行緒是否突然終止,並且會嘗試終止執行緒,以及是否需要增加執行緒來替換原工作執行緒。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // worker數量 -1
  // completedAbruptly 是 true,突然終止,說明是 task 執行時異常情況導致,即run()方法執行時發生了異常,那麼正在工作的 worker 執行緒數量需要-1
  // completedAbruptly 是 false 是突然終止,說明是 worker 執行緒沒有 task 可執行了,不用-1,因為已經在 getTask() 方法中-1了
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

  // 從 Workers Set 中移除 worker
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  // 嘗試終止執行緒,
  tryTerminate();

  // 是否需要增加 worker 執行緒
  // 執行緒池狀態是 running 或 shutdown
  // 如果當前執行緒是突然終止的,addWorker()
  // 如果當前執行緒不是突然終止的,但當前執行緒數量 < 要維護的執行緒數量,addWorker()
  // 故如果調用執行緒池 shutdown(),直到workQueue為空前,執行緒池都會維持 corePoolSize 個執行緒,
  // 然後再逐漸銷毀這 corePoolSize 個執行緒
  int c = ctl.get();
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}

源碼搞的有點頭大了,可能一時半會無法理解上面這些源碼,不過你可以先把注釋粘過去,等有時間了需要反覆刺激,加深印象!

其他執行緒池

下面我們來了解一下其他執行緒池的構造原理,主要涉及 FixedThreadPool、SingleThreadExecutor、CachedThreadPool

newFixedThreadPool

newFixedThreadPool 被稱為可重用固定執行緒數的執行緒池,下面是 newFixedThreadPool 的源碼

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

可以看到,newFixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設置為創建 FixedThreadPool 時指定的參數 nThreads,也就是說,在 newFiexedThreadPool 中,核心執行緒數就是最大執行緒數。

下面是 newFixedThreadPool 的執行示意圖

newFixedThreadPool 的工作流程如下

  • 如果當前運行的執行緒數少於 corePoolSize,則會創建新執行緒 addworker 來執行任務
  • 如果當前執行緒的執行緒數等於 corePoolSize,會將任務直接加入到 LinkedBlockingQueue 無界阻塞隊列中,LinkedBlockingQueue 的上限如果沒有制定,默認為 Integer.MAX_VALUE 大小。
  • 等到執行緒池中的任務執行完畢後,newFixedThreadPool 會反覆從 LinkedBlockingQueue 中獲取任務來執行。

相較於 ThreadPoolExecutor,newFixedThreadPool 主要做了以下改變

  • 核心執行緒數等於最大執行緒數,因此 newFixedThreadPool 只有兩個最大容量,一個是執行緒池的執行緒容量,還有一個是 LinkedBlockingQueue 無界阻塞隊列的執行緒容量。

  • 這裡可以看到還有一個變化是 0L,也就是 keepAliveTime = 0L,keepAliveTime 就是到達工作執行緒最大容量後的執行緒等待時間,0L 就意味著當執行緒池中的執行緒數大於 corePoolsize 時,空餘的執行緒會被立即終止。

  • 由於使用無界隊列,運行中的 newFixedThreadPool 不會拒絕任務,也就是不會調用 RejectedExecutionHandler.rejectedExecution 方法。

newSingleThreadExecutor

newSingleThreadExecutor 中只有單個工作執行緒,也就是說它是一個只有單個 worker 的 Executor。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(),
                            threadFactory));
}

可以看到,在 newSingleThreadExecutor 中,corePoolSize 和 maximumPoolSize 都被設置為 1,也不存在超時情況,同樣使用了 LinkedBlockingQueue 無界阻塞隊列,除了 corePoolSize 和 maximumPoolSize 外,其他幾乎和 newFixedThreadPool 一模一樣。

下面是 newSingleThreadExecutor 的執行示意圖

newSingleThreadExecutor 的執行過程和 newFixedThreadPool 相同,只是 newSingleThreadExecutor 的工作執行緒數為 1。

newCachedThreadPool

newCachedThreadPool 是一個根據需要創建工作執行緒的執行緒池,newCachedThreadPool 執行緒池最大數量是 Integer.MAX_VALUE,保活時間是 60 秒,使用的是SynchronousQueue 無緩衝阻塞隊列。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>(),
                                threadFactory);
}

它的執行示意圖如下

  • 首先會先執行 SynchronousQueue.offer 方法,如果當前 maximumPool 中有空閑執行緒正在執行 SynchronousQueue.poll ,就會把任務交給空閑執行緒來執行,execute 方法執行完畢,否則的話,繼續向下執行。
  • 如果 maximumPool 中沒有執行緒執行 SynchronousQueue.poll 方法,這種情況下 newCachedThreadPool 會創建一個新執行緒執行任務,execute 方法執行完成。
  • 執行完成的執行緒將執行 poll 操作,這個 poll 操作會讓空閑執行緒最多在 SynchronousQueue 中等待 60 秒鐘。如果 60 秒鐘內提交了一個新任務,那麼空閑執行緒會執行這個新提交的任務,否則空閑執行緒將會終止。

這裡的關鍵點在於 SynchronousQueue 隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個執行緒對應的移除操作。這其實就是一種任務傳遞,如下圖所示

其實還有一個執行緒池 ScheduledThreadPoolExecutor ,就先不在此篇文章做詳細贅述了。

執行緒池實踐考量因素

下面介紹幾種在實踐過程中使用執行緒池需要考慮的幾個點

  • 避免任務堆積,比如我們上面提到的 newFixedThreadPool,它是創建指定數目的執行緒,但是工作隊列是無界的,這就導致如果工作隊列執行緒太少,導致處理速度跟不上入隊速度,這種情況下很可能會導致 OOM,診斷時可以使用 jmap 檢查是否有大量任務入隊。
  • 生產實踐中很可能由於邏輯不嚴謹或者工作執行緒不能及時釋放導致 執行緒泄漏,這個時候最好檢查一下執行緒棧
  • 避免死鎖等同步問題
  • 盡量避免在使用執行緒池時操作 ThreadLocal,因為工作執行緒的生命周期可能會超過任務的生命周期。

執行緒池大小的設置

執行緒池大小的設置也是面試官經常會考到的一個點,一般需要根據任務類型來配置執行緒池大小

  • 如果是 CPU 密集型任務,那麼就意味著 CPU 是稀缺資源,這個時候我們通常不能通過增加執行緒數來提高計算能力,因為執行緒數量太多,會導致頻繁的上下文切換,一般這種情況下,建議合理的執行緒數值是 N(CPU)數 + 1
  • 如果是 I/O 密集型任務,就說明需要較多的等待,這個時候可以參考 Brain Goetz 的推薦方法 執行緒數 = CPU核數 × (1 + 平均等待時間/平均工作時間)。參考值可以是 N(CPU) 核數 * 2。

當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將執行緒池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

後記

這篇文章真的寫了很久,因為之前對執行緒池認識不是很深,所以花了大力氣來研究,希望這篇文章對你有所幫助。

另外,添加我的微信 becomecxuan,加入每日一題群,每天一道面試題分享,更多內容請參見我的 Github,成為最好的 bestJavaer,已經收錄此篇文章,詳情見原文鏈接

我自己肝了六本 PDF,微信搜索「程式設計師cxuan」關注公眾號後,在後台回復 cxuan ,領取全部 PDF,這些 PDF 如下

六本 PDF 鏈接