Netty编解码之ProtoBuf案例二

  • 2020 年 2 月 10 日
  • 筆記

  前面我们介绍了Protobuf的基本使用,但是我们是一个POJO对象创建一个proto文件,那么在实际环境中将要创建多个文件,会非常的不方便,本文我们来看看怎么根据类型来动态处理

Protobuf案例二

proto文件

  在proto文件中我们通过message来管理类型,具体如下

syntax = "proto3";  option optimize_for = SPEED; // 加快解析  option java_package="com.dpb.netty.codec2";   //指定生成到哪个包下  option java_outer_classname="MyDataInfo"; // 外部类名, 文件名    //protobuf 可以使用message 管理其他的message  message MyMessage {        //定义一个枚举类型      enum DataType {          StudentType = 0; //在proto3 要求enum的编号从0开始          WorkerType = 1;      }        //用data_type 来标识传的是哪一个枚举类型      DataType data_type = 1;        //表示每次枚举类型最多只能出现其中的一个, 节省空间      oneof dataBody {          Student student = 2;          Worker worker = 3;      }    }      message Student {      int32 id = 1;//Student类的属性      string name = 2; //  }  message Worker {      string name=1;      int32 age=2;  }

对应的POJO文件

  通过proto.exe文件来动态生成pojo文件。

服务端代码

  将生成的文件拷贝进项目中,然后编写服务端代码。

package com.dpb.netty.codec2;    import com.dpb.netty.codec.StudentPojo;  import io.netty.bootstrap.ServerBootstrap;  import io.netty.channel.ChannelFuture;  import io.netty.channel.ChannelInitializer;  import io.netty.channel.ChannelOption;  import io.netty.channel.EventLoopGroup;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioServerSocketChannel;  import io.netty.handler.codec.protobuf.ProtobufDecoder;    /**   * @program: netty4demo   * @description:   * @author: 波波烤鸭   * @create: 2019-12-23 11:15   */  public class NettyServerDemo {      public static void main(String[] args) {          // 创建对应的 线程池          // 创建Boss group          EventLoopGroup boosGroup = new NioEventLoopGroup(1);          // 创建 workgroup          EventLoopGroup workGroup = new NioEventLoopGroup();          // 创建对应的启动类          ServerBootstrap bootstrap = new ServerBootstrap();          try{              // 设置相关的配置信息              bootstrap.group(boosGroup,workGroup) // 设置对应的线程组                      .channel(NioServerSocketChannel.class) // 设置对应的通道                      .option(ChannelOption.SO_BACKLOG,1024) // 设置线程的连接个数                      .childHandler(new ChannelInitializer<SocketChannel>() { // 设置                          /**                           * 给pipeline 设置处理器                           * @param socketChannel                           * @throws Exception                           */                          @Override                          protected void initChannel(SocketChannel socketChannel) throws Exception {                              // 指定Protobuf解码                              socketChannel.pipeline().addLast("decoder"                                      ,new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));                              socketChannel.pipeline().addLast(new NettyServerHandler());                          }                      });              System.out.println("服务启动了....");              // 绑定端口  启动服务              ChannelFuture channelFuture = bootstrap.bind(6668).sync();              // 对关闭通道进行监听              channelFuture.channel().closeFuture().sync();          }catch (Exception e){            }finally {              // 优雅停服              boosGroup.shutdownGracefully();              workGroup.shutdownGracefully();          }        }  }
package com.dpb.netty.codec2;    import com.dpb.netty.codec.StudentPojo;  import io.netty.buffer.Unpooled;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.SimpleChannelInboundHandler;  import io.netty.util.CharsetUtil;    /**   * @program: netty4demo   * @description:   * @author: 波波烤鸭   * @create: 2019-12-23 11:24   */  public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {            @Override      protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage data) throws Exception {          // 得根据不同的类型来获取对应的数据          MyDataInfo.MyMessage.DataType type = data.getDataType();          if(type == MyDataInfo.MyMessage.DataType.StudentType){              // 表示传递过来的是 Student类型              System.out.println("学生信息:" + data.getStudent().getId() + " " + data.getStudent().getName());          }else if(type == MyDataInfo.MyMessage.DataType.WorkerType){              // 表示传递的是 worker类型              System.out.println("worker信息:" + data.getWorker().getName() + " " + data.getWorker().getAge());          }else{              System.out.println("类型不匹配.... ");          }      }        /**       * 读取客户端发送数据完成后的方法       *    在本方法中可以发送返回的数据       * @param ctx       * @throws Exception       */      @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          // writeAndFlush 是组合方法          ctx.writeAndFlush(Unpooled.copiedBuffer("你好啊,客户端....^_^",CharsetUtil.UTF_8));      }  }

注意解码器的位置

客户端代码

package com.dpb.netty.codec2;    import io.netty.bootstrap.Bootstrap;  import io.netty.channel.ChannelFuture;  import io.netty.channel.ChannelInitializer;  import io.netty.channel.EventLoopGroup;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioSocketChannel;  import io.netty.handler.codec.protobuf.ProtobufEncoder;    /**   * @program: netty4demo   * @description:   * @author: 波波烤鸭   * @create: 2019-12-23 11:31   */  public class NettyClientDemo {        public static void main(String[] args) {          // 客户端就只需要创建一个 线程组了          EventLoopGroup loopGroup = new NioEventLoopGroup();          // 创建 启动器          Bootstrap bootstrap = new Bootstrap();          try{              // 设置相关的参数              bootstrap.group(loopGroup)                      .channel(NioSocketChannel.class)                      .handler(new ChannelInitializer<SocketChannel>() {                          @Override                          protected void initChannel(SocketChannel socketChannel) throws Exception {                              // 指定protobu编码                              socketChannel.pipeline().addLast("encoder",new ProtobufEncoder());                              socketChannel.pipeline().addLast(new NettyClientHandler());                          }                      });              // 连接服务              ChannelFuture future = bootstrap.connect("localhost",6668).sync();              // 对服务关闭 监听              future.channel().closeFuture().sync();          }catch (Exception e){            }finally {              loopGroup.shutdownGracefully();          }        }  }
package com.dpb.netty.codec2;    import com.dpb.netty.codec.StudentPojo;  import io.netty.buffer.ByteBuf;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.ChannelInboundHandlerAdapter;  import io.netty.util.CharsetUtil;    import java.util.Random;    /**   * @program: netty4demo   * @description:   * @author: 波波烤鸭   * @create: 2019-12-23 11:36   */  public class NettyClientHandler extends ChannelInboundHandlerAdapter {        /**       * 连接上服务的回调方法       * @param ctx       * @throws Exception       */      @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          // 发送数据          //随机的发送Student 或者 Workder 对象          int random = new Random().nextInt(3);          MyDataInfo.MyMessage myMessage = null;            if(0 == random) { //发送Student 对象                myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(666).setName("波波烤鸭").build()).build();          } else { // 发送一个Worker 对象                myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("邓师傅").build()).build();          }            ctx.writeAndFlush(myMessage);      }        /**       * 读取服务端返回的信息       * @param ctx       * @param msg       * @throws Exception       */      @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ByteBuf buf  = (ByteBuf) msg;          System.out.println("服务端返回的信息:" + buf.toString(CharsetUtil.UTF_8));      }  }

测试

  先启动服务器,然后启动多个客户端。

通过输出结果我们可以看到,服务器可以根据不同的类型获取到对应的POJO对象中的数据,会比原来单一的处理要更加的灵活些!