RocketMQ 4.7.1 環境搭建、集群、MQ整合SpringBoot
- 2020 年 10 月 26 日
- 筆記
- mq, Spring Boot, 分散式架構, 技術乾貨
導讀
之前學過ActiveMQ但是並發量不是很大點我直達,所以又學阿里開源的RocketMQ,據說隊列可以堆積億級別。下面是網上找的消息隊列對比圖,僅供參考
部署
官網
前置條件
- 推薦使用64位作業系統,建議使用Linux / Unix / Mac;
- 64位JDK 1.8+;
- Maven 3.2.x;
- Git;
- 適用於Broker伺服器的記憶體4G +可用磁碟
下載
地址://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
百度雲盤:
鏈接: //pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg 密碼: varj
安裝依賴項
export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin
mq上傳至linux
解壓
maven編譯
啟動NameServer
後台啟動方式
nohup sh bin/mqnamesrv &
NameServer啟動時記憶體不足(問題解決)
找到runserver.sh 修改JAVA_OPT
vim /bin/runserver.sh配置
啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
語法:nohup sh bin/mqbroker -n NameServer服務ip地址
Broker記憶體不足(問題解決)
找到runbroker.sh 修改JAVA_OPT
vim /bin/runbroker.sh配置
服務都啟動成功
模擬消費
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
開2個控制台,連接通一台linux
注意
NameServer默認埠號:9876;broker默認埠號:10911
可視化控制台
官網地址
百度雲盤
鏈接: //pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg 密碼: v6bq
解壓
安裝編譯
進入:/opt/soft/rocketmq-externals-master/rocketmq-console
編譯: mvn clean package -Dmaven.test.skip=true
修改appliccation.properties的rocketmq.config.namesrvAddr
編譯打包
啟動
進入target目錄,啟動java -jar
守護進程啟動: nohup java -jar rocketmq-console-ng-2.0.0.jar &
SpringBoot整合RocketMQ(生產者)
創建SpringBoot項目
項目結構
加入依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
PayProducer.java
package com.ybchen.ybchenmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; /** * 消息生產者 */ @Component public class PayProducer { /** * 生產者所屬的組 */ private String producerGroup = "pay_group"; /** * MQ的地址,注意需開放埠號或者關閉防火牆 */ private String nameServerAddr = "192.168.199.100:9876"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多個地址以;隔開 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") producer.setNamesrvAddr(nameServerAddr); start(); } /** * 獲取生產者 * @return */ public DefaultMQProducer getProducer() { return this.producer; } /** * 開啟,對象在使用之前必須要調用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 關閉,一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { this.producer.shutdown(); } }
PayController.java
package com.ybchen.ybchenmq.controller; import com.ybchen.ybchenmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @ClassName:PayController * @Description:支付 * @Author:chenyb * @Date:2020/10/18 2:47 下午 * @Versiion:1.0 */ @RestController @RequestMapping("/api/v1") public class PayController { @Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; /** * 支付回調 * * @param text * @return */ @RequestMapping("pay_cb") public Object callback(String text) { /** * String topic:話題 * String tags:二級分類 * byte[] body:body消息位元組數組 */ Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes()); try { SendResult send = payProducer.getProducer().send(message); System.out.println("send------>"+send); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; } }
測試
常見錯誤
錯誤一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里雲存在多網卡,rocketmq會根據當前網卡選擇一個IP使用,當你的機器有多塊網卡時,很可能會有問題,比如,機器上有兩個ip,一個公網ip,一個私網ip,因此需要配置broker.conf指定當前公網的ip,然後重啟broker
修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增這個配置:brokerIP1=xxx.xxx.xxx.xxx
啟動命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
錯誤2
MQClientException: No route info of this topic, TopicTest1
原因:Broker 緊追自動創建Topic,且用戶沒有通過手工方式創建此Topic,或者broker和Nameserver網路不通
解決:
通過sh bin/mqbroker -m 查看配置
autoCreateTopicEnable=true 則自動創建Topic
Centos 7 關閉防火牆:systemctl stop firewalld
錯誤3
控制台查看不了數據,提示連接10909錯誤
原因:Rocket默認開啟了VIP通道,VPI通道埠號為10911-2=10909
解決:阿里雲安全組添加一個埠:10909
錯誤4
無法自動創建topic:客戶端版本要和服務端版本保持一致
伺服器上裝的是4.7.1 引入依賴項時 <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency>
檢索消息發送
SpringBoot整合RocketMQ(消費者)
創建SpringBoot項目
項目結構
加入依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
PayConsumer.java
package com.ybchen.ybchenmqconsumer.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; /** * @ClassName:PayConsumer * @Description:消費者 * @Author:chenyb * @Date:2020/10/18 4:13 下午 * @Versiion:1.0 */ @Component public class PayConsumer { /** * 生產者所屬的組 */ private String producerGroup = "pay_consumer_group"; /** * MQ的地址,注意需開放埠號或者關閉防火牆 */ private String nameServerAddr = "192.168.199.100:9876"; /** * 訂閱主題 */ private String topic = "ybchen_pay_topic"; private DefaultMQPushConsumer consumer; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); //指定NameServer地址,多個地址以;隔開 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") consumer.setNamesrvAddr(nameServerAddr); //設置消費地點,從最後一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題,監聽主題下的那些標籤 consumer.subscribe(topic, "*"); //註解一個監聽器 //lambda方式 // consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { // try { // Message message = msg.get(0); // System.out.printf("%s Receive New Messages: %s %n", // Thread.currentThread().getName(), new String(msg.get(0).getBody())); // //主題 // String topic = message.getTopic(); // //消息內容 // String body = null; // body = new String(message.getBody(), "utf-8"); // //二級分類 // String tags = message.getTags(); // //鍵 // String keys = message.getKeys(); // System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // } // }); //一般方式 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { Message message = list.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8")); //主題 String topic = message.getTopic(); //消息內容 String body = null; body = new String(message.getBody(), "utf-8"); //二級分類 String tags = message.getTags(); //鍵 String keys = message.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start .........."); } }
application.properties
server.port=8081
測試生產者消費者
MQ集群架構模式分析
單節點
優點
本地開發測試,配置簡單,同步刷盤消息一條都不會丟
缺點
不可靠,如果宕機,會導致服務不可用
主從(非同步、同步雙寫)
優點
同步雙寫消息不丟失,非同步複製存在少量丟失你,主節點宕機,從節點可以對外提供消息的消費,但是不支援寫入
缺點
主備有短暫消息延遲,毫秒級,目前不支援自動切換,需要腳本或者其他程式進行檢測然後停止broker,重啟讓從節點成為主節點
雙主
優點
配置簡單,可以靠配置RAID磁碟陣列保證消息可靠,非同步刷盤丟失少量消息
缺點
master宕機期間,未被消費的消息在機器恢復之前不可消息,實時性會受到影響
雙主雙從,多主多從模式(非同步複製)
優點
磁碟損壞,消息丟失的非常小,消息實時性不會受影響,Master宕機後,消費者仍然可以從Slave消費
缺點
主備有短暫消息延遲,毫秒級,如果Master宕機,磁碟損壞情況,會丟失你少量消息
雙主雙從,多主多從模式(同步雙寫)
優點
同步雙寫方式,主備都寫成功,才嚮應用返回成功,服務可用性與數據可用性非常高
缺點
性能比非同步複製模式略低,主宕機後,備機不能自動切換為主機
推薦
- 主從(非同步、同步雙寫)
- 雙主雙從,多主多從模式(非同步複製)
- 雙主雙從,多主多從模式(同步雙寫)
主從集群搭建
準備工作
準備2台機器,ip地址分別為:192.168.199.100;192.168.199.101;
環境:RocketMQ4.7.1+jdk8+Maven+Centos 7
啟動兩台nameserver
啟動兩個機器的nameserver
路徑:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
啟動:nohup sh bin/mqnamesrc &
編輯並啟動roccketmq
主節點
進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async
編輯並修改如下:vim broker-a.properties
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &
從節點
進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async
編輯並修改如下:vim broker-a-s.properties
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &
注意事項
- namesrvAddr:相同
- brokerClusterName:相同
- brokerName:相同
- brokerId:不同,0是主節點
- deleteWhen:相同
- fileReservedTime:相同
- brokerRole:不同,分ASYNC_MASTER、SLAVE
- flushDiskType:相同
啟動broker
使用管控台
使用192.168.199.100這台伺服器,修改配置
192.168.199.100這台伺服器
進入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources
修改配置文件:vim application.properties
rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
編譯
切換到:/opt/soft/rocketmq-externals-master/rocketmq-console
打包:
mvn clean
mvn install -Dmaven.test.skip=true
啟動
進入:/opt/soft/rocketmq-externals-master/rocketmq-console/target
守護進程方式啟動:nohup java -jar rocketmq-console-ng-2.0.0.jar &
集群測試
故障演練
模擬主掛了,但是從還可以被消費,此時不能寫入,等主重啟後,可以繼續寫入(數據不會被重複消費),以下內容是連續的
總結
好了,到目前為止,主從已經搭建完成了。
Broker分為Master和Slave,一個Master可以對應多個Slave,但一個Slave只能對應一個Master,Master與Slave通過相同的Broker Name來匹配,不同的Broker id來定義時Master還是Slave
Broker向所有的NameServer節點建立長連接,定時註冊Topic和發送元數據資訊
NameServer定時掃描(默認2分鐘)所有存活Broker的連接,如果超過時間沒響應,則斷開連接(心跳檢測),但是Consumer客戶端不能感知,Consumer定時(30秒)從NameServer獲取topic的最新資訊,所以broker不可用時,Consumer最多需要30秒才能發現
只有Master才能進行寫入操作,Slave不允許寫入只能同步,同步策略取決於Master配置
客戶端消費可以從Master和Slave消費,默認消費者都從Master消費,如果在Master掛了之後,客戶端從NameServer中感知Broker宕機,就會從Slave消費,感知非實時,存在一定的滯後性,Slave不能保證Master的100%都同步過來,會有少量的消息丟失。一旦Master恢復,未同步過去的消息會被最終消費掉。
如果Consumer實例的數量比Message Queue的總數量還多的話,多出來的Consumer實例將無法分到Queue,也就無法消費到消息,也就無法起到分攤負載的作用,所以需要控制讓Queue的總數量大於Consumer的數量。
場景模擬
生產和消費重試及處理
生產者重試
- 消息重試(保證數據的高可靠性),本身內部支援重試,默認次數是2
- 如果網路情況較差,或者跨集群則建議多改幾次
生產者設置重試次數,並設置唯一的key(一般唯一標識符)
消費者重試
- 原因:消息處理異常,broker端到consumer端各種問題,如網路原因閃斷,消費處理失敗,ACK返回失敗等
- 注意
- 重試間隔時間配置,默認每條消息最多重試16次
- 超過重試次數人工補償
- 消費端去重
- 一條消息無論重試多少次,這些重試消息的Message ID,key不會改變
- 消費重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息不再重試,繼續消費新的消息
設置廣播方式
模擬消息重發
非同步發送消息和回調實戰
應用場景
比如12306付完錢💰後,非同步出票,對性能要求高,可以支援更高的並發,回調成功後觸發相應的業務(onSuccess)
官方例子
改造生產者
演示
onSuccess:因為是非同步方式,這裡可以記錄日誌啥的
onException:補償機制,根據實際情況使用,看是否進行重試
OneWay(無需等待)
應用場景
主要做日誌收集,適用於對性能要求高,但可靠性並不高的場景。
延遲消息實戰
什麼是延遲消息
- Producer將消息發送到消息隊列RocketMQ服務端,但並不期望這條消息立馬投遞,而是推遲在當前時間點之後的某一個時間投遞到Consumer進行消費,該消息即定時消息,目前支援固定精度的消息
- 延遲消息級別,1….18
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
應用場景
- 通過消息觸發一些定時任務,比如在某一固定時間點向用戶發送提醒消息
- 消息生產和消費有時間窗口要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延遲消息。這條消息將會在30分鐘以後投遞給消費者,消費者收到此消息後需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略。
改生產者
生產者MessageQueueSelector實戰
簡介
生產消息使用MessageQueueSelector投遞到Topic下指定的Queue
應用場景
- 順序消息
- 分攤負載
默認topic下的queue數量是4,可以配置
支援同步,非同步發送指定的MessageQueue
選擇的queue數量必須小於配置的,否則會出錯
好處
如果隊列中某個產品,流量暴增,隨機分配的話,會導致整個Topic都不能使用,指定到隊列的話,如果這個隊列壞了,其他隊列不影響使用。
改造生產者
同步發送
發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC276723EAC0000, offsetMsgId=C0A8C76400002A9F000000000009B536, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=1]
發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672BCD50001, offsetMsgId=C0A8C76400002A9F000000000009B602, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=2]
發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672CAA20002, offsetMsgId=C0A8C76400002A9F000000000009B6CF, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=3]
可以看到列印出來的,queueId=0
非同步發送
生產者端程式碼修改
@Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; /** * 支付回調 * * @param text * @return */ @RequestMapping("pay_cb") public Object callback(String text) { /** * String topic:話題 * String tags:二級分類 * byte[] body:body消息位元組數組 */ Message message = new Message(TOPIC, "tag_a", text.getBytes()); //生產者使用MessageQueueSelector投遞到Topic下指定的Queue,arg只能小於等於4 // try { // SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { // @Override // public MessageQueue select(List<MessageQueue> list, Message message, Object o) { // int queueNum=Integer.parseInt(o.toString()); // return list.get(queueNum); // } // }, 0); // System.out.printf("發送結果=%s,msg=%s",sendResult.getSendStatus(),sendResult); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (MQBrokerException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } //非同步發送到指定的queue try { payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { int queueNum = Integer.parseInt(o.toString()); return list.get(queueNum); } }, 3, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("發送結果=%s,msg=%s", sendResult.getSendStatus(), sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h //message.setDelayTimeLevel(2); // try { // SendResult send = payProducer.getProducer().send(message); // System.out.println("send------>"+send); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (MQBrokerException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } //非同步發送 // try { // payProducer.getProducer().send(message, new SendCallback() { // @Override // public void onSuccess(SendResult sendResult) { // System.out.printf("發送結果=%s,msg=%s",sendResult.getSendStatus(),sendResult); // } // // @Override // public void onException(Throwable e) { // e.printStackTrace(); // //補償機制,根據實際情況使用,看是否進行重試 // } // }); // } catch (MQClientException e) { // e.printStackTrace(); // } catch (RemotingException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } return "ok"; }
順序消息的應用場景
簡介
順序消息可以應用到電商和證券系統,訂單系統。
什麼是順序系統?
消息的生產和消費順序一致
全局順序
topic下面全部消息都要有序(很少用)
- 性能要求不高,所有的消息嚴格按照FIFO(先進先出)原則進行消息發布和消費的場景,並行度成為消息系統的瓶頸,吞吐量不夠
- 在證券處理中,以人民幣兌換美元為例,在價格相同的情況下,先出價者優先處理,則可以通過全局順序的方式進行發布和消費
局部順序
只要保證一組消息被順序消費即可(RocketMQ中使用)
- 性能要求高
- 電商的訂單創建,同一訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息、訂單交易成功消息都會按照先後順序來發布和消費
順序發布
對於指定的一個Topic,客戶端按照一定的先後順序發送消息
順序消費
對於指定的一個Topic,按照一定的先後順序接收消息,即先發送的消息一定先會被客戶端接收到
注意事項
- 順序消息不支援非同步發送,否則將無法保證順序消費
- 順序消息暫不支援廣播模式
官方例子
改造生產者程式碼
創建ProductOrder.java
package com.ybchen.ybchenmq.entity; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * @ClassName:ProductOrder * @Description:訂單 * @Author:chenyb * @Date:2020/10/25 12:56 下午 * @Versiion:1.0 */ public class ProductOrder implements Serializable { /** * 訂單id */ private long orderIdl; /** * 訂單操作類型 */ private String type; public long getOrderIdl() { return orderIdl; } public void setOrderIdl(long orderIdl) { this.orderIdl = orderIdl; } public String getType() { return type; } public void setType(String type) { this.type = type; } public ProductOrder() { } public ProductOrder(long orderIdl, String type) { this.orderIdl = orderIdl; this.type = type; } @Override public String toString() { return "ProductOrder{" + "orderIdl=" + orderIdl + ", type='" + type + '\'' + '}'; } /** * 模擬批量創建實體類 * @return */ public static List<ProductOrder> getOrderList(){ List<ProductOrder> list=new ArrayList<>(); list.add(new ProductOrder(111L,"創建訂單")); list.add(new ProductOrder(222L,"創建訂單")); list.add(new ProductOrder(333L,"創建訂單")); list.add(new ProductOrder(111L,"支付訂單")); list.add(new ProductOrder(222L,"支付訂單")); list.add(new ProductOrder(111L,"完成訂單")); list.add(new ProductOrder(222L,"完成訂單")); list.add(new ProductOrder(333L,"支付訂單")); list.add(new ProductOrder(333L,"完成訂單")); return list; } }
控制層:PayController.java
@Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; private static final String TOPIC_ORDER = "ybchen_pay_order_topic"; @RequestMapping("pay_order") public Object payOrder() throws Exception{ //獲取訂單號 List<ProductOrder> list=ProductOrder.getOrderList(); for (int i = 0; i < list.size(); i++) { ProductOrder order=list.get(i); Message message=new Message(TOPIC_ORDER, "", order.getOrderIdl()+"", order.toString().getBytes()); //發送,同一個訂單id進入同一個隊列中 SendResult sendResult =payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) { Long id=(Long)arg; long index=id%mqs.size(); return mqs.get((int) index); } },order.getOrderIdl()); //列印輸出結果 System.out.printf("發送結果=%s,sendResult=%s,orderId=%s,type=%s\n", sendResult.getSendStatus(), sendResult.toString(), order.getOrderIdl(), order.getType()); } return "ok"; }
改造消費者
package com.ybchen.ybchenmqconsumer.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; /** * @ClassName:PayOrderConsumer * @Description:消費者-訂單 * @Author:chenyb * @Date:2020/10/18 4:13 下午 * @Versiion:1.0 */ @Component public class PayOrderConsumer { /** * 生產者所屬的組 */ private String producerGroup = "pay_order_consumer_group"; /** * MQ的地址,注意需開放埠號或者關閉防火牆 */ private String nameServerAddr = "192.168.199.100:9876;192.168.199.101:9876"; /** * 訂閱主題,訂單 */ private static final String TOPIC_ORDER = "ybchen_pay_order_topic"; private DefaultMQPushConsumer consumer; public PayOrderConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); //指定NameServer地址,多個地址以;隔開 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") consumer.setNamesrvAddr(nameServerAddr); //設置消費地點,從最後一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題,監聽主題下的那些標籤 consumer.subscribe(TOPIC_ORDER, "*"); //默認是集群方式,廣播方式不支援重試 consumer.setMessageModel(MessageModel.CLUSTERING); //註解一個監聽器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { MessageExt msg=list.get(0); System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), new String(msg.getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("consumer order start .........."); } }
測試順序消息
一個生產者一個消費者
可以看到消費的時候,有點慢,因為我本地安裝了2個虛擬機做一主一從,消費的順序是正確的,都是按照:創建訂單、支付訂單、完成訂單
2020-10-25 13:52:31.822 INFO 1473 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2020-10-25 13:52:31.822 INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2020-10-25 13:52:31.825 INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D46F0000, offsetMsgId=C0A8C76400002A9F000000000009C8B2, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=6],orderId=111,type=創建訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4930001, offsetMsgId=C0A8C76400002A9F000000000009C9A5, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=6],orderId=222,type=創建訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4A90002, offsetMsgId=C0A8C76400002A9F000000000009CA98, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=6],orderId=333,type=創建訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4C00003, offsetMsgId=C0A8C76400002A9F000000000009CB8B, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=7],orderId=111,type=支付訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4CC0004, offsetMsgId=C0A8C76400002A9F000000000009CC7E, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=7],orderId=222,type=支付訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D00005, offsetMsgId=C0A8C76400002A9F000000000009CD71, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=8],orderId=111,type=完成訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D30006, offsetMsgId=C0A8C76400002A9F000000000009CE64, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=8],orderId=222,type=完成訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4DE0007, offsetMsgId=C0A8C76400002A9F000000000009CF57, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=7],orderId=333,type=支付訂單 發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4F80008, offsetMsgId=C0A8C76400002A9F000000000009D04A, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=8],orderId=333,type=完成訂單
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='創建訂單'} ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='支付訂單'} ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='完成訂單'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='創建訂單'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='支付訂單'} ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='完成訂單'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='創建訂單'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='支付訂單'} ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='完成訂單'}
一個生產者3個消費者
消費者會平均分配queue的數量,消費者數量小於等於4!!!
本地在線模擬,一個生產者、3個消費者場景,看看消費的順序,內容較長,被分割3塊
消費者核心配置
setConsumeFromWhere
- CONSUME_FORM_FIRST_OFFSET:初次從消息隊列頭部開始消費,即歷史消息(還存儲在broker的)全部消費一遍,後續在啟動接著上次消費的進度開始消費
- CONSUME_FROM_LAST_OFFSET:默認策略,初次從該隊列尾開始消費,即跳過歷史小心,後續在啟動接著上次消費的進度開始消費
- CCONSUME_FROM_TIMESTAMP:從某個時間點開始消費,默認是半小時以前,後續在啟動接著上次消費的進度開始消費
setAllocateMessageQueueStrategy
- 負載均衡策略演算法,即消費者分配到queue的演算法,默認值AllocateMessageQueueAveragely即取模平均分配
setOffsetStore
- 消息消費進度存儲器,2個策略
- LocalFileOffsetStore(廣播模式默認使用)
- RemoteBrokerOffsetStore(集群模式默認使用)
setConsumeThreadMin
- 最小消費執行緒池數量
setConsumeThreadMax
- 最大消費執行緒池數量
setPullBatchSize
- 消費者去broker拉取消息時,一次次拉取多少條
setConsumeMessageBatchMaxSize
- 單次消費時一次性消費多少條消息
setMessageModel
- 消費者消費模式
- CLUSTERING:默認是集群模式
- BROADCASTING:廣播模式
Topic下隊列的奇偶數會影響Customer個數裡面的消費數量
- 如果是4個隊列(默認隊列為4),8個消息,4個節點則各會消費2條,如果不對等,則負載均衡會分配不均勻
- 如果consumer實例數量比message queue的總數量還多的話,多出來的consumer實例將無法分到queue,也就無法消費達到消息,也就無法起到分攤負載的作用,所以需要控制讓queue的總數量大於consumer的數量
集群模式(默認)
- Consumer實例平均分攤消費生產者發送的消息
- 例如:訂單消息,只能被消費一次
廣播模式
- 廣播模式下消費消息,投遞到Broker的消息會被每個Consumer進行消費,一條消息被多個Consumer消費,廣播消費中ConsumerGroup暫時無用
- 例如:QQ群,群主發一條消息,所有人都可以看到
消息存儲
ConsumeQueue
邏輯隊列,默認存儲位置:/root/store/consumequeue
CommitLog
真正存儲消息文件的,默認存儲位置:/root/store/commitlog
常見面試題
為什麼消息隊列?
優點
- 非同步:例如秒殺,可以使用,點我直達
- 解耦
- 削峰:秒殺情況下,一個個入隊,一個個出隊,有序進行
缺點
- 系統可用性越低:外部依賴越多,依賴越多,出問題風險越大
- 系統複雜性提高:需要考慮多種場景,比如消息重複消費、消息丟失
- 需要更多的機器和人力:消息隊列一般集群部署,需要運維和監控
如何避免重複消費?
RocketMQ不保證消息不重複,如果業務保證嚴格的不能重複消費,需要自己去業務端去重
資料庫表去重
指定某個欄位唯一值
setNX
利用Redis的特性分散式鎖,下面是我之前的程式碼,待改造
package com.cyb.redis.utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class jedisUtils { private static String ip = "192.168.31.200"; private static int port = 6379; private static JedisPool pool; static { pool = new JedisPool(ip, port); } public static Jedis getJedis() { return pool.getResource(); } public static boolean getLock(String lockKey, String requestId, int timeout) { //獲取jedis對象,負責和遠程redis伺服器進行連接 Jedis je=getJedis(); //參數3:NX和XX //參數4:EX和PX String result = je.set(lockKey, requestId, "NX", "EX", timeout); if (result=="ok") { return true; } return false; } public static synchronized boolean getLock2(String lockKey, String requestId, int timeout) { //獲取jedis對象,負責和遠程redis伺服器進行連接 Jedis je=getJedis(); //參數3:NX和XX //參數4:EX和PX Long result = je.setnx(lockKey, requestId); if (result==1) { je.expire(lockKey, timeout); //設置有效期 return true; } return false; } }
Redis原子遞增
利用Redis的incr特性,如果大於0說明消費過了(需要設置過期時間)
如何保證消息的可靠性傳輸?
producer端
- 不採用oneway發送,使用同步或者一部方式發送,做好重試,但是重試的Message key必須唯一
- 投遞的日誌需要保存,關鍵欄位、投遞時間、投遞狀態、重試次數、請求體、響應體等
broker端
- 雙主雙從架構,NameServer需要多節點
- 同步雙寫,非同步刷盤
consumer端
- 消息消費保存日誌文件中
大量堆積到broker裡面,如何處理?
- 臨時topic隊列擴容,提高消費者能力
- 編寫臨時處理分發程式,從舊topic快速讀取到臨時新topic中,新topic的queue數量擴容多倍,然後再啟動更多consumer進行臨時新的topic消費
RocketMQ高性能的原因?
MQ架構配置
- 順序寫
- 隨機讀
- 零拷貝
發送端高可用
- 雙主雙從架構:創建Topic的時候,MessageQueue創建在多個Broker上,即相同的Broker名稱,不同brokerid;當一個Master不可用時,組內其他的Master仍然可用
消費高可用
- 主從架構:Broker角色,Master提供讀寫,Slave只支援讀
- Consumer不用配置,當Master不可用或者繁忙的時候,Consumer會自動切換到Slave節點進行讀取
提升消息的消費能力
- 增加多個消費者
- 修改消費者的執行緒池最小/大數量
項目源碼
案例源碼
鏈接: https://pan.baidu.com/s/1Q8iL0lH-bdFEycYGq61hQg 密碼: rww2
Linux下RocketMQ安裝包
鏈接: //pan.baidu.com/s/1dkE7sAs9E4TjwDQ38Pv4_A 密碼: mkjm
尾聲
過幾天搭建RocketMQ雙主雙從集群,今天先到這兒~