曹工說mini-dubbo(1)–為了實踐動態代理,我寫了個簡單的rpc框架
- 2020 年 3 月 16 日
- 筆記
相關背景及資源:
之前本來一直在寫spring源碼解析這塊,如下,aop部分剛好寫完。以前零散看過一些文章,知道rpc調用基本就是使用動態代理,比如rmi,dubbo,feign調用等。自己也就想着試一下,於是有了mini-dubbo這個東西,暫時也不能稱為一個框架,因為還不是生產級的,目前只是實現了一部分小功能,也沒有監控,也沒有xxx,反正就是缺的比較多。
曹工說Spring Boot源碼(22)– 你說我Spring Aop依賴AspectJ,我依賴它什麼了
我就說下,裏面用到的知識點吧,有興趣的,可以克隆源碼下來看看:
- 動態代理
- 服務註冊和消費,使用redis作為註冊中心,其中使用了redisson作為redis客戶端,其中涉及到BeanFactoryPostProcessor的使用
- 因為傳輸層使用netty和mina,是異步的,但是上層又需要等待結果,所以用到了同步轉異步
- spring的xml解析,bean definition註冊,spring 擴展xml 命名空間
- 自定義的spi的相關知識
- 分層思想,從dubbo借鑒了其分層,但是mini-dubbo要少幾層,因為我暫時不是很清楚dubbo的每一層的具體職責,所以我按我自己理解分的層。上層依賴下層,只通過下層的接口,查找下層接口時,直接在spring容器中查找bean即可,類似於spring mvc的設計。當下層有多個實現時,通過類似spi機制來指定具體要使用的下層實現。
- 基於第5點,所以本框架非常容易替換各層的實現,只要自己自定義一個spring bean,實現對應的接口,然後在spi文件中指定本實現的類名即可。
- netty和mina的tcp粘包拆包工作。
概要
代碼我放在了如下位置:
https://gitee.com/ckl111/mini-dubbo
我介紹下代碼的整體結構:
服務端聚合工程比較簡單,目前也沒時間去仔細弄,包含了如下module:
<modules> <!--業務層api--> <module>../mini-dubbo-api</module> <!--業務層,服務端demo--> <module>../mini-dubbo-server</module> <!--配置層,解析xml的工作,在本層完成--> <module>../mini-dubbo-core</module> <module>../mini-dubbo-common</module> </modules>
目前的大部分實現,是在客戶端,包含了如下module:
<modules> <!--業務層api--> <module>../mini-dubbo-api</module> <!--業務層,測試demo--> <module>../mini-dubbo-client</module> <!--配置層,解析xml的工作,在本層完成--> <module>../mini-dubbo-core</module> <module>../mini-dubbo-common</module> <!--註冊中心層--> <module>../mini-dubbo-registry-layer</module> <!--集群層,完成事情:負載均衡策略,集群容錯策略等--> <module>../mini-dubbo-cluster-layer</module> <!--信息交換層,主要完成同步轉異步的操作,因為下層的mina和netty為異步,本層同步等待結果--> <module>../mini-dubbo-exchange-layer</module> <!--傳輸層如使用netty實現,則需包含如下module--> <module>../mini-dubbo-transport-layer-netty</module> <!--傳輸層如使用mina實現,則需包含如下module--> <module>../mini-dubbo-transport-layer-mina</module> </modules>
其中,模塊間的依賴關係如下:
業務模塊,一般只需要依賴mini-dubbo-core模塊,mini-dubbo-core主要依賴了如下模塊:
為什麼這麼劃分,因為mini-dubbo-core模塊,其實主要是完成解析業務模塊(比如client)中的xml,根據其xml配置,註冊對應的bean到spring 容器中,而具體的bean實現,就是放在各個模塊的,比如,xml里配置netty作為傳輸層實現,那麼mini-dubbo-core就得解析為mini-dubbo-transport-layer-netty中的一個實現類作為bean,註冊到spring容器,供上層使用。
目前的分層,只是暫時的,後續可能會略有調整。
一次客戶端調用的大體思路
-
業務module中,配置xml,示例如下:
<dubbo:registry address="redis://127.0.0.1:6379"/> <dubbo:reference id="gpsLocationUpdateService" interface="dubbo.learn.IGpsLocationUpdateService"/> <context:component-scan base-package="dubbo"></context:component-scan>
其中的dubbo:reference就代表了一個遠端的服務,業務代碼中可以自動注入該接口,當調用該接口時,實際就會發起rpc調用。
熟悉的同學已經知道了,這塊肯定是生成了一個動態代理。
-
繼續之前,我們看看dubbo的十層架構:
可以看到,我們這邊是比dubbo少了幾層,首先proxy,目前直接用了jdk動態代理,沒有其他技術,所以就沒有抽出一層;然後monitor層,現在肯定是沒有的,這部分其實才是一個框架的重頭戲,但是我也不會前端,所以這塊估計暫時沒有;接下來是protocol層,我暫時不太清楚dubbo的設計,所以就沒弄這層。
-
知道了分層結構後,我們可以回到第一點,即動態代理那裡,我們的動態代理,只依賴下層的接口。目前,各層之間的接口,放在mini-dubbo-common模塊中,定義如下:
-
註冊中心層,負責接收上層傳來的調用參數等上下文,並返回結果
/** * 註冊中心層的rpc調用者 * 1:接收上層傳下來的業務參數,並返回結果 * * 本層:會根據不同實現,去相應的註冊中心,獲取匹配的服務提供者列表,傳輸給下一層 */ public interface RegistryLayerRpcInvoker { Object invoke(RpcContext rpcContext); }
-
集群層,接收上層註冊中心層傳來的服務提供者列表和rpc調用上下文,並返回最終結果
public interface ClusterLayerRpcInvoker { /** * 由註冊中心層提供對應service的服務提供者列表,本方法可以根據負載均衡策略,進行篩選 * @param providerList * @param rpcContext * @return */ Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext); }
-
exchange層,上層集群層,會替我們選好某一台具體的服務提供者,然後讓我們去調用,本層完成同步轉異步
public interface ExchangeLayerRpcInvoker { /** * * @param providerHostAndPort 要調用的服務提供者的地址 * @param rpcContext rpc上下文,包含了要調用的參數等 * @return rpc調用的結果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); }
-
傳輸層,本層目前有兩個簡單實現,netty和mina。
/** * * 本層為傳輸層,上層為exchange層。 * 上層exchange,目前有一個默認實現,主要是完成同步轉異步的操作。 * 上層將具體的傳輸工作交給底層的傳輸層,比如netty和mina,然後在一個future上等待傳輸層完成工作 * * 本層會完成實際的發送工作和接收返迴響應的工作 */ public interface TransportLayerRpcInvoker { /** * * @param providerHostAndPort 要調用的服務提供者的地址 * @param rpcContext rpc上下文,包含了要調用的參數等 * @return rpc調用的結果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); }
其中,我們的最上邊的動態代理層,只依賴於下層,其中,示例代碼如下:
@Override public Object invoke(Object proxy, Method method, Object[] args) { // 1.從spring容器中,獲取下層的實現bean;如果有多個,則根據spi文件中指定的為準 RegistryLayerRpcInvoker registryLayerRpcInvoker = SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class); RpcContext rpcContext = new RpcContext(); rpcContext.setProxy(proxy); rpcContext.setMethod(method); rpcContext.setArgs(args); rpcContext.setServiceName(method.getDeclaringClass().getName()); // 2.調用下層 Object o = registryLayerRpcInvoker.invoke(rpcContext); return o; }
這裡1處,可以看到,我們通過SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去獲取具體的下層實現,這是我們自定義的一個工具類,其內部實現一會再說。
2處調用下層實現,獲取結果。
-
-
registry,註冊中心層的實現
@Service public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker { @Autowired private RedisRegistry redisRegistry; @Override public Object invoke(RpcContext rpcContext) { //1.獲取集群層實現 ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class); //2.從redis中,根據服務名,獲取服務提供者列表 List<ProviderHostAndPort> list = redisRegistry.getServiceProviderList(rpcContext.getServiceName()); if (CollectionUtils.isEmpty(list)) { throw new RuntimeException(); } //2.調用集群層實現,獲取結果 Object o = clusterLayerRpcInvoker.invoke(list, rpcContext); return o; } }
-
集群層實現,本層我也不算懂,模仿dubbo實現了一下。
主要實現了以下兩種:
- Failover,出現失敗,立即重試其他服務器。可以設置重試次數。
- Failfast,請求失敗以後,返回異常結果,不進行重試。
以failover為例:
@Slf4j @Service public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker { @Autowired private LoadBalancePolicy loadBalancePolicy; @Override public Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext) { ExchangeLayerRpcInvoker exchangeLayerRpcInvoker = SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class); int retryTimes = 3; for (int i = 0; i < retryTimes; i++) { // 1.根據負載均衡策略,選擇1台服務提供者 ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList); try { // 調用下層,獲取結果 Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); return o; } catch (Exception e) { log.error("fail to invoke {},exception:{},will try another", providerHostAndPort,e); // 2.如果調用失敗,進入下一次循環 continue; } } throw new RuntimeException("fail times extend"); } }
其中,一共會嘗試3次,每次的邏輯:根據負載均衡策略,選擇1台去調用;如果有問題,則換一台。
調用下層時,獲取了下層的接口:ExchangeLayerRpcInvoker
-
exchange層,這層完成同步轉異步的操作,目前只有一個實現:
@Service public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker { public static ConcurrentHashMap<String, CompletableFuture<Object>> requestId2futureMap = new ConcurrentHashMap<>(); @Override public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) { String requestId = UUID.randomUUID().toString(); rpcContext.setRequestId(requestId); rpcContext.setRequestId2futureMap(requestId2futureMap); CompletableFuture<Object> completableFuture = new CompletableFuture<>(); requestId2futureMap.put(requestId, completableFuture); /** * 交給具體的底層去解決 */ TransportLayerRpcInvoker transportLayerRpcInvoker = SpiServiceLoader.loadService(TransportLayerRpcInvoker .class); transportLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); Object s = null; try { s = completableFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return s; } }
這層大家可以簡單理解為:主線程調用傳輸層之前,生成一個id和一個completablefuture,放到一個全局map,然後將id傳給下層,然後在completablefuture上阻塞;下層拿到id後,在消息里傳輸;服務端再將id傳輸回來,然後客戶端拿着id找到completablefuture,並喚醒主線程。
-
信息傳輸層,以netty為例,具體的netty相關的知識,大家就得自己先學習一下:
簡單步驟如下:
//1.初始化客戶端連接 public void initChannel() { Bootstrap b = configBootStrap(); ChannelFuture future = null; try { future = b.connect(providerHostAndPort.getHost(), providerHostAndPort.getPort()).sync(); if (future.isSuccess()) { channel = future.channel(); return; } } catch (InterruptedException e) { ... } throw new RuntimeException(); } private Bootstrap configBootStrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("lengthFieldPrepender", new LengthFieldPrepender(2)); p.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder( 65536, 0, 2, 0, 2)); p.addLast("decoder", new StringDecoder()); p.addLast("encoder", new StringEncoder()); p.addLast(new ClientHandler()); }//攔截器設置 }); return b; }
使用連接的channle,發送數據:
public void sendMessage(String messageContent) { synchronized (lockObj) { if (channel == null) { initChannel(); } } ChannelFuture channelFuture = channel.writeAndFlush(messageContent); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println("發送請求消息成功"); } }); }
-
netty接收到服務端相應後,根據requestId來獲取future,喚醒上層線程
@Slf4j public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext cx) { log.info("channelActive,local address:{},remote address:{}", cx.channel().localAddress(),cx.channel().remoteAddress()); } /** * 讀取信息 * * @param ctx 渠道連接對象 * @param msg 信息 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ResponseVO responseVO = JSONObject.parseObject((String) msg, ResponseVO.class); String requestId = responseVO.getRequestId(); //1.獲取future CompletableFuture<Object> completableFuture = Netty4ClientRpcInvoker.requestId2futureMap .get(requestId); //2.將結果塞進future,在此future上阻塞的線程被喚醒 completableFuture.complete(responseVO.getContent()); log.info("client channelRead,thread:{}", Thread.currentThread()); log.info("客戶端端讀寫遠程地址是-----------" + ctx.channel().remoteAddress() + "信息是:" + msg.toString()); } }
如何根據spi進行切換
之前我們提到了可以根據spi,隨意切換實現,比如我們想使用mina來傳輸的話:
這裡的spi的原理也很簡單:
dubbo.learn.common.spi.SpiServiceLoader#loadService public static <T> T loadService(Class<T> clazz) { //先查找緩存 Object cached = spiName2ServiceMap.get(clazz.getName()); if (cached != null) { return (T) cached; } //2.從spring容器獲取該class的全部實現bean Map<String, T> map = applicationContext.getBeansOfType(clazz); if (CollectionUtils.isEmpty(map)) { return null; } if (map.size() == 1) { Object o = map.values().iterator().next(); return clazz.cast(o); } //讀取spi文件,獲取用戶指定的實現 String s = SpiParser.getSpiForSpecifiedService(clazz); if (StringUtils.isEmpty(s)) { log.error("發現多個服務實現bean:{},且在spi中未指定要使用的bean",map); throw new RuntimeException(); } // 根據用戶spi中的實現,來返回相應的bean Object specifiedServiceInSpiFile = map.values().stream().filter(v -> Objects.equals(v.getClass().getName(), s)) .findFirst().orElse(null); if (specifiedServiceInSpiFile == null) { log.error("spi中指定的服務在bean集合中未找到。" + "發現多個服務實現bean:{},在spi中指定的服務為:{}",map,s); throw new RuntimeException(); } spiName2ServiceMap.put(clazz.getName(),specifiedServiceInSpiFile); return (T) specifiedServiceInSpiFile; }
總結
裏面細節比較多,最近工作比較忙,所以,大家可以先把代碼弄下來,直接自己運行下,依賴的就只有一個redis而已。
後續我會接着優化該框架,歡迎大家加進來,一起開發;如果覺得還不錯,就star一下吧。
源碼路徑:
https://gitee.com/ckl111/mini-dubbo