­

Netty實現心跳

  • 2020 年 2 月 17 日
  • 筆記

心跳機制

心跳機制是常用的一個健康監測的機制,說白了就是每隔一段時間向伺服器發送一個心跳的報文,服務收到報文後,就認為當前的客戶端在活動的狀態,否則會進入異常的機制,比如說主從切換

既然存在一個通訊,就一定會有服務端和客戶端。服務端開啟監聽,客戶端發起心跳報文,然後服務就再次響應。

系統的設計

  1. 消息的類型 在服務端和客戶端進行通訊的時候,需要區分消息的類型,根據消息的類型分別進行不同的處理。
public enum MessageType {    SERVICE_REQ((byte) 0),/*業務請求消息*/    SERVICE_RESP((byte) 1), /*業務應答消息*/    ONE_WAY((byte) 2), /*無需應答的消息*/    LOGIN_REQ((byte) 3), /*登錄請求消息*/    LOGIN_RESP((byte) 4), /*登錄響應消息*/    HEARTBEAT_REQ((byte) 5), /*心跳請求消息*/    HEARTBEAT_RESP((byte) 6);/*心跳應答消息*/    private byte code;      MessageType(byte code) {         this.code = code;    }      public byte getValue() {         return code;    }      public static MessageType getMessageType(String typeName){         for (MessageType mt :MessageType.values()) {              if(mt.toString().equals(typeName.trim())){                   return mt;              }           }         return null;    }    }   
  1. 內容的類型 在設計這個傳輸的模型的時候考慮的文件的傳輸(當然也可以作為消息的類型),所以還需要定義一個內容的類型
 public enum ContentType {         Default((byte) 0),         File((byte) 1),         Other((byte) 2);         private byte code;         ContentType(byte code) {              this.code = code;         }           public byte getValue() {              return code;         }           public static ContentType getContentType(String typeName){              for (ContentType mt :ContentType.values()) {                   if(mt.toString().equals(typeName.trim())){                        return mt;                   }                }              return null;         }    }   
  1. 消息頭 消息頭包含了消息的認證資訊和長度,用來認證資訊的合法來源和消息的截取。定義如下:
public class MessageHead {         private int headData = DEFAULT_MAGIC_START_CODE;//協議開始標誌         private int length;//包的長度         private String token;//認證的Token,可以設置時效         private LocalDateTime createDate;         private String messageId;         private MessageType messageType;         private ContentType  contentType;    }   
  1. 自定義傳輸Encoder和Decoder 在Netty中幾乎所有的業務邏輯在Handler中,Encoder和Decoder是特殊的handler,用於對消息的編碼和反編碼。類似序列號和反序列號。
public class RzEncoder extends MessageToByteEncoder<Message> {         @Override         protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {              // TODO Auto-generated method stub              // 寫入開頭的標誌              out.writeInt(msg.getHeader().getHeadData());              // 寫入包的的長度              out.writeInt(msg.getContent().length);              /** * token定長50個位元組 * 第一個參數 原數組 * 第二個參數 原數組位置 * 第三個參數 目標數組 * 第四個參數 目標數組位置 * 第五個參數 copy多少個長度 */              byte[] indexByte = msg.getHeader().getToken().getBytes();              writeByte(out, indexByte, 50);                  byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();              writeByte(out, createTimeByte, 50);                byte[] idByte = msg.getHeader().getMessageId().getBytes();              writeByte(out, idByte, 50);                byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};              out.writeBytes(msgType);              byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};              out.writeBytes(contentType);                  out.writeBytes(msg.getContent());           }           private void writeByte(ByteBuf out, byte[] bytes, int length) {              byte[] writeArr = new byte[length];              /** * * 第一個參數 原數組 * 第二個參數 原數組位置 * 第三個參數 目標數組 * 第四個參數 目標數組位置 * 第五個參數 copy多少個長度 */              System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);              out.writeBytes(writeArr);         }           private void writeByte(ByteBuf out, String content, int length) {              if (StringUtils.isEmpty(content)) {                   content = "";              }              writeByte(out, content.getBytes(), length);         }      }      public class RzDecoder extends ByteToMessageDecoder {         private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//協議頭 類型 int+length 4個位元組+令牌和 令牌生成時間50個位元組         private int headData = DEFAULT_MAGIC_START_CODE;//協議開始標誌           @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {              // 刻度長度必須大於基本長度              if (buffer.readableBytes() >= BASE_LENGTH) {                   /** * 粘包 發送頻繁 可能多次發送黏在一起 需要考慮 不過一個客戶端發送太頻繁也可以推斷是否是攻擊 */                   //防止soket流攻擊。客戶端傳過來的數據太大不合理                   if (buffer.readableBytes() > 1024*1024*10) {                        buffer.skipBytes(buffer.readableBytes());                     }              }              int beginIndex;//記錄包開始位置              while (true) {                   // 獲取包頭開始的index                   beginIndex = buffer.readerIndex();                   //如果讀到開始標記位置 結束讀取避免拆包和粘包                   if (buffer.readInt() == headData) {                        break;                   }                     //初始化讀的index為0                   buffer.resetReaderIndex();                   // 當略過,一個位元組之後,                   //如果當前buffer數據小於基礎數據 返回等待下一次讀取                   if (buffer.readableBytes() < BASE_LENGTH) {                        return;                   }              }              // 消息的長度              int length = buffer.readInt();              // 判斷請求數據包數據是否到齊              if ((buffer.readableBytes() - 100) < length) {                   //沒有到期 返回讀的指針 等待下一次數據到期再讀                   buffer.readerIndex(beginIndex);                   return;              }              //讀取令牌              byte[] tokenByte = new byte[50];              buffer.readBytes(tokenByte);                  //讀取令牌生成時間              byte[] createDateByte = new byte[50];              buffer.readBytes(createDateByte);                //讀取Id              byte[] messageIdByte = new byte[50];              buffer.readBytes(messageIdByte);                byte[] messageTypeByte = new byte[1];              buffer.readBytes(messageTypeByte);              byte[] contentTypeByte = new byte[1];              buffer.readBytes(contentTypeByte);              ContentType contentType = ContentType.values()[contentTypeByte[0]];                //讀取content              byte[] data = new byte[length];              buffer.readBytes(data);              MessageHead head = new MessageHead();              head.setHeadData(headData);              head.setToken(new String(tokenByte).trim());              head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));              head.setLength(length);              head.setMessageId(new String(messageIdByte).trim());              head.setMessageType(MessageType.values()[messageTypeByte[0]]);              head.setContentType(contentType);              Message message = new Message(head, data);              //認證不通過              if (!message.authorization(message.buidToken())) {                   ctx.close();                   return;              }              out.add(message);              buffer.discardReadBytes();//回收已讀位元組         }    }     
  1. 心跳的發送 心跳的發送就只剩下生成消息和發送了,此處略。

(本文完)

作者:付威 部落格地址:http://blog.laofu.online

如有任何知識產權、版權問題或理論錯誤,還請指正。 本文是付威的網路部落格原創,自由轉載-非商用-非衍生-保持署名,請遵循:創意共享3.0許可證