RocketMQ Broker啟動流程梳理
- 2019 年 10 月 6 日
- 筆記
org.apache.rocketmq.broker.BrokerStartup:
public static void main(String[] args) { start(createBrokerController(args)); }
1.創建配置類
初始化配置主要任務是根據 properties 文件以及命令行參數值,創建了以下配置類:
•nettyServerConfig:封裝了作為消息隊列伺服器的配置資訊•nettyClientConfig:封裝了作為NameServer客戶端配置資訊•brokerConfig:封裝了 Broker 配置資訊•messageStoreConfig:封裝了 RocketMQ 存儲系統的配置資訊
1.Broker 初始化
2.1 配置文件載入
•主題配置載入:
result = result && this.consumerOffsetManager.load();
這一步主要是載入 topics.json 文件,並解析生成 TopicConfigSerializerWrapper 對象,並 set 進 topicConfigTable 中。
•消費者位移管理載入:
result = result && this.subscriptionGroupManager.load();
這一步主要是載入 consumerOffset.json 文件,並解析生成 ConsumerOffsetManager 對象,並替換 offsetTable 成員值。
•消費者訂閱組載入:
result = result && this.consumerFilterManager.load();
這一步主要是載入 subscriptionGroup.json 文件,並解析生成 SubscriptionGroupManager 對象,並放進 subscriptionGroupTable 中。
•消費者過濾管理載入:
result = result && this.consumerFilterManager.load();
這一步主要是載入 consumerFilter.json 文件,並解析生成 ConsumerFilterManager 對象
•messageStore 消息存儲初始化:
if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; e.printStackTrace(); } }
這一步主要是創建了 DefaultMessageStore 對象,這是 Broker 消息寸處的核心實現,創建該對象時也會啟動很多相關服務執行緒,用於管理 store 的存儲。
•messageStore載入:
result = result && this.messageStore.load();
1)延遲消息載入:載入 delayOffset.json 文件,解析生成DelayOffsetSerializerWrapper,並加入offsetTable中
2)commitLog載入:MappfileQueue映射文件隊列載入,載入定義的storePath目錄文件
3)consumeQueue載入
2.2 初始化執行緒池
•創建nettyRemotingServer:根據前面初始化好的nettyConfig創建遠程通訊服務
•根據brokerConfig初始化各種執行緒池:
1)初始化發送消息執行緒池
2)初始化拉取消息執行緒池
3)初始化broker管理執行緒池
4)初始化client管理執行緒池
5)初始化消費者管理執行緒池
•把這些執行緒池註冊到nettyRemotingServer中
2.3 初始化定時任務:
在執行緒池註冊完後,就會開啟各種定時任務:
•開啟定時記錄 Broker 的狀態(消息拉取時間總和、消息發送總和等)
BrokerController.this.getBrokerStats().record();
•消息位移持久化,定時向 consumerOffset.json 文件中寫入消費者偏移量
BrokerController.this.consumerOffsetManager.persist();
•消息過濾持久化,定時向 consumerFilter.json 文件寫入消費者過濾器資訊
BrokerController.this.consumerFilterManager.persist();
•定時禁用消費慢的消費者以保護 Broker,可以設置 disableConsumeIfConsumerReadSlowly 屬性,默認 false
BrokerController.this.protectBroker();
•定時列印 Send、Pull、Query、Transaction 資訊
BrokerController.this.printWaterMark();
•定時列印已存儲在提交日誌中但尚未調度到消費隊列的位元組數
rokerController.this.getMessageStore().dispatchBehindBytes())
•定時獲取 namserver 地址
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
如果是從伺服器:
•定時從主伺服器獲取 TopicConfig、ConsumerOffset、DelayOffset、SubscriptionGroupConfig 等資訊
BrokerController.this.slaveSynchronize.syncAll();
如果是主伺服器:
•定時列印從伺服器落後的位元組數
BrokerController.this.printMasterAndSlaveDiff();
2.4 添加進程退出時關閉broker資源的鉤子函數
1.Broker 啟動
3.1 messageStore啟動:
•啟動各類執行緒服務:
1)啟動刷盤任務執行緒 2)啟動commitLog執行緒 3)啟動存儲存儲統計服務執行緒storeStateService 4)啟動延遲定時消息服務執行緒 5)啟動消息分發到各中Consumer queue服務執行緒reputMessageService 6)啟動HA主從同步執行緒
•啟動各類定時任務
3.2 啟動netty服務:
remotingServer啟動:啟動遠程通訊服務 fastRemotingServer啟動:啟動遠程通訊服務 broker對外API啟動:啟動client遠程通訊服務
3.3 pullRequestHolderService使拉取消息保持長輪詢任務啟動
3.4 ClientHouseKeepingService執行緒定時清除不活動鏈接任務啟動
3.5 過濾伺服器任務啟動
3.6 向NameServer註冊broker資訊
3.7 開啟定時向NameServer註冊broker資訊任務
