簡單實現通過netty通信,後續提供基於protobuf傳輸協議的rpc框架

  • 2019 年 10 月 10 日
  • 筆記

後續也會提供service-mesh簡單的代碼實現 netty通信和socket通信大致是類似的,在socket的基礎上對其進行封裝,當然你也可以實現netty功能,但是我給你一句話。

package org.gfu.base.netty;    import io.netty.bootstrap.Bootstrap;  import io.netty.channel.ChannelFuture;  import io.netty.channel.ChannelInitializer;  import io.netty.channel.ChannelOption;  import io.netty.channel.EventLoopGroup;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioSocketChannel;  import io.netty.handler.codec.string.StringDecoder;  import io.netty.handler.codec.string.StringEncoder;  import io.netty.util.CharsetUtil;    import java.util.Date;      /**   * netty client   *   * @author [email protected] |[email protected] |gfu   * @date 2019/9/27   */  class NettyClient {        private String host;      private int port;      private String jsonStr;          NettyClient(String host, int port) {          this.host = host;          this.port = port;      }        NettyClient setMessage(String jsonStr) {          this.jsonStr = jsonStr;          return this;      }        void run() {          EventLoopGroup workerGroup = new NioEventLoopGroup();            try {              Bootstrap bootstrap = new Bootstrap();              bootstrap.group(workerGroup)                      .channel(NioSocketChannel.class)                      .option(ChannelOption.SO_KEEPALIVE, true)                      .handler(new ChannelInitializer<SocketChannel>() {                          @Override                          protected void initChannel(SocketChannel ch) throws Exception {                              ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                              ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                              ch.pipeline().addLast(new NettyClientHandler(jsonStr));                          }                      });              ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(                      f -> {                          if (f.isSuccess()) {                              System.out.println("連接成功:" + host + ":" + port);                          } else {                              System.out.println(new Date() + "-- 連接失敗:" + host + ":" + port);                          }                      }              ).sync();              channelFuture.channel().closeFuture().sync();          } catch (Exception e) {              e.printStackTrace();          } finally {              workerGroup.shutdownGracefully();          }      }    }
package org.gfu.base.netty;    import io.netty.channel.ChannelHandlerAdapter;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.ChannelInboundHandlerAdapter;  import io.netty.channel.SimpleChannelInboundHandler;    import java.util.Scanner;    /**   * netty server handler   *   * @author [email protected] |[email protected] |gfu   * @date 2019/9/27   */  public class NettyClientHandler extends ChannelInboundHandlerAdapter {        private ChannelHandlerContext ctx;      private String jsonStr;        public NettyClientHandler(String jsonStr) {          this.jsonStr = jsonStr;      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          this.ctx = ctx;          ctx.writeAndFlush(jsonStr);      }  }
package org.gfu.base.netty;    import io.netty.bootstrap.ServerBootstrap;  import io.netty.channel.*;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.sctp.nio.NioSctpServerChannel;  import io.netty.channel.socket.SocketChannel;  import io.netty.channel.socket.nio.NioServerSocketChannel;  import io.netty.handler.codec.string.StringDecoder;  import io.netty.handler.codec.string.StringEncoder;  import io.netty.util.CharsetUtil;    import java.util.Date;    /**   * netty server   *   * @author [email protected] |[email protected] |gfu   * @date 2019/9/27   */  class NettyServer {        private String host;      private int port;        NettyServer(String host, int port) {          this.host = host;          this.port = port;      }        void run() {          EventLoopGroup bossGroup = new NioEventLoopGroup();          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap serverBootstrap = new ServerBootstrap();              serverBootstrap.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .childHandler(new ChannelInitializer<SocketChannel>() {                          @Override                          protected void initChannel(SocketChannel ch) throws Exception {                              ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));                              ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));                              ch.pipeline().addLast(new NettyServerHandler());                          }                      }).option(ChannelOption.SO_BACKLOG, 128)                      .childOption(ChannelOption.SO_KEEPALIVE, true);              ChannelFuture channelFuture = serverBootstrap.bind(host, port).                      addListener(f -> {                          if (f.isSuccess()) {                              System.out.println("綁定成功:" + host + ":" + port);                          } else {                              System.out.println(new Date() + "--綁定失敗:" + host + ":" + port);                          }                      }).sync();              channelFuture.channel().closeFuture().sync();          } catch (Exception e) {              e.printStackTrace();          } finally {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }  }
package org.gfu.base.netty;    import io.netty.buffer.ByteBuf;  import io.netty.channel.*;    /**   * netty server handler   *   * @author [email protected] |[email protected] |gfu   * @date 2019/9/27   */  public class NettyServerHandler extends ChannelInboundHandlerAdapter {        private String msg;        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          System.out.println(msg);          this.msg = msg.toString();          this.channelActive(ctx);      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          super.exceptionCaught(ctx, cause);      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception {          System.out.println(msg);          ctx.writeAndFlush(msg + "server accept success");      }  }