Netty實現心跳
- 2020 年 2 月 17 日
- 筆記
心跳機制
心跳機制是常用的一個健康監測的機制,說白了就是每隔一段時間向伺服器發送一個心跳的報文,服務收到報文後,就認為當前的客戶端在活動的狀態,否則會進入異常的機制,比如說主從切換
。
既然存在一個通訊,就一定會有服務端和客戶端。服務端開啟監聽,客戶端發起心跳報文,然後服務就再次響應。
系統的設計
- 消息的類型 在服務端和客戶端進行通訊的時候,需要區分消息的類型,根據消息的類型分別進行不同的處理。
public enum MessageType { SERVICE_REQ((byte) 0),/*業務請求消息*/ SERVICE_RESP((byte) 1), /*業務應答消息*/ ONE_WAY((byte) 2), /*無需應答的消息*/ LOGIN_REQ((byte) 3), /*登錄請求消息*/ LOGIN_RESP((byte) 4), /*登錄響應消息*/ HEARTBEAT_REQ((byte) 5), /*心跳請求消息*/ HEARTBEAT_RESP((byte) 6);/*心跳應答消息*/ private byte code; MessageType(byte code) { this.code = code; } public byte getValue() { return code; } public static MessageType getMessageType(String typeName){ for (MessageType mt :MessageType.values()) { if(mt.toString().equals(typeName.trim())){ return mt; } } return null; } }
- 內容的類型 在設計這個傳輸的模型的時候考慮的文件的傳輸(當然也可以作為消息的類型),所以還需要定義一個內容的類型
public enum ContentType { Default((byte) 0), File((byte) 1), Other((byte) 2); private byte code; ContentType(byte code) { this.code = code; } public byte getValue() { return code; } public static ContentType getContentType(String typeName){ for (ContentType mt :ContentType.values()) { if(mt.toString().equals(typeName.trim())){ return mt; } } return null; } }
- 消息頭 消息頭包含了消息的認證資訊和長度,用來認證資訊的合法來源和消息的截取。定義如下:
public class MessageHead { private int headData = DEFAULT_MAGIC_START_CODE;//協議開始標誌 private int length;//包的長度 private String token;//認證的Token,可以設置時效 private LocalDateTime createDate; private String messageId; private MessageType messageType; private ContentType contentType; }
- 自定義傳輸Encoder和Decoder 在Netty中幾乎所有的業務邏輯在Handler中,Encoder和Decoder是特殊的handler,用於對消息的編碼和反編碼。類似序列號和反序列號。
public class RzEncoder extends MessageToByteEncoder<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { // TODO Auto-generated method stub // 寫入開頭的標誌 out.writeInt(msg.getHeader().getHeadData()); // 寫入包的的長度 out.writeInt(msg.getContent().length); /** * token定長50個位元組 * 第一個參數 原數組 * 第二個參數 原數組位置 * 第三個參數 目標數組 * 第四個參數 目標數組位置 * 第五個參數 copy多少個長度 */ byte[] indexByte = msg.getHeader().getToken().getBytes(); writeByte(out, indexByte, 50); byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes(); writeByte(out, createTimeByte, 50); byte[] idByte = msg.getHeader().getMessageId().getBytes(); writeByte(out, idByte, 50); byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()}; out.writeBytes(msgType); byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()}; out.writeBytes(contentType); out.writeBytes(msg.getContent()); } private void writeByte(ByteBuf out, byte[] bytes, int length) { byte[] writeArr = new byte[length]; /** * * 第一個參數 原數組 * 第二個參數 原數組位置 * 第三個參數 目標數組 * 第四個參數 目標數組位置 * 第五個參數 copy多少個長度 */ System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length); out.writeBytes(writeArr); } private void writeByte(ByteBuf out, String content, int length) { if (StringUtils.isEmpty(content)) { content = ""; } writeByte(out, content.getBytes(), length); } } public class RzDecoder extends ByteToMessageDecoder { private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//協議頭 類型 int+length 4個位元組+令牌和 令牌生成時間50個位元組 private int headData = DEFAULT_MAGIC_START_CODE;//協議開始標誌 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) { // 刻度長度必須大於基本長度 if (buffer.readableBytes() >= BASE_LENGTH) { /** * 粘包 發送頻繁 可能多次發送黏在一起 需要考慮 不過一個客戶端發送太頻繁也可以推斷是否是攻擊 */ //防止soket流攻擊。客戶端傳過來的數據太大不合理 if (buffer.readableBytes() > 1024*1024*10) { buffer.skipBytes(buffer.readableBytes()); } } int beginIndex;//記錄包開始位置 while (true) { // 獲取包頭開始的index beginIndex = buffer.readerIndex(); //如果讀到開始標記位置 結束讀取避免拆包和粘包 if (buffer.readInt() == headData) { break; } //初始化讀的index為0 buffer.resetReaderIndex(); // 當略過,一個位元組之後, //如果當前buffer數據小於基礎數據 返回等待下一次讀取 if (buffer.readableBytes() < BASE_LENGTH) { return; } } // 消息的長度 int length = buffer.readInt(); // 判斷請求數據包數據是否到齊 if ((buffer.readableBytes() - 100) < length) { //沒有到期 返回讀的指針 等待下一次數據到期再讀 buffer.readerIndex(beginIndex); return; } //讀取令牌 byte[] tokenByte = new byte[50]; buffer.readBytes(tokenByte); //讀取令牌生成時間 byte[] createDateByte = new byte[50]; buffer.readBytes(createDateByte); //讀取Id byte[] messageIdByte = new byte[50]; buffer.readBytes(messageIdByte); byte[] messageTypeByte = new byte[1]; buffer.readBytes(messageTypeByte); byte[] contentTypeByte = new byte[1]; buffer.readBytes(contentTypeByte); ContentType contentType = ContentType.values()[contentTypeByte[0]]; //讀取content byte[] data = new byte[length]; buffer.readBytes(data); MessageHead head = new MessageHead(); head.setHeadData(headData); head.setToken(new String(tokenByte).trim()); head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim())); head.setLength(length); head.setMessageId(new String(messageIdByte).trim()); head.setMessageType(MessageType.values()[messageTypeByte[0]]); head.setContentType(contentType); Message message = new Message(head, data); //認證不通過 if (!message.authorization(message.buidToken())) { ctx.close(); return; } out.add(message); buffer.discardReadBytes();//回收已讀位元組 } }
- 心跳的發送 心跳的發送就只剩下生成消息和發送了,此處略。
(本文完)
作者:付威 部落格地址:http://blog.laofu.online
如有任何知識產權、版權問題或理論錯誤,還請指正。 本文是付威的網路部落格原創,自由轉載-非商用-非衍生-保持署名,請遵循:創意共享3.0許可證