RocketMQ Broker啟動流程梳理

  • 2019 年 10 月 6 日
  • 筆記

Broker 啟動的主函數入口:

org.apache.rocketmq.broker.BrokerStartup:

public static void main(String[] args) {    start(createBrokerController(args));  }

1.創建配置類

初始化配置主要任務是根據 properties 文件以及命令行參數值,創建了以下配置類:

•nettyServerConfig:封裝了作為消息隊列伺服器的配置資訊•nettyClientConfig:封裝了作為NameServer客戶端配置資訊•brokerConfig:封裝了 Broker 配置資訊•messageStoreConfig:封裝了 RocketMQ 存儲系統的配置資訊

1.Broker 初始化

2.1 配置文件載入

•主題配置載入:

result = result && this.consumerOffsetManager.load();

這一步主要是載入 topics.json 文件,並解析生成 TopicConfigSerializerWrapper 對象,並 set 進 topicConfigTable 中。

•消費者位移管理載入:

result = result && this.subscriptionGroupManager.load();

這一步主要是載入 consumerOffset.json 文件,並解析生成 ConsumerOffsetManager 對象,並替換 offsetTable 成員值。

•消費者訂閱組載入:

result = result && this.consumerFilterManager.load();

這一步主要是載入 subscriptionGroup.json 文件,並解析生成 SubscriptionGroupManager 對象,並放進 subscriptionGroupTable 中。

•消費者過濾管理載入:

result = result && this.consumerFilterManager.load();

這一步主要是載入 consumerFilter.json 文件,並解析生成 ConsumerFilterManager 對象

•messageStore 消息存儲初始化:

if (result) {    try {      this.messageStore =        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,                                this.brokerConfig);      this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);      //load plugin      MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);      this.messageStore = MessageStoreFactory.build(context, this.messageStore);      this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));    } catch (IOException e) {      result = false;      e.printStackTrace();    }  }

這一步主要是創建了 DefaultMessageStore 對象,這是 Broker 消息寸處的核心實現,創建該對象時也會啟動很多相關服務執行緒,用於管理 store 的存儲。

•messageStore載入:

result = result && this.messageStore.load();

1)延遲消息載入:載入 delayOffset.json 文件,解析生成DelayOffsetSerializerWrapper,並加入offsetTable中

2)commitLog載入:MappfileQueue映射文件隊列載入,載入定義的storePath目錄文件

3)consumeQueue載入

2.2 初始化執行緒池

•創建nettyRemotingServer:根據前面初始化好的nettyConfig創建遠程通訊服務

•根據brokerConfig初始化各種執行緒池:

1)初始化發送消息執行緒池

2)初始化拉取消息執行緒池

3)初始化broker管理執行緒池

4)初始化client管理執行緒池

5)初始化消費者管理執行緒池

•把這些執行緒池註冊到nettyRemotingServer中

2.3 初始化定時任務:

在執行緒池註冊完後,就會開啟各種定時任務:

•開啟定時記錄 Broker 的狀態(消息拉取時間總和、消息發送總和等)

BrokerController.this.getBrokerStats().record();

•消息位移持久化,定時向 consumerOffset.json 文件中寫入消費者偏移量

BrokerController.this.consumerOffsetManager.persist();

•消息過濾持久化,定時向 consumerFilter.json 文件寫入消費者過濾器資訊

BrokerController.this.consumerFilterManager.persist();

•定時禁用消費慢的消費者以保護 Broker,可以設置 disableConsumeIfConsumerReadSlowly 屬性,默認 false

BrokerController.this.protectBroker();

•定時列印 Send、Pull、Query、Transaction 資訊

BrokerController.this.printWaterMark();

•定時列印已存儲在提交日誌中但尚未調度到消費隊列的位元組數

rokerController.this.getMessageStore().dispatchBehindBytes())

•定時獲取 namserver 地址

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

如果是從伺服器:

•定時從主伺服器獲取 TopicConfig、ConsumerOffset、DelayOffset、SubscriptionGroupConfig 等資訊

BrokerController.this.slaveSynchronize.syncAll();

如果是主伺服器:

•定時列印從伺服器落後的位元組數

BrokerController.this.printMasterAndSlaveDiff();

2.4 添加進程退出時關閉broker資源的鉤子函數

1.Broker 啟動

3.1 messageStore啟動:

•啟動各類執行緒服務:

 1)啟動刷盤任務執行緒   2)啟動commitLog執行緒   3)啟動存儲存儲統計服務執行緒storeStateService   4)啟動延遲定時消息服務執行緒   5)啟動消息分發到各中Consumer queue服務執行緒reputMessageService   6)啟動HA主從同步執行緒

•啟動各類定時任務

3.2 啟動netty服務:

remotingServer啟動:啟動遠程通訊服務 fastRemotingServer啟動:啟動遠程通訊服務 broker對外API啟動:啟動client遠程通訊服務

3.3 pullRequestHolderService使拉取消息保持長輪詢任務啟動

3.4 ClientHouseKeepingService執行緒定時清除不活動鏈接任務啟動

3.5 過濾伺服器任務啟動

3.6 向NameServer註冊broker資訊

3.7 開啟定時向NameServer註冊broker資訊任務