RocketMQ 源碼分析之路由中心(NameServer)
- 2020 年 4 月 10 日
- 筆記
你可能沒有看過 RocketMQ 的架構圖,沒關係,一起來學習一下,RocketMQ 架構圖如下:
在 RocketMQ 中,有四個角色:
- Producer:消息的生產者,每個 MQ 中間件都有。
- Consumer:消息的消費者,每個 MQ 中間件都有。
- NameServer:RocketMQ 的路由中心,跟 ZooKeeper 差不多。
- Broker:消息伺服器,RocketMQ 的消息全部存儲在這裡。
Producer 發送消息之前,先從 NameServer 中獲取到 Broker 伺服器列表,然後根據負載均衡策略選擇一台 Broker 發送,消息消費時也是同樣的道理。可以說 NameServer 是 RocketMQ 的大腦,想要實現路由分發的功能,那麼在 NameServer 必然要維護著 Broker 伺服器資訊,這中間就會涉及到 Broker 伺服器服務狀態管理問題,這篇文章就來聊一聊 RocketMQ 是如何做服務狀態管理的。
在聊服務狀態管理之前,先來講一講為何不用 ZooKeeper 來做路由中心?
聽聞早期的 RocketMQ 是使用 ZooKeeper 來做路由中心。我們知道 ZooKeeper 功能比較強大,包括自動 Master 選舉等,強大的同時部署維護就變得複雜了,但是 ZooKeeper 的很多功能 RocketMQ 並不需要,RocketMQ 只需要一個輕量級的元數據伺服器就夠了。所以就造了 NameServer 這個輪子。
還有一個原因就是中間件對穩定性要求比較高,使用 ZooKeeper 作為註冊和路由中心的話,就依賴了另一個中間件,提高了系統複雜性和維護成本,而 NameServer 只是 RocketMQ 中的一個模組,且只有少量程式碼,維護起來簡單,穩定性也提高了。
好了,說回服務狀態管理問題,其實這個並不陌生,在微服務領域有大量的中間件都涉及到了這個問題。對於服務狀態管理,一般有兩種解決思路。
第一種思路是主動探測,如圖:
主動探測是由路由方(比如 NameServer)發起的,每一個被路由方(比如 Broker)需要打開一個埠,然後路由方每隔一段時間(比如 30 秒)探測這些埠是否可用,如果可用就認為伺服器正常,否則認為服務不可用,就把服務從列表中刪除。
這種方式存在的問題就路由方壓力可能過大,如果被路由方部署的實例較多時,那麼每次探測的成本會比較高,探測的時間也比較長,可能會導致路由方可能不能正常工作。
第二種思路是心跳模式,如圖:
心跳模式不在是路由方發起了,改成被路由方每隔一段時間向路由方發送心跳包,路由方記錄被路由方的心跳包,包括伺服器IP、上報時間等。每一次上報後,更新對應的資訊。路由方啟動一個定時器,定期檢測當前時間和節點,最近續約時間的差值,如果達到一個閾值(比如說90秒),那麼認為這個服務節點不可用。
現在大部分需要服務狀態管理的中間件,都採用心跳模式,沒有太多的缺陷,也不會對伺服器造成多大的壓力。在 RocketMQ 中 NameServer 與 Broker 的通訊也是採用 心跳模式。
心跳模式中,有上報心跳、保存心跳資訊、定時檢測這個步驟。我們從上報心跳和定時檢測這兩個方面,從源碼的角度,看看 RocketMQ 是如何實現心跳模式的。
先從上報心跳開始,在 RocketMQ 中,默認情況下,Broker 伺服器會每間隔 30秒向集群中的所有 NameServer 發送心跳包。源程式碼是BrokerController#start()
,如下程式碼:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } // brokerConfig.getRegisterNameServerPeriod() 默認是 30 秒 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
其中上報心跳的時間用戶是可以自定義的,但是不會低於 10秒高於 60秒。當然這只是一個定時器,具體發送心跳包的方法是org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll()
,程式碼如下:
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 獲取所有 NameServer 伺服器 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 構建 broker 資訊 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 向 NameServer 逐個上報 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
心跳包發送完之後,就是 NameServer 處理心跳包了,NameServer 會將心跳資訊保存起來,保存心跳資訊的源程式碼我就不貼了,涉及的東西比較多,有興趣的可以查看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest()#RequestCode.REGISTER_BROKER
,一步一步 Debug 就知道保存過程。
來看看最後一個操作定時檢測,NameServer 會開啟一個探測執行緒,源程式碼在org.apache.rocketmq.namesrv.NamesrvController#initialize()
下,程式碼如下:
// 檢測 broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS);
NameServer 每 10秒會發起一次檢測。具體檢測源程式碼是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker()
,程式碼如下:
/** * 檢測 broker 狀態 */ public void scanNotActiveBroker() { // 遍歷 broker 存活列表 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); // 如果最後一次上報時間已經超過兩分鐘,則移出 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
NameServer 會遍歷 Broker 存活列表,如果最後一次發送心跳包的時間超過 120秒,則認為 Broker 伺服器不可用,將 Broker 從各種配置列表中移出。
到此為止,RocketMQ 的心跳模式實現就完成了,上面的源程式碼都是一些粗略的,具體的實現細節還是比較繁瑣的,有興趣的可以深入研究源碼,獲取更多詳細資訊。
關於RocketMQ 解決服務狀態管理的分享就這些,感謝您的閱讀,希望這篇文章對您的學習或者工作有一點幫助。有收穫的話,也可以幫忙推薦給其他的小夥伴,讓更多的人受益,萬分感謝。
最後
目前互聯網上很多大佬都有 RocketMQ 相關文章,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支援。若文中有所錯誤之處,還望提出,謝謝。
歡迎關注公眾號【互聯網平頭哥】。這裡有職場感悟、Java 技術,雖然不高大上,但通俗易懂。今天最好的是明天最低的要求,願你我共同進步。