RocketMQ入門到入土(二)事務消息&順序消息
接上一篇:RocketMQ入門到入土(一)新手也能看懂的原理和實戰!
一、事務消息的由來
1、案例
引用官方的購物案例:
小明購買一個100元的東西,賬戶扣款100元的同時需要保證在下游的積分系統給小明這個帳號增加100積分。帳號系統和積分系統是兩個獨立是系統,一個要減少100元,一個要增加100積分。如下圖:
2、問題
-
帳號服務扣款成功了,通知積分系統也成功了,但是積分增加的時候失敗了,數據不一致了。
-
帳號服務扣款成功了,但是通知積分系統失敗了,所以積分不會增加,數據不一致了。
3、方案
RocketMQ針對第一個問題解決方案是:如果消費失敗了,是會自動重試的,如果重試幾次後還是消費失敗,那麼這種情況就需要人工解決了,比如放到死信隊列里然後手動查原因進行處理等。
RocketMQ針對第二個問題解決方案是:如果你扣款成功了,但是往mq寫消息的時候失敗了,那麼RocketMQ會進行回滾消息的操作,這時候我們也能回滾我們扣款的操作。
二、事務消息的原理
1、原理圖解
2、詳細過程
1.Producer發送半消息(Half Message)到broker。
我真想吐槽一句為啥叫半消息,難以理解,其實這就是prepare message,預發送消息。
-
Half Message發送成功後開始執行本地事務。
-
如果本地事務執行成功的話則返回commit,如果執行失敗則返回rollback。(這個是在事務消息的回調方法里由開發者自己決定commit or rollback)
Producer發送上一步的commit還是rollback到broker,這裡有兩種情況:
1.如果broker收到了commit/rollback消息 :
-
如果收到了commit,則broker認為整個事務是沒問題的,執行成功的。那麼會下發消息給Consumer端消費。
-
如果收到了rollback,則broker認為本地事務執行失敗了,broker將會刪除Half Message,不下發給Consumer端。
2.如果broker未收到消息(如果執行本地事務突然宕機了,相當本地事務執行結果返回unknow,則和broker未收到確認消息的情況一樣處理。):
-
broker會定時回查本地事務的執行結果:如果回查結果是本地事務已經執行則返回commit,若未執行,則返回rollback。
-
Producer端回查的結果發送給Broker。Broker接收到的如果是commit,則broker視為整個事務執行成功,如果是rollback,則broker視為本地事務執行失敗,broker刪除Half Message,不下發給consumer。如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重複回查,以確保查到最終的事務結果。重複回查的時間間隔和次數都可配。
三、事務消息實現流程
1、實現流程
簡單來看就是:事務消息是個監聽器,有回調函數,回調函數里我們進行業務邏輯的操作,比如給賬戶-100元,然後發消息到積分的mq里,這時候如果賬戶-100成功了,且發送到mq成功了,則設置消息狀態為commit,這時候broker會將這個半消息發送到真正的topic中。一開始發送他是存到半消息隊列里的,並沒存在真實topic的隊列里。只有確認commit後才會轉移。
2、補救方案
如果事務因為中斷,或是其他的網路原因,導致無法立即響應的,RocketMQ當做UNKNOW處理,RocketMQ事務消息還提供了一個補救方案:定時查詢事務消息的事務狀態。這也是一個回調函數,這裡面可以做補償,補償邏輯開發者自己寫,成功的話自己返回commit就完事了。
四、事務消息程式碼實例
1、程式碼
package com.chentongwei.mq.rocketmq; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.Date; /** * Description: * * @author TongWei.Chen 2020-06-21 11:32:58 */ public class ProducerTransaction2 { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); // 回調 producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { LocalTransactionState state = null; //msg-4返回COMMIT_MESSAGE if(message.getKeys().equals("msg-1")){ state = LocalTransactionState.COMMIT_MESSAGE; } //msg-5返回ROLLBACK_MESSAGE else if(message.getKeys().equals("msg-2")){ state = LocalTransactionState.ROLLBACK_MESSAGE; }else{ //這裡返回unknown的目的是模擬執行本地事務突然宕機的情況(或者本地執行成功發送確認消息失敗的場景) state = LocalTransactionState.UNKNOW; } System.out.println(message.getKeys() + ",state:" + state); return state; } /** * 事務消息的回查方法 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { if (null != messageExt.getKeys()) { switch (messageExt.getKeys()) { case "msg-3": System.out.println("msg-3 unknow"); return LocalTransactionState.UNKNOW; case "msg-4": System.out.println("msg-4 COMMIT_MESSAGE"); return LocalTransactionState.COMMIT_MESSAGE; case "msg-5": //查詢到本地事務執行失敗,需要回滾消息。 System.out.println("msg-5 ROLLBACK_MESSAGE"); return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); //模擬發送5條消息 for (int i = 1; i < 6; i++) { try { Message msg = new Message("transactionTopic", null, "msg-" + i, ("測試,這是事務消息! " + i).getBytes()); producer.sendMessageInTransaction(msg, null); } catch (Exception e) { e.printStackTrace(); } } } }
2、結果
msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW
msg-3 unknow
msg-3 unknow
msg-5 ROLLBACK_MESSAGE
msg-4 COMMIT_MESSAGE
msg-3 unknow
msg-3 unknow
msg-3 unknow
msg-3 unknow
3、管控台
4、結果分析
-
只有msg-1和msg-4發送成功了。msg-4在msg-1前面是因為msg-1先成功的,msg-4是回查才成功的。按時間倒序來的。
-
先來輸出五個結果,對應五條消息
msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW
-
然後進入了回查,msg-3還是unknow,msg-5回滾了,msg-4提交了事務。所以這時候msg-4在管控台里能看到了。
-
過了一段時間再次回查msg-3,發現還是unknow,所以一直回查。
回查的時間間隔和次數都是可配的,默認是回查15次還失敗的話就會把這個消息丟掉了。
五、疑問
疑問:Spring事務、常規的分散式事務不行嗎?Rocketmq的事務是否多此一舉了呢?
MQ用於解耦,之前是分散式事務直接操作了帳號系統和積分系統。但是他兩就是強耦合的存在,如果中間插了個mq,帳號系統操作完發消息到mq,這時候只要保證發送成功就提交,發送失敗則回滾,這步怎麼保證,就是靠事務了。而且用RocketMQ做分散式事務的也蠻多的。
六、順序消息解釋
1、概述
RocketMQ的消息是存儲到Topic的queue裡面的,queue本身是FIFO(First Int First Out)先進先出隊列。所以單個queue是可以保證有序性的。
但問題是1個topic有N個queue,作者這麼設計的好處也很明顯,天然支援集群和負載均衡的特性,將海量數據均勻分配到各個queue上,你發了10條消息到同一個topic上,這10條消息會自動分散在topic下的所有queue中,所以消費的時候不一定是先消費哪個queue,後消費哪個queue,這就導致了無序消費。
2、圖解
3、再次分析
一個Producer發送了m1、m2、m3、m4四條消息到topic上,topic有四個隊列,由於自帶的負載均衡策略,四個隊列上分別存儲了一條消息。queue1上存儲的m1,queue2上存儲的m2,queue3上存儲的m3,queue4上存儲的m4,Consumer消費的時候是多執行緒消費,所以他無法保證先消費哪個隊列或者哪個消息,比如發送的時候順序是m1,m2,m3,m4,但是消費的時候由於Consumer內部是多執行緒消費的,所以可能先消費了queue4隊列上的m4,然後才是m1,這就導致了無序。
七、順序消息解決方案
1、方案一
很簡單,問題產生的關鍵在於多個隊列都有消息,我消費的時候又不知道哪個隊列的消息是最新的。那麼思路就有了,發消息的時候你要想保證有序性的話,就都給我發到一個queue上,然後消費的時候因為只有那一個queue上有消息且queue是FIFO,先進先出,所以正常消費就完了。
很完美。而且RocketMQ也給我們提供了這種發消息的時候選擇queue的api(MessageQueueSelector)。直接上程式碼。
2、程式碼一
2.1、生產者
import java.util.List; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; /** * 消息發送者 */ public class Producer5 { public static void main(String[] args)throws Exception { DefaultMQProducer producer = new DefaultMQProducer("my-order-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); for (int i = 0; i < 5; i++) { Message message = new Message("orderTopic", ("hello!" + i).getBytes()); producer.send( // 要發的那條消息 message, // queue 選擇器 ,向 topic中的哪個queue去寫消息 new MessageQueueSelector() { // 手動 選擇一個queue @Override public MessageQueue select( // 當前topic 裡面包含的所有queue List<MessageQueue> mqs, // 具體要發的那條消息 Message msg, // 對應到 send() 里的 args,也就是2000前面的那個0 // 實際業務中可以把0換成實際業務系統的主鍵,比如訂單號啥的,然後這裡做hash進行選擇queue等。能做的事情很多,我這裡做演示就用第一個queue,所以不用arg。 Object arg) { // 向固定的一個queue里寫消息,比如這裡就是向第一個queue里寫消息 MessageQueue queue = mqs.get(0); // 選好的queue return queue; } }, // 自定義參數:0 // 2000代表2000毫秒超時時間 0, 2000); } } }
2.2、消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * Description: * * @author TongWei.Chen 2020-06-22 11:17:47 */ public class ConsumerOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer"); consumer.setNamesrvAddr("124.57.180.156:9876"); consumer.subscribe("orderTopic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody()) + " Thread:" + Thread.currentThread().getName() + " queueid:" + msg.getQueueId()); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer start..."); } }
2.3、輸出結果
Consumer start...
hello!0 Thread:ConsumeMessageThread_1 queueid:0
hello!1 Thread:ConsumeMessageThread_1 queueid:0
hello!2 Thread:ConsumeMessageThread_1 queueid:0
hello!3 Thread:ConsumeMessageThread_1 queueid:0
hello!4 Thread:ConsumeMessageThread_1 queueid:0
很完美,有序輸出!
3、情況二
比如你新需求:把未支付的訂單都放到queue1里,已支付的訂單都放到queue2里,支付異常的訂單都放到queue3里,然後你消費的時候要保證每個queue是有序的,不能消費queue1一條直接跑到queue2去了,要逐個queue去消費。
這時候思路是發消息的時候利用自定義參數arg,消息體里肯定包含支付狀態,判斷是未支付的則選擇queue1,以此類推。這樣就保證了每個queue里只包含同等狀態的消息。那麼消費者目前是多執行緒消費的,肯定亂序。三個queue隨機消費。解決方案更簡單,直接將消費端的執行緒數改為1個,這樣隊列是FIFO,他就逐個消費了。RocketMQ也為我們提供了這樣的api,如下兩句:
// 最大執行緒數1 consumer.setConsumeThreadMax(1); // 最小執行緒數 consumer.setConsumeThreadMin(1);