曹工說mini-dubbo(1)–為了實踐動態代理,我寫了個簡單的rpc框架

  • 2020 年 3 月 16 日
  • 筆記

相關背景及資源:

之前本來一直在寫spring源碼解析這塊,如下,aop部分剛好寫完。以前零散看過一些文章,知道rpc調用基本就是使用動態代理,比如rmi,dubbo,feign調用等。自己也就想着試一下,於是有了mini-dubbo這個東西,暫時也不能稱為一個框架,因為還不是生產級的,目前只是實現了一部分小功能,也沒有監控,也沒有xxx,反正就是缺的比較多。

曹工說Spring Boot源碼(22)– 你說我Spring Aop依賴AspectJ,我依賴它什麼了

我就說下,裏面用到的知識點吧,有興趣的,可以克隆源碼下來看看:

  1. 動態代理
  2. 服務註冊和消費,使用redis作為註冊中心,其中使用了redisson作為redis客戶端,其中涉及到BeanFactoryPostProcessor的使用
  3. 因為傳輸層使用netty和mina,是異步的,但是上層又需要等待結果,所以用到了同步轉異步
  4. spring的xml解析,bean definition註冊,spring 擴展xml 命名空間
  5. 自定義的spi的相關知識
  6. 分層思想,從dubbo借鑒了其分層,但是mini-dubbo要少幾層,因為我暫時不是很清楚dubbo的每一層的具體職責,所以我按我自己理解分的層。上層依賴下層,只通過下層的接口,查找下層接口時,直接在spring容器中查找bean即可,類似於spring mvc的設計。當下層有多個實現時,通過類似spi機制來指定具體要使用的下層實現。
  7. 基於第5點,所以本框架非常容易替換各層的實現,只要自己自定義一個spring bean,實現對應的接口,然後在spi文件中指定本實現的類名即可。
  8. netty和mina的tcp粘包拆包工作。

概要

代碼我放在了如下位置:

https://gitee.com/ckl111/mini-dubbo

我介紹下代碼的整體結構:

服務端聚合工程比較簡單,目前也沒時間去仔細弄,包含了如下module:

    <modules>          <!--業務層api-->          <module>../mini-dubbo-api</module>          <!--業務層,服務端demo-->          <module>../mini-dubbo-server</module>            <!--配置層,解析xml的工作,在本層完成-->          <module>../mini-dubbo-core</module>            <module>../mini-dubbo-common</module>      </modules>  

目前的大部分實現,是在客戶端,包含了如下module:

<modules>         <!--業務層api-->         <module>../mini-dubbo-api</module>         <!--業務層,測試demo-->         <module>../mini-dubbo-client</module>           <!--配置層,解析xml的工作,在本層完成-->         <module>../mini-dubbo-core</module>           <module>../mini-dubbo-common</module>           <!--註冊中心層-->         <module>../mini-dubbo-registry-layer</module>         <!--集群層,完成事情:負載均衡策略,集群容錯策略等-->         <module>../mini-dubbo-cluster-layer</module>         <!--信息交換層,主要完成同步轉異步的操作,因為下層的mina和netty為異步,本層同步等待結果-->         <module>../mini-dubbo-exchange-layer</module>           <!--傳輸層如使用netty實現,則需包含如下module-->         <module>../mini-dubbo-transport-layer-netty</module>         <!--傳輸層如使用mina實現,則需包含如下module-->         <module>../mini-dubbo-transport-layer-mina</module>  </modules>

其中,模塊間的依賴關係如下:

業務模塊,一般只需要依賴mini-dubbo-core模塊,mini-dubbo-core主要依賴了如下模塊:

為什麼這麼劃分,因為mini-dubbo-core模塊,其實主要是完成解析業務模塊(比如client)中的xml,根據其xml配置,註冊對應的bean到spring 容器中,而具體的bean實現,就是放在各個模塊的,比如,xml里配置netty作為傳輸層實現,那麼mini-dubbo-core就得解析為mini-dubbo-transport-layer-netty中的一個實現類作為bean,註冊到spring容器,供上層使用。

目前的分層,只是暫時的,後續可能會略有調整。

一次客戶端調用的大體思路

  1. 業務module中,配置xml,示例如下:

    <dubbo:registry address="redis://127.0.0.1:6379"/>    <dubbo:reference id="gpsLocationUpdateService" interface="dubbo.learn.IGpsLocationUpdateService"/>    <context:component-scan base-package="dubbo"></context:component-scan>

    其中的dubbo:reference就代表了一個遠端的服務,業務代碼中可以自動注入該接口,當調用該接口時,實際就會發起rpc調用。

    熟悉的同學已經知道了,這塊肯定是生成了一個動態代理。

  2. 繼續之前,我們看看dubbo的十層架構:

    可以看到,我們這邊是比dubbo少了幾層,首先proxy,目前直接用了jdk動態代理,沒有其他技術,所以就沒有抽出一層;然後monitor層,現在肯定是沒有的,這部分其實才是一個框架的重頭戲,但是我也不會前端,所以這塊估計暫時沒有;接下來是protocol層,我暫時不太清楚dubbo的設計,所以就沒弄這層。

  3. 知道了分層結構後,我們可以回到第一點,即動態代理那裡,我們的動態代理,只依賴下層的接口。目前,各層之間的接口,放在mini-dubbo-common模塊中,定義如下:

    • 註冊中心層,負責接收上層傳來的調用參數等上下文,並返回結果

      /**   * 註冊中心層的rpc調用者   * 1:接收上層傳下來的業務參數,並返回結果   *   * 本層:會根據不同實現,去相應的註冊中心,獲取匹配的服務提供者列表,傳輸給下一層   */  public interface RegistryLayerRpcInvoker {        Object invoke(RpcContext rpcContext);  }
    • 集群層,接收上層註冊中心層傳來的服務提供者列表和rpc調用上下文,並返回最終結果

      public interface ClusterLayerRpcInvoker {        /**       * 由註冊中心層提供對應service的服務提供者列表,本方法可以根據負載均衡策略,進行篩選       * @param providerList       * @param rpcContext       * @return       */      Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext);  }
    • exchange層,上層集群層,會替我們選好某一台具體的服務提供者,然後讓我們去調用,本層完成同步轉異步

      public interface ExchangeLayerRpcInvoker {        /**       *       * @param providerHostAndPort 要調用的服務提供者的地址       * @param rpcContext   rpc上下文,包含了要調用的參數等       * @return  rpc調用的結果       */      Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);  }
    • 傳輸層,本層目前有兩個簡單實現,netty和mina。

      /**   *   * 本層為傳輸層,上層為exchange層。   * 上層exchange,目前有一個默認實現,主要是完成同步轉異步的操作。   * 上層將具體的傳輸工作交給底層的傳輸層,比如netty和mina,然後在一個future上等待傳輸層完成工作   *   * 本層會完成實際的發送工作和接收返迴響應的工作   */  public interface TransportLayerRpcInvoker {        /**       *       * @param providerHostAndPort 要調用的服務提供者的地址       * @param rpcContext   rpc上下文,包含了要調用的參數等       * @return  rpc調用的結果       */      Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);  }

      其中,我們的最上邊的動態代理層,只依賴於下層,其中,示例代碼如下:

            @Override      public Object invoke(Object proxy, Method method, Object[] args) {          // 1.從spring容器中,獲取下層的實現bean;如果有多個,則根據spi文件中指定的為準          RegistryLayerRpcInvoker registryLayerRpcInvoker =                  SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class);            RpcContext rpcContext = new RpcContext();          rpcContext.setProxy(proxy);          rpcContext.setMethod(method);          rpcContext.setArgs(args);          rpcContext.setServiceName(method.getDeclaringClass().getName());        // 2.調用下層          Object o = registryLayerRpcInvoker.invoke(rpcContext);          return o;      }

      這裡1處,可以看到,我們通過SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去獲取具體的下層實現,這是我們自定義的一個工具類,其內部實現一會再說。

      2處調用下層實現,獲取結果。

  4. registry,註冊中心層的實現

    @Service  public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker {        @Autowired      private RedisRegistry redisRegistry;          @Override      public Object invoke(RpcContext rpcContext) {          //1.獲取集群層實現          ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class);          //2.從redis中,根據服務名,獲取服務提供者列表          List<ProviderHostAndPort> list = redisRegistry.getServiceProviderList(rpcContext.getServiceName());          if (CollectionUtils.isEmpty(list)) {              throw new RuntimeException();          }       //2.調用集群層實現,獲取結果          Object o = clusterLayerRpcInvoker.invoke(list, rpcContext);          return o;      }  }
  5. 集群層實現,本層我也不算懂,模仿dubbo實現了一下。

    主要實現了以下兩種:

    • Failover,出現失敗,立即重試其他服務器。可以設置重試次數。
    • Failfast,請求失敗以後,返回異常結果,不進行重試。

    以failover為例:

    @Slf4j  @Service  public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker {        @Autowired      private LoadBalancePolicy loadBalancePolicy;        @Override      public Object invoke(List<ProviderHostAndPort> providerList, RpcContext rpcContext) {          ExchangeLayerRpcInvoker exchangeLayerRpcInvoker =                  SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class);            int retryTimes = 3;          for (int i = 0; i < retryTimes; i++) {              // 1.根據負載均衡策略,選擇1台服務提供者              ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList);              try {                  // 調用下層,獲取結果                  Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext);                  return o;              } catch (Exception e) {                  log.error("fail to invoke {},exception:{},will try another",                          providerHostAndPort,e);                  // 2.如果調用失敗,進入下一次循環                  continue;              }          }            throw new RuntimeException("fail times extend");      }  }

    其中,一共會嘗試3次,每次的邏輯:根據負載均衡策略,選擇1台去調用;如果有問題,則換一台。

    調用下層時,獲取了下層的接口:ExchangeLayerRpcInvoker

  6. exchange層,這層完成同步轉異步的操作,目前只有一個實現:

    @Service  public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker {        public static ConcurrentHashMap<String, CompletableFuture<Object>> requestId2futureMap =              new ConcurrentHashMap<>();          @Override      public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) {          String requestId = UUID.randomUUID().toString();          rpcContext.setRequestId(requestId);          rpcContext.setRequestId2futureMap(requestId2futureMap);            CompletableFuture<Object> completableFuture = new CompletableFuture<>();          requestId2futureMap.put(requestId, completableFuture);              /**           * 交給具體的底層去解決           */          TransportLayerRpcInvoker  transportLayerRpcInvoker =                  SpiServiceLoader.loadService(TransportLayerRpcInvoker .class);            transportLayerRpcInvoker.invoke(providerHostAndPort, rpcContext);            Object s = null;          try {              s = completableFuture.get();          } catch (InterruptedException | ExecutionException e) {              e.printStackTrace();          }            return s;      }  }

    這層大家可以簡單理解為:主線程調用傳輸層之前,生成一個id和一個completablefuture,放到一個全局map,然後將id傳給下層,然後在completablefuture上阻塞;下層拿到id後,在消息里傳輸;服務端再將id傳輸回來,然後客戶端拿着id找到completablefuture,並喚醒主線程。

  7. 信息傳輸層,以netty為例,具體的netty相關的知識,大家就得自己先學習一下:

    簡單步驟如下:

      //1.初始化客戶端連接  public void initChannel() {      Bootstrap b = configBootStrap();      ChannelFuture future = null;      try {          future = b.connect(providerHostAndPort.getHost(), providerHostAndPort.getPort()).sync();          if (future.isSuccess()) {              channel = future.channel();              return;          }      } catch (InterruptedException e) {          ...      }        throw new RuntimeException();  }    private Bootstrap configBootStrap() {      EventLoopGroup group = new NioEventLoopGroup();      Bootstrap b = new Bootstrap();      b.group(group)              .channel(NioSocketChannel.class)              .option(ChannelOption.TCP_NODELAY, true)              .handler(new ChannelInitializer<SocketChannel>() {                  @Override                  public void initChannel(SocketChannel ch) throws Exception {                      ChannelPipeline p = ch.pipeline();                      p.addLast("lengthFieldPrepender", new LengthFieldPrepender(2));                      p.addLast("lengthFieldBasedFrameDecoder",                              new LengthFieldBasedFrameDecoder(                                      65536, 0,                                      2, 0, 2));                      p.addLast("decoder", new StringDecoder());                      p.addLast("encoder", new StringEncoder());                      p.addLast(new ClientHandler());                    }//攔截器設置              });      return b;  }

    使用連接的channle,發送數據:

    public void sendMessage(String messageContent) {      synchronized (lockObj) {          if (channel == null) {              initChannel();          }      }      ChannelFuture channelFuture = channel.writeAndFlush(messageContent);      channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {          @Override          public void operationComplete(Future<? super Void> future) throws Exception {              System.out.println("發送請求消息成功");          }      });  }
  8. netty接收到服務端相應後,根據requestId來獲取future,喚醒上層線程

    @Slf4j  public class ClientHandler extends ChannelInboundHandlerAdapter {      @Override      public void channelActive(ChannelHandlerContext cx) {          log.info("channelActive,local address:{},remote address:{}",                  cx.channel().localAddress(),cx.channel().remoteAddress());      }        /**       * 讀取信息       *       * @param ctx 渠道連接對象       * @param msg 信息       * @throws Exception       */      @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ResponseVO responseVO = JSONObject.parseObject((String) msg, ResponseVO.class);          String requestId = responseVO.getRequestId();            //1.獲取future          CompletableFuture<Object> completableFuture = Netty4ClientRpcInvoker.requestId2futureMap                  .get(requestId);          //2.將結果塞進future,在此future上阻塞的線程被喚醒          completableFuture.complete(responseVO.getContent());          log.info("client channelRead,thread:{}", Thread.currentThread());          log.info("客戶端端讀寫遠程地址是-----------"                  + ctx.channel().remoteAddress() + "信息是:" + msg.toString());        }  }

如何根據spi進行切換

之前我們提到了可以根據spi,隨意切換實現,比如我們想使用mina來傳輸的話:

這裡的spi的原理也很簡單:

dubbo.learn.common.spi.SpiServiceLoader#loadService  public static <T> T loadService(Class<T> clazz) {      //先查找緩存      Object cached = spiName2ServiceMap.get(clazz.getName());      if (cached != null) {          return (T) cached;      }      //2.從spring容器獲取該class的全部實現bean      Map<String, T> map = applicationContext.getBeansOfType(clazz);      if (CollectionUtils.isEmpty(map)) {          return null;      }        if (map.size() == 1) {          Object o = map.values().iterator().next();          return clazz.cast(o);      }      //讀取spi文件,獲取用戶指定的實現      String s = SpiParser.getSpiForSpecifiedService(clazz);      if (StringUtils.isEmpty(s)) {          log.error("發現多個服務實現bean:{},且在spi中未指定要使用的bean",map);          throw new RuntimeException();      }      // 根據用戶spi中的實現,來返回相應的bean      Object specifiedServiceInSpiFile = map.values().stream().filter(v -> Objects.equals(v.getClass().getName(), s))              .findFirst().orElse(null);      if (specifiedServiceInSpiFile == null) {          log.error("spi中指定的服務在bean集合中未找到。" +                  "發現多個服務實現bean:{},在spi中指定的服務為:{}",map,s);          throw new RuntimeException();      }        spiName2ServiceMap.put(clazz.getName(),specifiedServiceInSpiFile);      return (T) specifiedServiceInSpiFile;  }

總結

裏面細節比較多,最近工作比較忙,所以,大家可以先把代碼弄下來,直接自己運行下,依賴的就只有一個redis而已。

後續我會接着優化該框架,歡迎大家加進來,一起開發;如果覺得還不錯,就star一下吧。
源碼路徑:
https://gitee.com/ckl111/mini-dubbo