【雜談】從實現角度看ChannelFuture

JDK中的Future特性

在介紹Netty的ChannelFuture之前,我們先來看看JDK中的Future是如何實現的。總的來說就是任務提交的時候會使用裝飾器模式,將任務包裝成一個FutureTask。當執行器執行該Task的時候,不僅僅會執行用戶提交的任務,還會執行裝飾器添加的額外操作,例如在執行之前記錄當前執行線程、執行完成後將任務結果保存在FutureTask對象內部等。

  • Thread runner   =>  裝飾器添加的,在執行任務之前,會在對象內保存當前執行線程的引用,用於中斷任務執行
  • Object outcome => 任務執行結果(返回值或異常對象),任務執行完成後會將結果set到此對象的outcome,後續可通過Future的get接口取出
  • Callable<V> callble => 用戶提交的實際任務
  • WaitNode waiters => 用於保存等待線程,任務完成後會喚醒這些線程

詳細請看本人過去整理的隨筆:

Netty中的ChannelFuture

ChannelFuture是在Future基礎上的完善,它支持添加監聽器,在任務完成後自動執行相關操作。

這個其實就是觀察者模式,個人認為就是在前面的C部分添加監聽的方法調用,即執行線程執行完成後,如果發現該任務上有監聽器,則該執行線程還會調用監聽器接口。

話不多說,我們來看看它代碼到底是怎麼寫的,跟Future的實現有何異同。相關實現關鍵內容在DefaultPromise類中,相對於前面的FutureTask。

1.對象屬性

 private volatile Object result;
 private final EventExecutor executor;
 private Object listeners;
 private short waiters;
 private boolean notifyingListeners;

從對象屬性中我們可以得知,該對象沒有保存用戶任務。實際上個人任務這是它跟Future最大的不同,Future所引用的本質上是一個任務,而ChannelFuture所引用的只有任務狀態和任務結果。所以這個DefaultPromise對象不會被作為任務提交到執行器中。

2.任務完成時的線程喚醒與監聽器觸發

private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                notifyListeners();
            }
            return true;
        }
        return false;
    }

當任務執行完成,通過setValue將值傳入DefaultPromise對象時,喚醒等待的線程,並觸發監聽器。

2.線程的等待與喚醒方式

public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters(); 
                try {
                    wait(); 
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }
private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;
}

從上述代碼可知,這裡的線程等待與喚醒方式使用的內置的wait()和notify()方法,而FutureTask的等待隊列是單獨實現的。

註:這裡尚不清楚這兩種實現方式之間的優劣。

3.設置監聽器

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        //添加監聽器時,如果發現已經完成,則直接調用觸發監聽器
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

如果任務已經完成,則直接觸發監聽器。防止出現”調用setLisener的時候,任務已經完成,導致監聽器不被觸發”。

4.任務取消

public boolean cancel(boolean mayInterruptIfRunning) {
        if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
            if (checkNotifyWaiters()) {
                notifyListeners();
            }
            return true;
        }
        return false;
    }

從代碼可以看出,它的實現與FutureTask有所不同,它並不會嘗試調用執行線程的interrupt()方法來中斷線程,只是將等待線程喚醒,並觸發監聽器。所以這個「取消」操作並不會影響代碼的實際執行。

事實上「中斷」也只是一種協作方式,它只是設置中斷狀態並將線程喚醒(如果該線程正處於掛起狀態),如果用戶代碼中沒有對中斷狀態進行判斷,也沒有使用wait()、sleep()等方法,中斷操作也是不會實際「打斷」代碼執行的。

詳細可看:【雜談】線程中斷——Interrupt