netty系列之:netty中的核心MessageToMessage編碼器
簡介
在netty中我們需要傳遞各種類型的消息,這些message可以是字符串,可以是數組,也可以是自定義的對象。不同的對象之間可能需要互相轉換,這樣就需要一個可以自由進行轉換的轉換器,為了統一編碼規則和方便用戶的擴展,netty提供了一套消息之間進行轉換的框架。本文將會講解這個框架的具體實現。
框架簡介
netty為消息和消息之間的轉換提供了三個類,這三個類都是抽象類,分別是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。
先來看下他們的定義:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler
MessageToMessageEncoder繼承自ChannelOutboundHandlerAdapter,負責向channel中寫消息。
MessageToMessageDecoder繼承自ChannelInboundHandlerAdapter,負責從channel中讀取消息。
MessageToMessageCodec繼承自ChannelDuplexHandler,它是一個雙向的handler,可以從channel中讀取消息,也可以向channel中寫入消息。
有了這三個抽象類,我們再看下這三個類的具體實現。
MessageToMessageEncoder
先看一下消息的編碼器MessageToMessageEncoder,編碼器中最重要的方法就是write,看下write的實現:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
write方法接受一個需要轉換的原始對象msg,和一個表示channel讀寫進度的ChannelPromise。
首先會對msg進行一個類型判斷,這個判斷方法是在acceptOutboundMessage中實現的。
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
這裡的matcher是一個TypeParameterMatcher對象,它是一個在MessageToMessageEncoder構造函數中初始化的屬性:
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
這裡的I就是要匹配的msg類型。
如果不匹配,則繼續調用ctx.write(msg, promise);
將消息不做任何轉換的寫入到channel中,供下一個handler調用。
如果匹配成功,則會調用核心的encode方法:encode(ctx, cast, out);
注意,encode方法在MessageToMessageEncoder中是一個抽象方法,需要用戶在繼承類中自行擴展。
encode方法實際上是將msg對象轉換成為要轉換的對象,然後添加到out中。這個out是一個list對象,具體而言是一個CodecOutputList對象,作為一個list,out是一個可以存儲多個對象的列表。
那麼out是什麼時候寫入到channel中去的呢?
別急,在write方法中最後有一個finally代碼塊,在這個代碼塊中,會將out寫入到channel裏面。
因為out是一個List,可能會出現out中的對象部分寫成功的情況,所以這裡需要特別處理。
首先判斷out中是否只有一個對象,如果是一個對象,那麼直接寫到channel中即可。如果out中多於一個對象,那麼又分成兩種情況,第一種情況是傳入的promise是一個voidPromise,那麼調用writeVoidPromise方法。
什麼是voidPromise呢?
我們知道Promise有多種狀態,可以通過promise的狀態變化了解到數據寫入的情況。對於voidPromise來說,它只關心一種失敗的狀態,其他的狀態都不關心。
如果用戶關心promise的其他狀態,則會調用writePromiseCombiner方法,將多個對象的狀態合併為一個promise返回。
事實上,在writeVoidPromise和writePromiseCombiner中,out中的對象都是一個一個的取出來,寫入到channel中的,所以才會生成多個promise和需要將promise進行合併的情況:
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
MessageToMessageDecoder
和encoder對應的就是decoder了,MessageToMessageDecoder的邏輯和MessageToMessageEncoder差不多。
首先也是需要判斷讀取的消息類型,這裡也定義了一個TypeParameterMatcher對象,用來檢測傳入的消息類型:
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
decoder中重要的方法是channelRead方法,我們看下它的實現:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}
首先檢測msg的類型,只有接受的類型才進行decode處理,否則將msg加入到CodecOutputList中。
最後在finally代碼塊中將out中的對象一個個取出來,調用ctx.fireChannelRead進行讀取。
消息轉換的關鍵方法是decode,這個方法也是一個抽象方法,需要在繼承類中實現具體的功能。
MessageToMessageCodec
前面講解了一個編碼器和一個解碼器,他們都是單向的。最後要講解的codec叫做MessageToMessageCodec,這個codec是一個雙向的,即可以接收消息,也可以發送消息。
先看下它的定義:
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler
MessageToMessageCodec繼承自ChannelDuplexHandler,接收兩個泛型參數分別是INBOUND_IN和OUTBOUND_IN。
它定義了兩個TypeParameterMatcher,分別用來過濾inboundMsg和outboundMsg:
protected MessageToMessageCodec() {
inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");
outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");
}
分別實現了channelRead和write方法,用來讀寫消息:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
decoder.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
encoder.write(ctx, msg, promise);
}
這裡的decoder和encoder實際上就是前面我們講到的MessageToMessageDecoder和MessageToMessageEncoder:
private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return MessageToMessageCodec.this.acceptOutboundMessage(msg);
}
@Override
@SuppressWarnings("unchecked")
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
}
};
private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return MessageToMessageCodec.this.acceptInboundMessage(msg);
}
@Override
@SuppressWarnings("unchecked")
protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
}
};
可以看到MessageToMessageCodec實際上就是對MessageToMessageDecoder和MessageToMessageEncoder的封裝,如果需要對MessageToMessageCodec進行擴展的話,需要實現下面兩個方法:
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
總結
netty中提供的MessageToMessage的編碼框架是後面對編碼解碼器進行擴展的基礎。只有深入了解其中的原理,我們對於新的編碼解碼器運用起來才能得心應手。
本文已收錄於 //www.flydean.com/14-0-1-netty-codec-msg-to-msg/
最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!