RocketMQ入門基礎-環境&整合
- 2020 年 4 月 7 日
- 筆記
消息隊列作為高並發系統的核心組件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要用於三種典型場景:應用解耦、流量消峰、消息分發。
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比於Rabbitmq、kafka具有主要優勢特性有:
- 支持事務型消息(消息發送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
- 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提)
- 支持18個級別的延遲消息(rabbitmq和kafka不支持)
- 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動確認)
- 支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持)
- 支持重複消費(rabbitmq不支持,kafka支持)
本文主要介紹RocketMQ的單機安裝、雙機主從高可用安裝配置、運維管理平台搭建、與SpringBoot整合幾個知識點,具備相關知識技能的同學請直接拉到最後點個 「在看」 即可。

文章開始之前需要先準備好JDK1.8或以上的服務器環境以及從rocketmq官網下載好二進制安裝包,下載地址http://rocketmq.apache.org/dowloading/releases/
單機安裝配置
工欲善其事必先利其器,要想深入了解RocketMQ得先把環境安裝好,咱們先開始單機版RocketMQ的安裝!
- 解壓安裝
unzip rocketmq-all-4.7.0-bin-release.zip
- 啟動 Name Server
> nohup sh bin/mqnamesrv &
- 查看 Name Server啟動日誌
> tail -f ~/logs/rocketmqlogs/namesrv.log

- 啟動 Broker Server
> nohup sh bin/mqbroker -n localhost:9876 &
- 查看 Broker Server 啟動日誌
> tail -f ~/logs/rocketmqlogs/broker.log

單機情況下安裝使用RocketMQ很簡單,只需要分別啟動NameServer和Broker Server即可!
關閉RockerMQ需要使用下面的命令:
# 先關閉Broker Server > sh bin/mqshutdown broker # 再關閉NameServer > sh bin/mqshutdown namesrv
雙機主從高可用搭建
為了消除單機故障,增加可靠性或增大吞吐量,可以在多台服務器上部署多個NameServer和Broker,並為每個Broker部署一個或多個Slave。本節將說明使用兩台機器,搭建雙主、雙從、無單點故障的高可用RocketMQ集群。假設現在有兩台服務器,IP地址分別為:192.168.100.43和192.168.100.44,部署架構如下:

啟動多個NameServer 和 Broker
首先需要在兩台服務器上分別啟動NameServer(nohup sh bin/mqnamesrv &),這樣我們就得到了一個無單點的NameServer服務,服務地址為192.168.100.43:9876和192.168.100.44:9876。
然後在兩台服務器中RocketMQ的conf目錄分別建立兩個文件 broker-master.properties
,broker-slave.properties
,下面是不同服務器的配置說明:
- 192.168.100.43 機器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-a
- 192.168.100.43 機器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-b
- 192.168.100.44 機器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-b
- 192.168.100.44 機器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-a
然後分別使用如下命令啟動兩台服務器的主節點和從節點 nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
這樣一個高可用的RockerMQ集群就搭建好了,我們登陸可視化運維管理界面查看集群狀態,集群正常啟動。

重要參數說明
本節主要是對Broker的配置文件中用到的參數進行說明
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
指定NameServer的地址,可以是多個。brokerClusterName = DefaultCluster
Cluster地址,如果集群數量比較多,可以分成多個Cluster,每個Cluster供一個業務群使用。brokerName = broker-a
Broker的名稱,Master 和Slave 通過使用相同的 Broker 名稱來表明相互關係,以說明某個Slave 是哪個Master 的 Slave。brokerId = 1
一個Master可以有多個Slave,0表示Master,大於0的表示不同Slave的ID。fileReservedTime = 48
在磁盤上保存消息的時長,單位是小時,自動刪除超時的消息。deleteWhen = 04
與 fileReservedTime 參數對應,表明在幾點做消息刪除動作,默認是凌晨4點。brokerRole = SYNC_MASTER
brokerRole
的可選參數有SYNC_MASTER
,ASYNC_MASTER
,SLAVE
三種。SYNC
和ASYNC
表示MASTER
和SLAVE
之間同步消息的機制,SYNC
的意思是當Slave
和Master
的消息同步完成後再返回發送成功的狀態。flushDiskType = ASYNC_FLUSH
flushDiskType
表示刷盤策略,可選值有ASYNC_FLUSH 和 SYNC_FLUSH兩種,分別代表同步刷盤和異步刷盤。同步情況下,消息只有真正寫入磁盤才返回成功狀態;異步情況下,消息寫入page_cache後就返回成功狀態。listenPort = 11011
Broker監聽的端口,一台服務器啟動多個Broker,需要設置不同的監聽端口避免端口衝突。storePathRootDir = /app/rocketmq/store-a
存儲消息以及配置信息的根目錄。
可視化管理平台
RocketMQ可以使用rocketmq-externals
作為運維管理平台,Github地址https://github.com/apache/rocketmq-externals,我們需要將源碼下載下來後再進行手動編譯,過程如下:
- 下載 從github(https://github.com/apache/rocketmq-externals) 下載RocketMQ可視化管理工具
rocketmq-externals
的源碼; - 打包 下載完成後切換進rocketmq-console目錄,使用maven命令對其打包
mvn clean package -Dmaven.test.skip=true
,打包完成後生成可執行文件rocketmq-console-ng-1.0.1.jar - 運行 使用
java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876
命令啟動,這裡注意需要設置兩個參數:--server.port
為運行的這個web應用的端口,如果不設置的話默認為8080;--rocketmq.config.namesrvAddr
為RocketMQ命名服務地址,若NameServer為集群則使用英文 ; 分割 - 訪問 瀏覽器訪問
xxx.xxx.xxx.xxx:8080
進入控制台界面,效果如下

SpringBoot整合RocketMQ
在SpringBoot中整合RocketMQ主要用到 rocketmq-spring-boot-starter
組件,下面是詳細整合過程。
- 引入組件
rocketmq-spring-boot-starter
依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
- 修改application.yml,添加RocketMQ相關配置
rocketmq: name-server: 192.168.100.43:9876;192.168.100.44:9876 producer: group: test-group send-message-timeout: 3000
如果是集群,多個name-server使用英文 ; 分割。
- 編寫消息生產者
MessageProduce
/** * Description: * rocketMQ消息發送方法 * @author javadaily */ @Component public class MessageProduce { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發送消息 * @param topic 主題 * @param message 消息體 */ public void sendMessage(String topic,String message){ this.rocketMQTemplate.convertAndSend(topic,message); } }
使用RocketMQTemplate發送消息
- 編寫消息消費者
MessageConsumer
@Slf4j @Component @RocketMQMessageListener( topic = "test-topic", consumerGroup = "test-group", selectorExpression = "*" ) public class MessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("received message is {}", message); } }
消費者只需要繼承RocketMQListener類即可,主要關注實現類上的 @RocketMQMessageListener
註解,配置的 topic
和 consumerGroup
需要跟消息生產者的配置保持一致。
- 編寫單元測試發送消息
@RunWith(SpringRunner.class) @SpringBootTest public class MessageProduceTest { @Autowired private MessageProduce messageProduce; @Test public void testSendMessage() { messageProduce.sendMessage("test-topic","Hello,JAVA日知錄"); } }
- 測試 先啟動springboot應用,再執行測試用例。
