ThreadPoolExcutor 原理探究

概論

執行緒池(英語:thread pool):一種執行緒使用模式。執行緒過多會帶來調度開銷,進而影響快取局部性和整體性能。而執行緒池維護著多個執行緒,等待著監督管理者分配可並發執行的任務。這避免了在處理短時間任務時創建與銷毀執行緒的代價。執行緒池不僅能夠保證內核的充分利用,還能防止過分調度。可用執行緒數量應該取決於可用的並發處理器、處理器內核、記憶體、網路 sockets 等的數量。 例如,執行緒數一般取 cpu 數量 +2 比較合適,執行緒數過多會導致額外的執行緒切換開銷。

Java 中的執行緒池是用 ThreadPoolExecutor 類來實現的. 本文就對該類的源碼來分析一下這個類內部對於執行緒的創建, 管理以及後台任務的調度等方面的執行原理。

先看一下執行緒池的類圖:

執行緒池的類圖

上圖的目的主要是為了讓大家知道執行緒池相關類之間的關係,至少賺個眼熟,以後看到不會有害怕的感覺。


 

Executor 框架介面

Executor 框架是一個根據一組執行策略調用,調度,執行和控制的非同步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。

下面是 ThreadPoolExeCutor 類圖。Executors 其實是一個工具類,裡面提供了好多靜態方法,這些方法根據用戶選擇返回不同的執行緒實例。

從上圖也可以看出來,ThreadPoolExeCutor 是執行緒池的核心。

J.U.C 中有三個 Executor 介面:

  • Executor:一個運行新任務的簡單介面;

  • ExecutorService:擴展了 Executor 介面。添加了一些用來管理執行器生命周期和任務生命周期的方法;

  • ScheduledExecutorService:擴展了 ExecutorService。支援 Future 和定期執行任務。

其實通過這些介面就可以看到一些設計思想,每個介面的名字和其任務是完全匹配的。不會因為 Executor 中只有一個方法,就將其放到其他介面中。這也是很重要的單一原則。


 

ThreadPoolExeCutor 分析

在去具體分析 ThreadPoolExeCutor 運行邏輯前,先看下面的流程圖:

該圖是 ThreadPoolExeCutor 整個運行過程的一個概括,整個源碼的核心邏輯總結起來就是:

  1. 創建執行緒:要知道如何去創建執行緒,控制執行緒數量,執行緒的存活與銷毀;

  2. 添加任務:任務添加後如何處理,是立刻執行,還是先保存;

  3. 執行任務:如何獲取任務,任務執行失敗後如何處理?

下面將進入源碼分析,來深入理解 ThreadPoolExeCutor 的設計思想。


 

構造函數

先來看構造函數:

    public ThreadPoolExecutor(int corePoolSize,                                int maximumPoolSize,                                long keepAliveTime,                                TimeUnit unit,                                BlockingQueue<Runnable> workQueue,                                ThreadFactory threadFactory,                                RejectedExecutionHandler handler) {          if (corePoolSize < 0 ||              maximumPoolSize <= 0 ||              maximumPoolSize < corePoolSize ||              keepAliveTime < 0)              throw new IllegalArgumentException();
     // 注意 workQueue, threadFactory, handler 是不可以為null 的,為空會直接拋出錯誤
if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

  1. corePoolSize 核心執行緒數表示核心執行緒池的大小。當提交一個任務時,如果當前核心執行緒池的執行緒個數沒有達到 corePoolSize,則會創建新的執行緒來執行所提交的任務,即使當前核心執行緒池有空閑的執行緒。如果當前核心執行緒池的執行緒個數已經達到了corePoolSize,則不再重新創建執行緒。如果調用了 prestartCoreThread() 或者 prestartAllCoreThreads(),執行緒池創建的時候所有的核心執行緒都會被創建並且啟動。若 corePoolSize == 0,則任務執行完之後,沒有任何請求進入時,銷毀執行緒池的執行緒。若 corePoolSize > 0,即使本地任務執行完畢,核心執行緒也不會被銷毀。corePoolSize 其實可以理解為可保留的空閑執行緒數。

  2. maximumPoolSize: 表示執行緒池能夠容納同時執行的最大執行緒數。如果當阻塞隊列已滿時,並且當前執行緒池執行緒個數沒有超過 maximumPoolSize 的話,就會創建新的執行緒來執行任務。注意 maximumPoolSize >= 1 必須大於等於 1。maximumPoolSize == corePoolSize ,即是固定大小執行緒池。實際上最大容量是由 CAPACITY 控制

  3. keepAliveTime: 執行緒空閑時間。當空閑時間達到 keepAliveTime值時,執行緒會被銷毀,直到只剩下 corePoolSize 個執行緒為止,避免浪費記憶體和句柄資源。默認情況,當執行緒池的執行緒數 > corePoolSize 時,keepAliveTime 才會起作用。但當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 變數設置為 true 時,核心執行緒超時後會被回收。

  4. unit時間單位。為 keepAliveTime 指定時間單位。

  5. workQueue 快取隊列。當請求的執行緒數 > maximumPoolSize時,執行緒進入 BlockingQueue 阻塞隊列。可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。

  6. threadFactory 創建執行緒的工程類。可以通過指定執行緒工廠為每個創建出來的執行緒設置更有意義的名字,如果出現並發問題,也方便查找問題原因。

  7. handler 執行拒絕策略的對象。當執行緒池的阻塞隊列已滿和指定的執行緒都已經開啟,說明當前執行緒池已經處於飽和狀態了,那麼就需要採用一種策略來處理這種情況。採用的策略有這幾種:
    • AbortPolicy: 直接拒絕所提交的任務,並拋出 RejectedExecutionException 異常;

    • CallerRunsPolicy:只用調用者所在的執行緒來執行任務;

    • DiscardPolicy:不處理直接丟棄掉任務;

    • DiscardOldestPolicy:丟棄掉阻塞隊列中存放時間最久的任務,執行當前任務


屬性定義

看完構造函數之後,再來看下該類裡面的變數,有助於進一步理解整個程式碼運行邏輯,下面是一些比較重要的變數:

// 用來標記執行緒池狀態(高3位),執行緒個數(低29位)  // 默認是 RUNNING 狀態,執行緒個數為0  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    // 執行緒個數掩碼位數,整型最大位數-3,可以適用於不同平台  private static final int COUNT_BITS = Integer.SIZE - 3;    //執行緒最大個數(低29位)00011111111111111111111111111111  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //(高3位):11100000000000000000000000000000  private static final int RUNNING    = -1 << COUNT_BITS;    //(高3位):00000000000000000000000000000000  private static final int SHUTDOWN   =  0 << COUNT_BITS;    //(高3位):00100000000000000000000000000000  private static final int STOP       =  1 << COUNT_BITS;    //(高3位):01000000000000000000000000000000  private static final int TIDYING    =  2 << COUNT_BITS;    //(高3位):01100000000000000000000000000000  private static final int TERMINATED =  3 << COUNT_BITS;    // 獲取高三位 運行狀態  private static int runStateOf(int c)     { return c & ~CAPACITY; }    //獲取低29位 執行緒個數  private static int workerCountOf(int c)  { return c & CAPACITY; }    //計算ctl新值,執行緒狀態 與 執行緒個數  private static int ctlOf(int rs, int wc) { return rs | wc; }

這裡需要對一些操作做些解釋。 

  • Integer.SIZE:對於不同平台,其位數不一樣,目前常見的是 32 位;

  • (1 << COUNT_BITS) – 1:首先是將 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其餘都是 0;-1 操作則是將後面前面的 COUNT_BITS 位都變成 1。

  • -1 << COUNT_BITS:-1 的原碼是 10000000 00000000 00000000 00000001 ,反碼是 111111111 11111111 11111111 11111110 ,補碼 +1,然後左移 29 位是 11100000 00000000 00000000 00000000;這裡轉為十進位是負數。

  • ~CAPACITY取反,最高三位是1;

總結:這裡巧妙利用 bit 操作來將執行緒數量和運行狀態聯繫在一起,減少了變數的存在和記憶體的佔用。其中五種狀態的十進位排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED


 

執行緒池狀態

執行緒池狀態含義:

  • RUNNING:接受新任務並且處理阻塞隊列里的任務;

  • SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務;

  • STOP:拒絕新任務並且拋棄阻塞隊列里的任務同時會中斷正在處理的任務;

  • TIDYING:所有任務都執行完(包含阻塞隊列裡面任務)當前執行緒池活動執行緒為 0,將要調用 terminated 方法

  • TERMINATED:終止狀態。terminated 方法調用完成以後的狀態;

執行緒池狀態轉換:

  • RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它裡面調用了shutdown()方法。

  • RUNNING or SHUTDOWN)-> STOP:顯式 shutdownNow() 方法;

  • SHUTDOWN -> TIDYING:當執行緒池和任務隊列都為空的時候;

  • STOP -> TIDYING:當執行緒池為空的時候;

  • TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候;


 原碼,反碼,補碼知識小劇場:

1. 原碼:原碼就是符號位加上真值的絕對值, 即用第一位表示符號,其餘位表示值. 比如如果是 8 位二進位:

[+1] = 0000 0001

[-1] = 1000 0001

負數原碼第一位是符號位. 

 

2. 反碼:反碼的表示方法是,正數的反碼是其本身,負數的反碼是在其原碼的基礎上, 符號位不變,其餘各個位取反.

[+1] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110]

 

3. 補碼:補碼的表示方法是,正數的補碼就是其本身,負數的補碼是在其原碼的基礎上, 符號位不變, 其餘各位取反, 最後 +1. (即在反碼的基礎上 +1)

[+1] = [0000 0001] = [0000 0001] = [0000 0001]

[-1] = [1000 0001] = [1111 1110] = [1111 1111]

4. 總結
在知道一個數原碼的情況下:
正數:反碼,補碼 就是本身自己
負數:反碼是高位符號位不變,其餘位取反。補碼:反碼+1

 

 5. 左移:當數值左、右移時,先將數值轉化為其補碼形式,移完後,再轉換成對應的原碼

     左移:高位丟棄,低位補零

     [+1]  = [00000001]

     [0000 0001] << 1 = [0000 0010] = [0000 0010] = [+2]

     [-1]  = [1000 0001] = [1111 1111]

     [1111 1111] << 1 = [1111 1110] = [1000 0010] = [-2]

其中,再次提醒,負數的補碼是反碼+1;負數的反碼是補碼-1;

 

 6. 右移:高位保持不變,低位丟棄

     [+127] = [0111 1111] = [0111 1111]

     [0111 1111]補 >> 1 = [0011 1111] = [0011 1111] = [+63]

     [-127] = [1111 1111] = [1000 0001]

     [1000 0001] >> 1 = [1100 0000] = [1100 0000]原 = [-64]


execute 方法分析

通過 ThreadPoolExecutor 創建執行緒池後,提交任務後執行過程是怎樣的,下面來通過源碼來看一看。execute 方法源碼如下:

public void execute(Runnable command) {      if (command == null)          throw new NullPointerException();        // 返回包含執行緒數及執行緒池狀態(頭3位)      int c = ctl.get();        // 如果工作執行緒數小於核心執行緒數,則創建執行緒任務執行      if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))              return;            // 如果創建失敗,防止外部已經在執行緒池中加入新任務,重新獲取          c = ctl.get();      }        // 只有執行緒池處於 RUNNING 狀態,且 入隊列成功      if (isRunning(c) && workQueue.offer(command)) {        // 後面的操作屬於double-check          int recheck = ctl.get();            // 如果執行緒池不是 RUNNING 狀態,則將剛加入隊列的任務移除          if (! isRunning(recheck) && remove(command))              reject(command);            // 如果之前的執行緒已被消費完,新建一個執行緒          else if (workerCountOf(recheck) == 0)              addWorker(null, false);      }      // 核心池和隊列都滿了,嘗試創建一個新執行緒      else if (!addWorker(command, false))          // 如果 addWorker 返回是 false,即創建失敗,則喚醒拒絕策略          reject(command);  } 

execute 方法執行邏輯有這樣幾種情況:

  1. 如果當前運行的執行緒少於 corePoolSize,則會創建新的執行緒來執行新的任務;

  2. 如果運行的執行緒個數等於或者大於 corePoolSize,則會將提交的任務存放到阻塞隊列 workQueue 中;

  3. 如果當前 workQueue 隊列已滿的話,則會創建新的執行緒來執行任務;

  4. 如果執行緒個數已經超過了 maximumPoolSize,則會使用飽和策略 RejectedExecutionHandler 來進行處理。

這裡要注意一下 addWorker(null, false) 也就是創建一個執行緒,但並沒有傳入任務,因為任務已經被添加到 workQueue 中了,所以 worker 在執行的時候,會直接從 workQueue 中獲取任務。所以,在 workerCountOf(recheck) == 0 時執行 addWorker(null, false) 也是為了保證執行緒池在 RUNNING 狀態下必須要有一個執行緒來執行任務。

需要注意的是,執行緒池的設計思想就是使用了核心執行緒池 corePoolSize,阻塞隊列 workQueue 和執行緒池 maximumPoolSize,這樣的快取策略來處理任務,實際上這樣的設計思想在需要框架中都會使用。

需要注意執行緒和任務之間的區別,任務是保存在 workQueue 中的,執行緒是從執行緒池裡面取的,由 CAPACITY 控制容量。


addWorker 方法分析

addWorker 方法的主要工作是在執行緒池中創建一個新的執行緒並執行,firstTask 參數用於指定新增的執行緒執行的第一個任務,core 參數為 true 表示在新增執行緒時會判斷當前活動執行緒數是否少於 corePoolSize,false 表示新增執行緒前需要判斷當前活動執行緒數是否少於 maximumPoolSize,程式碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {      retry:      for (;;) {          int c = ctl.get();          // 獲取運行狀態          int rs = runStateOf(c);            /*           * 這個if判斷           * 如果rs >= SHUTDOWN,則表示此時不再接收新任務;           * 接著判斷以下3個條件,只要有1個不滿足,則返回false:           * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務           * 2. firsTask為空           * 3. 阻塞隊列不為空           *           * 首先考慮rs == SHUTDOWN的情況           * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;           * 然後,如果firstTask為空,並且workQueue也為空,則返回false,           * 因為隊列中已經沒有任務了,不需要再添加執行緒了           */          // Check if queue empty only if necessary.          if (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null &&                 ! workQueue.isEmpty()))              return false;          for (;;) {              // 獲取執行緒數              int wc = workerCountOf(c);              // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進位是29個1),返回false;              // 這裡的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較,              // 如果為false則根據maximumPoolSize來比較。              //              if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize))                  return false;              // 嘗試增加workerCount,如果成功,則跳出第一個for循環              if (compareAndIncrementWorkerCount(c))                  break retry;              // 如果增加workerCount失敗,則重新獲取ctl的值              c = ctl.get();  // Re-read ctl              // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行              if (runStateOf(c) != rs)                  continue retry;              // else CAS failed due to workerCount change; retry inner loop          }      }      boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {          // 根據firstTask來創建Worker對象          w = new Worker(firstTask);          // 每一個Worker對象都會創建一個執行緒          final Thread t = w.thread;          if (t != null) {              final ReentrantLock mainLock = this.mainLock;              mainLock.lock();              try {                  // Recheck while holding lock.                  // Back out on ThreadFactory failure or if                  // shut down before lock acquired.                  int rs = runStateOf(ctl.get());                  // rs < SHUTDOWN表示是RUNNING狀態;                  // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向執行緒池中添加執行緒。                  // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務                  if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null)) {                      if (t.isAlive()) // precheck that t is startable                          throw new IllegalThreadStateException();                      // workers是一個HashSet                      workers.add(w);                      int s = workers.size();                      // largestPoolSize記錄著執行緒池中出現過的最大執行緒數量                      if (s > largestPoolSize)                          largestPoolSize = s;                      workerAdded = true;                  }              } finally {                  mainLock.unlock();              }              if (workerAdded) {                  // 啟動執行緒                  t.start();                  workerStarted = true;              }          }      } finally {          if (! workerStarted)              addWorkerFailed(w);      }      return workerStarted;  }

這裡需要注意有以下幾點:

  1. 在獲取鎖後重新檢查執行緒池的狀態,這是因為其他執行緒可可能在本方法獲取鎖前改變了執行緒池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。

  2.  t.start()會調用 Worker 類中的 run 方法,Worker 本身實現了 Runnable 介面。原因在創建執行緒得時候,將 Worker 實例傳入了 t 當中,可參見 Worker 類的構造函數。

  3. wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次調用 addWorker 來添加執行緒會先判斷當前執行緒數是否超過了CAPACITY,然後再去判斷是否超 corePoolSize 或 maximumPoolSize,說明執行緒數實際上是由 CAPACITY 來控制的。


內部類 Worker 分析

上面分析過程中,提到了一個 Worker 類,對於某些對源碼不是很熟悉得同學可能有點不清楚,下面就來看看 Worker 的源碼:

 private final class Worker          extends AbstractQueuedSynchronizer          implements Runnable      {          /**           * This class will never be serialized, but we provide a           * serialVersionUID to suppress a javac warning.           */          private static final long serialVersionUID = 6138294804551838833L;            /** Thread this worker is running in.  Null if factory fails. */          final Thread thread;          /** Initial task to run.  Possibly null. */          Runnable firstTask;          /** Per-thread task counter */          volatile long completedTasks;            /**           * Creates with given first task and thread from ThreadFactory.           * @param firstTask the first task (null if none)           */          Worker(Runnable firstTask) {              setState(-1); // inhibit interrupts until runWorker              this.firstTask = firstTask;
       // 注意此處傳入的是this
this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */
     // 這裡其實會調用外部的 runWorker 方法來執行自己。
public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) {
       // 如果已經設置過1了,這時候在設置1就會返回false,也就是不可重入
if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }      // 提供安全中斷執行緒得方法 void interruptIfStarted() { Thread t;
       // 一開始 setstate(-1) 避免了還沒開始運行就被中斷可能
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

首先看到的是 Worker 繼承了(AbstractQueuedSynchronizer) AQS,並實現了 Runnable 介面,說明 Worker 本身也是執行緒。然後看其構造函數可以發現,內部有兩個屬性變數分別是 Runnable 和 Thread 實例,該類其實就是對傳進來得屬性做了一個封裝,並加入了獲取鎖的邏輯(繼承了 AQS )。具體可參考文章:透過 ReentrantLock 分析 AQS 的實現原理

Worker 繼承了 AQS,使用 AQS 來實現獨佔鎖的功能。為什麼不使用 ReentrantLock 來實現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:

  1. lock 方法一旦獲取了獨佔鎖,表示當前執行緒正在執行任務中;

  2. 如果正在執行任務,則不應該中斷執行緒;

  3. 如果該執行緒現在不是獨佔鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該執行緒進行中斷;

  4. 執行緒池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閑的執行緒,interruptIdleWorkers 方法會使用 tryLock 方法來判斷執行緒池中的執行緒是否是空閑狀態;

  5. 之所以設置為不可重入,是因為我們不希望任務在調用像 setCorePoolSize 這樣的執行緒池控制方法時重新獲取鎖。如果使用 ReentrantLock,它是可重入的,這樣如果在任務中調用了如 setCorePoolSize 這類執行緒池控制的方法,會中斷正在運行的執行緒,因為 size 小了,需要中斷一些執行緒 。

所以,Worker 繼承自 AQS,用於判斷執行緒是否空閑以及是否可以被中斷。

此外,在構造方法中執行了 setState(-1);,把 state 變數設置為 -1,為什麼這麼做呢?是因為 AQS 中默認的 state 是 0,如果剛創建了一個 Worker 對象,還沒有執行任務時,這時就不應該被中斷,看一下 tryAquire 方法: 

protected boolean tryAcquire(int unused) {      if (compareAndSetState(0, 1)) {          setExclusiveOwnerThread(Thread.currentThread());          return true;      }      return false;  }

正因為如此,在 runWorker 方法中會先調用 Worker 對象的 unlock 方法將 state 設置為 0。tryAcquire 方法是根據 state 是否是 0 來判斷的,所以,setState(-1);將 state 設置為 -1 是為了禁止在執行任務前對執行緒進行中斷。


 runWorker 方法分析

前面提到了內部類 Worker 的 run 方法調用了外部類 runWorker,下面來看下 runWork 的具體邏輯。

final void runWorker(Worker w) {         Thread wt = Thread.currentThread();         Runnable task = w.firstTask;         w.firstTask = null;         w.unlock(); // status 設置為0,允許中斷,也可以避免再次加鎖失敗         boolean completedAbruptly = true;         try {             while (task != null || (task = getTask()) != null) {                 // 要派發task的時候,需要上鎖                 w.lock();                 // 如果執行緒池當前狀態至少是stop,則設置中斷標誌;                 // 如果執行緒池當前狀態是RUNNININ,則重置中斷標誌,重置後需要重新                 //檢查下執行緒池狀態,因為當重置中斷標誌時候,可能調用了執行緒池的shutdown方法                 //改變了執行緒池狀態。                 if ((runStateAtLeast(ctl.get(), STOP) ||                      (Thread.interrupted() &&                       runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                     wt.interrupt();                   try {                     //任務執行前干一些事情                     beforeExecute(wt, task);                     Throwable thrown = null;                     try {                         task.run();//執行任務                     } catch (RuntimeException x) {                         thrown = x; throw x;                     } catch (Error x) {                         thrown = x; throw x;                     } catch (Throwable x) {                         thrown = x; throw new Error(x);                     } finally {                         //任務執行完畢後干一些事情                         afterExecute(task, thrown);                     }                 } finally {                     task = null;                     //統計當前worker完成了多少個任務                     w.completedTasks++;                     w.unlock();                 }             }             completedAbruptly = false;         } finally {               //執行清了工作             processWorkerExit(w, completedAbruptly);         }     }

總結一下 runWorker 方法的執行過程:

  1. while 循環不斷地通過 getTask() 方法從阻塞隊列中取任務;

  2. 如果執行緒池正在停止,那麼要保證當前執行緒是中斷狀態,否則要保證當前執行緒不是中斷狀態;

  3. 調用 task.run()執行任務;

  4. 如果 task 為 null 則跳出循環,執行 processWorkerExit 方法;

  5. runWorker 方法執行完畢,也代表著 Worker 中的 run 方法執行完畢,銷毀執行緒。

這裡的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 類中是空的,留給子類來實現。

completedAbruptly 變數來表示在執行任務過程中是否出現了異常,在 processWorkerExit 方法中會對該變數的值進行判斷。


 

getTask 方法分析

getTask 方法是從阻塞隊列裡面獲取任務,具體程式碼邏輯如下:

private Runnable getTask() {      // timeOut變數的值表示上次從阻塞隊列中取任務時是否超時      boolean timedOut = false; // Did the last poll() time out?      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);          // Check if queue empty only if necessary.          /*           * 如果執行緒池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行以下判斷:           * 1. rs >= STOP,執行緒池是否正在stop;           * 2. 阻塞隊列是否為空。           * 如果以上條件滿足,則將workerCount減1並返回null。           * 因為如果當前執行緒池狀態的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務。           */          if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {              decrementWorkerCount();              return null;          }          int wc = workerCountOf(c);          // Are workers subject to culling?          // timed變數用於判斷是否需要進行超時控制。          // allowCoreThreadTimeOut默認是false,也就是核心執行緒不允許進行超時;          // wc > corePoolSize,表示當前執行緒池中的執行緒數量大於核心執行緒數量;          // 對於超過核心執行緒數量的這些執行緒,需要進行超時控制          boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            /*           * wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法;           * timed && timedOut 如果為true,表示當前操作需要進行超時控制,並且上次從阻塞隊列中獲取任務發生了超時           * 接下來判斷,如果有效執行緒數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1;           * 如果減1失敗,則返回重試。           * 如果wc == 1時,也就說明當前執行緒是執行緒池中唯一的一個執行緒了。           */          if ((wc > maximumPoolSize || (timed && timedOut))              && (wc > 1 || workQueue.isEmpty())) {              if (compareAndDecrementWorkerCount(c))                  return null;              continue;          }          try {              /*               * 根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null;               * 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。               *               */              Runnable r = timed ?                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                  workQueue.take();              if (r != null)                  return r;              // 如果 r == null,說明已經超時,timedOut設置為true              timedOut = true;          } catch (InterruptedException retry) {              // 如果獲取任務時當前執行緒發生了中斷,則設置timedOut為false並返回循環重試              timedOut = false;          }      }  }

其實到這裡後,你會發現在 ThreadPoolExcute 內部有幾個重要的檢驗:

  • 判斷當前的運行狀態,根據運行狀態來做處理,如果當前都停止運行了,那很多操作也就沒必要了;

  • 判斷當前執行緒池的數量,然後將該數據和 corePoolSize 以及 maximumPoolSize 進行比較,然後再去決定下一步該做啥;

首先是第一個 if 判斷,當運行狀態處於非 RUNNING 狀態,此外 rs >= STOP(執行緒池是否正在 stop)或阻塞隊列是否為空。則將 workerCount 減 1 並返回 null。為什麼要減 1 呢,因為此處其實是去獲取一個 task,但是發現處於停止狀態了,也就是沒必要再去獲取運行任務了,那這個執行緒就沒有存在的意義了。後續也會在 processWorkerExit 將該執行緒移除。

第二個 if 條件目的是控制執行緒池的有效執行緒數量。由上文中的分析可以知道,在執行 execute 方法時,如果當前執行緒池的執行緒數量超過了 corePoolSize 且小於 maximumPoolSize,並且 workQueue 已滿時,則可以增加工作執行緒,但這時如果超時沒有獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前執行緒池中不需要那麼多執行緒來執行任務了,可以把多於 corePoolSize 數量的執行緒銷毀掉,保持執行緒數量在 corePoolSize 即可。

什麼時候會銷毀?當然是 runWorker 方法執行完之後,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。

getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然後會執行 processWorkerExit 方法。


 

processWorkerExit 方法

下面在看 processWorkerExit 方法的具體邏輯:

private void processWorkerExit(Worker w, boolean completedAbruptly) {      // 如果completedAbruptly值為true,則說明執行緒執行時出現了異常,需要將workerCount減1;      // 如果執行緒執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操作,這裡就不必再減了。        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted          decrementWorkerCount();      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          //統計完成的任務數          completedTaskCount += w.completedTasks;          // 從workers中移除,也就表示著從執行緒池中移除了一個工作執行緒          workers.remove(w);      } finally {          mainLock.unlock();      }      // 根據執行緒池狀態進行判斷是否結束執行緒池      tryTerminate();      int c = ctl.get();      /*       * 當執行緒池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那麼會直接addWorker;       * 如果allowCoreThreadTimeOut=true,並且等待隊列有任務,至少保留一個worker;       * 如果allowCoreThreadTimeOut=false,workerCount不少於corePoolSize。       */      if (runStateLessThan(c, STOP)) {          if (!completedAbruptly) {              int min = allowCoreThreadTimeOut ? 0 : corePoolSize;              if (min == 0 && ! workQueue.isEmpty())                  min = 1;              if (workerCountOf(c) >= min)                  return; // replacement not needed          }          addWorker(null, false);      }  }

至此,processWorkerExit 執行完之後,工作執行緒被銷毀,以上就是整個工作執行緒的生命周期。但是這有兩點需要注意:

  1. 大家想想什麼時候才會調用這個方法,任務幹完了才會調用。那麼沒事做了,就需要看下是否有必要結束執行緒池,這時候就會調用 tryTerminate。

  2. 如果此時執行緒處於 STOP 狀態以下,那麼就會判斷核心執行緒數是否達到了規定的數量,沒有的話,就會繼續創建一個執行緒。


tryTerminate方法

tryTerminate 方法根據執行緒池狀態進行判斷是否結束執行緒池,程式碼如下:

final void tryTerminate() {      for (;;) {          int c = ctl.get();          /*           * 當前執行緒池的狀態為以下幾種情況時,直接返回:           * 1. RUNNING,因為還在運行中,不能停止;           * 2. TIDYING或TERMINATED,因為執行緒池中已經沒有正在運行的執行緒了;           * 3. SHUTDOWN並且等待隊列非空,這時要執行完workQueue中的task;           */          if (isRunning(c) ||              runStateAtLeast(c, TIDYING) ||              (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))              return;          // 如果執行緒數量不為0,則中斷一個空閑的工作執行緒,並返回          if (workerCountOf(c) != 0) { // Eligible to terminate              interruptIdleWorkers(ONLY_ONE);              return;          }          final ReentrantLock mainLock = this.mainLock;          mainLock.lock();          try {              // 這裡嘗試設置狀態為TIDYING,如果設置成功,則調用terminated方法              if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                  try {                      // terminated方法默認什麼都不做,留給子類實現                      terminated();                  } finally {                      // 設置狀態為TERMINATED                      ctl.set(ctlOf(TERMINATED, 0));                      termination.signalAll();                  }                  return;              }          } finally {              mainLock.unlock();          }          // else retry on failed CAS      }  }

interruptIdleWorkers(boolean onlyOne) 如果 ONLY_ONE = true 那麼就的最多讓一個空閑執行緒發生中斷,ONLY_ONE = false 時是所有空閑執行緒都會發生中斷。那執行緒什麼時候會處於空閑狀態呢?

一是執行緒數量很多,任務都完成了;二是執行緒在 getTask 方法中執行 workQueue.take() 時,如果不執行中斷會一直阻塞。

所以每次在工作執行緒結束時調用 tryTerminate 方法來嘗試中斷一個空閑工作執行緒,避免在隊列為空時取任務一直阻塞的情況。


 

shutdown方法

shutdown 方法要將執行緒池切換到 SHUTDOWN 狀態,並調用 interruptIdleWorkers 方法請求中斷所有空閑的 worker,最後調用 tryTerminate 嘗試結束執行緒池。

public void shutdown() {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          // 安全策略判斷          checkShutdownAccess();          // 切換狀態為SHUTDOWN          advanceRunState(SHUTDOWN);          // 中斷空閑執行緒          interruptIdleWorkers();          onShutdown(); // hook for ScheduledThreadPoolExecutor      } finally {          mainLock.unlock();      }      // 嘗試結束執行緒池      tryTerminate();  }

這裡思考一個問題:在 runWorker 方法中,執行任務時對 Worker 對象 w 進行了 lock 操作,為什麼要在執行任務的時候對每個工作執行緒都加鎖呢?

下面仔細分析一下:

  • 在 getTask 方法中,如果這時執行緒池的狀態是 SHUTDOWN 並且 workQueue 為空,那麼就應該返回 null 來結束這個工作執行緒,而使執行緒池進入 SHUTDOWN 狀態需要調用shutdown 方法;

  • shutdown 方法會調用 interruptIdleWorkers 來中斷空閑的執行緒,interruptIdleWorkers 持有 mainLock,會遍歷 workers 來逐個判斷工作執行緒是否空閑。但 getTask 方法中沒有mainLock;

  • 在 getTask 中,如果判斷當前執行緒池狀態是 RUNNING,並且阻塞隊列為空,那麼會調用 workQueue.take() 進行阻塞;

  • 如果在判斷當前執行緒池狀態是 RUNNING 後,這時調用了 shutdown 方法把狀態改為了 SHUTDOWN,這時如果不進行中斷,那麼當前的工作執行緒在調用了 workQueue.take() 後會一直阻塞而不會被銷毀,因為在 SHUTDOWN 狀態下不允許再有新的任務添加到 workQueue 中,這樣一來執行緒池永遠都關閉不了了;

  • 由上可知,shutdown 方法與 getTask 方法(從隊列中獲取任務時)存在競態條件;

  • 解決這一問題就需要用到執行緒的中斷,也就是為什麼要用 interruptIdleWorkers 方法。在調用 workQueue.take() 時,如果發現當前執行緒在執行之前或者執行期間是中斷狀態,則會拋出 InterruptedException,解除阻塞的狀態;

  • 但是要中斷工作執行緒,還要判斷工作執行緒是否是空閑的,如果工作執行緒正在處理任務,就不應該發生中斷;

  • 所以 Worker 繼承自 AQS,在工作執行緒處理任務時會進行 lock,interruptIdleWorkers 在進行中斷時會使用 tryLock 來判斷該工作執行緒是否正在處理任務,如果 tryLock 返回 true,說明該工作執行緒當前未執行任務,這時才可以被中斷。

下面就來分析一下 interruptIdleWorkers 方法。

interruptIdleWorkers方法

private void interruptIdleWorkers() {      interruptIdleWorkers(false);  }  private void interruptIdleWorkers(boolean onlyOne) {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          for (Worker w : workers) {              Thread t = w.thread;              if (!t.isInterrupted() && w.tryLock()) {                  try {                      t.interrupt();                  } catch (SecurityException ignore) {                  } finally {                      w.unlock();                  }              }              if (onlyOne)                  break;          }      } finally {          mainLock.unlock();      }  }

interruptIdleWorkers 遍歷 workers 中所有的工作執行緒,若執行緒沒有被中斷 tryLock 成功,就中斷該執行緒。

為什麼需要持有 mainLock ?因為 workers 是 HashSet 類型的,不能保證執行緒安全。


 

shutdownNow方法

public List<Runnable> shutdownNow() {      List<Runnable> tasks;      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          checkShutdownAccess();          advanceRunState(STOP);          // 中斷所有工作執行緒,無論是否空閑          interruptWorkers();          // 取出隊列中沒有被執行的任務          tasks = drainQueue();      } finally {          mainLock.unlock();      }      tryTerminate();      return tasks;  }

shutdownNow 方法與 shutdown 方法類似,不同的地方在於:

  1. 設置狀態為 STOP;

  2. 中斷所有工作執行緒,無論是否是空閑的;

  3. 取出阻塞隊列中沒有被執行的任務並返回。

shutdownNow 方法執行完之後調用 tryTerminate 方法,該方法在上文已經分析過了,目的就是使執行緒池的狀態設置為 TERMINATED。


 

執行緒池的監控

通過執行緒池提供的參數進行監控。執行緒池裡有一些屬性在監控執行緒池的時候可以使用

  • getTaskCount:執行緒池已經執行的和未執行的任務總數;

  • getCompletedTaskCount:執行緒池已完成的任務數量,該值小於等於 taskCount;

  • getLargestPoolSize:執行緒池曾經創建過的最大執行緒數量。通過這個數據可以知道執行緒池是否滿過,也就是達到了maximumPoolSize;

  • getPoolSize:執行緒池當前的執行緒數量;

  • getActiveCount:當前執行緒池中正在執行任務的執行緒數量。

通過這些方法,可以對執行緒池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以擴展這些方法在執行前或執行後增加一些新的操作,例如統計執行緒池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展。

到此,關於 ThreadPoolExecutor 的內容就講完了。

  

 參考文獻

Java中執行緒池ThreadPoolExecutor原理探究

【Java】 之ThreadPoolExcutor源碼淺析

執行緒池ThreadPoolExecutor實現原理

深入理解Java執行緒池:ThreadPoolExecutor