­

死磕 java執行緒系列之執行緒池深入解析——普通任務執行流程

  • 2019 年 11 月 3 日
  • 筆記

threadpool_task

(手機橫屏看源碼更方便)


註:java源碼分析部分如無特殊說明均基於 java8 版本。

註:執行緒池源碼部分如無特殊說明均指ThreadPoolExecutor類。

簡介

前面我們一起學習了Java中執行緒池的體系結構、構造方法和生命周期,本章我們一起來學習執行緒池中普通任務到底是怎麼執行的。

建議學習本章前先去看看彤哥之前寫的《死磕 java執行緒系列之自己動手寫一個執行緒池》那兩章,有助於理解本章的內容,且那邊的程式碼比較短小,學起來相對容易一些。

問題

(1)執行緒池中的普通任務是怎麼執行的?

(2)任務又是在哪裡被執行的?

(3)執行緒池中有哪些主要的方法?

(4)如何使用Debug模式一步一步調試執行緒池?

使用案例

我們創建一個執行緒池,它的核心數量為5,最大數量為10,空閑時間為1秒,隊列長度為5,拒絕策略列印一句話。

如果使用它運行20個任務,會是什麼結果呢?

public class ThreadPoolTest01 {      public static void main(String[] args) {          // 新建一個執行緒池          // 核心數量為5,最大數量為10,空閑時間為1秒,隊列長度為5,拒絕策略列印一句話          ExecutorService threadPool = new ThreadPoolExecutor(5, 10,                  1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),                  Executors.defaultThreadFactory(), new RejectedExecutionHandler() {              @Override              public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                  System.out.println(currentThreadName() + ", discard task");              }          });            // 提交20個任務,注意觀察num          for (int i = 0; i < 20; i++) {              int num = i;              threadPool.execute(()->{                  try {                      System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());                      Thread.sleep(2000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              });          }        }        private static String currentThreadName() {          return Thread.currentThread().getName();      }  }

構造方法的7個參數我們就不詳細解釋了,有興趣的可以看看《死磕 java執行緒系列之執行緒池深入解析——構造方法》那章。

我們一起來看看一次運行的結果:

pool-1-thread-1, 0 running, 1572678434411  pool-1-thread-3, 2 running, 1572678434411  pool-1-thread-2, 1 running, 1572678434411  pool-1-thread-4, 3 running, 1572678434411  pool-1-thread-5, 4 running, 1572678434411  pool-1-thread-6, 10 running, 1572678434412  pool-1-thread-7, 11 running, 1572678434412  pool-1-thread-8, 12 running, 1572678434412  main, discard task  main, discard task  main, discard task  main, discard task  main, discard task  // 【本文由公從號「彤哥讀源碼」原創】  pool-1-thread-9, 13 running, 1572678434412  pool-1-thread-10, 14 running, 1572678434412  pool-1-thread-3, 5 running, 1572678436411  pool-1-thread-1, 6 running, 1572678436411  pool-1-thread-6, 7 running, 1572678436412  pool-1-thread-2, 8 running, 1572678436412  pool-1-thread-7, 9 running, 1572678436412

注意,觀察num值的列印資訊,先是列印了0~4,再列印了10~14,最後列印了5~9,竟然不是按順序列印的,為什麼呢?

讓我們一步一步debug進去查看。

execute()方法

execute()方法是執行緒池提交任務的方法之一,也是最核心的方法。

// 提交任務,任務並非立即執行,所以翻譯成執行任務似乎不太合適  public void execute(Runnable command) {      // 任務不能為空      if (command == null)          throw new NullPointerException();        // 控制變數(高3位存儲狀態,低29位存儲工作執行緒的數量)      int c = ctl.get();      // 1. 如果工作執行緒數量小於核心數量      if (workerCountOf(c) < corePoolSize) {          // 就添加一個工作執行緒(核心)          if (addWorker(command, true))              return;          // 重新獲取下控制變數          c = ctl.get();      }      // 2. 如果達到了核心數量且執行緒池是運行狀態,任務入隊列      if (isRunning(c) && workQueue.offer(command)) {          int recheck = ctl.get();          // 再次檢查執行緒池狀態,如果不是運行狀態,就移除任務並執行拒絕策略          if (! isRunning(recheck) && remove(command))              reject(command);          // 容錯檢查工作執行緒數量是否為0,如果為0就創建一個          else if (workerCountOf(recheck) == 0)              addWorker(null, false);      }      // 3. 任務入隊列失敗,嘗試創建非核心工作執行緒      else if (!addWorker(command, false))          // 非核心工作執行緒創建失敗,執行拒絕策略          reject(command);  }

關於執行緒池狀態的內容,我們這裡不拿出來細講了,有興趣的可以看看《死磕 java執行緒系列之執行緒池深入解析——生命周期》那章。

提交任務的過程大致如下:

(1)工作執行緒數量小於核心數量,創建核心執行緒;

(2)達到核心數量,進入任務隊列;

(3)任務隊列滿了,創建非核心執行緒;

(4)達到最大數量,執行拒絕策略;

其實,就是三道坎——核心數量、任務隊列、最大數量,這樣就比較好記了。

流程圖大致如下:

threadpool_task

任務流轉的過程我們知道了,但是任務是在哪裡執行的呢?繼續往下看。

addWorker()方法

這個方法主要用來創建一個工作執行緒,並啟動之,其中會做執行緒池狀態、工作執行緒數量等各種檢測。

private boolean addWorker(Runnable firstTask, boolean core) {      // 判斷有沒有資格創建新的工作執行緒      // 主要是一些狀態/數量的檢查等等      // 這段程式碼比較複雜,可以先跳過      retry:      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);            // 執行緒池狀態檢查          if (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null &&                 ! workQueue.isEmpty()))              return false;            // 工作執行緒數量檢查          for (;;) {              int wc = workerCountOf(c);              if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize))                  return false;              // 數量加1並跳出循環              if (compareAndIncrementWorkerCount(c))                  break retry;              c = ctl.get();  // Re-read ctl              if (runStateOf(c) != rs)                  continue retry;              // else CAS failed due to workerCount change; retry inner loop          }      }        // 如果上面的條件滿足,則會把工作執行緒數量加1,然後執行下面創建執行緒的動作        boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {          // 創建工作執行緒          w = new Worker(firstTask);          final Thread t = w.thread;          if (t != null) {              final ReentrantLock mainLock = this.mainLock;              mainLock.lock();              try {                  // 再次檢查執行緒池的狀態                  int rs = runStateOf(ctl.get());                  if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                          throw new IllegalThreadStateException();                        // 添加到工作執行緒隊列                      workers.add(w);                      // 還在池子中的執行緒數量(只能在mainLock中使用)                      int s = workers.size();                      if (s > largestPoolSize)                          largestPoolSize = s;                        // 標記執行緒添加成功                      workerAdded = true;                  }              } finally {                  mainLock.unlock();              }              if (workerAdded) {                  // 執行緒添加成功之後啟動執行緒                  t.start();                  workerStarted = true;              }          }      } finally {          // 執行緒啟動失敗,執行失敗方法(執行緒數量減1,執行tryTerminate()方法等)          if (! workerStarted)              addWorkerFailed(w);      }      return workerStarted;  }

這裡其實還沒到任務執行的地方,上面我們可以看到執行緒是包含在Worker這個類中的,那麼,我們就跟蹤到這個類中看看。

Worker內部類

Worker內部類可以看作是對工作執行緒的包裝,一般地,我們說工作執行緒就是指Worker,但實際上是指其維護的Thread實例。

// Worker繼承自AQS,自帶鎖的屬性  private final class Worker      extends AbstractQueuedSynchronizer      implements Runnable  {      // 真正工作的執行緒      final Thread thread;      // 第一個任務,從構造方法傳進來      Runnable firstTask;      // 完成任務數      volatile long completedTasks;        // 構造方法// 【本文由公從號「彤哥讀源碼」原創】      Worker(Runnable firstTask) {          setState(-1); // inhibit interrupts until runWorker          this.firstTask = firstTask;          // 使用執行緒工廠生成一個執行緒          // 注意,這裡把Worker本身作為Runnable傳給執行緒          this.thread = getThreadFactory().newThread(this);      }        // 實現Runnable的run()方法      public void run() {          // 調用ThreadPoolExecutor的runWorker()方法          runWorker(this);      }        // 省略鎖的部分  }

這裡要能夠看出來工作執行緒Thread啟動的時候實際是調用的Worker的run()方法,進而調用的是ThreadPoolExecutor的runWorker()方法。

runWorker()方法

runWorker()方法是真正執行任務的地方。

final void runWorker(Worker w) {      // 工作執行緒      Thread wt = Thread.currentThread();      // 任務      Runnable task = w.firstTask;      w.firstTask = null;      // 強制釋放鎖(shutdown()裡面有加鎖)      // 這裡相當於無視那邊的中斷標記      w.unlock(); // allow interrupts      boolean completedAbruptly = true;      try {          // 取任務,如果有第一個任務,這裡先執行第一個任務          // 只要能取到任務,這就是個死循環          // 正常來說getTask()返回的任務是不可能為空的,因為前面execute()方法是有空判斷的          // 那麼,getTask()什麼時候才會返回空任務呢?          while (task != null || (task = getTask()) != null) {              w.lock();              // 檢查執行緒池的狀態              if ((runStateAtLeast(ctl.get(), STOP) ||                   (Thread.interrupted() &&                    runStateAtLeast(ctl.get(), STOP))) &&                  !wt.isInterrupted())                  wt.interrupt();                try {                  // 鉤子方法,方便子類在任務執行前做一些處理                  beforeExecute(wt, task);                  Throwable thrown = null;                  try {                      // 真正任務執行的地方                      task.run();                      // 異常處理                  } catch (RuntimeException x) {                      thrown = x; throw x;                  } catch (Error x) {                      thrown = x; throw x;                  } catch (Throwable x) {                      thrown = x; throw new Error(x);                  } finally {                      // 鉤子方法,方便子類在任務執行後做一些處理                      afterExecute(task, thrown);                  }              } finally {                  // task置為空,重新從隊列中取                  task = null;                  // 完成任務數加1                  w.completedTasks++;                  w.unlock();              }          }          completedAbruptly = false;      } finally {          // 到這裡肯定是上面的while循環退出了          processWorkerExit(w, completedAbruptly);      }  }

這個方法比較簡單,忽略狀態檢測和鎖的內容,如果有第一個任務,就先執行之,之後再從任務隊列中取任務來執行,獲取任務是通過getTask()來進行的。

getTask()

從隊列中獲取任務的方法,裡面包含了對執行緒池狀態、空閑時間等的控制。

private Runnable getTask() {      // 是否超時      boolean timedOut = false;        // 死循環      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);            // 執行緒池狀態是SHUTDOWN的時候會把隊列中的任務執行完直到隊列為空          // 執行緒池狀態是STOP時立即退出          if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {              decrementWorkerCount();              return null;          }            // 工作執行緒數量// 【本文由公從號「彤哥讀源碼」原創】          int wc = workerCountOf(c);            // 是否允許超時,有兩種情況:          // 1. 是允許核心執行緒數超時,這種就是說所有的執行緒都可能超時          // 2. 是工作執行緒數大於了核心數量,這種肯定是允許超時的          // 注意,非核心執行緒是一定允許超時的,這裡的超時其實是指取任務超時          boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            // 超時判斷(還包含一些容錯判斷)          if ((wc > maximumPoolSize || (timed && timedOut))              && (wc > 1 || workQueue.isEmpty())) {              // 超時了,減少工作執行緒數量,並返回null              if (compareAndDecrementWorkerCount(c))                  return null;              // 減少工作執行緒數量失敗,則重試              continue;          }            try {              // 真正取任務的地方              // 默認情況下,只有當工作執行緒數量大於核心執行緒數量時,才會調用poll()方法觸發超時調用                Runnable r = timed ?                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                  workQueue.take();                // 取到任務了就正常返回              if (r != null)                  return r;              // 沒取到任務表明超時了,回到continue那個if中返回null              timedOut = true;          } catch (InterruptedException retry) {              // 捕獲到了中斷異常              // 中斷標記是在調用shutDown()或者shutDownNow()的時候設置進去的              // 此時,會回到for循環的第一個if處判斷狀態是否要返回null              timedOut = false;          }      }  }

注意,這裡取任務會根據工作執行緒的數量判斷是使用BlockingQueue的poll(timeout, unit)方法還是take()方法。

poll(timeout, unit)方法會在超時時返回null,如果timeout<=0,隊列為空時直接返回null。

take()方法會一直阻塞直到取到任務或拋出中斷異常。

所以,如果keepAliveTime設置為0,當任務隊列為空時,非核心執行緒取不出來任務,會立即結束其生命周期。

默認情況下,是不允許核心執行緒超時的,但是可以通過下面這個方法設置使核心執行緒也可超時。

public void allowCoreThreadTimeOut(boolean value) {      if (value && keepAliveTime <= 0)          throw new IllegalArgumentException("Core threads must have nonzero keep alive times");      if (value != allowCoreThreadTimeOut) {          allowCoreThreadTimeOut = value;          if (value)              interruptIdleWorkers();      }  }

至此,執行緒池中任務的執行流程就結束了。

再看開篇問題

觀察num值的列印資訊,先是列印了0~4,再列印了10~14,最後列印了5~9,竟然不是按順序列印的,為什麼呢?

執行緒池的參數:核心數量5個,最大數量10個,任務隊列5個。

答:執行前5個任務執行時,正好還不到核心數量,所以新建核心執行緒並執行了他們;

執行中間的5個任務時,已達到核心數量,所以他們先入隊列;

執行後面5個任務時,已達核心數量且隊列已滿,所以新建非核心執行緒並執行了他們;

再執行最後5個任務時,執行緒池已達到滿負荷狀態,所以執行了拒絕策略。

總結

本章通過一個例子並結合執行緒池的重要方法我們一起分析了執行緒池中普通任務執行的流程。

(1)execute(),提交任務的方法,根據核心數量、任務隊列大小、最大數量,分成四種情況判斷任務應該往哪去;

(2)addWorker(),添加工作執行緒的方法,通過Worker內部類封裝一個Thread實例維護工作執行緒的執行;

(3)runWorker(),真正執行任務的地方,先執行第一個任務,再源源不斷從任務隊列中取任務來執行;

(4)getTask(),真正從隊列取任務的地方,默認情況下,根據工作執行緒數量與核心數量的關係判斷使用隊列的poll()還是take()方法,keepAliveTime參數也是在這裡使用的。

彩蛋

核心執行緒和非核心執行緒有什麼區別?

答:實際上並沒有什麼區別,主要是根據corePoolSize來判斷任務該去哪裡,兩者在執行任務的過程中並沒有任何區別。有可能新建的時候是核心執行緒,而keepAliveTime時間到了結束了的也可能是剛開始創建的核心執行緒。

Worker繼承自AQS有何意義?

前面我們看了Worker內部類的定義,它繼承自AQS,天生自帶鎖的特性,那麼,它的鎖是用來幹什麼的呢?跟任務的執行有關係嗎?

答:既然是跟鎖(同步)有關,說明Worker類跨執行緒使用了,此時我們查看它的lock()方法發現只在runWorker()方法中使用了,但是其tryLock()卻是在interruptIdleWorkers()方法中使用的。

private void interruptIdleWorkers(boolean onlyOne) {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          for (Worker w : workers) {              Thread t = w.thread;              if (!t.isInterrupted() && w.tryLock()) {                  try {                      t.interrupt();                  } catch (SecurityException ignore) {                  } finally {                      w.unlock();                  }              }              if (onlyOne)                  break;          }      } finally {          mainLock.unlock();      }  }

interruptIdleWorkers()方法的意思是中斷空閑執行緒的意思,它只會中斷BlockingQueue的poll()或take()方法,而不會中斷正在執行的任務。

一般來說,interruptIdleWorkers()方法的調用不是在本工作執行緒,而是在主執行緒中調用的,還記得《死磕 java執行緒系列之執行緒池深入解析——生命周期》中說過的shutdown()和shutdownNow()方法嗎?

觀察兩個方法中中斷執行緒的方法,shutdown()中就是調用了interruptIdleWorkers()方法,這裡tryLock()獲取到鎖了再中斷,如果沒有獲取到鎖則不中斷,沒獲取到鎖只有一種情況,也就是lock()所在的地方,也就是有任務正在執行。

而shutdownNow()中中斷執行緒則很暴力,並沒有tryLock(),而是直接中斷了執行緒,所以調用shutdownNow()可能會中斷正在執行的任務。

所以,Worker繼承自AQS實際是要使用其鎖的能力,這個鎖主要是用來控制shutdown()時不要中斷正在執行任務的執行緒。


歡迎關注我的公眾號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一起暢遊源碼的海洋。

qrcode