Netty实现远程调用RPC功能

  • 2019 年 11 月 6 日
  • 筆記

添加依赖

<dependency>      <groupId>io.netty</groupId>      <artifactId>netty-all</artifactId>      <version>4.1.2.Final</version>  </dependency>    <dependency>      <groupId>org.reflections</groupId>      <artifactId>reflections</artifactId>      <version>0.9.10</version>  </dependency>  

组织架构

服务端

封装类信息

public class ClassInfo implements Serializable {        private static final long serialVersionUID = 1L;        private String className;  //类名      private String methodName;//方法名      private Class<?>[] types; //参数类型      private Object[] objects;//参数列表        public String getClassName() {          return className;      }        public void setClassName(String className) {          this.className = className;      }        public String getMethodName() {          return methodName;      }        public void setMethodName(String methodName) {          this.methodName = methodName;      }        public Class<?>[] getTypes() {          return types;      }        public void setTypes(Class<?>[] types) {          this.types = types;      }        public Object[] getObjects() {          return objects;      }        public void setObjects(Object[] objects) {          this.objects = objects;      }  }  

服务端网络处理服务器

public class NettyRPCServer {      private int port;      public NettyRPCServer(int port) {          this.port = port;      }        public void start() {          EventLoopGroup bossGroup = new NioEventLoopGroup();          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap serverBootstrap = new ServerBootstrap();              serverBootstrap.group(bossGroup, workerGroup)                      .channel(NioServerSocketChannel.class)                      .option(ChannelOption.SO_BACKLOG, 128)                      .childOption(ChannelOption.SO_KEEPALIVE, true)                      .localAddress(port).childHandler(                              new ChannelInitializer<SocketChannel>() {                                  @Override                                  protected void initChannel(SocketChannel ch) throws Exception {                                      ChannelPipeline pipeline = ch.pipeline();                                      //编码器                                      pipeline.addLast("encoder", new ObjectEncoder());                                      //解码器                                      pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));                                      //服务器端业务处理类                                      pipeline.addLast(new InvokeHandler());                                  }                              });              ChannelFuture future = serverBootstrap.bind(port).sync();              System.out.println("......server is ready......");              future.channel().closeFuture().sync();          } catch (Exception e) {              bossGroup.shutdownGracefully();              workerGroup.shutdownGracefully();          }      }        public static void main(String[] args) throws Exception {          new NettyRPCServer(9999).start();      }  }  

服务器端业务处理类

public class InvokeHandler extends ChannelInboundHandlerAdapter {      //得到某接口下某个实现类的名字      private String getImplClassName(ClassInfo classInfo) throws Exception{          //服务方接口和实现类所在的包路径          String interfacePath="com.lyz.server";          int lastDot = classInfo.getClassName().lastIndexOf(".");          String interfaceName=classInfo.getClassName().substring(lastDot);          Class superClass=Class.forName(interfacePath+interfaceName);          Reflections reflections = new Reflections(interfacePath);          //得到某接口下的所有实现类          Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);          if(ImplClassSet.size()==0){              System.out.println("未找到实现类");              return null;          }else if(ImplClassSet.size()>1){              System.out.println("找到多个实现类,未明确使用哪一个");              return null;          }else {              //把集合转换为数组              Class[] classes=ImplClassSet.toArray(new Class[0]);              return classes[0].getName(); //得到实现类的名字          }      }        @Override  //读取客户端发来的数据并通过反射调用实现类的方法      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ClassInfo classInfo = (ClassInfo) msg;          System.out.println(classInfo);          Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();          Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());          //通过反射调用实现类的方法          Object result = method.invoke(clazz, classInfo.getObjects());          ctx.writeAndFlush(result);      }  }  

服务端接口及实现类

// 无参接口  public interface HelloNetty {      String hello();  }    // 实现类  public class HelloNettyImpl implements HelloNetty {      @Override      public String hello() {          return "hello,netty";      }  }    // 带参接口  public interface HelloRPC {      String hello(String name);  }    // 实现类  public class HelloRPCImpl implements HelloRPC {      @Override      public String hello(String name) {          return "hello," + name;      }  }  

客户端

代理类

public class NettyRPCProxy {      //根据接口创建代理对象      public static Object create(Class target) {          return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {              @Override              public Object invoke(Object proxy, Method method, Object[] args)                      throws Throwable {                  //封装ClassInfo                  ClassInfo classInfo = new ClassInfo();                  classInfo.setClassName(target.getName());                  classInfo.setMethodName(method.getName());                  classInfo.setObjects(args);                  classInfo.setTypes(method.getParameterTypes());                    //开始用Netty发送数据                  EventLoopGroup group = new NioEventLoopGroup();                  ResultHandler resultHandler = new ResultHandler();                  try {                      Bootstrap b = new Bootstrap();                      b.group(group)                              .channel(NioSocketChannel.class)                              .handler(new ChannelInitializer<SocketChannel>() {                                  @Override                                  public void initChannel(SocketChannel ch) throws Exception {                                      ChannelPipeline pipeline = ch.pipeline();                                      //编码器                                      pipeline.addLast("encoder", new ObjectEncoder());                                      //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器                                      pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));                                      //客户端业务处理类                                      pipeline.addLast("handler", resultHandler);                                  }                              });                      ChannelFuture future = b.connect("127.0.0.1", 9999).sync();                      future.channel().writeAndFlush(classInfo).sync();                      future.channel().closeFuture().sync();                  } finally {                      group.shutdownGracefully();                  }                  return resultHandler.getResponse();              }          });      }  }  

客户端业务处理类

public class ResultHandler extends ChannelInboundHandlerAdapter {        private Object response;      public Object getResponse() {          return response;      }        @Override //读取服务器端返回的数据(远程调用的结果)      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          response = msg;          ctx.close();      }  }  

客户端接口

// 无参接口  public interface HelloNetty {      String hello();  }    // 带参接口  public interface HelloRPC {      String hello(String name);  }写一个

测试类 服务调用方

public class TestNettyRPC {      public static void main(String [] args){            //第1次远程调用          HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);          System.out.println(helloNetty.hello());            //第2次远程调用          HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);          System.out.println(helloRPC.hello("RPC"));        }  }  

输出结果

服务端

......server is ready......  com.lyz.serverStub.ClassInfo@2b894733  com.lyz.serverStub.ClassInfo@167bfa9

客户端

hello,netty  hello,RPC  

下一篇通过netty实现线上聊天功能

 

 

public class NettyRPCProxy {
//根据接口创建代理对象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//封装ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());

//开始用Netty发送数据
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//客户端业务处理类
pipeline.addLast("handler", resultHandler);
}
});
ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}