Java多執行緒類FutureTask源碼閱讀以及淺析

  FutureTask是一個具體的實現類,實現了RunnableFuture介面,RunnableFuture分別繼承了Runnable和Future介面,因此FutureTask類既可以被執行緒執行,又可以拿到執行緒執行的結果。FutrueTask應用於多執行緒中非同步處理並得到處理結果的場景,比如:加入有個流程需要調用遠程介面拿到相關數據在本地進行處理,但是這個介面花費時間比較長。如果使用傳統的阻塞執行緒去處理的話,那麼就會一直阻塞在調用介面這裡,其它的事情都幹不了,這樣操作顯然效率相對較低的。因此,我們可以使用FutureTask來解決這個問題,FutureTask可以非同步調用遠端介面,那麼當前執行緒就可以做與遠端介面無關的數據,雙管齊下提高效率。

  FutureTask UML類圖:

 

   FutureTask類簡單的使用示例:

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("非同步處理");
            Thread.sleep(3000);
            return "ok";
        });
        new Thread(futureTask).start();
        System.out.println("同步處理其它事情");
        Thread.sleep(1000);
        System.out.println("等待非同步處理結果:" + futureTask.get());
        System.out.println("處理完成");
    }

  

  一、成員變數

  FutureTask類有state,callable,outcome,runner和waiters 5個成員變數

  

 

 

  1.state

    執行緒運行狀態,有以下幾種狀態:

      NEW:初始狀態,在初始化時的狀態,狀態值為0;

      COMPLETING: 完成中狀態,run方法被調用時,對返回值進行賦值欠的狀態,值為1;

      NORMAL: 正常狀態,執行緒正常執行,在返回值被賦值被賦值成功後的狀態,值為2;

      EXCEPTIONAL:異常狀態,在執行用戶回調方式call的過程中出現異常,值為3;

      CANCELLED: 取消狀態,用戶調用cancel(false)方法時的狀態,值為4;

      INTERRUPTING:打斷中狀態,用戶調用cancel(true)方法時的狀態,值為5;

      INTERRUPTED: 被打斷狀態,用戶調用cancel(true)方法時,runner執行緒執行打斷方法完成後的狀態,值為6;

 

    運行狀態轉換:

      NEW -> COMPLETING -> NORMAL

      NEW -> COMPLETING -> EXCEPTIONAL

      NEW -> CANCELLED

      NEW -> INTERRUPTING -> INTERRUPTED

    

 

 

  2.callable

  該成員變數用於非同步執行用戶自定義業務程式碼,當futureTask獲得cpu時間片後調用run方法,在run方法中調用callable.call(),獲取到執行結果。

  

  3.outcome

  非同步執行輸出結果,類型為object。賦值時機時在callable.call()方式執行完成後。

 

  4.runner

  用於執行callable介面,在futureTask被cpu調度時會使用cas賦值為當前執行緒。當前執行緒執行完成後設置為null,等待gc回收。

 

  5.waiters

  內部類實現的單向鏈表,用於等待獲取執行結果。每次調用get()方法時都會將該執行緒放入等待隊列的頭部,當該執行緒被打斷後,或者get(timeout)方法過期後就會重這個等待隊列中移除。當callable.call()執行完成後會從頭部開始遍歷逐個喚醒等待執行緒,並將執行結果返回。

 

  二、核心方法

  1.run方法

  run方法間接實現於Runnable的介面,所以當futureTask執行緒獲得cpu資源後會調用該方法。

  1.首先先判斷當前狀態是否為初始化狀態,如果不是初始狀態直接結束該方法。否則使用cas方式給成員變數runner賦值,賦值為當前執行緒。用cas方式能夠保證多執行緒環境下賦值是執行緒安全的。不懂cas的同學自行查閱相關資料。

  2.如果callable不為null並且state狀態為NEW,則執行callable.call()方法,並得到該方法的返回值。

  3.如果執行call方法出現異常時,執行setException方法,該方法將state的NEW狀態使用cas方式修改為COMPLETING狀態,修改成功後outcome設置為當前拋出的異常,狀態再次改為EXCEPTIONAL狀態。然後將等待隊列中的執行緒都喚醒,並從隊列中移除。調用鉤子done()方法,將callable擲為null。

  4.如果call方式執行成功,下一步則調用set方法,該方法首先將NEW狀態用cas修改成COMPLETING狀態,修改成功後將call執行結果賦值到outcome變數,COMPLETING狀態修改為NORMAL,喚醒等待執行緒並從隊列移除,調用狗子方法。

  5.執行finally程式碼的程式碼,將runner擲為null,如果當前狀態為打斷中,那麼會將當前資源讓出,直到執行緒最終被打斷。

 

 /**
     * Runnable#run();
     * 執行緒獲得cpu資源後會執行該方法
     */
    public void run() {
        //判斷當前狀態是不是初始狀態
        //將runner賦值為當前執行緒
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //調用用戶業務流程
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //拋出異常,修改響應的狀態
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

  

/**
     * 1.修改狀態值  NEW  --> COMPLETING -->  EXCEPTIONAL
     * 2.移除並喚醒所有等待中的執行緒
     * @param t
     */
    protected void setException(Throwable t) {
        //將state修改為COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            //將state修改為EXCEPTIONAL
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //對返回值進行處理
            finishCompletion();
        }
    }

/**
     * 1.狀態值  NEW  --> COMPLETING  --> NORMAL
     * 2.設置執行結果值
     * 3.喚醒所有等待中的執行緒
     * @param v
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  

  2.get() 和 get(long timeout,TimeUnit timeUnit);

  兩個方法的區別在於前者沒有超時時間,後者由超時時間,流程是基本差不多的。

  1.如果state為COMPLETED,進入report方法,該方法會判斷當前的狀態為NORMAL時將outcome返回,否則拋出異常。

  2.如果state不為COMPLETED進入awaitDone方法。

  3.awaitDone方法顧名思義就是等待操作結果。方法裡面是一個死循環,在循環過程中如果執行緒被打斷,就會拋出異常,並將剛創建的等待執行緒從隊列中移除。

  4.如果狀態已完成,將等待執行緒綁定的執行緒設為null,並將狀態返回。

  5.如果當前狀態為COMPLETING則將當前cpu資源讓出給其它執行緒。

  6.如果等待節點為null,就創建一個新的節點,該節點綁定了當前的執行緒。

  7.如果新創建的節點還沒有與等待隊列進行綁定,那麼就將該節點放入隊列頭部。

  8.如果調用的是由過期時間的方法,那麼判斷如果已經到期了則將該節點從隊列中移除,並返回狀態。否則進入有過期的等待。

  9.執行緒進入等待狀態,執行緒會阻塞在這裡,等待run方法執行完成後調用unPark方法。

  10.執行緒被喚醒後,進入report方法。

  

/**
     * @throws CancellationException {@inheritDoc}
     */
    /**
     * 獲取執行結果
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    /**
     * 獲取執行結果
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

  

  3.awaitDone(boolean timed, long nanos)

  該方法是等待完成,或者執行緒被打斷而拋出異常退出,有或者是經過nanos 這麼多納秒後退出。方法內部是一個死循環,通過各種條件判斷是否滿足條件退出,否則執行緒進入等待狀態,直到被其他執行緒喚醒。

  1.首先會判斷當前執行緒是否有打斷標記,如果被打斷過,刪除剛創建出來的等待節點,並拋出InterruptedException異常。

  2.如果當前任務是已完成狀態,直接將當前狀態返回。

  3.如果當前任務狀態為完成中,說明其他執行緒正在操作,當前執行緒無需要重複操作,只需要將cpu資源讓出來。

  4.如果前三個條件均未滿足,則會創建等待節點,然後進入第二輪循環。

  5.第二輪循環,將第二輪循環創建的等待節點放入等待鏈表的頭部,並使用cas方式給waiters賦值,保證多執行緒下正常正確的賦值。

  6.第三輪循環,如果用戶調用的是有過期時間的get方法,則會計算當前剩餘時間,1)如果剩餘時間小於等於0,則說明已經過期,那麼就會移除當前等待中的節點,將當前任務狀態返回。2)否則調用LockSupport的有過期時間的parkNanos,該方法會讓執行緒進入等待狀態,也即執行緒會阻塞在這裡,過期時間不會超過用戶傳入的過期時間。如果用戶調用的是沒有過期時間的方法,那麼調用LockSupport的有無過期時間的parkNanos,該方法會讓執行緒無限的等待下午,知道有其他執行緒將他喚醒。

  源碼:

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * 等待完成或打斷退出或超時退出
     * @param timed true if use timed waits 是否有超時時間
     * @param nanos time to wait, if timed 等待時間
     * @return state upon completion 狀態碼
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //返回true,執行緒被打斷過,但並不會直接拋出異常
            //而是等其他執行緒將執行緒喚醒之後,發現該執行緒在等待過程中執行了打斷操作
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //這裡的意思是任務已完成,又可以能是正常結束,也有可以能是用戶取消,或者異常,打斷
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //讓出cpu資源
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //第一次循環會進入這個條件創建節點
            else if (q == null)
                q = new WaitNode();
            //第二次循環給新創建的q節點放在waiters鏈表的頭部
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                //過期退出
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //有限時間掛起執行緒
                LockSupport.parkNanos(this, nanos);
            }
            else
                //無限時間掛起執行緒
                LockSupport.park(this);
        }
    }

  

  三、總結

  FutureTask是運用了高並發設計思想的Future設計模式。它很好的處理了高並發下處理多件獲取或創建數據並無相關聯的操作耗時長的問題。設計者可以將耗時比較長的操作(比如遠程調用介面等)使用非同步的方式(即創建一個新的執行緒)去處理,那麼主執行緒就可以做其他的事情了,這樣可以大大減少整體的處理時間。這個模式適用於多個無關聯的時間,如果A操作的進行需要B操作的結果才可以開始,那麼A其實是一直帶阻塞等待B的結果的,這個串列執行的耗時差不多,使用future模式意義不大。

  FutureTask的get方法在用戶邏輯程式碼未返回結果時仍然後進入阻塞,但是用戶業務程式碼的執行並不受主執行緒(創建FutureTask的執行緒)的影響。我們可以通過重寫done方法來獲取到完成動作,這樣我們再調用get方法時就不會阻塞。

  在現實生活中就有很多類似Future模式的例子。比如你的生日快到了,你需要去蛋糕店訂蛋糕,同時還需要買其他的禮品,開party所需的東西等,假設蛋糕店製作蛋糕需要花費1個小時,購買其他物品需要2小時。用傳統的串列的方式就是你去蛋糕店跟老闆說你要訂蛋糕,老闆根據你的需求開始製作蛋糕,你就在店裡坐著等製作完成。1個小時後終於製作好蛋糕了,然後你才能拿著蛋糕去買其他東西,買完其他東西有需要耗費2小時,最後你總共花費了3小時。當使用Future模式時,你事先寫好你需要訂多大的,什麼口味的蛋糕,然後去到蛋糕跟老闆說你先去買其他東西,一會再過來拿。但是你忘記留聯繫方式給蛋糕店老闆了(沒有重寫done方法),所以你並不知道蛋糕什麼時候做好,提前過去拿,那你還得在店裡等蛋糕做好。如果重寫了done方法,相當於給店老闆留了電話號碼,等蛋糕做好老闆就會打電話給你,你過拿蛋糕時就不會說太早過去要等一會或太晚過去了。我們用最壞的情況來計算,你買其他東西花了兩個小時,製作蛋糕花了1個小時。由於製作蛋糕和你沒其他東西是分開同時進行的,所以最終你只花了2個小時,比串列的方式快了1個小時。

  以上就是我在看FutureTask源碼過程中的總結,如有錯漏歡迎提出。