RocketMQ入門基礎-環境&整合

概述&選型

消息隊列作為高並發系統的核心組件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要用於三種典型場景:應用解耦流量消峰消息分發

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比於Rabbitmq、kafka具有主要優勢特性有:

  • 支持事務型消息(消息發送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
  • 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提)
  • 支持18個級別的延遲消息(rabbitmq和kafka不支持)
  • 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動確認)
  • 支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持)
  • 支持重複消費(rabbitmq不支持,kafka支持)

本文主要介紹RocketMQ的單機安裝、雙機主從高可用安裝配置、運維管理平台搭建、與SpringBoot整合幾個知識點,具備相關知識技能的同學請直接拉到最後點個 「在看」 即可。

文章開始之前需要先準備好JDK1.8或以上的服務器環境以及從rocketmq官網下載好二進制安裝包,下載地址http://rocketmq.apache.org/dowloading/releases/

單機安裝配置

工欲善其事必先利其器,要想深入了解RocketMQ得先把環境安裝好,咱們先開始單機版RocketMQ的安裝!

  • 解壓安裝 unzip rocketmq-all-4.7.0-bin-release.zip
  • 啟動 Name Server > nohup sh bin/mqnamesrv &
  • 查看 Name Server啟動日誌 > tail -f ~/logs/rocketmqlogs/namesrv.log
  • 啟動 Broker Server > nohup sh bin/mqbroker -n localhost:9876 &
  • 查看 Broker Server 啟動日誌 > tail -f ~/logs/rocketmqlogs/broker.log

單機情況下安裝使用RocketMQ很簡單,只需要分別啟動NameServer和Broker Server即可!

關閉RockerMQ需要使用下面的命令:

# 先關閉Broker Server  > sh bin/mqshutdown broker  # 再關閉NameServer  > sh bin/mqshutdown namesrv  

雙機主從高可用搭建

為了消除單機故障,增加可靠性或增大吞吐量,可以在多台服務器上部署多個NameServer和Broker,並為每個Broker部署一個或多個Slave。本節將說明使用兩台機器,搭建雙主、雙從、無單點故障的高可用RocketMQ集群。假設現在有兩台服務器,IP地址分別為:192.168.100.43和192.168.100.44,部署架構如下:

啟動多個NameServer 和 Broker

首先需要在兩台服務器上分別啟動NameServer(nohup sh bin/mqnamesrv &),這樣我們就得到了一個無單點的NameServer服務,服務地址為192.168.100.43:9876和192.168.100.44:9876。

然後在兩台服務器中RocketMQ的conf目錄分別建立兩個文件 broker-master.propertiesbroker-slave.properties,下面是不同服務器的配置說明:

  • 192.168.100.43 機器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876  brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId = 0  deleteWhen = 04  fileReservedTime = 48  brokerRole = SYNC_MASTER  flushDiskType = ASYNC_FLUSH  listenPort = 10911  storePathRootDir = /app/rocketmq/store-a  
  • 192.168.100.43 機器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876  brokerClusterName = DefaultCluster  brokerName = broker-b  brokerId = 1  deleteWhen = 04  fileReservedTime = 48  brokerRole = SLAVE  flushDiskType = ASYNC_FLUSH  listenPort = 11011  storePathRootDir = /app/rocketmq/store-b  
  • 192.168.100.44 機器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876  brokerClusterName = DefaultCluster  brokerName = broker-b  brokerId = 0  deleteWhen = 04  fileReservedTime = 48  brokerRole = SYNC_MASTER  flushDiskType = ASYNC_FLUSH  listenPort = 10911  storePathRootDir = /app/rocketmq/store-b  
  • 192.168.100.44 機器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876  brokerClusterName = DefaultCluster  brokerName = broker-a  brokerId = 1  deleteWhen = 04  fileReservedTime = 48  brokerRole = SLAVE  flushDiskType = ASYNC_FLUSH  listenPort = 11011  storePathRootDir = /app/rocketmq/store-a  

然後分別使用如下命令啟動兩台服務器的主節點和從節點 nohup sh bin/mqbroker -c conf/broker-master.properties & nohup sh bin/mqbroker -c conf/broker-slave.properties &

這樣一個高可用的RockerMQ集群就搭建好了,我們登陸可視化運維管理界面查看集群狀態,集群正常啟動。

重要參數說明

本節主要是對Broker的配置文件中用到的參數進行說明

  • namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 指定NameServer的地址,可以是多個。
  • brokerClusterName = DefaultCluster Cluster地址,如果集群數量比較多,可以分成多個Cluster,每個Cluster供一個業務群使用。
  • brokerName = broker-a Broker的名稱,Master 和Slave 通過使用相同的 Broker 名稱來表明相互關係,以說明某個Slave 是哪個Master 的 Slave。
  • brokerId = 1 一個Master可以有多個Slave,0表示Master,大於0的表示不同Slave的ID。
  • fileReservedTime = 48 在磁盤上保存消息的時長,單位是小時,自動刪除超時的消息。
  • deleteWhen = 04 與 fileReservedTime 參數對應,表明在幾點做消息刪除動作,默認是凌晨4點。
  • brokerRole = SYNC_MASTER brokerRole的可選參數有SYNC_MASTERASYNC_MASTERSLAVE三種。SYNCASYNC 表示MASTERSLAVE 之間同步消息的機制,SYNC的意思是當SlaveMaster 的消息同步完成後再返回發送成功的狀態。
  • flushDiskType = ASYNC_FLUSH flushDiskType 表示刷盤策略,可選值有ASYNC_FLUSH 和 SYNC_FLUSH兩種,分別代表同步刷盤和異步刷盤。同步情況下,消息只有真正寫入磁盤才返回成功狀態;異步情況下,消息寫入page_cache後就返回成功狀態。
  • listenPort = 11011 Broker監聽的端口,一台服務器啟動多個Broker,需要設置不同的監聽端口避免端口衝突。
  • storePathRootDir = /app/rocketmq/store-a 存儲消息以及配置信息的根目錄。

可視化管理平台

RocketMQ可以使用rocketmq-externals作為運維管理平台,Github地址https://github.com/apache/rocketmq-externals,我們需要將源碼下載下來後再進行手動編譯,過程如下:

  • 下載 從github(https://github.com/apache/rocketmq-externals) 下載RocketMQ可視化管理工具 rocketmq-externals 的源碼;
  • 打包 下載完成後切換進rocketmq-console目錄,使用maven命令對其打包 mvn clean package -Dmaven.test.skip=true ,打包完成後生成可執行文件rocketmq-console-ng-1.0.1.jar
  • 運行 使用 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876 命令啟動,這裡注意需要設置兩個參數: --server.port 為運行的這個web應用的端口,如果不設置的話默認為8080; --rocketmq.config.namesrvAddr 為RocketMQ命名服務地址,若NameServer為集群則使用英文 ; 分割
  • 訪問 瀏覽器訪問 xxx.xxx.xxx.xxx:8080 進入控制台界面,效果如下

SpringBoot整合RocketMQ

在SpringBoot中整合RocketMQ主要用到 rocketmq-spring-boot-starter 組件,下面是詳細整合過程。

  • 引入組件rocketmq-spring-boot-starter 依賴
<dependency>  	<groupId>org.apache.rocketmq</groupId>  	<artifactId>rocketmq-spring-boot-starter</artifactId>  	<version>2.1.0</version>  </dependency>  
  • 修改application.yml,添加RocketMQ相關配置
rocketmq:    name-server: 192.168.100.43:9876;192.168.100.44:9876    producer:      group: test-group      send-message-timeout: 3000  

如果是集群,多個name-server使用英文 ; 分割。

  • 編寫消息生產者 MessageProduce
/**   * Description:   * rocketMQ消息發送方法   * @author javadaily   */  @Component  public class MessageProduce {        @Autowired      private RocketMQTemplate rocketMQTemplate;        /**       * 發送消息       * @param topic 主題       * @param message 消息體       */      public void sendMessage(String topic,String message){          this.rocketMQTemplate.convertAndSend(topic,message);      }  }  

使用RocketMQTemplate發送消息

  • 編寫消息消費者 MessageConsumer
@Slf4j  @Component  @RocketMQMessageListener(          topic = "test-topic",          consumerGroup = "test-group",          selectorExpression = "*"  )  public class MessageConsumer implements RocketMQListener<String> {      @Override      public void onMessage(String message) {          log.info("received message is {}", message);      }  }  

消費者只需要繼承RocketMQListener類即可,主要關注實現類上的 @RocketMQMessageListener 註解,配置的 topicconsumerGroup 需要跟消息生產者的配置保持一致。

  • 編寫單元測試發送消息
@RunWith(SpringRunner.class)  @SpringBootTest  public class MessageProduceTest {      @Autowired      private MessageProduce messageProduce;        @Test      public void testSendMessage() {          messageProduce.sendMessage("test-topic","Hello,JAVA日知錄");      }  }  
  • 測試 先啟動springboot應用,再執行測試用例。