netty系列之:netty中的核心MessageToMessage編碼器

簡介

在netty中我們需要傳遞各種類型的消息,這些message可以是字符串,可以是數組,也可以是自定義的對象。不同的對象之間可能需要互相轉換,這樣就需要一個可以自由進行轉換的轉換器,為了統一編碼規則和方便用戶的擴展,netty提供了一套消息之間進行轉換的框架。本文將會講解這個框架的具體實現。

框架簡介

netty為消息和消息之間的轉換提供了三個類,這三個類都是抽象類,分別是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。

先來看下他們的定義:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler 

MessageToMessageEncoder繼承自ChannelOutboundHandlerAdapter,負責向channel中寫消息。

MessageToMessageDecoder繼承自ChannelInboundHandlerAdapter,負責從channel中讀取消息。

MessageToMessageCodec繼承自ChannelDuplexHandler,它是一個雙向的handler,可以從channel中讀取消息,也可以向channel中寫入消息。

有了這三個抽象類,我們再看下這三個類的具體實現。

MessageToMessageEncoder

先看一下消息的編碼器MessageToMessageEncoder,編碼器中最重要的方法就是write,看下write的實現:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
                try {
                    final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {
                        ctx.write(out.getUnsafe(0), promise);
                    } else if (sizeMinusOne > 0) {
                        if (promise == ctx.voidPromise()) {
                            writeVoidPromise(ctx, out);
                        } else {
                            writePromiseCombiner(ctx, out, promise);
                        }
                    }
                } finally {
                    out.recycle();
                }
            }
        }
    }

write方法接受一個需要轉換的原始對象msg,和一個表示channel讀寫進度的ChannelPromise。

首先會對msg進行一個類型判斷,這個判斷方法是在acceptOutboundMessage中實現的。

    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

這裡的matcher是一個TypeParameterMatcher對象,它是一個在MessageToMessageEncoder構造函數中初始化的屬性:

    protected MessageToMessageEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

這裡的I就是要匹配的msg類型。

如果不匹配,則繼續調用ctx.write(msg, promise); 將消息不做任何轉換的寫入到channel中,供下一個handler調用。

如果匹配成功,則會調用核心的encode方法:encode(ctx, cast, out);

注意,encode方法在MessageToMessageEncoder中是一個抽象方法,需要用戶在繼承類中自行擴展。

encode方法實際上是將msg對象轉換成為要轉換的對象,然後添加到out中。這個out是一個list對象,具體而言是一個CodecOutputList對象,作為一個list,out是一個可以存儲多個對象的列表。

那麼out是什麼時候寫入到channel中去的呢?

別急,在write方法中最後有一個finally代碼塊,在這個代碼塊中,會將out寫入到channel裏面。

因為out是一個List,可能會出現out中的對象部分寫成功的情況,所以這裡需要特別處理。

首先判斷out中是否只有一個對象,如果是一個對象,那麼直接寫到channel中即可。如果out中多於一個對象,那麼又分成兩種情況,第一種情況是傳入的promise是一個voidPromise,那麼調用writeVoidPromise方法。

什麼是voidPromise呢?

我們知道Promise有多種狀態,可以通過promise的狀態變化了解到數據寫入的情況。對於voidPromise來說,它只關心一種失敗的狀態,其他的狀態都不關心。

如果用戶關心promise的其他狀態,則會調用writePromiseCombiner方法,將多個對象的狀態合併為一個promise返回。

事實上,在writeVoidPromise和writePromiseCombiner中,out中的對象都是一個一個的取出來,寫入到channel中的,所以才會生成多個promise和需要將promise進行合併的情況:

    private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
        final ChannelPromise voidPromise = ctx.voidPromise();
        for (int i = 0; i < out.size(); i++) {
            ctx.write(out.getUnsafe(i), voidPromise);
        }
    }

    private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
        final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        for (int i = 0; i < out.size(); i++) {
            combiner.add(ctx.write(out.getUnsafe(i)));
        }
        combiner.finish(promise);
    }

MessageToMessageDecoder

和encoder對應的就是decoder了,MessageToMessageDecoder的邏輯和MessageToMessageEncoder差不多。

首先也是需要判斷讀取的消息類型,這裡也定義了一個TypeParameterMatcher對象,用來檢測傳入的消息類型:

    protected MessageToMessageDecoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

decoder中重要的方法是channelRead方法,我們看下它的實現:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                int size = out.size();
                for (int i = 0; i < size; i++) {
                    ctx.fireChannelRead(out.getUnsafe(i));
                }
            } finally {
                out.recycle();
            }
        }
    }

首先檢測msg的類型,只有接受的類型才進行decode處理,否則將msg加入到CodecOutputList中。

最後在finally代碼塊中將out中的對象一個個取出來,調用ctx.fireChannelRead進行讀取。

消息轉換的關鍵方法是decode,這個方法也是一個抽象方法,需要在繼承類中實現具體的功能。

MessageToMessageCodec

前面講解了一個編碼器和一個解碼器,他們都是單向的。最後要講解的codec叫做MessageToMessageCodec,這個codec是一個雙向的,即可以接收消息,也可以發送消息。

先看下它的定義:

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler

MessageToMessageCodec繼承自ChannelDuplexHandler,接收兩個泛型參數分別是INBOUND_IN和OUTBOUND_IN。

它定義了兩個TypeParameterMatcher,分別用來過濾inboundMsg和outboundMsg:

    protected MessageToMessageCodec() {
        inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");
        outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");
    }

分別實現了channelRead和write方法,用來讀寫消息:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        decoder.channelRead(ctx, msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        encoder.write(ctx, msg, promise);
    }

這裡的decoder和encoder實際上就是前面我們講到的MessageToMessageDecoder和MessageToMessageEncoder:

    private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {

        @Override
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return MessageToMessageCodec.this.acceptOutboundMessage(msg);
        }

        @Override
        @SuppressWarnings("unchecked")
        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
            MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
        }
    };

    private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {

        @Override
        public boolean acceptInboundMessage(Object msg) throws Exception {
            return MessageToMessageCodec.this.acceptInboundMessage(msg);
        }

        @Override
        @SuppressWarnings("unchecked")
        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
            MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
        }
    };

可以看到MessageToMessageCodec實際上就是對MessageToMessageDecoder和MessageToMessageEncoder的封裝,如果需要對MessageToMessageCodec進行擴展的話,需要實現下面兩個方法:

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;

總結

netty中提供的MessageToMessage的編碼框架是後面對編碼解碼器進行擴展的基礎。只有深入了解其中的原理,我們對於新的編碼解碼器運用起來才能得心應手。

本文已收錄於 //www.flydean.com/14-0-1-netty-codec-msg-to-msg/

最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!