RocketMQ 平滑升級到主從切換(實戰篇)

  • 2019 年 11 月 6 日
  • 筆記

本文主要介紹如何將 RocketMQ 集群從原先的主從同步升級到主從切換。

本文首先介紹與 DLedger 多副本即 RocketMQ 主從切換相關的核心配置屬性,然後嘗試搭建一個主從同步集群,最後將原先的 RocketMQ 集群平滑升級到 DLedger 集群的示例,並簡單測試一下主從切換功能。

1、RocketMQ 主從切換核心配置參數詳解


其主要的配置參數如下所示:

  • enableDLegerCommitLog 是否啟用 DLedger,即是否啟用 RocketMQ 主從切換,默認值為 false。如果需要開啟主從切換,則該值需要設置為 true 。
  • dLegerGroup 節點所屬的 raft 組,建議與 brokerName 保持一致,例如 broker-a。
  • dLegerPeers 集群節點信息,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多個節點用英文冒號隔開,單個條目遵循 legerSlefId-ip:端口,這裡的端口用作 dledger 內部通信。
  • dLegerSelfId 當前節點id。取自 legerPeers 中條目的開頭,即上述示例中的 n0,並且特別需要強調,只能第一個字符為英文,其他字符需要配置成數字。
  • storePathRootDir DLedger 日誌文件的存儲根目錄,為了能夠支持平滑升級,該值與 storePathCommitLog 設置為不同的目錄。

2、搭建主從同步環境


首先先搭建一個傳統意義上的主從同步架構,往集群中灌一定量的數據,然後升級到 DLedger 集群。

在 Linux 服務器上搭建一個 rocketmq 主從同步集群我想不是一件很難的事情,故本文就不會詳細介紹按照過程,只貼出相關配置。

實驗環境的部署結構採取 一主一次,其部署圖如下:

下面我就重點貼一下 broker 的配置文件。

220 上的 broker 配置文件如下:

brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId =  deleteWhen =  fileReservedTime =  brokerRole = ASYNC_MASTER  flushDiskType = ASYNC_FLUSH  brokerIP1=192.168.0.220  brokerIP2=192.168.0.220  namesrvAddr=192.168.0.221:;192.168.0.220:  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store  storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog  autoCreateTopicEnable=false  autoCreateSubscriptionGroup=false  

221 上 broker 的配置文件如下:

brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId =  deleteWhen =  fileReservedTime =  brokerRole = SLAVE  flushDiskType = ASYNC_FLUSH  brokerIP1=192.168.0.221  brokerIP2=192.168.0.221  namesrvAddr=192.168.0.221:;192.168.0.220:  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store  storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog  autoCreateTopicEnable=false  autoCreateSubscriptionGroup=false  

相關的啟動命令如下:

nohup bin/mqnamesrv  /dev/null  >& &  nohup bin/mqbroker -c conf/broker.conf  /dev/null  >& &  

安裝後的集群信息如圖所示:

3、主從同步集群升級到DLedger


3.1 部署架構

DLedger 集群至少需要3台機器,故搭建 DLedger 還需要再引入一台機器,其部署結構圖如下:

從主從同步集群升級到 DLedger 集群,用戶最關心的還是升級後的集群是否能夠兼容原先的數據,即原先存儲在消息能否能被消息消費者消費端,甚至於能否查詢到。 為了方便後續驗證,首先我使用下述程序向 mq 集群中添加了一篇方便查詢的消息(設置消息的key)。

public class Producer {      public static void main(String[] args) throws MQClientException, InterruptedException {          DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");          producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");          producer.start();          for(int i =; i < ; i ++) {              try {                  Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));                  SendResult sendResult = producer.send(msg);                 //System.out.printf("%s%n", sendResult);              } catch (Exception e) {                  e.printStackTrace();                  Thread.sleep();              }          }          producer.shutdown();          System.out.println("end");      }  }  

消息的查詢結果示例如下:

3.2 升級步驟

Step1:將 192.168.0.220 的 rocketmq 拷貝到 192.168.0.222,可以使用如下命令進行操作。在 192.168.0.220 上敲如下命令:

 scp -r rocketmq-all-4.5.2-bin-release/ [email protected]:/opt/application/rocketmq-all-4.5.2-bin-release  

溫馨提示:示例中由於版本是一樣,實際過程中,版本需要升級,故需先下載最新的版本,然後將老集群中的 store 目錄完整的拷貝到新集群的 store 目錄。

Step2:依次在三台服務器的 broker.conf 配置文件中添加與 dledger 相關的配置屬性。

192.168.0.220 broker配置文件如下:

brokerClusterName = DefaultCluster  brokerId =  deleteWhen =  fileReservedTime =  brokerRole = ASYNC_MASTER  flushDiskType = ASYNC_FLUSH  brokerIP1=192.168.0.220  brokerIP2=192.168.0.220  namesrvAddr=192.168.0.221:;192.168.0.220:  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store  storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog  autoCreateTopicEnable=false  autoCreateSubscriptionGroup=false  # 與 dledger 相關的屬性  enableDLegerCommitLog=true  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store  dLegerGroup=broker-a  dLegerPeers=n0-192.168.0.220:;n1-192.168.0.221:;n2-192.168.0.222:  dLegerSelfId=n0  

192.168.0.221 broker配置文件如下:

brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId =  deleteWhen =  fileReservedTime =  brokerRole = SLAVE  flushDiskType = ASYNC_FLUSH  brokerIP1=192.168.0.221  brokerIP2=192.168.0.221  namesrvAddr=192.168.0.221:;192.168.0.220:  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store  storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog  autoCreateTopicEnable=false  autoCreateSubscriptionGroup=false  # 與dledger 相關的配置屬性  enableDLegerCommitLog=true  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store  dLegerGroup=broker-a  dLegerPeers=n0-192.168.0.220:;n1-192.168.0.221:;n2-192.168.0.222:  dLegerSelfId=n1  

192.168.0.222 broker配置文件如下:

brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId =  deleteWhen =  fileReservedTime =  brokerRole = ASYNC_MASTER  flushDiskType = ASYNC_FLUSH  brokerIP1=192.168.0.222  brokerIP2=192.168.0.222  namesrvAddr=192.168.0.221:;192.168.0.220:  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store  storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog  autoCreateTopicEnable=false  autoCreateSubscriptionGroup=false  # 與 dledger 相關的配置  enableDLegerCommitLog=true  storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store  dLegerGroup=broker-a  dLegerPeers=n0-192.168.0.220:;n1-192.168.0.221:;n2-192.168.0.222:  dLegerSelfId=n2  

溫馨提示:legerSelfId 分別為 n0、n1、n2。在真實的生產環境中,broker配置文件中的 storePathRootDir、storePathCommitLog 盡量使用單獨的根目錄,這樣判斷其磁盤使用率時才不會相互影響。

Step3:將 store/config 下的 所有文件拷貝到 dledger store 的 congfig 目錄下。

cd /opt/application/rocketmq-all-4.5.2-bin-release/store/  cp config/* dledger_store/config/    

溫馨提示:該步驟按照各自按照時配置的目錄進行複製即可。

Step4:依次啟動三台 broker。

nohup bin/mqbroker -c conf/broker.conf  /dev/null  >& &  

如果啟動成功,則在 rocketmq-console 中看到的集群信息如下:

3.3 驗證消息發送與消息查找

首先我們先驗證升級之前的消息是否能查詢到,那我們還是查找key 為 m600000 的消息,查找結果如圖所示:

然後我們來測試一下消息發送。測試代碼如下:

public class Producer {      public static void main(String[] args) throws MQClientException, InterruptedException {          DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");          producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");          producer.start();          for(int i =; i < ; i ++) {              try {                  Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));                  SendResult sendResult = producer.send(msg);                  System.out.printf("%s%n", sendResult);              } catch (Exception e) {                  e.printStackTrace();                  Thread.sleep();              }          }          producer.shutdown();          System.out.println("end");      }  }  

執行結果如下:

再去控制台查詢一下消息,其結果也表明新的消息也能查詢到。

最後我們再來驗證一下主節點宕機,消息發送是否會受影響。

在消息發送的過程中,去關閉主節點,其截圖如下:

再來看一下集群的狀態:

等待該複製組重新完成主服務器選舉後,即可繼續處理消息發送。

溫馨提示:由於本示例是一主一從,故在選舉期間,消息不可用,但在真實的生產環境上,其部署架構是多主主從,即一個複製組在 leader 選舉期間,其他複製組可以接替該複製組完成消息的發送,實現消息服務的高可用。

與 DLedger 相關的日誌,默認存儲在 broker_default.log 文件中。

本文就介紹到這裡了,如果覺得文章對您有幫助的話,還希望幫忙點個【在看】,謝謝。