8.源碼分析—從設計模式中看SOFARPC中的EventBus?

  • 2019 年 10 月 3 日
  • 筆記

我們在前面分析客戶端引用的時候會看到如下這段程式碼:

// 產生開始調用事件  if (EventBus.isEnable(ClientStartInvokeEvent.class)) {      EventBus.post(new ClientStartInvokeEvent(request));  }

這裡用EventBus調用了一下post方法之後就什麼也沒做了,就方法名來看是發送了一個post請求,也不知道發給誰,到底有什麼用。

所以這一節我們來分析一下EventBus這個類的作用。

首先我們來看一下這個類的方法

從EventBus的方法中我們是不是應該想到了這是使用了什麼設計模式?

沒錯,這裡用到的是訂閱發布模式(Subscribe/Publish)。訂閱發布模式定義了一種一對多的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態。

我們先分析源碼,分析完源碼之後再來總結一下。

EventBus發送事件

根據上面的示例,我們先看EventBus#post是裡面是怎麼做的。
EventBus#post

  private final static ConcurrentMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP = new ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>>();    public static void post(final Event event) {      //是否開啟匯流排      if (!isEnable()) {          return;      }      //根據傳入得event獲取到相應的Subscriber      CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());      if (CommonUtils.isNotEmpty(subscribers)) {          for (final Subscriber subscriber : subscribers) {              //如果事件訂閱者是同步的,那麼直接調用              if (subscriber.isSync()) {                  handleEvent(subscriber, event);              } else { // 非同步                  final RpcInternalContext context = RpcInternalContext.peekContext();                  //使用執行緒池啟動一個執行緒一部執行任務                  final ThreadPoolExecutor asyncThreadPool = AsyncRuntime.getAsyncThreadPool();                  try {                      asyncThreadPool.execute(                              new Runnable() {                                  @Override                                  public void run() {                                      try {                                          RpcInternalContext.setContext(context);                                          //調用訂閱者的event事件                                          handleEvent(subscriber, event);                                      } finally {                                          RpcInternalContext.removeContext();                                      }                                  }                              });                  } catch (RejectedExecutionException e) {                      LOGGER                              .warn("This queue is full when post event to async execute, queue size is " +                                      asyncThreadPool.getQueue().size() +                                      ", please optimize this async thread pool of eventbus.");                  }              }          }      }  }    private static void handleEvent(final Subscriber subscriber, final Event event) {      try {          subscriber.onEvent(event);      } catch (Throwable e) {          if (LOGGER.isWarnEnabled()) {              LOGGER.warn("Handle " + event.getClass() + " error", e);          }      }  }

這個post方法主要做了這麼幾件事:

  1. 根據傳入的Event獲取對應的訂閱列表subscribers
  2. 遍歷subscribers
  3. 如果訂閱者是非同步的,那麼就使用執行緒池啟動執行任務
    4, 如果是同步的那麼就調用handleEvent方法向訂閱者發布消息

我們再來看看訂閱者是怎樣的:

Subscriber

public abstract class Subscriber {      /**       * 接到事件是否同步執行       */      protected boolean sync = true;        /**       * 事件訂閱者       */      protected Subscriber() {      }        /**       * 事件訂閱者       *       * @param sync 是否同步       */      protected Subscriber(boolean sync) {          this.sync = sync;      }        /**       * 是否同步       *       * @return 是否同步       */      public boolean isSync() {          return sync;      }        /**       * 事件處理,請處理異常       *       * @param event 事件       */      public abstract void onEvent(Event event);    }

Subscriber是一個抽象類,默認是同步的方式進行訂閱。總共有下面四個實現類:
LookoutSubscriber
FaultToleranceSubscriber
RestTracerSubscriber
SofaTracerSubscriber

這裡我不打算每個都進行分析,到時候打算用到了再詳細說明,這樣不會那麼抽象。

由於我們前面講到了,在客戶端引用的時候會發送一個產生開始調用事件給匯流排,那一定要有訂閱者這個發送事件才有意義。所以我們接下來看看是在哪裡進行事件的註冊的。

訂閱者註冊到EventBus

通過上面的繼承關係圖可以看到,在ConsumerConfig是AbstractIdConfig的子類,所以在初始化ConsumerConfig的時候AbstractIdConfig靜態程式碼塊也會被初始化。

public abstract class AbstractIdConfig<S extends AbstractIdConfig> implements Serializable {        static {          RpcRuntimeContext.now();      }  }

在調用RpcRuntimeContext#now方法的時候,會調用到RpcRuntimeContext的靜態程式碼塊

RpcRuntimeContext

public class RpcRuntimeContext {        static {          if (LOGGER.isInfoEnabled()) {              LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);          }          put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);          // 初始化一些上下文          initContext();          // 初始化其它模組          ModuleFactory.installModules();          // 增加jvm關閉事件          if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {              Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {                  @Override                  public void run() {                      if (LOGGER.isWarnEnabled()) {                          LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");                      }                      destroy(false);                  }              }, "SOFA-RPC-ShutdownHook"));          }      }        public static long now() {          return System.currentTimeMillis();      }  }

在RpcRuntimeContext靜態程式碼塊里主要做了以下幾件事:

  1. 初始化一些上下文的東西,例如:應用Id,應用名稱,當前所在文件夾地址等
  2. 初始化一些模組,等下分析
  3. 增加jvm關閉時的鉤子

我們直接看installModules方法就好了,其他的方法和主流程無關。

ModuleFactory#installModules

public static void installModules() {      //通過SPI載入Module模組      ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);      //moduleLoadList 默認是 *      String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);      for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {          String moduleName = o.getKey();          Module module = o.getValue().getExtInstance();          // judge need load from rpc option          if (needLoad(moduleLoadList, moduleName)) {              // judge need load from implement              if (module.needLoad()) {                  if (LOGGER.isInfoEnabled()) {                      LOGGER.info("Install Module: {}", moduleName);                  }                  //安裝模板                  module.install();                  INSTALLED_MODULES.put(moduleName, module);              } else {                  if (LOGGER.isInfoEnabled()) {                      LOGGER.info("The module " + moduleName + " does not need to be loaded.");                  }              }          } else {              if (LOGGER.isInfoEnabled()) {                  LOGGER.info("The module " + moduleName + " is not in the module load list.");              }          }      }  }
  1. 這個方法裡面一開始獲取Module的擴展類,Module的擴展類有如下幾個:
    FaultToleranceModule
    LookoutModule
    RestTracerModule
    SofaTracerModule

  2. 然後會去獲取MODULE_LOAD_LIST配置類,多個配置用「;」分割。
  3. 調用loader.getAllExtensions()獲取所有的擴展類。遍歷擴展類。
  4. 接著調用needLoad方法:
static boolean needLoad(String moduleLoadList, String moduleName) {      //用;拆分      String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);      boolean match = false;      for (String activatedModule : activatedModules) {          //ALL 就是 *          if (StringUtils.ALL.equals(activatedModule)) {              match = true;          } else if (activatedModule.equals(moduleName)) {              match = true;          } else if (match && (activatedModule.equals("!" + moduleName)                  || activatedModule.equals("-" + moduleName))) {              match = false;              break;          }      }      return match;  }

這個方法會傳入配置的moduleLoadList和當前遍歷到的moduleName,moduleLoadList默認是*所以會返回true,如果配置了moduleLoadList不為*的話,如果moduleName是配置中的之一便會返回true。

  1. 調用module的install進行模板的裝配

這裡我們進入到SofaTracerModule#install中

SofaTracerModule#install

public void install() {      Tracer tracer = TracerFactory.getTracer("sofaTracer");      if (tracer != null) {          subscriber = new SofaTracerSubscriber();          EventBus.register(ClientStartInvokeEvent.class, subscriber);          EventBus.register(ClientBeforeSendEvent.class, subscriber);          EventBus.register(ClientAfterSendEvent.class, subscriber);          EventBus.register(ServerReceiveEvent.class, subscriber);          EventBus.register(ServerSendEvent.class, subscriber);          EventBus.register(ServerEndHandleEvent.class, subscriber);          EventBus.register(ClientSyncReceiveEvent.class, subscriber);          EventBus.register(ClientAsyncReceiveEvent.class, subscriber);          EventBus.register(ClientEndInvokeEvent.class, subscriber);      }  }

這裡我們可以看到文章一開始被發送的ClientStartInvokeEvent在這裡被註冊了。訂閱者的實現類是SofaTracerSubscriber。

訂閱者被調用

在上面我們分析到在註冊到EventBus之後,會發送一個post請求,然後EventBus會遍歷所有的Subscriber,調用符合條件的Subscriber的onEvent方法。

SofaTracerSubscriber#onEvent

public void onEvent(Event originEvent) {       if (!Tracers.isEnable()) {          return;      }      Class eventClass = originEvent.getClass();        if (eventClass == ClientStartInvokeEvent.class) {          ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;          Tracers.startRpc(event.getRequest());      }        else if (eventClass == ClientBeforeSendEvent.class) {              ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;              Tracers.clientBeforeSend(event.getRequest());      }      .....  }

這個方法裡面主要就是對不同的event做出不同的反應。ClientStartInvokeEvent所做的請求就是調用一下Tracers#startRpc,Tracers是用來做鏈路追蹤的,這篇文章不涉及。

總結

我們首先上一張圖,來說明一下訂閱發布模式整體的結構。

在我們這個例子里EventBus的職責就是調度中心,subscriber的具體實現註冊到EventBus中後,會保存到EventBus的SUBSCRIBER_MAP集合中。

發布者在發布消息的時候會調用EventBus的post方法傳入一個具體的event來調用訂閱者的事件。一個事件有多個訂閱者,消息的發布者不會直接的去調用訂閱者來發布消息,而是通過EventBus來進行觸發。

通過EventBus來觸發不同的訂閱者的事件可以在觸發事件之前同一的為其做一些操作,比如是同步還是非同步,要不要過濾部分訂閱者等。

SOFARPC源碼解析系列:

1. 源碼分析—SOFARPC可擴展的機制SPI

2. 源碼分析—SOFARPC客戶端服務引用

3. 源碼分析—SOFARPC客戶端服務調用

4. 源碼分析—SOFARPC服務端暴露

5.源碼分析—SOFARPC調用服務

6.源碼分析—和dubbo相比SOFARPC是如何實現負載均衡的?

7.源碼分析—SOFARPC是如何實現連接管理與心跳?