簡單實現通過netty通信,後續提供基於protobuf傳輸協議的rpc框架
- 2019 年 10 月 10 日
- 筆記
後續也會提供service-mesh簡單的代碼實現 netty通信和socket通信大致是類似的,在socket的基礎上對其進行封裝,當然你也可以實現netty功能,但是我給你一句話。


package org.gfu.base.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Date; /** * netty client * * @author [email protected] |[email protected] |gfu * @date 2019/9/27 */ class NettyClient { private String host; private int port; private String jsonStr; NettyClient(String host, int port) { this.host = host; this.port = port; } NettyClient setMessage(String jsonStr) { this.jsonStr = jsonStr; return this; } void run() { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new NettyClientHandler(jsonStr)); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).addListener( f -> { if (f.isSuccess()) { System.out.println("連接成功:" + host + ":" + port); } else { System.out.println(new Date() + "-- 連接失敗:" + host + ":" + port); } } ).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
package org.gfu.base.netty; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import java.util.Scanner; /** * netty server handler * * @author [email protected] |[email protected] |gfu * @date 2019/9/27 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private ChannelHandlerContext ctx; private String jsonStr; public NettyClientHandler(String jsonStr) { this.jsonStr = jsonStr; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; ctx.writeAndFlush(jsonStr); } }
package org.gfu.base.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.sctp.nio.NioSctpServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Date; /** * netty server * * @author [email protected] |[email protected] |gfu * @date 2019/9/27 */ class NettyServer { private String host; private int port; NettyServer(String host, int port) { this.host = host; this.port = port; } void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new NettyServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = serverBootstrap.bind(host, port). addListener(f -> { if (f.isSuccess()) { System.out.println("綁定成功:" + host + ":" + port); } else { System.out.println(new Date() + "--綁定失敗:" + host + ":" + port); } }).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package org.gfu.base.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.*; /** * netty server handler * * @author [email protected] |[email protected] |gfu * @date 2019/9/27 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private String msg; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); this.msg = msg.toString(); this.channelActive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(msg); ctx.writeAndFlush(msg + "server accept success"); } }