netty源碼解解析(4.0)-20 ChannelHandler: 自己實現一個自定義協議的伺服器和客戶端
- 2019 年 10 月 3 日
- 筆記
本章不會直接分析Netty源碼,而是通過使用Netty的能力實現一個自定義協議的伺服器和客戶端。通過這樣的實踐,可以更深刻地理解Netty的相關程式碼,同時可以了解,在設計實現自定義協議的過程中需要解決的一些關鍵問題。
本周章涉及到的程式碼可以從github上下載: https://github.com/brandonlyg/tinytransport.git。
設計協議
本章要設計的協議是基於TCP的應用層協議。在設計一個協議之前需要先回答以下幾個問題:
- 使用場景是什麼?
- 這個協議有哪些功能?
- 性能上有什麼要求?
- 對網路頻寬有什麼要求?
- 安全上有哪些要求?
接下來依次回答這些問題:
使用場景
在可信任的內部網路中,不同進程之間高速交換消息。
功能
- 在客戶端和伺服器進行消息交換。
- 發送消息然後非同步接收響應。
- 客戶端和伺服器之間可以保持長連接。
- 傳輸大量的數據。
性能
數據包的提取性能接近記憶體copy。
擴展性
可以通過擴展header欄位,進而擴展協議的功能。
頻寬
盡量少的冗餘數據,佔用盡量小的頻寬。
安全
由於是在可信任的內網中交互消息,沒有特別端安全性要求。
這些問題的答案,就是整個協議的設計要求。下面就按照這些設計要求來設計一套完整的協議,具體類容包括以下兩個部分:
- 數據包的格式。
- 客戶端和伺服器端消息的交互規則。
數據包格式的設計
設計自己的數據包格式之前,我們先來回顧以下LengthFieldBasedFrameDecoder能夠處理的數據包格式:
| header | contentLength | conent |
這個類把header的設計留給了子類,現在我們的注意力只需要集中在header欄位上即可。下面是header設計:
| begin | version | cmd | contentType | compression | sequenceId | resCode |
整個數據包的格式就是:
| begin | version | cmd | contentType | compression | sequenceId | resCode | contentLength | content |
現在來看一下這個數據包能實現哪些設計要求。
begin
類型: 32位無符號整數(uint32),這欄位是一個常量,用來準確第定位到數據包的開始位置,這樣就能更準確地分離出數據包,進而保證了“客戶端和伺服器端進行消息交換”。它的設計還要平衡數據包提取性能和準確性。嚴格來說,數據包中只能有一個begin,形式化描述如下:
1. 設一個數據包P的長度是L,P(i)表示數據包中任意一個Byte,begin=0XADEF4BC9(這個值可以任意選擇,盡量不選擇有意義的數字)。
2. 設反序列化一個uint32的演算法是ui=deserUint32(i), i>=0 && i < L。
3. 必須滿足: deserUint32(0) == begin, 且deserUint32(i) != begin, i > 0 && i < L。
要在(1)(2)兩個前提條件下滿足第(3)點,需要設計一個轉義符EC=0xFF, 對P中除begin以外的部分進行轉義,轉義演算法是:
如果deserUint32(i)==begin或P(i)==EC, 在P(i)前面插入EC。
找到begin的演算法是:
如果deserUint32(i)==begin且P(i-1)!=EC。
逆轉義演算法是:
如果P(i)==EC, P(i+1)==EC或deserUint32(i+1)==begin, 刪除P(i)。
以上使用轉義符的方案,雖然能夠準確地找到begin,但演算法複雜度是O(L),顯然不能滿足“接近記憶體copy”這個要求。但是如果不使用轉義符,就可以達到這個性能要求。如果仔細計算一下begin重複的概率就會發現, 它的重複概率只有1/0x100000000,如果再結合length欄位一起檢查數據包的正確性,得到錯誤數據包的概率就會更低。不使用轉義符,以極小的出錯概率換取性能大幅提升是一筆合適的買賣。
總的來說,begin可以滿足兩個設計要求: 消息交換,數據包的提取性能接近記憶體copy。
version
類型:uint8。協議的版本號,這個欄位用來滿足“擴展性”要求。每個version對應一種不同的header結構,換言之,知道了版本號,就知道怎樣解析header。
cmd
類型: uint8。這個欄位用來定義不同數據包的功能。可以使用這個欄位定義心跳數據包,使用心跳數據包讓”伺服器和客戶端保持長連接”。此外業務層可使用這個欄位定義自己需要的數據包。
contentType
類型: uint8。這個欄位是content的類型。使用這個欄位可以在content數據交給業務層之前,對他進行一下特殊的處理。用戶可以定義自己的的消息類型。它可以加”消息交換”的能力。
compression
類型: uint8。 壓縮演算法。這個欄位可以用來表示content使用的壓縮演算法。通過使用適當的壓縮演算法,壓縮滿足”傳輸大量數據”和”頻寬”的要求。
sequenceId
類型: uint32。這個欄位是數據包的唯一序列號。只需要保證在一個socket連接建立-斷開周期內保證它的唯一性即可。使用這個ID,可以實現“發送消息然後非同步接收響應”。
resCode
類型: uint8。響應數據包的狀態碼,用來在響應數據包中附帶異常資訊。
至此數據包的格式已經設計完畢。接下來設計必要的交互規則。
協議交互規則設計
使用心跳保持長連接
cmd: PING(0x01), PONG(0x02)。客戶端連接到伺服器之後,每隔一段時間發送一個PING包,伺服器端收到之後立即響應PONG包。伺服器端在一個超時時間後沒有收到PING就認為TCP連接不可用,主動端開。客戶端在發送PING之後,經過一個超時時間後沒有收到PONG就認為連接不可用,重新建立連接。
消息的請求和響應
cmd: REQUEST(0x10), RESPONSE(0x02)。客戶端使用REQUEST包向伺服器發送請求,服務使用RESPONSE包響應。請求和響應的sequenceId一致。
推送消息
cmd: PUSH(0x20)。使用PUSH向對方推送消息,不需要響應。
程式碼分析
這個輕量級的客戶端和伺服器框架在架構上分為4個部分:
- 數據包: Frame, FrameDecoder, FrameEncoder, FrameGzipCodec。
- 消息: FMessage, FrameToMessageDecoder, MessageToFrameEncode, FMessageHandler, FMessageTrait, FMTraits。
- 客戶端框架: TcpConnector, TcpClient。
- 伺服器端框架: TcpServer。
由於前面已經詳細講解了設計原理,這裡只重點分析一下關鍵程式碼。
Frame
Frame是數據包類型,它的主要功能是數據包的序列化(encode方法)和反序列化(decode)。
序列化方法:
1 /** 2 * 把Frame對象編碼成數據包 3 * @param out 4 */ 5 public void encode(ByteBuf out){ 6 out.writeInt(BEGIN); 7 out.writeByte(header.getVersion()); 8 out.writeByte(header.getCmd().getValue()); 9 out.writeByte(header.getContentType()); 10 out.writeByte(header.getCompression()); 11 out.writeInt(header.getSequenceId()); 12 out.writeByte(header.getResCode()); 13 14 int contentLength = 0; 15 if(null != content){ 16 contentLength = content.readableBytes(); 17 } 18 if(contentLength > MAX_CONTENT_LENGTH){ 19 throw new TooLongFrameException("content too long. contentLength:"+contentLength); 20 } 21 out.writeShort(contentLength); 22 if(null != content){ 23 out.writeBytes(content); 24 } 25 }
6-12行,序列化header中除contentLength的其他欄位。
14-21行,序列化contentLength欄位。
22-24行,序列content。
反序列化方法
1 /** 2 * 從數據包解碼得到Frame 3 * @param in 一個完整的數據包 4 * @return Frame對象 5 */ 6 public static Frame decode(ByteBuf in){ 7 if(in.readableBytes() < HEADER_LENGTH){ 8 throw new CorruptedFrameException("pack length less than header length("+HEADER_LENGTH+")"); 9 } 10 11 //得到header 12 Header header = new Header(); 13 in.readInt(); 14 header.setVersion(in.readByte()); 15 header.setCmd(Command.valueOf(in.readByte() & 0xFF)); 16 header.setContentType((byte)(in.readByte() & 0xFF)); 17 header.setCompression((byte)(in.readByte() & 0xFF)); 18 header.setSequenceId(in.readInt()); 19 header.setResCode((byte)(in.readByte() & 0xFF)); 20 21 //讀出content 22 int contentLength = in.readShort() & 0xFFFF; 23 if(in.readableBytes() != contentLength){ 24 throw new CorruptedFrameException("content is not match."+in.readableBytes() + "-" + contentLength); 25 } 26 27 ByteBuf content = contentLength > 0 ? in.retainedSlice(in.readerIndex(), contentLength) : null; 28 in.skipBytes(contentLength); 29 30 //創建Frame對象 31 Frame frame = new Frame(); 32 frame.setHeader(header); 33 frame.setContent(content); 34 35 if(null != content) content.release(); 36 37 return frame; 38 }
這段程式碼,注釋已經比較清晰了,這裡就不再多說。
FrameDecoder
這個類繼承了LengthFieldBasedFrameDecoder,所以只需要很少的程式碼就可以從Byte流中分離出數據包。
1 public FrameDecoder(){ 2 super(Frame.MAX_LENGTH, Frame.HEADER_LENGTH - 2, 2); 3 } 4 5 @Override 6 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { 7 //找到begin位置 8 int start = in.readerIndex(); 9 int begin = in.getInt(start + 0); 10 if(begin != Frame.BEGIN){ 11 dropFailedData(in); 12 } 13 14 //解碼得到Frame對象 15 ByteBuf dataPack = null; 16 try{ 17 dataPack = (ByteBuf)super.decode(ctx, in); 18 Frame frame = Frame.decode(dataPack); 19 return frame; 20 }finally { 21 if(null != dataPack){ 22 dataPack.release(); 23 } 24 } 25 }
2行,設置了數據包的最大長度Frame.MAX_LENGTH, 數據包header除contentLength之外的長度Frame.HEADER_LENGTH-2, contentLength欄位的長度。這樣,只要正確地找到數據包的開始位置就能LengthFieldBasedFrameDecoder就能幫助我們把數據包提取出來。
8-12行,確定數據包的開始位置。
17-18行,提取數據包,並把數據包反序列化成Frame。
FMessageTrait
為了能夠靈活地處理FMessage的content, 框架中定義了FMessageTrait介面,可以使用不同個FMessageTrait實現處理不同的content類型。
1 /** 2 * FMessage消息特徵介面,根據不同的contentType進行Frame和FMessage之間的轉換 3 */ 4 public interface FMessageTrait { 5 6 /** 7 * 得到匹配的contentType 8 * @return contentType的值 9 */ 10 int getContentType(); 11 12 /** 13 * 把FMessage轉換成Frame 14 * @param fmsg 15 * @return 16 * @throws EncoderException 17 */ 18 Frame encode(FMessage fmsg) throws EncoderException; 19 20 /** 21 * 把Frame轉換成FMessage 22 * @param frame 23 * @return 24 * @throws DecoderException 25 */ 26 FMessage decode(Frame frame) throws DecoderException; 27 }
FrameToMessageDecoder和MessageToFrameEncoder使用FMessageTrait進行FMessage和Frame之間的轉換。
1 /** 2 * 把Frame轉換成FMessage 3 */ 4 @ChannelHandler.Sharable 5 public class FrameToMessageDecoder extends MessageToMessageDecoder<Frame> { 6 7 private Map<Integer, FMessageTrait> fmTraits = new HashMap<>(); 8 9 10 public void addFMessageTrait(FMessageTrait trait){ 11 fmTraits.put(trait.getContentType(), trait); 12 } 13 14 @Override 15 protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception { 16 int contentType = frame.getHeader().getContentType(); 17 FMessageTrait trait = fmTraits.get(contentType); 18 if(null == trait){ 19 throw new EncoderException("can't find trait. contentType:"+contentType); 20 } 21 22 FMessage fmsg = trait.decode(frame); 23 out.add(fmsg); 24 } 25 }
10-12行,把FMessageTrait放入map中。構建contentType-FMessageTrait之間的映射。
17行,從map中得到FMessageTrait。
22行,使用FMessageTrait把Frame轉換成FMessage。
MessageToFrameEncoder的實現類似。不同的是在22處調用FMessageTrait的encode方法把FMessage轉換成Frame。
FMTraits中給出了幾種常見的FMessageTrait實現:
- FMTraitBytes: 處理byte array類型的content。
- FMTraitString: 處理String類型的content。
- FMTraitJson: 處理Json格式是content。
- FMTraitProtobuf: 處理protobuf格式的content。
他們都有一個共同的祖先AbstractFMTrait, 這個抽象類實現FMessageTrait的encode和decode方法,定義了兩個抽象方法encodeContent和decodeContent,子類只需專註於content的處理就可以了。
下面以FMTraitBytes為例,講解一下FMessageTrait的具體實現。FMTraitBytes處理的FMessage類型要求conent是byte[]類型。
1 public static final int BYTES = 0x01; 2 public static final FMessageTrait FMTBytes = new FMTraitBytes(); 3 public static class FMTraitBytes extends AbstractFMTrait { 4 protected int contentType; 5 6 public FMTraitBytes(){ 7 this(BYTES); 8 } 9 10 public FMTraitBytes(int contentType){ 11 this.contentType = contentType; 12 } 13 14 @Override 15 public int getContentType() { 16 return contentType; 17 } 18 19 @Override 20 protected ByteBuf encodeContent(FMessage fmsg) throws EncoderException{ 21 byte[] bytes = (byte[])fmsg.getContent(); 22 23 ByteBuf buf = null; 24 if(null != bytes && bytes.length > 0){ 25 buf = ByteBufAllocator.DEFAULT.buffer(bytes.length); 26 buf.writeBytes(bytes); 27 } 28 29 return buf; 30 } 31 32 @Override 33 protected Object decodeContent(Frame frame) throws DecoderException { 34 ByteBuf buf = frame.getContent(); 35 byte[] bytes = null; 36 if(null != buf && buf.readableBytes() > 0){ 37 bytes = new byte[buf.readableBytes()]; 38 buf.readBytes(bytes); 39 } 40 41 return bytes; 42 } 43 }
6-17行,實現了contentType的設置和獲取。
21-29行,把FMessage的content轉換成ByteBuf。
34-42行, 發Frame的content轉換成byte[]。
FMessageHandler
這是一個專門用來處理FMessage的ChannelInboundHandler。channelRead0方法負責把不同cmd的FMessage派發到專用方法處理,這些方法有:
- onPing: 收到PING, 會自動響應一個PONG。
- onPong: 收到PONG。
- onRequest: 收到REQUEST。
- onResponse: 收到RESPONSE。
- onPush: 收到PUSH。
客戶端框架
TcpConnector功能是發起連接,它的主要功能集中在以下三個方法中。
1 public void addFMessageTrait(FMessageTrait trait){ 2 fmEncoder.addFMessageTrait(trait); 3 fmDecoder.addFMessageTrait(trait); 4 } 5 6 public TcpClient connect(InetSocketAddress address) throws Exception{ 7 ChannelFuture future = bootstrap.connect(address); 8 Channel channel = future.channel(); 9 10 TcpClient client = new TcpClient(channel, workerElg.next()); 11 channel.attr(TcpClient.CLIENT).set(client); 12 13 future.sync(); 14 15 return client; 16 } 17 18 protected void doInitChannel(SocketChannel ch) throws Exception { 19 ChannelPipeline pl = ch.pipeline(); 20 21 pl.addLast(H_FRAME_DECODER, new FrameDecoder()); 22 pl.addLast(H_FRAME_ENCODER, frameEncoder); 23 24 pl.addLast(H_READ_TIMEOUT, new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS)); 25 26 pl.addLast(H_FM_DECODER, fmDecoder); 27 pl.addLast(H_FM_ENCODER, fmEncoder); 28 29 pl.addLast(H_FM_HANDLER, clientHandler); 30 }
addFMessageTrait設置FMessageTrait,開發者可以根據需要訂製FMessage的處理能力,FMTraitBytes會默認添加。
connect用來發起連接,創建TcpClient對象。
doInitChannel初始化Channel, 開發者可以覆蓋這個方法,訂製channel的ChannelHandler。
另外,TcpConnector內部實現了一個FMessageHandler的派生類ClientHandler。這個類的channelActive方法中啟動一個定時任務定時發送PING。onResponse方法負責調用TcpClient的onResponse方法。
TcpClient是客戶端連接對象,它主要有兩個方法:
public boolean send(FMessage msg);
public Promise<FMessage> send(FMessage msg, TimeUnit timeUnit, long timeout);
第一個不處理響應。第二個可以非同步數量響應。
另外還有一個給TcpConnector使用的onResponse方法,用來觸發第二個send返回Promise對象的回調。
伺服器端框架
TcpServer是伺服器端框架,它比較簡單。開發者只需要覆蓋doInitChannel,添加自己的ChannelHandler,就可以實現伺服器端的訂製。