Java+Nettty自定义RPC框架

  • 2020 年 7 月 21 日
  • 笔记
本次利用Java+netty实现自定义rpc框架,共分为三个工程,公共模块+服务提供者+服务消费者:
 

rpc-common工程
 
pom.xml
 
<project xmlns="//maven.apache.org/POM/4.0.0"
    xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.rpc.common</groupId>
    <artifactId>rpc-common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
 
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>
        <dependency>
 
            <groupId>com.alibaba</groupId>
 
            <artifactId>fastjson</artifactId>
 
            <version>1.2.41</version>
 
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 
 
RpcDecoder.java
 
 
package com.rpc.decoder;
 
import java.util.List;
 
import com.rpc.util.SerializationUtil;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
 *
 * @author linxu
 *
 */
public class RpcDecoder extends ByteToMessageDecoder {
    private Class<?> genericClass;
 
    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }
 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        Object obj = SerializationUtil.toClass(genericClass, data);
        out.add(obj);
    }
 
}
 
 
RpcEncoder.java
 
package com.rpc.decoder;
 
import com.rpc.util.SerializationUtil;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
 *
 * @author linxu
 *
 */
@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {
     private Class<?> genericClass; 
       
        public RpcEncoder(Class<?> genericClass) { 
            this.genericClass = genericClass; 
        } 
       
        @Override 
        public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { 
            if (genericClass.isInstance(msg)) { 
                byte[] data = SerializationUtil.toByte(msg); 
                out.writeInt(data.length); 
                out.writeBytes(data); 
            } 
        } 
     
}
 
SrpcRequest.java
 
package com.rpc.message;
 
import java.io.Serializable;
import java.util.Arrays;
/**
 *
 * @author linxu
 *
 */
public class SrpcRequest implements Serializable{
    private static final long serialVersionUID = 6132853628325824727L; 
    // 请求Id 
    private String            requestId; 
    // 远程调用接口名称 
    private String            interfaceName; 
    //远程调用方法名称 
    private String            methodName; 
    // 参数类型 
    private Class<?>[]        parameterTypes; 
    // 参数值 
    private Object[]          parameters; 
   
    public String getRequestId() { 
        return requestId; 
    } 
   
    public void setRequestId(String requestId) { 
        this.requestId = requestId; 
    } 
   
    public String getInterfaceName() { 
        return interfaceName; 
    } 
   
    public void setInterfaceName(String interfaceName) { 
        this.interfaceName = interfaceName; 
    } 
   
    public String getMethodName() { 
        return methodName; 
    } 
   
    public void setMethodName(String methodName) { 
        this.methodName = methodName; 
    } 
   
    public Class<?>[] getParameterTypes() { 
        return parameterTypes; 
    } 
   
    public void setParameterTypes(Class<?>[] parameterTypes) { 
        this.parameterTypes = parameterTypes; 
    } 
   
    public Object[] getParameters() { 
        return parameters; 
    } 
   
    public void setParameters(Object[] parameters) { 
        this.parameters = parameters; 
    } 
   
    @Override 
    public String toString() { 
        return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName 
                + ", methodName=" + methodName + ", parameterTypes=" 
                + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) 
                + "]"; 
    } 
   
} 
 
 
 
SrpcResponse.java
 
package com.rpc.message;
 
import java.io.Serializable;
 
/**
 *
 * @author linxu
 *
 */
public class SrpcResponse implements Serializable{
    private static final long serialVersionUID = -5934073769679010930L; 
    // 请求的Id 
    private String            requestId; 
    // 异常 
    private Throwable         error; 
    // 响应 
    private Object            result; 
   
    public String getRequestId() { 
        return requestId; 
    } 
   
    public void setRequestId(String requestId) { 
        this.requestId = requestId; 
    } 
    public Throwable getError() { 
        return error; 
    } 
   
    public void setError(Throwable error) { 
        this.error = error; 
    } 
   
    public Object getResult() { 
        return result; 
    } 
   
    public void setResult(Object result) { 
        this.result = result; 
    } 
   
    @Override 
    public String toString() { 
        return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result 
                + "]"; 
    } 
   
} 
 
 
 
SerializationUtil.java
 
package com.rpc.util;
 
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
 *
 * @author 86136
 *
 */
public class SerializationUtil {
    /**
     * 序列化
     *
     * @param t
     * @return
     */
    public static <T> byte[] toByte(T t) {
        ByteArrayOutputStream b = new ByteArrayOutputStream();
        ObjectOutputStream o = null;
        try {
            o = new ObjectOutputStream(b);
            o.writeObject(t);
            return b.toByteArray();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if (b != null) {
                try {
                    b.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                if (o != null) {
                    try {
                        o.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
        return null;
 
    }
 
    /**
     * 反序列
     *
     * @param clazz
     * @param buffer
     * @return
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T toClass(Class<T> clazz, byte[] buffer) throws Exception {
        ByteArrayInputStream i = new ByteArrayInputStream(buffer);
        ObjectInputStream o = null;
        try {
            o = new ObjectInputStream(i);
            return (T) o.readObject();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if (i != null) {
                i.close();
            }
            if (o != null) {
                o.close();
            }
        }
        return null;
 
    }
}
 
 
DeptService.java
 
package com.user.service;
 
public interface DeptService {
    public String selectDept(String d);
 
}
 
 
UserService.java
 
package com.user.service;
 
public interface UserService {
    String sayHello(String word);
}
 
 
 
rpc-consumer工程
 
ClientBootstrap.java
 
package com.rpc;
 
import com.user.service.DeptService;
import com.user.service.UserService;
 
/**
 * 调用测试
 *
 * @author linxu
 *
 */
public class ClientBootstrap {
 
    public static void main(String[] args) throws InterruptedException {
        test1();
        test2();
    }
 
    public static void test1() {
        RpcConsumer consumer = new RpcConsumer();
        UserService service = (UserService) consumer.createProxy(UserService.class);
        System.out.println(service.sayHello("are you ok 001 ?"));
    }
 
    public static void test2() {
        RpcConsumer consumer = new RpcConsumer();
        DeptService service = (DeptService) consumer.createProxy(DeptService.class);
        System.out.println(service.selectDept("are you ok 002 ?"));
    }
}
 
ClientHandler.java
 
package com.rpc;
 
import java.util.concurrent.Callable;
 
import com.rpc.message.SrpcRequest;
import com.rpc.message.SrpcResponse;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable<Object> {
    private ChannelHandlerContext context;
    private SrpcResponse result;
    private SrpcRequest  para;
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        context = ctx;
    }
 
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
        result = (SrpcResponse)msg;
        notify();
    }
 
    @Override
    public synchronized Object call() throws InterruptedException {
        context.writeAndFlush(para);
        wait();
        return result;
    }
 
    void setPara(SrpcRequest  para) {
        this.para = para;
    }
}
 
 
RpcConsumer.java
 
package com.rpc;
 
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import org.apache.commons.lang3.StringUtils;
 
import com.rpc.decoder.RpcDecoder;
import com.rpc.decoder.RpcEncoder;
import com.rpc.message.SrpcRequest;
import com.rpc.message.SrpcResponse;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
public class RpcConsumer {
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static ClientHandler client;
 
    /**
     * 代理对象去执行了一个socket连接请求,
     */
    public Object createProxy(final Class<?> interfaceClass) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { interfaceClass },
                (proxy, method, arguments) -> {
                    if (client == null) {
                        initClient();
                    }
                    //请求封装
                    SrpcRequest request = new SrpcRequest();
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setInterfaceName(interfaceClass.getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameters(arguments);
                    client.setPara(request);
                    //请求结果
                    SrpcResponse response = (SrpcResponse) executor.submit(client).get();
                     
                    if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) {
                        return null;
                    }
                    if (response.getError() != null) {
                        throw response.getError();
                    }
                    return response.getResult();
                });
    }
 
    private static void initClient() {
        client = new ClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new RpcEncoder(SrpcRequest.class));
                        p.addLast(new RpcDecoder(SrpcResponse.class));
                        p.addLast(client);
                    }
                });
        try {
            b.connect("localhost", 8888).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
pom.xml
 
<project xmlns="//maven.apache.org/POM/4.0.0"
    xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.rpc.consumer</groupId>
    <artifactId>rpc-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
 
        <dependency>
 
            <groupId>com.rpc.common</groupId>
            <artifactId>rpc-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 
 
 
rpc-provider工程
 
Bootstrap.java
 
package com.rpc.bootstrap;
 
import java.util.HashMap;
import java.util.Map;
 
import com.rpc.decoder.RpcDecoder;
import com.rpc.decoder.RpcEncoder;
import com.rpc.message.SrpcRequest;
import com.rpc.message.SrpcResponse;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
 
@SuppressWarnings({ })
public class Bootstrap {
    // 服务器接口容器
    @SuppressWarnings({ })
    public static final Map<String, Object> serviceRegistry = new HashMap<String, Object>();
 
    public static void startServer(String hostName, int port) {
        try {
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
 
            bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new RpcDecoder(SrpcRequest.class));
                            p.addLast(new RpcEncoder(SrpcResponse.class));
                            p.addLast(new RpcServerHandler());
                        }
                    });
            bootstrap.bind(hostName, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        //服务注册
        registryService();
        //netty启动
        startServer("localhost", 8888);
    }
 
    public static void registryService() {
        final Map<String, String> serviceInterfaceConfiguration = ServiceRegistry.registry();
        if (serviceInterfaceConfiguration != null && !serviceInterfaceConfiguration.isEmpty()) {
            serviceInterfaceConfiguration.forEach((k, v) -> {
                try {
                    @SuppressWarnings("deprecation")
                    Object object = Class.forName(v).newInstance();
                    serviceRegistry.put(k, object);
                } catch (ClassNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (InstantiationException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (IllegalAccessException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
 
            });
        }
 
    }
}
 
RpcServerHandler.java
 
package com.rpc.bootstrap;
 
import java.lang.reflect.Method;
 
import com.rpc.message.SrpcRequest;
import com.rpc.message.SrpcResponse;
 
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
 
public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest msg) throws Exception {
        SrpcResponse response = new SrpcResponse();
        response.setRequestId(msg.getRequestId());
        try {
            response.setResult(handle(msg));
        } catch (Exception e) {
            response.setError(e);
            e.printStackTrace();
        }
        ctx.writeAndFlush(response);
    }
 
    /**
     * 执行服务接口方法
     *
     * @param request
     * @return
     * @throws Exception
     */
    private Object handle(SrpcRequest request) throws Exception {
        Object service = Bootstrap.serviceRegistry.get(request.getInterfaceName());
        Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
        return method.invoke(service, request.getParameters());
    }
}
 
 
ServiceRegistry.java
 
package com.rpc.bootstrap;
 
import java.util.HashMap;
import java.util.Map;
 
public class ServiceRegistry {
 
    public static Map<String,String>serviceInterfaceConfiguration=new HashMap<String, String>();
     
     
    public static void put(String k,String v) {
        serviceInterfaceConfiguration.put(k, v);
         
    }
    public static Map<String,String> registry() {
        //服务接口注册
        put("com.user.service.UserService", "com.user.serviceimp.UserServiceImpl");
        put("com.user.service.DeptService", "com.user.serviceimp.DeptServiceImpl");
        return serviceInterfaceConfiguration;
         
    }
     
}
 
 
DeptServiceImpl.java
 
package com.user.serviceimp;
 
import com.user.service.DeptService;
 
public class DeptServiceImpl implements DeptService{
 
    @Override
    public String selectDept(String d) {
         
        return d+":successful";
    }
 
}
 
UserServiceImpl.java
 
package com.user.serviceimp;
 
import com.user.service.UserService;
 
public class UserServiceImpl implements UserService{
 
    @Override
    public String sayHello(String word) {
        // TODO Auto-generated method stub
        System.err.println("调用成功"+word);
        return "调用成功"+word;
    }
 
}
 
pom,xml
 
<project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.rpc.provider</groupId>
  <artifactId>rpc-provider</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
     
    <dependencies>
 
        <dependency>
 
            <groupId>com.rpc.common</groupId>
            <artifactId>rpc-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>