手寫RPC-簡陋版
- 2022 年 1 月 7 日
- 筆記
- Learn Java
前言
最近不小心被隔離,放假思考一番,決定開始在手寫序列。這個序列在之前看Nacous和網關源碼的時候就有想法,只是一直沒落實下來,趁着隔離行動起來。
必備知識介紹
序列化與反序列化
序列化是把對象的狀態信息轉化為可存儲或傳輸的形式過程,也就是把對象轉化為位元組序列的過程稱為對象的序列化;
反序列化是序列化的逆向過程,把位元組數組反序列化為對象,把位元組序列恢復為對象的過程成為對象的反序列化;
在Java中通過 JDK 提供了 Java 對象的序列化方式實現對象序列化傳輸,主要通過輸出流java.io.ObjectOutputStream和對象輸入流java.io.ObjectInputStream來實現;
java.io.ObjectOutputStream:表示對象輸出流 , 它的 writeObject(Object obj)方法可以對參數指定的 obj 對象進行序列化,把得到的位元組序列寫到一個目標輸出流中;
java.io.ObjectInputStream:表示對象輸入流 ,它的 readObject()方法源輸入流中讀取位元組序列,再把它們反序列化成為一個對象,並將其返回;
需要注意的是,被序列化的對象需要實現 java.io.Serializable 接口。Java 的序列化機制是通過判斷類的 serialVersionUID 來驗證版本一致性的。在進行反序列化時,JVM 會把傳來的位元組流中的 serialVersionUID 與本地相應實體類的 serialVersionUID 進行比較,如果相同就認為是一致的,可以進行反序列化,否則就會出現序列化版本不一致的異常,即是 InvalidCastException。
另外一個需要注意的就是transient關鍵字,被transient修飾的屬性不會被序列化,如果從重寫writeobject和readobject則可以重新被序列化。在JDK中的案例就是ArryList中修飾Object[]的數組使用transient關節字,保證傳輸過程中不照成浪費,只傳輸有用的值。本質是是通過反射來實現調用writeobject和readobject。
什麼是Socket通信
Socket 的原意是「插座」,在計算機通信領域,Socket 被翻譯為「套接字」,它是計算機之間進行通信的一種約定或一種方式。通過 Socket 這種約定,一台計算機可以接收其他計算機的數據,也可以向其他計算機發送數據。

RPC原理介紹
RPC是什麼
所謂的RPC其實是為了不同主機的兩個進程間通信而產生的,通常不同的主機之間的進程通信,程序編寫需要考慮到網絡通信的功能,這樣程序的編寫將會變得複雜。RPC就來解決這一問題的,一台主機上的進程對另外一台主機的進程發起請求時,內核會將請求轉交給RPC client,RPC client經過報文的封裝轉交給目標主機的RPC server,RPC server就將報文進行解析,還原成正常的請求,轉交給目標主機上的目標進程。在我們看來在就像是在同一台主機上的兩個進程通信一樣,完全沒有意識到是在不同的主機上。因此RPC其實也可以看做是一種協議或者是編程框架,目的是為了簡化分佈式程序的編寫。
RPC基本流程

-
Rpc Client通過傳入的IP、端口號、調用類以及方法的參數,通過動態代理找到具體的調用類的方法,將請求的類、方法序列化,傳輸到服務端;
-
當Rpc Service收到請求以後,將傳入類和方法反序列化,通過反射找到對應的類的方法進行調用,最後將返回結果進行序列化,返回客戶端;
-
Rpc Client收到返回值以後,進行反序列化,最後將結果展示;
手擼RPC
從RPC的基本流程可以看到,對於RPC性能來說可以提升的主要兩個地方分別是序列化工具以及通信框架,在我們整個手擼系列裏面會一步一步將其中的組件提升為高性能的組件,從阻塞IO到NIO,從JDK原始序列化框架到現在五花把門序列化框架,從手動的創建對象到Spring自動化創建對象,註冊中心引入等等,整個過程還會伴隨知識介紹,讓我們一起攜手共進。
邁出第一步
第一步我們只做到支持一個類的遠程調用,採用JDK攜帶的序列化和反序列的工具以及阻塞連接的方式。

整體項目結構分為三部分,rpc-api作為Api提供,rpc-common主要是提供公共封裝供client和service調用,rpc-v1包括rpc-v1-client主要是客戶端調用封裝,rpc-v1-service作為Api實現以及暴露對應方法,以後每次做的更改我都會新增一個版本,這樣會方便新手進行學習。
Service端

服務端的實現採用ServerSocket監聽某個端口,循環接收連接請求,如果發來了請求就創建一個線程,在新線程中處理調用,核心類就是RpcProxyService和ProcessorHandler,實現如下:
RpcProxyService
@Slf4j
public class RpcProxyService {
private ExecutorService threadPool;
public RpcProxyService() {
int corePoolSize = 5;
int maximumPoolSize = 200;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("socket-pool-").build();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
}
/**
* 暴露方法,在註冊完成以後服務後立刻開始監聽
*
* @param service
* @param port
*/
public void register(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port);) {
Socket socket;
while ((socket = serverSocket.accept()) != null) {
log.info("客戶端連接IP為:" + socket.getInetAddress());
threadPool.execute(new ProcessorHandler(socket, service));
}
} catch (IOException e) {
log.error("連接異常", e);
}
}
}
ProcessorHandler
@Slf4j
public class ProcessorHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.service = service;
this.socket = socket;
}
@Override
public void run() {
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
//從輸入流讀取參數
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
//通過反射獲取到方法
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
//執行方法
Object result = method.invoke(service, rpcRequest.getParameters());
outputStream.writeObject(RpcResponse.ok(result));
outputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException exception) {
//此處可再次進行包裝將異常情況分類
log.error("調用時發生錯誤", exception);
}
}
}
Client端

Client端通過RpcClientProxy動態代理(採用JDK動態代理)生成代理對象,然後通過執行RemoteInvocationHandler的invoke來確定調用具體的類和方法,也就是構建RpcRequest對象,最後通過RpcClient發起遠程調用。
RpcClientProxy
public class RpcClientProxy {
public <T> T getProxy(Class<T> interfaceClass, String host, int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RemoteInvocationHandler(host, port));
}
}
RemoteInvocationHandler
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//構造請求參數
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
//發送請求
RpcClient rpcClient = new RpcClient();
return ((RpcResponse) rpcClient.send(rpcRequest, host, port)).getData();
}
}
RpcClient
@Slf4j
public class RpcClient {
public Object send(RpcRequest rpcRequest, String host, int port) {
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
//序列化
outputStream.writeObject(rpcRequest);
outputStream.flush();
return inputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("調用時發生異常", e);
return null;
}
}
}
Common端
Common端目前做入參和出參封裝,代碼如下:
RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {
/**
* 接口名稱
*/
private String interfaceName;
/**
* 方法名稱
*/
private String methodName;
/**
* 參數
*/
private Object[] parameters;
/**
* 參數類型
*/
private Class<?>[] paramTypes;
}
RpcResponse
@Data
public class RpcResponse<T> implements Serializable {
/**
* 狀態碼
*/
private Integer code;
/**
* 提醒信息
*/
private String message;
/**
* 返回信息
*/
private T data;
public static <T> RpcResponse<T> ok(T data) {
RpcResponse<T> rpcResponse = new RpcResponse<>();
rpcResponse.setCode(ResponseCode.SUCCESS.getCode());
rpcResponse.setData(data);
rpcResponse.setMessage(rpcResponse.getMessage());
return rpcResponse;
}
public static <T> RpcResponse<T> error(int code, String message) {
RpcResponse<T> rpcResponse = new RpcResponse<>();
rpcResponse.setCode(code);
rpcResponse.setMessage(message);
return rpcResponse;
}
}
整體代碼我已經上傳github,對於初學者一定要聯調一下,理解清楚整體的RPC流程。
歡迎大家點點關注,點點贊!