深度解讀 java 執行緒池設計思想及源碼實現

  • 2019 年 10 月 7 日
  • 筆記

轉自:https://javadoop.com/2017/09/05/java-thread-pool

我相信大家都看過很多的關於執行緒池的文章,基本上也是面試必問的,好像我寫這篇文章其實是沒有什麼意義的,不過,我相信你也和我一樣,看了很多文章還是一知半解,甚至可能看了很多瞎說的文章。希望大家看過這篇文章以後,就可以完全掌握 Java 執行緒池了。

我發現好些人都是因為這篇文章來到本站的,希望這篇讓人留下第一眼印象的文章能給你帶來收穫。

本文一大重點是源碼解析,不過執行緒池設計思想以及作者實現過程中的一些巧妙用法是我想傳達給讀者的。本文還是會一行行關鍵程式碼進行分析,目的是為了讓那些自己看源碼不是很理解的同學可以得到參考。

執行緒池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術水平。

本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。當然,如果讀者僅為面試準備,可以直接滑到最後的總結部分。

目錄

  • 總覽
  • Executor 介面
  • ExecutorService
  • FutureTask
  • AbstractExecutorService
  • ThreadPoolExecutor
  • Executors
  • 總結

總覽

開篇來一些廢話。下圖是 java 執行緒池幾個相關類的繼承結構:

先簡單說說這個繼承結構,Executor 位於最頂層,也是最簡單的,就一個 execute(Runnable runnable) 介面方法定義。

ExecutorService 也是介面,在 Executor 介面的基礎上添加了很多的介面方法,所以一般來說我們會使用這個介面。

然後再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這裡實現了非常有用的一些方法供子類直接使用,之後我們再細說。

然後才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關於執行緒池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:

同在並發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,裡面的方法都是靜態方法,如以下我們最常用的用於生成 ThreadPoolExecutor 的實例的一些方法:

public static ExecutorService newCachedThreadPool() {      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                    60L, TimeUnit.SECONDS,                                    new SynchronousQueue<Runnable>());  }  public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(nThreads, nThreads,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>());  }

另外,由於執行緒池支援獲取執行緒執行的結果,所以,引入了 Future 介面,RunnableFuture 繼承自此介面,然後我們最需要關心的就是它的實現類 FutureTask。到這裡,記住這個概念,在執行緒池的使用過程中,我們是往執行緒池提交任務(task),使用過執行緒池的都知道,我們提交的每個任務是實現了 Runnable 介面的,其實就是先將 Runnable 的任務包裝成 FutureTask,然後再提交到執行緒池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然後具有 Future 介面的語義,即可以在將來(Future)得到執行的結果。

當然,執行緒池中的 BlockingQueue 也是非常重要的概念,如果執行緒數達到 corePoolSize,我們的每個任務會提交到等待隊列中,等待執行緒池中的執行緒來取任務並執行。這裡的 BlockingQueue 通常我們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現類都有不同的特徵,使用場景之後會慢慢分析。想要詳細了解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。

把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用於實現定時執行。不過本文不會介紹它的實現,我相信讀者看完本文後可以比較容易地看懂它的源碼。

以上就是本文要介紹的知識,廢話不多說,開始進入正文。

Executor 介面

/*   * @since 1.5   * @author Doug Lea   */  public interface Executor {      void execute(Runnable command);  }

我們可以看到 Executor 介面非常簡單,就一個 voidexecute(Runnablecommand) 方法,代表提交一個任務。為了讓大家理解 java 執行緒池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。

我們經常這樣啟動一個執行緒:

new Thread(new Runnable(){    // do something  }).start();

用了執行緒池 Executor 後就可以像下面這麼使用:

Executor executor = anExecutor;  executor.execute(new RunnableTask1());  executor.execute(new RunnableTask2());

如果我們希望執行緒池同步執行每一個任務,我們可以這麼實現這個介面:

class DirectExecutor implements Executor {      public void execute(Runnable r) {          r.run();// 這裡不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的執行緒。      }  }

我們希望每個任務提交進來後,直接啟動一個新的執行緒來執行這個任務,我們可以這麼實現:

class ThreadPerTaskExecutor implements Executor {      public void execute(Runnable r) {          new Thread(r).start();  // 每個任務都用一個新的執行緒來執行      }  }

我們再來看下怎麼組合兩個 Executor 來使用,下面這個實現是將所有的任務都加到一個 queue 中,然後從 queue 中取任務,交給真正的執行器執行,這裡採用 synchronized 進行並發控制:

class SerialExecutor implements Executor {      // 任務隊列      final Queue<Runnable> tasks = new ArrayDeque<Runnable>();      // 這個才是真正的執行器      final Executor executor;      // 當前正在執行的任務      Runnable active;        // 初始化的時候,指定執行器      SerialExecutor(Executor executor) {          this.executor = executor;      }        // 添加任務到執行緒池: 將任務添加到任務隊列,scheduleNext 觸發執行器去任務隊列取任務      public synchronized void execute(final Runnable r) {          tasks.offer(new Runnable() {              public void run() {                  try {                      r.run();                  } finally {                      scheduleNext();                  }              }          });          if (active == null) {              scheduleNext();          }      }        protected synchronized void scheduleNext() {          if ((active = tasks.poll()) != null) {              // 具體的執行轉給真正的執行器 executor              executor.execute(active);          }      }  }

當然了,Executor 這個介面只有提交任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道當前執行緒池有多少個執行緒活著、已經完成了多少任務等等,這些都是這個介面的不足的地方。接下來我們要介紹的是繼承自 Executor 介面的 ExecutorService 介面,這個介面提供了比較豐富的功能,也是我們最常使用到的介面。

ExecutorService

一般我們定義一個執行緒池的時候,往往都是使用這個介面:

ExecutorService executor = Executors.newFixedThreadPool(args...);  ExecutorService executor = Executors.newCachedThreadPool(args...);

因為這個介面中定義的一系列方法大部分情況下已經可以滿足我們的需要了。

那麼我們簡單初略地來看一下這個介面中都有哪些方法:

public interface ExecutorService extends Executor {        // 關閉執行緒池,已提交的任務繼續執行,不接受繼續提交新任務      void shutdown();        // 關閉執行緒池,嘗試停止正在執行的所有任務,不接受繼續提交新任務      // 它和前面的方法相比,加了一個單詞「now」,區別在於它會去停止當前正在進行的任務      List<Runnable> shutdownNow();        // 執行緒池是否已關閉      boolean isShutdown();        // 如果調用了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼返回true      // 這個方法必須在調用shutdown或shutdownNow方法之後調用才會返回true      boolean isTerminated();        // 等待所有任務完成,並設置超時時間      // 我們這麼理解,實際應用中是,先調用 shutdown 或 shutdownNow,      // 然後再調這個方法等待所有的執行緒真正地完成,返回值意味著有沒有超時      boolean awaitTermination(long timeout, TimeUnit unit)              throws InterruptedException;        // 提交一個 Callable 任務      <T> Future<T> submit(Callable<T> task);        // 提交一個 Runnable 任務,第二個參數將會放到 Future 中,作為返回值,      // 因為 Runnable 的 run 方法本身並不返回任何東西      <T> Future<T> submit(Runnable task, T result);        // 提交一個 Runnable 任務      Future<?> submit(Runnable task);        // 執行所有任務,返回 Future 類型的一個 list      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)              throws InterruptedException;        // 也是執行所有任務,但是這裡設置了超時時間      <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                                    long timeout, TimeUnit unit)              throws InterruptedException;        // 只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果      <T> T invokeAny(Collection<? extends Callable<T>> tasks)              throws InterruptedException, ExecutionException;        // 同上一個方法,只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果,      // 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常      <T> T invokeAny(Collection<? extends Callable<T>> tasks,                      long timeout, TimeUnit unit)              throws InterruptedException, ExecutionException, TimeoutException;  }

這些方法都很好理解,一個簡單的執行緒池主要就是這些功能,能提交任務,能獲取結果,能關閉執行緒池,這也是為什麼我們經常用這個介面的原因。

FutureTask

在繼續往下層介紹 ExecutorService 的實現類之前,我們先來說說相關的類 FutureTask。

Future   -> RunnableFuture -> FutureTask  Runnable -> RunnableFuture    FutureTask 通過 RunnableFuture 間接實現了 Runnable 介面,  所以每個 Runnable 通常都先包裝成 FutureTask,  然後調用 executor.execute(Runnable command) 將其提交給執行緒池

我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個參數作為返回值:

<T> Future<T> submit(Runnable task, T result);

其實到時候會通過這兩個參數,將其包裝成 Callable。

Callable 也是因為執行緒池的需要,所以才有了這個介面。它和 Runnable 的區別在於 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果運行出現異常,call() 方法會拋出異常。

public interface Callable<V> {        V call() throws Exception;  }

在這裡,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這裡我們需要知道怎麼用就行了。

下面,我們來看看 ExecutorService 的抽象實現 AbstractExecutorService

AbstractExecutorService

AbstractExecutorService 抽象類派生自 ExecutorService 介面,然後在其基礎上實現了幾個實用的方法,這些方法提供給子類進行調用。

這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這裡的兩個 newTaskFor 方法也比較有用,用於將任務包裝成 FutureTask。定義於最上層介面 Executor中的 voidexecute(Runnablecommand) 由於不需要獲取結果,不會進行 FutureTask 的包裝。

需要獲取結果(FutureTask),用 submit 方法,不需要獲取結果,可以用 execute 方法。

下面,我將一行一行源碼地來分析這個類,跟著源碼來看看其實現吧:

Tips: invokeAny 和 invokeAll 方法佔了這整個類的絕大多數篇幅,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟後的作用,不用擔心會漏掉什麼導致看不懂後面的程式碼。

public abstract class AbstractExecutorService implements ExecutorService {        // RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask      // 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到執行緒池中執行      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {          return new FutureTask<T>(runnable, value);      }        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {          return new FutureTask<T>(callable);      }        // 提交任務      public Future<?> submit(Runnable task) {          if (task == null) throw new NullPointerException();          // 1. 將任務包裝成 FutureTask          RunnableFuture<Void> ftask = newTaskFor(task, null);          // 2. 交給執行器執行,execute 方法由具體的子類來實現          // 前面也說了,FutureTask 間接實現了Runnable 介面。          execute(ftask);          return ftask;      }        public <T> Future<T> submit(Runnable task, T result) {          if (task == null) throw new NullPointerException();          // 1. 將任務包裝成 FutureTask          RunnableFuture<T> ftask = newTaskFor(task, result);          // 2. 交給執行器執行          execute(ftask);          return ftask;      }        public <T> Future<T> submit(Callable<T> task) {          if (task == null) throw new NullPointerException();          // 1. 將任務包裝成 FutureTask          RunnableFuture<T> ftask = newTaskFor(task);          // 2. 交給執行器執行          execute(ftask);          return ftask;      }        // 此方法目的:將 tasks 集合中的任務提交到執行緒池執行,任意一個執行緒執行完後就可以結束了      // 第二個參數 timed 代表是否設置超時機制,超時時間為第三個參數,      // 如果 timed 為 true,同時超時了還沒有一個執行緒返回結果,那麼拋出 TimeoutException 異常      private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,                              boolean timed, long nanos)          throws InterruptedException, ExecutionException, TimeoutException {          if (tasks == null)              throw new NullPointerException();          // 任務數          int ntasks = tasks.size();          if (ntasks == 0)              throw new IllegalArgumentException();          //          List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);            // ExecutorCompletionService 不是一個真正的執行器,參數 this 才是真正的執行器          // 它對執行器進行了包裝,每個任務結束後,將結果保存到內部的一個 completionQueue 隊列中          // 這也是為什麼這個類的名字裡面有個 Completion 的原因吧。          ExecutorCompletionService<T> ecs =              new ExecutorCompletionService<T>(this);          try {              // 用於保存異常資訊,此方法如果沒有得到任何有效的結果,那麼我們可以拋出最後得到的一個異常              ExecutionException ee = null;              long lastTime = timed ? System.nanoTime() : 0;              Iterator<? extends Callable<T>> it = tasks.iterator();                // 首先先提交一個任務,後面的任務到下面的 for 循環一個個提交              futures.add(ecs.submit(it.next()));              // 提交了一個任務,所以任務數量減 1              --ntasks;              // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1)              int active = 1;                for (;;) {                  // ecs 上面說了,其內部有一個 completionQueue 用於保存執行完成的結果                  // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空                  Future<T> f = ecs.poll();                  // 為 null,說明剛剛提交的第一個執行緒還沒有執行完成                  // 在前面先提交一個任務,加上這裡做一次檢查,也是為了提高性能                  if (f == null) {                      if (ntasks > 0) {                          --ntasks;                          futures.add(ecs.submit(it.next()));                          ++active;                      }                      // 這裡是 else if,不是 if。這裡說明,沒有任務了,同時 active 為 0 說明                      // 任務都執行完成了。其實我也沒理解為什麼這裡做一次 break?                      // 因為我認為 active 為 0 的情況,必然從下面的 f.get() 返回了                        // 2018-02-23 感謝讀者 newmicro 的 comment,                      //  這裡的 active == 0,說明所有的任務都執行失敗,那麼這裡是 for 循環出口                      else if (active == 0)                          break;                      // 這裡也是 else if。這裡說的是,沒有任務了,但是設置了超時時間,這裡檢測是否超時                      else if (timed) {                          // 帶等待的 poll 方法                          f = ecs.poll(nanos, TimeUnit.NANOSECONDS);                          // 如果已經超時,拋出 TimeoutException 異常,這整個方法就結束了                          if (f == null)                              throw new TimeoutException();                          long now = System.nanoTime();                          nanos -= now - lastTime;                          lastTime = now;                      }                      // 這裡是 else。說明,沒有任務需要提交,但是池中的任務沒有完成,還沒有超時(如果設置了超時)                      // take() 方法會阻塞,直到有元素返回,說明有任務結束了                      else                          f = ecs.take();                  }                  /*                   * 我感覺上面這一段並不是很好理解,這裡簡單說下。                   * 1. 首先,這在一個 for 循環中,我們設想每一個任務都沒那麼快結束,                   *     那麼,每一次都會進到第一個分支,進行提交任務,直到將所有的任務都提交了                   * 2. 任務都提交完成後,如果設置了超時,那麼 for 循環其實進入了「一直檢測是否超時」                         這件事情上                   * 3. 如果沒有設置超時機制,那麼不必要檢測超時,那就會阻塞在 ecs.take() 方法上,                         等待獲取第一個執行結果                   * 4. 如果所有的任務都執行失敗,也就是說 future 都返回了,                         但是 f.get() 拋出異常,那麼從 active == 0 分支出去(感謝 newmicro 提出)                           // 當然,這個需要看下面的 if 分支。                   */                    // 有任務結束了                  if (f != null) {                      --active;                      try {                          // 返回執行結果,如果有異常,都包裝成 ExecutionException                          return f.get();                      } catch (ExecutionException eex) {                          ee = eex;                      } catch (RuntimeException rex) {                          ee = new ExecutionException(rex);                      }                  }              }// 注意看 for 循環的範圍,一直到這裡                if (ee == null)                  ee = new ExecutionException();              throw ee;            } finally {              // 方法退出之前,取消其他的任務              for (Future<T> f : futures)                  f.cancel(true);          }      }        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)          throws InterruptedException, ExecutionException {          try {              return doInvokeAny(tasks, false, 0);          } catch (TimeoutException cannotHappen) {              assert false;              return null;          }      }        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,                             long timeout, TimeUnit unit)          throws InterruptedException, ExecutionException, TimeoutException {          return doInvokeAny(tasks, true, unit.toNanos(timeout));      }        // 執行所有的任務,返回任務結果。      // 先不要看這個方法,我們先想想,其實我們自己提交任務到執行緒池,也是想要執行緒池執行所有的任務      // 只不過,我們是每次 submit 一個任務,這裡以一個集合作為參數提交      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)          throws InterruptedException {          if (tasks == null)              throw new NullPointerException();          List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());          boolean done = false;          try {              // 這個很簡單              for (Callable<T> t : tasks) {                  // 包裝成 FutureTask                  RunnableFuture<T> f = newTaskFor(t);                  futures.add(f);                  // 提交任務                  execute(f);              }              for (Future<T> f : futures) {                  if (!f.isDone()) {                      try {                          // 這是一個阻塞方法,直到獲取到值,或拋出了異常                          // 這裡有個小細節,其實 get 方法簽名上是會拋出 InterruptedException 的                          // 可是這裡沒有進行處理,而是拋給外層去了。此異常發生於還沒執行完的任務被取消了                          f.get();                      } catch (CancellationException ignore) {                      } catch (ExecutionException ignore) {                      }                  }              }              done = true;              // 這個方法返回,不像其他的場景,返回 List<Future>,其實執行結果還沒出來              // 這個方法返回是真正的返回,任務都結束了              return futures;          } finally {              // 為什麼要這個?就是上面說的有異常的情況              if (!done)                  for (Future<T> f : futures)                      f.cancel(true);          }      }        // 帶超時的 invokeAll,我們找不同吧      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                                           long timeout, TimeUnit unit)          throws InterruptedException {          if (tasks == null || unit == null)              throw new NullPointerException();          long nanos = unit.toNanos(timeout);          List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());          boolean done = false;          try {              for (Callable<T> t : tasks)                  futures.add(newTaskFor(t));                long lastTime = System.nanoTime();                Iterator<Future<T>> it = futures.iterator();              // 提交一個任務,檢測一次是否超時              while (it.hasNext()) {                  execute((Runnable)(it.next()));                  long now = System.nanoTime();                  nanos -= now - lastTime;                  lastTime = now;                  // 超時                  if (nanos <= 0)                      return futures;              }                for (Future<T> f : futures) {                  if (!f.isDone()) {                      if (nanos <= 0)                          return futures;                      try {                          // 調用帶超時的 get 方法,這裡的參數 nanos 是剩餘的時間,                          // 因為上面其實已經用掉了一些時間了                          f.get(nanos, TimeUnit.NANOSECONDS);                      } catch (CancellationException ignore) {                      } catch (ExecutionException ignore) {                      } catch (TimeoutException toe) {                          return futures;                      }                      long now = System.nanoTime();                      nanos -= now - lastTime;                      lastTime = now;                  }              }              done = true;              return futures;          } finally {              if (!done)                  for (Future<T> f : futures)                      f.cancel(true);          }      }    }

到這裡,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟執行緒來執行任務,它們都只是在方法內部調用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實現這個最重要的部分,這裡我們要說的就是 ThreadPoolExecutor 類了。