FutrueTask原理及源碼分析

  • 2019 年 10 月 3 日
  • 筆記

1.前言

相信很多人了解到FutureTask是因為ThreadPoolExecutor.submit方法,根據ThreadPoolExecutor.submit的使用,我們可以先猜一下FutureTask的原理。

public static void main(String[] args) throws ExecutionException, InterruptedException {

FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(3);
return 1;
}
});
new Thread(futureTask).start();

System.out.println(futureTask.get());
}

上面這個程式碼會在啟動後三秒列印出1,FutureTask.get()方法調用時會直到Callable中的程式碼執行完才會返回,所以FutureTask需要在這裡阻塞。因為可能多個執行緒進行get,所以需要一個阻塞隊列。

如果Callable三秒執行完,調用方過了五秒才調用get的話,FutureTask就需要把Callable中的執行結果存起來,並且也要把異常catch住存起來,所以需要一個變數存放結果。使用一個api然後想去研究它的原理,源碼時,其實可以

先想一下,它可能是怎麼做的,如果是我寫應該怎樣設計,這樣能提高自己的設計能力。

2.原理

FutureTask的原理其實和前言中的猜想類似,下面簡述一下FutureTask的原理。

FutureTask有兩個非常重要的方法,run方法和get方法,run方法是實現了Runnable然後在run裡面跑Callable的程式碼,

get方法就是我們常用的獲取數據的方法。run方法運行Callable中的程式碼然後catch住異常,然後將正常結果或者異常結果

存起來,並且喚醒因為調用get方法阻塞的執行緒。get方法是去判斷是否已經計算出結果,如果計算完成,返回結果否則進行

阻塞。

3.源碼分析

建議大家在閱讀源碼時,先看一下文檔,雖然文檔是英文的,但是自己讀一下搭配翻譯看懂應該不難,這裡給大家介紹一個IDEA的功能,點擊View->QuickDocumentation能讓文檔讀起來更加方便。

下面我就分析一下源碼:

  
   FutureTask中的狀態維護      private volatile int state;      private static final int NEW          = 0; //初始狀態      private static final int COMPLETING   = 1; //執行完成但是執行結果沒有保存      private static final int NORMAL       = 2; //執行完成並且保存了結果      private static final int EXCEPTIONAL  = 3; //出現了異常      private static final int CANCELLED    = 4; //取消      private static final int INTERRUPTING = 5; //打斷中,可以進行打斷執行緒了      private static final int INTERRUPTED  = 6; //執行緒已經被置成打斷狀態        private Callable<V> callable; //入參        private Object outcome; //執行成功結果保存到這個變數        private volatile Thread runner; //正在執行的執行緒        private volatile WaitNode waiters;//等待隊列
 如果你嘗試用idea追蹤者這些變數在哪裡賦值了,你會發現你找不到,這是因為這些變數的賦值都是通過Unsafe類完成的,這個類會直接改這些變數記憶體地址上對應的值。
  Unsafe可以通過對象+欄位的offset找到欄位對應的記憶體地址從而修改數據,了解了這些,在去看FutureTask的程式碼就很容易了
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}

下面看一下run方法是怎樣執行的

 public void run() {
      //runner置成當前執行緒
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran;
          //成功設置result失敗設置Exception
try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

下面看看成功都做了些什麼

protected void set(V v) {
      //執行成功後狀態扭轉成完成中,扭轉成功後將值存入outcome然後執行finishCompletion
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

下面看看失敗做了什麼

    protected void setException(Throwable t) {
      //與成功類似不再多講
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

成功和失敗都執行了finishCompletion,下面看看這個方法里幹了什麼

    /**       * Removes and signals all waiting threads, invokes done(), and       * nulls out callable.       */

  注釋已經非常清楚了。喚醒等待的節點,執行done,將callable置成null
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc //這裡為什麼能幫助gc呢,如果q在老年代,q.next在年輕代的話就可以了,詳情看https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6806875 q = next; } break; } } done(); callable = null; // to reduce footprint }

到這裡run方法已經很清楚了,下面看一下get方法

public V get() throws InterruptedException, ExecutionException {          int s = state;          if (s <= COMPLETING)
        //很明顯需要看這個方法,記住這個傳參false s
= awaitDone(false, 0L); return report(s); }

private int awaitDone(boolean timed, long nanos)          throws InterruptedException {
    //timed = false
final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {
        //執行緒已經被打斷了
if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }        int s = state;
        // 已經完成了返回狀態
if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //這裡直接讓出執行緒,讓runner去賦值 Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //加入隊列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //ture的話 等待一段時間。false的話直接阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }

這裡如果阻塞了,就等run方法執行完成的釋放了,程式碼邏輯很清晰,jdk並發包中的實現用了很多for(;;)這其實是作者寫C的習慣的while(true)會多一些指令,在java中編譯成

位元組碼這兩個是完全一樣的。下面看一下獲取到狀態後執行的report方法

  //正常直接返回結果,異常封裝一下拋出,這裡有個退出,退出的程式碼這裡就不再繼續分析了,看完上述的分析,相信你也能快速看懂退出的程式碼
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

4.使用

在實際開發中,大部分情況都要用到db,http,rpc這些IO操作,在一個方法中需要多次進行這些操作時,如果沒有前後關聯,可以使用Future充分

使用多核cpu,比如你需要查多個表拼接成一個VO返回給前端,就可以用Future提高介面的響應時間。