Netty实现心跳
- 2020 年 2 月 17 日
- 筆記
心跳机制
心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换
。
既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。
系统的设计
- 消息的类型 在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。
public enum MessageType { SERVICE_REQ((byte) 0),/*业务请求消息*/ SERVICE_RESP((byte) 1), /*业务应答消息*/ ONE_WAY((byte) 2), /*无需应答的消息*/ LOGIN_REQ((byte) 3), /*登录请求消息*/ LOGIN_RESP((byte) 4), /*登录响应消息*/ HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/ HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/ private byte code; MessageType(byte code) { this.code = code; } public byte getValue() { return code; } public static MessageType getMessageType(String typeName){ for (MessageType mt :MessageType.values()) { if(mt.toString().equals(typeName.trim())){ return mt; } } return null; } }
- 内容的类型 在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型
public enum ContentType { Default((byte) 0), File((byte) 1), Other((byte) 2); private byte code; ContentType(byte code) { this.code = code; } public byte getValue() { return code; } public static ContentType getContentType(String typeName){ for (ContentType mt :ContentType.values()) { if(mt.toString().equals(typeName.trim())){ return mt; } } return null; } }
- 消息头 消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:
public class MessageHead { private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志 private int length;//包的长度 private String token;//认证的Token,可以设置时效 private LocalDateTime createDate; private String messageId; private MessageType messageType; private ContentType contentType; }
- 自定义传输Encoder和Decoder 在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。
public class RzEncoder extends MessageToByteEncoder<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { // TODO Auto-generated method stub // 写入开头的标志 out.writeInt(msg.getHeader().getHeadData()); // 写入包的的长度 out.writeInt(msg.getContent().length); /** * token定长50个字节 * 第一个参数 原数组 * 第二个参数 原数组位置 * 第三个参数 目标数组 * 第四个参数 目标数组位置 * 第五个参数 copy多少个长度 */ byte[] indexByte = msg.getHeader().getToken().getBytes(); writeByte(out, indexByte, 50); byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes(); writeByte(out, createTimeByte, 50); byte[] idByte = msg.getHeader().getMessageId().getBytes(); writeByte(out, idByte, 50); byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()}; out.writeBytes(msgType); byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()}; out.writeBytes(contentType); out.writeBytes(msg.getContent()); } private void writeByte(ByteBuf out, byte[] bytes, int length) { byte[] writeArr = new byte[length]; /** * * 第一个参数 原数组 * 第二个参数 原数组位置 * 第三个参数 目标数组 * 第四个参数 目标数组位置 * 第五个参数 copy多少个长度 */ System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length); out.writeBytes(writeArr); } private void writeByte(ByteBuf out, String content, int length) { if (StringUtils.isEmpty(content)) { content = ""; } writeByte(out, content.getBytes(), length); } } public class RzDecoder extends ByteToMessageDecoder { private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节 private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) { // 刻度长度必须大于基本长度 if (buffer.readableBytes() >= BASE_LENGTH) { /** * 粘包 发送频繁 可能多次发送黏在一起 需要考虑 不过一个客户端发送太频繁也可以推断是否是攻击 */ //防止soket流攻击。客户端传过来的数据太大不合理 if (buffer.readableBytes() > 1024*1024*10) { buffer.skipBytes(buffer.readableBytes()); } } int beginIndex;//记录包开始位置 while (true) { // 获取包头开始的index beginIndex = buffer.readerIndex(); //如果读到开始标记位置 结束读取避免拆包和粘包 if (buffer.readInt() == headData) { break; } //初始化读的index为0 buffer.resetReaderIndex(); // 当略过,一个字节之后, //如果当前buffer数据小于基础数据 返回等待下一次读取 if (buffer.readableBytes() < BASE_LENGTH) { return; } } // 消息的长度 int length = buffer.readInt(); // 判断请求数据包数据是否到齐 if ((buffer.readableBytes() - 100) < length) { //没有到期 返回读的指针 等待下一次数据到期再读 buffer.readerIndex(beginIndex); return; } //读取令牌 byte[] tokenByte = new byte[50]; buffer.readBytes(tokenByte); //读取令牌生成时间 byte[] createDateByte = new byte[50]; buffer.readBytes(createDateByte); //读取Id byte[] messageIdByte = new byte[50]; buffer.readBytes(messageIdByte); byte[] messageTypeByte = new byte[1]; buffer.readBytes(messageTypeByte); byte[] contentTypeByte = new byte[1]; buffer.readBytes(contentTypeByte); ContentType contentType = ContentType.values()[contentTypeByte[0]]; //读取content byte[] data = new byte[length]; buffer.readBytes(data); MessageHead head = new MessageHead(); head.setHeadData(headData); head.setToken(new String(tokenByte).trim()); head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim())); head.setLength(length); head.setMessageId(new String(messageIdByte).trim()); head.setMessageType(MessageType.values()[messageTypeByte[0]]); head.setContentType(contentType); Message message = new Message(head, data); //认证不通过 if (!message.authorization(message.buidToken())) { ctx.close(); return; } out.add(message); buffer.discardReadBytes();//回收已读字节 } }
- 心跳的发送 心跳的发送就只剩下生成消息和发送了,此处略。
(本文完)
作者:付威 博客地址:http://blog.laofu.online
如有任何知识产权、版权问题或理论错误,还请指正。 本文是付威的网络博客原创,自由转载-非商用-非衍生-保持署名,请遵循:创意共享3.0许可证