FutureTask是怎樣獲取到異步執行結果的?
- 2019 年 10 月 13 日
- 筆記
所謂異步任務,就是不在當前線程中進行執行,而是另外起一個線程讓其執行。那麼當前線程如果想拿到其執行結果,該怎麼辦呢?
如果我們使用一個公共變量作為結果容器,兩個線程共用這個值,那麼應該是可以拿到結果的,但是這樣一來,對業務就會造成侵入干擾了,因為你始終得考慮將這個共享變量傳入到這個異步線程中去且要維持其安全性。
我們知道,Future.get() 可以獲取異步執行的結果,那麼它是怎麼做到的呢?
要實現線程的數據交換,我們按照進程間的通信方式可知有: 管道、共享內存、Socket套接字。而同一個jvm的兩個線程通信,所有線程共享內存區域,則一定是通過共享內存再簡單不過了。
本文將以 ThreadPoolExecutor 來解釋這個過程。
首先,如果想要獲取一個線程的執行結果,需要調用 ThreadPoolExecutor.submit(Callable); 方法。然後該方法會返回一個 Future 對象,通過 Future.get(); 即可獲取結果了。
它具體是怎麼實現的呢?
一、首先,我們來看一下 submit 過程
僅為返回了一個 Future<?> 的對象供下游調用!
// AbstractExecutorService public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 包裝一層結果,RunnableFuture, 也實現了 Runnable 接口 // 實際上就是 FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 然後交由 線程池進行調用任務了,即由 jvm 調用執行 Thread // 具體執行邏輯,在我之前的文章中也已經闡述,自行搜索 execute(ftask); // 最後,把包裝對象返回即可 return ftask; } /** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // FutureTask 實例化 /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
二、異步線程如何執行?
通過上面的分析,我們可以看到,異步線程的執行被包裝成了 FutureTask, 而java的異步線程執行都是由jvm調用Thread.run()進行, 所以異步起點也應該從這裡去找:
// FutureTask.run() public void run() { // 不允許多次執行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 直接調用 call() 方法,獲取返回結果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 執行異常,包裝異常信息 setException(ex); } // 將結果設置到當前的 FutureTask 實例變量 outcome 中,這樣當前線程就可以獲取了 // 設置結果時,會將 state 同時變更 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { // 設置結果時,還不代表可以直接獲取了,還有後續工作,所以設置為 COMPLETING 中間態 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 通知線程執行完成等後續工作 finishCompletion(); } } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; // 外部看起來是一個 for, 實際上只會執行一次, 目的是為了保證內部的鎖獲取成功 // 如果有其他線程成功後, waiters也就會為null, 從而自身也一起退出了 for (WaitNode q; (q = waiters) != null;) { // 保證更新的線程安全性 // 只要鎖獲取成功,就會一次性更新完成,不會失敗 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; // 依次喚醒等待的線程 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; // 只有把所有 wait 線程都通知完後,才可以退出 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 完成後鉤子方法,默認為空,如果需要做特殊操作可以自行複寫即可 done(); callable = null; // to reduce footprint } // 簡單看一下異常信息的包裝,與 正常結束方法類似,只是將 outcome 設置為了異常信息,完成狀態設置為 EXCEPTIONAL /** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
在上面這些實現中,我們也會有點迷糊,我幹啥來了?
不管怎麼樣,你明白一點,所有的執行結果都被放到 FutureTask 的 outcome 變量中了,我們如果想要知道結果,那麼,只需要獲取這個變量就可以了。
當然,也不可能這麼簡單了,起碼你得知道什麼時候獲取該變量是合適的才行!接下來!
三、如何獲取異步執行結果?
當然是用戶調用 future.get() 獲取了!
// Future.get() public V get() throws InterruptedException, ExecutionException { int s = state; // 只要狀態值小於 COMPLETING, 就說明任務還未完成, 去等待完成 if (s <= COMPLETING) s = awaitDone(false, 0L); // 只要等待完成, 再去把結果取回即可 return report(s); } // 我們先看一下結果的取回邏輯 report(), 果然不出意外的簡單, 只管取 outcome 即可 /** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; // 正常執行完成, 直接返回 if (s == NORMAL) return (V)x; // 此處會包含 CANCELLED/INTERRUPTING/INTERRUPTED if (s >= CANCELLED) throw new CancellationException(); // 業務異常則會被包裝成 ExecutionException throw new ExecutionException((Throwable)x); } // 看到取結果這麼簡單,那麼 等待結束的邏輯的呢?看起來好像沒那麼簡單了 /** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 中斷則退出 if (Thread.interrupted()) { // 因 q 是鏈表的頭,所以會移除所有的等待隊列,即中斷是對所有線程的 removeWaiter(q); throw new InterruptedException(); } int s = state; // 執行完成後,將線程置空即可,刪除工作會有其他地方完成 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 正在處理結果,稍作等待即可 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 其他情況,先創建自己的等待標識,以便在下一次循環中進行入隊等待 else if (q == null) q = new WaitNode(); // 進行一次入隊等待,將 q 作為頭節點 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 限時的等待,等待超時後,直接返回當前狀態即可 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 最長等待預定時間 LockSupport.parkNanos(this, nanos); } // 此處進行無限期等待,但當被喚醒時,一定有狀態變更的時候,應該會在下一個周期結束循環 else LockSupport.park(this); } }
可以看到,等待邏輯還是有點多的,畢竟場景多。至此,我們已經完全看到了一個,如何獲取異步線程的執行結果實現了。總結下:
1. 實現Runnable接口,由jvm進行線程調用;
2. 包裝 Callable.call()方法,帶返回值,當線程被調起時,轉給 call() 方法執行,並返回結果;
3. 將結果封裝到當前future實例中,以備查;
4. 當用戶調用get()方法時,保證狀態完成情況下,最快速地返回結果;
四、擴展: Future.get() vs Thread.join()
Future.get()方法,一方面是為了獲取異步線程的執行結果,另一方面也做到了等待線程執行完成的效果。
而 Thread.join() 則純粹是為了等待異步線程執行完成,那它們有什麼異曲同工之妙嗎?來看下
// Thread.join(), 通過 isAlive() 判斷是否完成 /** * Waits for this thread to die. * * <p> An invocation of this method behaves in exactly the same * way as the invocation * * <blockquote> * {@linkplain #join(long) join}{@code (0)} * </blockquote> * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final void join() throws InterruptedException { join(0); } /** * Waits at most {@code millis} milliseconds for this thread to * die. A timeout of {@code 0} means to wait forever. * * <p> This implementation uses a loop of {@code this.wait} calls * conditioned on {@code this.isAlive}. As a thread terminates the * {@code this.notifyAll} method is invoked. It is recommended that * applications not use {@code wait}, {@code notify}, or * {@code notifyAll} on {@code Thread} instances. * * @param millis * the time to wait in milliseconds * * @throws IllegalArgumentException * if the value of {@code millis} is negative * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } // 無限期等待 if (millis == 0) { while (isAlive()) { // 這是個 native 方法,即由jvm進行控制 // Thread 任務執行完成後,將進行 notifyAll() // 同理下面的限時等待 wait(0); } } else { // 限時等待 while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
可以看到, Thread.join() 的等待邏輯是依賴於 jvm 的調度的, 通過 wait/notify 機制實現。與 Future.get() 相比,它是在 之後的,且無法獲取結果。
五、Runnable如何包裝成Callable ?
Callable 其實就只是實現了一個 call() 方法而已,如果我們只實現了 Runnable, 是否就拿不到返回值呢?並不是,我們可以直接指定返回值對象或者不指定,使用Runnable進行submit();
// 不指定返回值的 Runnable, 此處的返回值一定 void public Future<?> submit(Runnable task); // 指定返回值的 Runnable, 由 T 進行返回值接收 public <T> Future<T> submit(Runnable task, T result);
但是 Runnable 是怎麼變成 Callable 的呢?其實就是一個 適配器模式的應用,我們來看一下!
// AbstractExecutorService.submit() public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 明確返回值為 Void RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 同樣使用 FutureTask 進行封裝,只是調用了不同的構造器 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } // FutureTask, 使用 Executors 工具類生成一個 callable, 屏蔽掉 Callable 與 Runnable 的差異 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } // Executors 使用一個適合器類將 Runnable 封裝成 Callable /** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * {@code Callable} to an otherwise resultless action. * @param task the task to run * @param result the result to return * @param <T> the type of the result * @return a callable object * @throws NullPointerException if task null */ public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } // 而 RunnableAdapter 也是很簡單, 僅將 call() 轉而調用 run() 方法即可 /** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
簡單不失優雅!這就是,大巧不工啊!
但是有一個點我們可以看到,那就是 result 的獲取,其實就是傳入什麼值,就返回值。而如果想在想要改變其結果,唯一的辦法是使 result 變量 對 Runnable.run() 可見,從而在 run() 方法中改變其值。這就看你怎麼用了!