【雜談】從實現角度看ChannelFuture
- 2020 年 5 月 1 日
- 筆記
JDK中的Future特性
在介紹Netty的ChannelFuture之前,我們先來看看JDK中的Future是如何實現的。總的來說就是任務提交的時候會使用裝飾器模式,將任務包裝成一個FutureTask。當執行器執行該Task的時候,不僅僅會執行用戶提交的任務,還會執行裝飾器添加的額外操作,例如在執行之前記錄當前執行線程、執行完成後將任務結果保存在FutureTask對象內部等。
- Thread runner => 裝飾器添加的,在執行任務之前,會在對象內保存當前執行線程的引用,用於中斷任務執行
- Object outcome => 任務執行結果(返回值或異常對象),任務執行完成後會將結果set到此對象的outcome,後續可通過Future的get接口取出
- Callable<V> callble => 用戶提交的實際任務
- WaitNode waiters => 用於保存等待線程,任務完成後會喚醒這些線程
詳細請看本人過去整理的隨筆:
Netty中的ChannelFuture
ChannelFuture是在Future基礎上的完善,它支持添加監聽器,在任務完成後自動執行相關操作。
這個其實就是觀察者模式,個人認為就是在前面的C部分添加監聽的方法調用,即執行線程執行完成後,如果發現該任務上有監聽器,則該執行線程還會調用監聽器接口。
話不多說,我們來看看它代碼到底是怎麼寫的,跟Future的實現有何異同。相關實現關鍵內容在DefaultPromise類中,相對於前面的FutureTask。
1.對象屬性
private volatile Object result; private final EventExecutor executor; private Object listeners; private short waiters; private boolean notifyingListeners;
從對象屬性中我們可以得知,該對象沒有保存用戶任務。實際上個人任務這是它跟Future最大的不同,Future所引用的本質上是一個任務,而ChannelFuture所引用的只有任務狀態和任務結果。所以這個DefaultPromise對象不會被作為任務提交到執行器中。
2.任務完成時的線程喚醒與監聽器觸發
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; }
當任務執行完成,通過setValue將值傳入DefaultPromise對象時,喚醒等待的線程,並觸發監聽器。
2.線程的等待與喚醒方式
public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; } private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { notifyAll(); } return listeners != null; }
從上述代碼可知,這裡的線程等待與喚醒方式使用的內置的wait()和notify()方法,而FutureTask的等待隊列是單獨實現的。
註:這裡尚不清楚這兩種實現方式之間的優劣。
3.設置監聽器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } //添加監聽器時,如果發現已經完成,則直接調用觸發監聽器 if (isDone()) { notifyListeners(); } return this; }
如果任務已經完成,則直接觸發監聽器。防止出現”調用setLisener的時候,任務已經完成,導致監聽器不被觸發”。
4.任務取消
public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; }
從代碼可以看出,它的實現與FutureTask有所不同,它並不會嘗試調用執行線程的interrupt()方法來中斷線程,只是將等待線程喚醒,並觸發監聽器。所以這個「取消」操作並不會影響代碼的實際執行。
事實上「中斷」也只是一種協作方式,它只是設置中斷狀態並將線程喚醒(如果該線程正處於掛起狀態),如果用戶代碼中沒有對中斷狀態進行判斷,也沒有使用wait()、sleep()等方法,中斷操作也是不會實際「打斷」代碼執行的。
詳細可看:【雜談】線程中斷——Interrupt