Java多執行緒-Furetue介面源程式碼詳解

  • 2020 年 2 月 19 日
  • 筆記

一、Furetue介面源程式碼詳解

1.1 Future的應用場景

 不管是繼承thread類重寫run方法還是實現runnable介面實例對象後作為參數輸入至Thread類的構造器中,都無法保證獲取到之前的執行結果。通過實現Callback介面,並用Future可以來接收多執行緒的執行結果,而這就是我在上一篇關於Callable和Runnabe介面對比中的部落格中所提到的,要徹底理解它們兩者之間的差別,必須拿到Future的使用中來。

 Future表示一個可能還沒有完成的非同步任務的結果,針對這個結果可以添加Callback以便在任務執行成功或失敗後作出相應的操作。這裡注意一下非同步以及並行之間的區別:

  1. 並行:其在Java中指的是一般指並行計算,是說同一時刻有多條指令同時被執行,這些指令執行於同一CPU的多核上,或者多個不同CPU上的核上,或者多個物理主機甚至多個網路中。
  2. 非同步:其與同步相對應,非同步指的是讓CPU暫時擱置當前請求的響應(CPU等待某個任務執行完畢等待期間就完全可以進行處理其他請求),處理下一個請求,當通過輪詢或其他方式得到回調通知後,開始運行。多執行緒將非同步操作放入另一執行緒中運行,通過輪詢或回調方法得到完成通知,但是完成埠,由作業系統接管非同步操作的調度,通過硬體中斷,在完成時觸發回調方法,此方式不需要佔用額外執行緒。(說白了就是一個等,一個則會通知,而負責通知的對象就是Future介面的子類對象)

1.2 Future介面所提供的抽象方法

1.boolean cancel(boolean mayInterruptIfRunning);

 此方法用於嘗試將此任務取消。如果當前任務還沒有被執行,那麼此任務就永遠得不到執行。如果當前任務已經開始執行了,那麼會根據參數:mayInterruptIfRunning來決定當前執行緒是否會中斷。如果任務已經完成或者已經取消或者因為某些原因不能被取消,那麼就返回flase,且取消操作失敗;

2.boolean isCancelled();

 判斷當前任務是否被取消了;如果成功取消,那麼返回true;

3.boolean isDone();

判斷當前任務是否被執行完畢,若執行完畢,則返回true;

4.V get() throws InterruptedException, ExecutionException;

 方法可以當任務結束後返回一個結果,如果調用時,工作還沒有結束,則會阻塞執行緒,直到任務執行完畢。如果計算被取消了,那麼返回異常:ancellationException,如果計算本身拋出了異常,則拋出ExecutionException,如果在當前執行緒等待計算完成的過程中當前執行緒被中斷了,則拋出:InterruptedException

5.V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

 這就是給普通的get方法設定了一個時間閾值,如果超時則不再等待當前執行緒。並且也多了一個當時間閾值達到的時候執行緒還未執行完畢會拋出異常:TimeoutException異常的機制。

1.3 Future介面下的繼承關係圖

RunnableFuture 這個介面同時繼承Future介面和Runnable介面,在成功執行run()方法後,可以通過Future訪問執行結果。這個介面都實現類是FutureTask,一個可取消的非同步計算,這個類提供了Future的基本實現;

FutureTask

 能用來包裝一個Callable或Runnable對象,因為它實現了Runnable介面,而且它能被傳遞到Executor進行執行。為了提供單例類,這個類在創建自定義的工作類時提供了protected構造函數。

SchedualFuture 這個介面表示一個延時的行為可以被取消。通常一個安排好的future是定時任務SchedualedExecutorService的結果

CompleteFuture 一個Future類是顯示地完成,而且能被用作一個完成等級,通過它的完成觸發支援的依賴函數和行為。當兩個或多個執行緒要執行完成或取消操作時,只有一個能夠成功。

ForkJoinTask 基於任務的抽象類,可以通過ForkJoinPool來執行。一個ForkJoinTask是類似於執行緒實體,但是相對於執行緒實體是輕量級的。大量的任務和子任務會被ForkJoinPool池中的真實執行緒掛起來,以某些使用限制為代價。


二、FutureTask類重要方法詳解

 由關於Future介面的繼承關係圖我們可知此類實現了RunnableFuture介面,我們還是從父類到子類的分析順序,所以接下來就該分析RunnableFuture介面了:

public interface RunnableFuture<V> extends Runnable, Future<V> {      /**       * Sets this Future to the result of its computation       * unless it has been cancelled.       */      void run();  }

 由上述程式碼可知,RunnableFuture只是繼承了兩個介面Runnable以及Future,沒有實現父介面的任何方法(這是使用繼承而不是實現的原因)。而FutureTask類又是實現了RunnableFuture介面,在Java中又是允許類實現多個介面的,所以完全可以認為FutureTask實現了Runnable以及Future兩個介面。下面就開始解讀FutureTask的JDK源碼。

**注意事項:**由於FutureTask類實現了Runnable介面,所以所有FutureTask對象總是能夠被作為參數傳入要求為Runnable介面的方法:比如說Thread的構造方法以及執行緒池對象的sunmmit()方法。

2.1 類中的屬性值分析

首先我們來看此類中的一些總要狀態屬性值:

     * Possible state transitions:       * NEW -> COMPLETING -> NORMAL       * NEW -> COMPLETING -> EXCEPTIONAL       * NEW -> CANCELLED       * NEW -> INTERRUPTING -> INTERRUPTED       */      private volatile int state;      private static final int NEW          = 0;      private static final int COMPLETING   = 1;      private static final int NORMAL       = 2;      private static final int EXCEPTIONAL  = 3;      private static final int CANCELLED    = 4;      private static final int INTERRUPTING = 5;      private static final int INTERRUPTED  = 6;

注意事項:一個FutureTask對象只能最多包含一個任務,其內部的等待隊列節點是描述等待此任務完成而執行緒阻塞的執行緒節點,而不是放置任務的,在2.2節中會詳細描述。

狀態名

含義

int值

state

當前任務的狀態

取以下幾種值大小

NEW

準備開始執行任務的狀態

0

COMPLETING

正在進行任務處理中

1

NORMAL

任務執行結束,並且任務處理結果正常,沒有異常出現

2

EXCEPTIONAL

執行任務的過程中出現了異常

3

CANCELLED

任務被取消了

4

INTERRUPTING

任務在執行過程中被中斷,是一個中間狀態

5

INTERRUPTED

中斷結束

6

JDK注釋給出了四種可能的任務狀態:

  1. NEW(準備開始執行任務的狀態)-> COMPLETING(正在進行任務處理中)-> NORMAL(任務執行結束,並且任務處理結果正常,沒有異常出現)
  2. NEW -> COMPLETING -> EXCEPTIONAL(執行任務的過程中出現了異常)
  3. NEW -> CANCELLED(任務被取消了)
  4. NEW -> INTERRUPTING(任務在執行過程中被中斷,是一個中間狀態) -> INTERRUPTED(中斷結束)

2.2 WaitNode以及等待機制案例分析

 jdk源程式碼:

    /**       * Simple linked list nodes to record waiting threads in a Treiber       * stack.  See other classes such as Phaser and SynchronousQueue       * for more detailed explanation.       */      static final class WaitNode {          volatile Thread thread;          volatile WaitNode next;          //注意其構造器,thread指向了調用此方法的當前執行緒對象          WaitNode() { thread = Thread.currentThread(); }      }

 上述程式碼表示的是如果有等待當前FutureTask計算完成的執行緒節點,這些多個節點以單向鏈表的形式構成。接下來就讓我們使用案例說明:如果當前FutureTask對象中所含的唯一任務沒有被執行完畢,而其他執行緒調用了FutureTask任務,那麼調用的調查其就會進行等待,直至當前FutureTask對象的計算完成比如以下的案例:

import java.util.concurrent.*;    /**   * @author Fisherman   * @date 2019/9/19   */  public class TestWaittingByGetting {        public static void main(String[] args) {          int totalSum = 0;            MyCallable integerMyCallable1 = new MyCallable(1, 30, "c1");          MyCallable integerMyCallable2 = new MyCallable(31, 60, "c2");          MyCallable integerMyCallable3 = new MyCallable(61, 100, "c3");            FutureTask<Integer> integerFutureTask1 = new FutureTask<>(integerMyCallable1);          FutureTask<Integer> integerFutureTask2 = new FutureTask<>(integerMyCallable2);          FutureTask<Integer> integerFutureTask3 = new FutureTask<>(integerMyCallable3);            new Thread(integerFutureTask1).start();          new Thread(integerFutureTask2).start();          new Thread(integerFutureTask3).start();            try {              totalSum = integerFutureTask1.get() + integerFutureTask2.get() + integerFutureTask3.get();          } catch (InterruptedException e) {              e.printStackTrace();          } catch (ExecutionException e) {              e.printStackTrace();          }          System.out.println(totalSum);      }          static class MyCallable implements Callable<Integer> {          private int start;          private int end;          private String name;            public MyCallable(int start, int end, String name) {              this.start = start;              this.end = end;              this.name = name;          }            @Override          public Integer call() throws Exception {                Thread.sleep(1000);              int sum = 0;                for (int i = start; i <= end; i++) {                  sum += i;              }              System.out.println(toString() + "執行完畢,可以返回計算的值了!");              return sum;          }            @Override          public String toString() {              return name;          }      }    }

 控制台輸出:(注意事項:執行緒執行完畢的順序並不一定是這般)

c2執行完畢,可以返回計算的值了!  c1執行完畢,可以返回計算的值了!  c3執行完畢,可以返回計算的值了!  5050

 我們以此證明了FutureTask對象交給Thread構造器後進行執行,或者交給執行緒池進行執行,main執行緒在調用FutureTask對象.get()方法,如果計算過程未完成,會導致main執行緒等待此結果計算出。如果從另一個方面來閱讀這個例子,可以發現此例是FutureJoinPool的基本模型,如果你把最大的任務是計算1-100的整數和,並最終將其輸出,那麼此任務本質上就是將0-100的任務分接為三個小任務:0-30,31-60,61-100來計算。如果我們進一步將小任務分配給多個核去執行,那麼就相當於實現了FutureJoinPool的重要並行運行功能。正是因為FutureTask對象.get()方法調用在未完成計算的時候會導致當前執行緒等待,所以它是實現FutureJoinPool類實現的重要依據。

2.3 FutureTask類的構造器分析

 FutureTask類有兩個構造器,一個參數入口是Runnable對象,另一個參數入口是Callable對象。

1.入口參數為Runnable對象的構造器

public FutureTask(Runnable runnable, V result) {          this.callable = Executors.callable(runnable, result);          this.state = NEW;       // ensure visibility of callable      }

 通過調用靜態方法Executors.callable(runnable, result);將runnable對象轉換為Callable對象,其中方法中result參數代表著runnbale方法將會計算出的結果值;

2.入口參數為Callable對象的構造器

public FutureTask(Callable<V> callable) {          if (callable == null)              throw new NullPointerException();          this.callable = callable;          this.state = NEW;       // ensure visibility of callable      }

 可見無論入口參數為何種類型,最終總是統一地將其轉換或直接賦值給內部的Callable參數進行後續執行。

2.4 get()方法

jdk1.8源程式碼:

    /**       * @throws CancellationException {@inheritDoc}       */      public V get() throws InterruptedException, ExecutionException {//此方法會拋出執行異常          int s = state;//得到當前FutureTask對象的狀態          if (s <= COMPLETING)//說明當前任務未執行完畢(可能正在執行,可能還沒準備好執行)              s = awaitDone(false, 0L);//那麼就調用awaitDone方法,得到更像的狀態s          return report(s);//將狀態s作為入口參數調用report方法      }

2.5 get(long timeout, TimeUnit unit)方法

jdk1.8源程式碼:

    /**       * @throws CancellationException {@inheritDoc}       */      public V get(long timeout, TimeUnit unit)          throws InterruptedException, ExecutionException, TimeoutException {          //此方法會拋出中斷異常、執行異常、取消異常          if (unit == null)//如果時間為空,那麼就拋出空指針異常              throw new NullPointerException();          int s = state;//得到當前FutureTask對象的狀態          if (s <= COMPLETING &&//如果未完成,那麼才會進行等待,即調用awaitDone方法              (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)              //如果超時了,等待返回的狀態表面當前任務還是沒有執行完畢,那麼就拋出超時異常              throw new TimeoutException();          return report(s);//這裡只有任務完成了,才會將狀態值作為參數入口調用report方法,並返回      }

 可以看到get()方法和get(long timeout, TimeUnit unit)實際上內部邏輯類似,但是需要注意的一點是在Java中邏輯&的運算順序,a&&b的邏輯判斷過程中只有a為真,那麼才會繼續判斷b是否為真,如果a==false,那麼就直接返回false,就不在繼續判斷b是否為真了。所以要特別主要只有s <= COMPLETING成立,才會調用awaitDone(true, unit.toNanos(timeout))方法,這裡隱藏的if邏輯是要特別注意

2.6 awaitDone()方法

jdk1.8源程式碼:

    /**       * Awaits completion or aborts on interrupt or timeout.       * 此方法會導致等待直至任務完成或者此方法在中斷和時間結束和被廢棄;       * @param timed true if use timed waits  timed為true意味著調用了時間閾值等待       * @param nanos time to wait, if timed //如果需要計時,那麼這就是時間單位       * @return state upon completion //返回狀態,直到任務完成       */      private int awaitDone(boolean timed, long nanos)          throws InterruptedException {          //如果timed為false,那麼時間為0L意味著沒有時間等待機制;否則deadline等於當前系統時間加上輸入的等待閾值時間nanos;          final long deadline = timed ? System.nanoTime() + nanos : 0L;            WaitNode q = null;//創建一個等待節點,但是使其指向為null          boolean queued = false;          for (;;) {//進入自旋              if (Thread.interrupted()) {                  //如果當前執行緒中斷了,那麼就將當前在等待隊列中的節點移出隊列,並拋出中斷異常                  removeWaiter(q);                  throw new InterruptedException();              }                int s = state;              if (s > COMPLETING) {//如果當前FutureTask對象的狀態已經是完成狀態,或是取消狀態                  if (q != null) //如果當前等待節點不會空,則將等待節點中的執行緒指向置為null                      q.thread = null;                  return s;//結束自旋,返回FutureTask任務執行的狀態              }              else if (s == COMPLETING) // cannot time out yet                  Thread.yield();//當前執行緒調用yield方法,在FutureTask的任務處於正在執行狀態              else if (q == null)//如果等待節點為null,那麼引向一個等待節點。                  q = new WaitNode();              else if (!queued)//如果當前等待節點還沒有插入到隊列中,那麼利用CAS機制插入等待隊列                  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                       q.next = waiters, q);              else if (timed) {//如果模式是計時的,就進行下面的執行邏輯                  nanos = deadline - System.nanoTime();//得到還有多久達到規定的截止時間                  if (nanos <= 0L) {                      //如果達到截止時間了,那麼使當前執行緒的等待節點出列,並返回FutureTask的狀態                      removeWaiter(q);                      return state;                  }                  LockSupport.parkNanos(this, nanos);//時間未到,那麼有時間限制地使當前執行緒休眠              }              else//這裡是沒有時間閾值調用的awaitDone方法的執行邏輯,直接使當前執行緒對象沒有時間限制地休眠                  LockSupport.park(this);          }      }

注意事項:

  1. 有上述程式碼的邏輯可知,如果當前執行緒中斷標誌位被置為true時調用了awaitDone()方法,那麼會導致當前執行緒直接跳過等待過程,並且拋出異常,但是FutureTask對象中的任務並不會因此而取消,而是繼續執行;
  2. 上述程式碼中當FutureTask對象的狀態滿足:s==COMPLETING時,說明當前FutureTask的任務處於正在執行狀態,所以調用Thread.yield(),是為了嘗試讓當前執行緒讓出資源CPU資源(從運行狀態轉為就緒狀態),而不會使當前執行緒休眠。如果當前執行緒再次搶佔到CPU資源,那麼又會開始自旋判斷。所以可以得出一個結論,如果第一次調用awaitDone方法時,FutureTask狀態為0,才會進入等待隊列,並使當前執行緒掛起,否則只會進入狀態為COMPLETING的自旋;
  3. if (s > COMPLETING) {//如果當前FutureTask對象的狀態已經是完成狀態,或是取消狀態 if (q != null) //如果當前等待節點不會空,則將等待節點中的執行緒指向置為null q.thread = null; return s;//結束自旋,返回FutureTask任務執行的狀態 } 此處程式碼之所以沒有使當前等待節點出列的操作,是由於如果其並沒有入隊,並且通過使其參數指向null操作,最終會被JVM回收;如果其已經入隊了,那麼則會由finishCompletion()負責移除節點,而此方法不會進行此操作,而是交給finishCompletion()方法來完成這個工作;

2.7 report(int s)方法

jdk1.8源程式碼:

    /**       * Returns result or throws exception for completed task.       *       * @param s completed state value       */      @SuppressWarnings("unchecked")      private V report(int s) throws ExecutionException {          Object x = outcome;          if (s == NORMAL)//如果任務被正常執行了,則返回任務的執行結果              return (V)x;          if (s >= CANCELLED)//如果任務被取消了,則拋出任務取消異常              throw new CancellationException();          throw new ExecutionException((Throwable)x);//除了以上的異常錯誤,那麼就是判斷為執行異常,並拋出;      }

2.8 finishCompletion()方法

jdk1.8源程式碼:

    /**       * Removes and signals all waiting threads, invokes done(), and       * nulls out callable.       */      private void finishCompletion() {          // assert state > COMPLETING;          for (WaitNode q; (q = waiters) != null;) {              if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                  for (;;) {                      Thread t = q.thread;//得到等待隊列第一個節點對象所指向的執行緒對象                      //如果執行緒對象不為空,是當前節點中的執行緒對象指向null,並喚醒該執行緒                      if (t != null) {                          q.thread = null;                          LockSupport.unpark(t);                      }                      WaitNode next = q.next;//得到下一個節點對象                      //如果下一個節點對象為null,表明已經到了隊列末尾,那麼跳出自旋                      if (next == null)                          break;                      q.next = null; // unlink to help gc,就是為了GC方便垃圾回收此對象q                      q = next;//q指向下一個等待節點,進行下一輪的自旋                  }                  break;              }          }            done();            callable = null;        // to reduce footprint      }

 方法finishCompletion()作用是在FutureTask任務後,將FutureTask對象中等待隊列中的執行緒對象喚醒,並且清空等待隊列中的節點。done()方法是提供給子類重寫的,其本身實現為空,這樣一來每次任務執行完畢都能夠進行執行done()方法。所以在2.2中提供的案例完全可以改成如下形式:


三、state狀態值更新的詳解

 實際上在第二章中講述了許多方法都是一個執行緒嘗試調用FutureTask的get(),嘗試獲得其內部任務執行結果過程中可能涉及的相關方法,實際上還有很多其他方法可以學習,但是實現邏輯很類似,的確沒有必要一一列舉。但是FutureTask對象狀態如何改變一定是不同的,相信讀者朋友和我一樣都很好奇其是如何實現的,下面我們就以正常任務的執行,不涉及取消cancel()方法的調用為例來分析此過程。

3.1FutureTask中的CAS機制學習

 首先我們需要學一下CAS機制,這裡只談用法,不談內部cpp實現,所以並不需要額外的知識儲備,但是我們得理解為何如此調用CAS方法,如此調用CAS方法的內部執行邏輯是什麼:

    // Unsafe mechanics      private static final sun.misc.Unsafe UNSAFE;      private static final long stateOffset;      private static final long runnerOffset;      private static final long waitersOffset;      static {          try {              UNSAFE = sun.misc.Unsafe.getUnsafe();              Class<?> k = FutureTask.class;              stateOffset = UNSAFE.objectFieldOffset                  (k.getDeclaredField("state"));              runnerOffset = UNSAFE.objectFieldOffset                  (k.getDeclaredField("runner"));              waitersOffset = UNSAFE.objectFieldOffset                  (k.getDeclaredField("waiters"));          } catch (Exception e) {              throw new Error(e);          }      }

上面是Future』Task對象中CAS機制的初始化工作,利用Java反射機制完成了系列指向工作:

  1. stateOffset在方法compareAndSwapObject()代表對象當前的state屬性
  2. runnerOffset在方法compareAndSwapObject()代表對象當前的runner屬性
  3. waitersOffset在方法compareAndSwapObject()代表對象當前的的waiters屬性
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

這是CAS機制的設置屬性的方法之一,下面解釋各個方法入口的含義:

var1 操作的對象  var2 操作的對象屬性  var4 var4與var2比較,如果當前對象屬性var2正好等於var4,那麼才會將其更新為var5  var5 更新值

3.2 以set/setException兩類方法分析FutureTask對象狀態更新的原理

正常的執行緒任務運行過程中狀態變換一定是如此順序:

NEW -> COMPLETING -> NORMAL

 首先,New一定是發生在FutureTask對象的構造器中出現,下面是其源程式碼:

    public FutureTask(Callable<V> callable) {          if (callable == null)              throw new NullPointerException();          this.callable = callable;          this.state = NEW;       // ensure visibility of callable      }

 由於同一個對象的構造器只能在一個執行緒中被調用,所以一定是執行緒安全的,所以就沒有使用CAS機制來確保狀態屬性state的賦值。

 其次,NEW -> COMPLETING的變化一定是run方法被調用時,run方法需要第一時間進行的操作,下面為run方法的jdk1.8源程式碼:

public void run() {              if (state != NEW ||              !UNSAFE.compareAndSwapObject(this, runnerOffset,                                           null, Thread.currentThread()))              return;              try {              Callable<V> c = callable;              if (c != null && state == NEW) {                  V result;                  boolean ran;                  try {                      result = c.call();                      ran = true;                  } catch (Throwable ex) {                      result = null;                      ran = false;                      setException(ex);                  }                  if (ran)                      set(result);              }          } finally {              // runner must be non-null until state is settled to              // prevent concurrent calls to run()              runner = null;              // state must be re-read after nulling runner to prevent              // leaked interrupts              int s = state;              if (s >= INTERRUPTING)                  handlePossibleCancellationInterrupt(s);          }      }

注意事項:

  1. 作為一個FutureTask對象,有可能同一時間被多個執行緒調用其run方法,而為了確保只能有一個執行緒啟動它內部的任務,其餘執行緒都沒有成功調用run方法,所以使用了以下的CAS機制,把FutureTask對象的runner更新為當前執行緒: if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return;
  2. 如果在result = c.call();方法執行過程中,那麼就調用setException(ex);來進行FutureTask對象的狀態值更新,否則使用set方法進行狀態值的更新(利用了布爾值ran進行相關控制);
  3. finnaly{}囊括了必定會執行的程式碼,主要是runner=null,以防止泄露中斷,以及調用 handlePossibleCancellationInterrupt(s);方法進行中斷處理;

我們在分別來看set方法以及setEXception方法的jdk源程式碼:首先是set方法的jdk源碼:

 /**       * Sets the result of this future to the given value unless       * this future has already been set or has been cancelled.       *       * <p>This method is invoked internally by the {@link #run} method       * upon successful completion of the computation.       *       * @param v the value       */      protected void set(V v) {          if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {              outcome = v;              UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state              finishCompletion();          }      }

set();方法入口參數為c.call()方法執行的結果,即任務執行的結果。

 但是一個令人深思的問題是:**為何此處執行緒狀態的更新是執行緒不安全的,需要CAS機制來保證執行緒的安全執行?**這是由於如果當set方法是可以被其他方法調用的,不僅僅是run方法調用,比如說done方法;

執行步驟:

  1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING): 使當前FutureTask對象狀態更新:NEW -> COMPLETING
  2. outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); 若1返回flase則什麼都不做,否則得到run方法的指向結果賦值給對象的全局變數outcome供report方法調用,即會被get方法調用;接著更新FutureTask對象的狀態值,不加檢驗地將狀態值變為NORMAL;最後調用finishCompletion進行等待隊列的喚醒以及釋放節點記憶體資源。
    /**       * Causes this future to report an {@link ExecutionException}       * with the given throwable as its cause, unless this future has       * already been set or has been cancelled.       *       * <p>This method is invoked internally by the {@link #run} method       * upon failure of the computation.       *       * @param t the cause of failure       */      protected void setException(Throwable t) {          if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {              outcome = t;              UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state              finishCompletion();          }      }

執行步驟:

  1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING): 使當前FutureTask對象狀態更新:NEW -> COMPLETING
  2. outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); 若1返回flase則什麼都不做,否則得到run方法的指向結果賦值給對象的全局變數outcome供report方法調用,即會被get方法調用;接著更新FutureTask對象的狀態值,不加檢驗地將狀態值變為EXCEPTIONAL;最後調用finishCompletion進行等待隊列的喚醒以及釋放節點記憶體資源。

狀態更新的小結:

  1. 非取消情況下FutureTask狀態的更新的確有:NEW -> COMPLETING -> NORMAL/EXCEPTIONAL,但是可以看到,中間狀態COMPLETING是抓轉瞬即逝的;
  2. set以及setException方法的內部邏輯十分相似,關鍵是CAS操作的理解。

四、Future和FutureTask小結

相信如果這般閱讀Future介面以及FutureTask類的源程式碼,一定對Future設計模式有所理解。下面做個小總結。

 Future介面的關鍵是其要求其子類實現get()方法,要求某個執行緒調用子類對象的get()方法時若值當前不可得,那麼就阻塞那個執行緒,直至計算結果便可得。

 FutureTask類實現了Future介面,即實現了上述Future介面所要求的get性質,藉此性質我們可以來實現大任務變小任務,最後匯總的操作(此文中中的2.2小節就舉了這個例子)。按照此思路,Fork/Join框架,ForkJoinPool類的負責執行任務的最小單元就是FutureTask類對象。詳細的ForkJoinPool類和Fork/Join框架請看下一章分析。