死磕 java執行緒系列之執行緒池深入解析——未來任務執行流程

  • 2019 年 11 月 4 日
  • 筆記

threadpool_futuretask

(手機橫屏看源碼更方便)


註:java源碼分析部分如無特殊說明均基於 java8 版本。

註:執行緒池源碼部分如無特殊說明均指ThreadPoolExecutor類。

簡介

前面我們一起學習了執行緒池中普通任務的執行流程,但其實執行緒池中還有一種任務,叫作未來任務(future task),使用它您可以獲取任務執行的結果,它是怎麼實現的呢?

建議學習本章前先去看看彤哥之前寫的《死磕 java執行緒系列之自己動手寫一個執行緒池(續)》,有助於理解本章的內容,且那邊的程式碼比較短小,學起來相對容易一些。

問題

(1)執行緒池中的未來任務是怎麼執行的?

(2)我們能學到哪些比較好的設計模式?

(3)對我們未來學習別的框架有什麼幫助?

來個栗子

我們還是從一個例子入手,來講解來章的內容。

我們定義一個執行緒池,並使用它提交5個任務,這5個任務分別返回0、1、2、3、4,在未來的某一時刻,我們再取用它們的返回值,做一個累加操作。

public class ThreadPoolTest02 {      public static void main(String[] args) throws ExecutionException, InterruptedException {          // 新建一個固定5個執行緒的執行緒池          ExecutorService threadPool = Executors.newFixedThreadPool(5);            List<Future<Integer>> futureList = new ArrayList<>();          // 提交5個任務,分別返回0、1、2、3、4          for (int i = 0; i < 5; i++) {              int num = i;                // 任務執行的結果用Future包裝              Future<Integer> future = threadPool.submit(() -> {                  try {                      Thread.sleep(1000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }                  System.out.println("return: " + num);                  // 返回值                  return num;              });                // 把future添加到list中              futureList.add(future);          }            // 任務全部提交完再從future中get返回值,並做累加          int sum = 0;          for (Future<Integer> future : futureList) {              sum += future.get();          }            System.out.println("sum=" + sum);      }  }

這裡我們思考兩個問題:

(1)如果這裡使用普通任務,要怎麼寫,時間大概是多少?

如果使用普通任務,那麼就要把累加操作放到任務裡面,而且並不是那麼好寫(final的問題),總時間大概是1秒多一點。但是,這樣有一個缺點,就是累加操作跟任務本身的內容耦合到一起了,後面如果改成累乘,還要修改任務的內容。

(2)如果這裡把future.get()放到for循環裡面,時間大概是多少?

這個問題我們先不回答,先來看源碼分析。

submit()方法

submit方法,它是提交有返回值任務的一種方式,內部使用未來任務(FutureTask)包裝,再交給execute()去執行,最後返回未來任務本身。

public <T> Future<T> submit(Callable<T> task) {      // 非空檢測      if (task == null) throw new NullPointerException();      // 包裝成FutureTask      RunnableFuture<T> ftask = newTaskFor(task);      // 交給execute()方法去執行      execute(ftask);      // 返回futureTask      return ftask;  }  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {      // 將普通任務包裝成FutureTask      return new FutureTask<T>(callable);  }

這裡的設計很巧妙,實際上這兩個方法都是在AbstractExecutorService這個抽象類中完成的,這是模板方法的一種運用。

我們來看看FutureTask的繼承體系:

threadpool_futuretask

FutureTask實現了RunnableFuture介面,而RunnableFuture介面組合了Runnable介面和Future介面的能力,而Future介面提供了get任務返回值的能力。

問題:submit()方法返回的為什麼是Future介面而不是RunnableFuture介面或者FutureTask類呢?

答:這是因為submit()返回的結果,對外部調用者只想暴露其get()的能力(Future介面),而不想暴露其run()的能力(Runaable介面)。

FutureTask類的run()方法

經過上一章的學習,我們知道execute()方法最後調用的是task的run()方法,上面我們傳進去的任務,最後被包裝成了FutureTask,也就是說execute()方法最後會調用到FutureTask的run()方法,所以我們直接看這個方法就可以了。

public void run() {      // 狀態不為NEW,或者修改為當前執行緒來運行這個任務失敗,則直接返回      if (state != NEW ||          !UNSAFE.compareAndSwapObject(this, runnerOffset,                                       null, Thread.currentThread()))          return;        try {          // 真正的任務          Callable<V> c = callable;          // state必須為NEW時才運行          if (c != null && state == NEW) {              // 運行的結果              V result;              boolean ran;              try {                  // 任務執行的地方【本文由公從號「彤哥讀源碼」原創】                  result = c.call();                  // 已執行完畢                  ran = true;              } catch (Throwable ex) {                  result = null;                  ran = false;                  // 處理異常                  setException(ex);              }              if (ran)                  // 處理結果                  set(result);          }      } finally {          // 置空runner          runner = null;          // 處理中斷          int s = state;          if (s >= INTERRUPTING)              handlePossibleCancellationInterrupt(s);      }  }

可以看到程式碼也比較簡單,先做狀態的檢測,再執行任務,最後處理結果或異常。

執行任務這裡沒啥問題,讓我們看看處理結果或異常的程式碼。

protected void setException(Throwable t) {      // 將狀態從NEW置為COMPLETING      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {          // 返回值置為傳進來的異常(outcome為調用get()方法時返回的)          outcome = t;          // 最終的狀態設置為EXCEPTIONAL          UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state          // 調用完成方法          finishCompletion();      }  }  protected void set(V v) {      // 將狀態從NEW置為COMPLETING      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {          // 返回值置為傳進來的結果(outcome為調用get()方法時返回的)          outcome = v;          // 最終的狀態設置為NORMAL          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state          // 調用完成方法          finishCompletion();      }  }

咋一看,這兩個方法似乎差不多,不同的是出去的結果不一樣且狀態不一樣,最後都調用了finishCompletion()方法。

private void finishCompletion() {      // 如果隊列不為空(這個隊列實際上為調用者執行緒)      for (WaitNode q; (q = waiters) != null;) {          // 置空隊列          if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {              for (;;) {                  // 調用者執行緒                  Thread t = q.thread;                  if (t != null) {                      q.thread = null;                      // 如果調用者執行緒不為空,則喚醒它                      // 【本文由公從號「彤哥讀源碼」原創】                      LockSupport.unpark(t);                  }                  WaitNode next = q.next;                  if (next == null)                      break;                  q.next = null; // unlink to help gc                  q = next;              }              break;          }      }      // 鉤子方法,子類重寫      done();      // 置空任務      callable = null;        // to reduce footprint  }

整個run()方法總結下來:

(1)FutureTask有一個狀態state控制任務的運行過程,正常運行結束state從NEW->COMPLETING->NORMAL,異常運行結束state從NEW->COMPLETING->EXCEPTIONAL;

(2)FutureTask保存了運行任務的執行緒runner,它是執行緒池中的某個執行緒;

(3)調用者執行緒是保存在waiters隊列中的,它是什麼時候設置進去的呢?

(4)任務執行完畢,除了設置狀態state變化之外,還要喚醒調用者執行緒。

調用者執行緒是什麼時候保存在FutureTask中(waiters)的呢?查看構造方法:

public FutureTask(Callable<V> callable) {      if (callable == null)          throw new NullPointerException();      this.callable = callable;      this.state = NEW;       // ensure visibility of callable  }

發現並沒有相關資訊,我們再試想一下,如果調用者不調用get()方法,那麼這種未來任務是不是跟普通任務沒有什麼區別?確實是的哈,所以只有調用get()方法了才有必要保存調用者執行緒到FutureTask中。

所以,我們來看看get()方法中是什麼鬼。

FutureTask類的get()方法

get()方法調用時如果任務未執行完畢,會阻塞直到任務結束。

public V get() throws InterruptedException, ExecutionException {      int s = state;      // 如果狀態小於等於COMPLETING,則進入隊列等待      if (s <= COMPLETING)          s = awaitDone(false, 0L);      // 返回結果(異常)      return report(s);  }

是不是很清楚,如果任務狀態小於等於COMPLETING,則進入隊列等待。

private int awaitDone(boolean timed, long nanos)      throws InterruptedException {      // 我們這裡假設不帶超時      final long deadline = timed ? System.nanoTime() + nanos : 0L;      WaitNode q = null;      boolean queued = false;      for (;;) {          // 處理中斷          if (Thread.interrupted()) {              removeWaiter(q);              throw new InterruptedException();          }          // 4. 如果狀態大於COMPLETING了,則跳出循環並返回          // 這是自旋的出口          int s = state;          if (s > COMPLETING) {              if (q != null)                  q.thread = null;              return s;          }          // 如果狀態等於COMPLETING,說明任務快完成了,就差設置狀態到NORMAL或EXCEPTIONAL和設置結果了          // 這時候就讓出CPU,優先完成任務          else if (s == COMPLETING) // cannot time out yet              Thread.yield();          // 1. 如果隊列為空          else if (q == null)              // 初始化隊列(WaitNode中記錄了調用者執行緒)              q = new WaitNode();          // 2. 未進入隊列          else if (!queued)              // 嘗試入隊              queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                   q.next = waiters, q);          // 超時處理          else if (timed) {              nanos = deadline - System.nanoTime();              if (nanos <= 0L) {                  removeWaiter(q);                  return state;              }              LockSupport.parkNanos(this, nanos);          }          // 3. 阻塞當前執行緒(調用者執行緒)          else              // 【本文由公從號「彤哥讀源碼」原創】              LockSupport.park(this);      }  }

這裡我們假設調用get()時任務還未執行,也就是其狀態為NEW,我們試著按上面標示的1、2、3、4走一遍邏輯:

(1)第一次循環,狀態為NEW,直接到1處,初始化隊列並把調用者執行緒封裝在WaitNode中;

(2)第二次循環,狀態為NEW,隊列不為空,到2處,讓包含調用者執行緒的WaitNode入隊;

(3)第三次循環,狀態為NEW,隊列不為空,且已入隊,到3處,阻塞調用者執行緒;

(4)假設過了一會任務執行完畢了,根據run()方法的分析最後會unpark調用者執行緒,也就是3處會被喚醒;

(5)第四次循環,狀態肯定大於COMPLETING了,退出循環並返回;

問題:為什麼要在for循環中控制整個流程呢,把這裡的每一步單獨拿出來寫行不行?

答:因為每一次動作都需要重新檢查狀態state有沒有變化,如果拿出去寫也是可以的,只是程式碼會非常冗長。這裡只分析了get()時狀態為NEW,其它的狀態也可以自行驗證,都是可以保證正確的,甚至兩個執行緒交叉運行(斷點的技巧)。

OK,這裡返回之後,再看看是怎麼處理最終的結果的。

private V report(int s) throws ExecutionException {      Object x = outcome;      // 任務正常結束      if (s == NORMAL)          return (V)x;      // 被取消了      if (s >= CANCELLED)          throw new CancellationException();      // 執行異常      throw new ExecutionException((Throwable)x);  }

還記得前面分析run的時候嗎,任務執行異常時是把異常放在outcome裡面的,這裡就用到了。

(1)如果正常執行結束,則返回任務的返回值;

(2)如果異常結束,則包裝成ExecutionException異常拋出;

通過這種方式,執行緒中出現的異常也可以返回給調用者執行緒了,不會像執行普通任務那樣調用者是不知道任務執行到底有沒有成功的。

其它

FutureTask除了可以獲取任務的返回值以外,還能夠取消任務的執行。

public boolean cancel(boolean mayInterruptIfRunning) {      if (!(state == NEW &&            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,                mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))          return false;      try {    // in case call to interrupt throws exception          if (mayInterruptIfRunning) {              try {                  Thread t = runner;                  if (t != null)                      t.interrupt();              } finally { // final state                  UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);              }          }      } finally {          finishCompletion();      }      return true;  }

這裡取消任務是通過中斷執行執行緒來處理的,有興趣的同學可以自己分析一下。

回答開篇

如果這裡把future.get()放到for循環裡面,時間大概是多少?

答:大概會是5秒多一點,因為每提交一個任務,都要阻塞調用者執行緒直到任務執行完畢,每個任務執行都是1秒多,所以總時間就是5秒多點。

總結

(1)未來任務是通過把普通任務包裝成FutureTask來實現的。

(2)通過FutureTask不僅能夠獲取任務執行的結果,還有感知到任務執行的異常,甚至還可以取消任務;

(3)AbstractExecutorService中定義了很多模板方法,這是一種很重要的設計模式;

(4)FutureTask其實就是典型的異常調用的實現方式,後面我們學習到Netty、Dubbo的時候還會見到這種設計思想的。

彩蛋

RPC框架中非同步調用是怎麼實現的?

答:RPC框架常用的調用方式有同步調用、非同步調用,其實它們本質上都是非同步調用,它們就是用FutureTask的方式來實現的。

一般地,通過一個執行緒(我們叫作遠程執行緒)去調用遠程介面,如果是同步調用,則直接讓調用者執行緒阻塞著等待遠程執行緒調用的結果,待結果返回了再返回;如果是非同步調用,則先返回一個未來可以獲取到遠程結果的東西FutureXxx,當然,如果這個FutureXxx在遠程結果返回之前調用了get()方法一樣會阻塞著調用者執行緒。

有興趣的同學可以先去預習一下dubbo的非同步調用(它是把Future扔到RpcContext中的)。


歡迎關注我的公眾號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一起暢遊源碼的海洋。

qrcode