RocketMQ 常用消息類型
文章首發於公眾號《程式設計師果果》
地址 : //mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg
消息發送示例
導入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 172.16.250.129:9876
producer:
group: myGroup
普通消息
同步發送
原理:
同步發送是指消息發送方發出一條消息後,會在收到服務端返迴響應之後才發下一條消息的通訊方式。
應用場景:
這種可靠性同步地發送方式應用場景非常廣泛,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。
示例程式碼:
public void sendMsg() throws Exception {
Message message = new Message(
// 普通消息所屬的Topic
"Topic-Normal",
// Message Tag可理解為Gmail中的標籤,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列 RocketMQ 的伺服器過濾。
"TagA",
// Message Body可以是任何二進位形式的數據。
"Hello MQ".getBytes()
);
rocketMQTemplate.getProducer().send( message );
// 等同於上面的方式(常用)
//rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}
非同步發送
原理:
非同步發送是指發送方發出一條消息後,不等服務端返迴響應,接著發送下一條消息的通訊方式。RocketMQ非同步發送,需要實現非同步發送回調介面(SendCallback)。消息發送方在發送了一條消息後,不需要等待服務端響應即可發送第二條消息。發送方通過回調介面接收服務端響應,並處理響應結果。
應用場景:
非同步發送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如,您影片上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
示例程式碼:
public void sendAsyncMsg() {
Map<String , Object> map = new HashMap<>();
map.put( "name" , "zs" );
map.put( "age" , 20);
rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息發送成功。
log.info( "async send success" );
}
@Override
public void onException(Throwable throwable) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
log.info( "async send fail" );
}
} );
}
順序消息
全局順序消息
- 概念:對於指定的一個Topic,所有消息按照嚴格的先入先出(FIFO)的順序來發布和消費。
- 適用場景:適用於性能要求不高,所有的消息嚴格按照FIFO原則來發布和消費的場景。
- 示例:在證券處理中,以人民幣兌換美元為Topic,在價格相同的情況下,先出價者優先處理,則可以按照FIFO的方式發布和消費全局順序消息。
分區順序消息
- 概念:對於指定的一個Topic,所有消息根據Sharding Key進行區塊分區。同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序消息中用來區分不同分區的關鍵欄位,和普通消息的Key是完全不同的概念。
- 適用場景:適用於性能要求高,以Sharding Key作為分區欄位,在同一個區塊中嚴格地按照FIFO原則進行消息發布和消費的場景。
- 示例:
- 用戶註冊需要發送發驗證碼,以用戶ID作為Sharding Key,那麼同一個用戶發送的消息都會按照發布的先後順序來消費。
- 電商的訂單創建,以訂單ID作為Sharding Key,那麼同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發布的先後順序來消費。
無序消息、全局順序消息、分區順序消息的對比
示例程式碼
public void sendOrderlyMsg() {
//根據指定的hashKey按順序發送
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
// 分區順序消息中區分不同分區的關鍵欄位,Sharding Key與普通消息的key是完全不同的概念。
// 全局順序消息,該欄位可以設置為任意非空字元串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
// 發送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
}
}
catch (Exception e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
System.out.println(new Date() + " Send mq message failed");
e.printStackTrace();
}
}
}
延時消息
概念:
Producer將消息發送到消息隊列RocketMQ服務端,但並不期望立馬投遞這條消息,而是延遲一定時間後才投遞到Consumer進行消費,該消息即延時消息。
適用場景:
消息生產和消費有時間窗口要求,例如在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時消息。這條消息將會在30分鐘以後投遞給消費者,消費者收到此消息後需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略。
示例程式碼:
public void sendDelayMsg() {
rocketMQTemplate.syncSend( "Topic-Delay",
MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
3000,
//設置延時等級3,這個消息將在10s之後發送(現在只支援固定的幾個時間,詳看delayTimeLevel)
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3 );
}
事務消息
概念:
- 事務消息:消息隊列RocketMQ提供類似X/Open XA的分散式事務功能,通過消息隊列RocketMQ事務消息能達到分散式事務的最終一致。
- 半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列RocketMQ服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成「暫不能投遞」狀態,處於該種狀態下的消息即半事務消息。
- 消息回查:由於網路閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列RocketMQ服務端通過掃描發現某條消息長期處於「半事務消息」時,需要主動向消息生產者詢問該消息的最終狀態(Commit或是Rollback),該詢問過程即消息回查。
分散式事務消息的優勢:
消息隊列RocketMQ分散式事務消息不僅可以實現應用之間的解耦,又能保證數據的最終一致性。同時,傳統的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關聯應用的不可用導致整體回滾,從而最大限度保證核心系統的可用性。在極端情況下,如果關聯的某一個應用始終無法處理成功,也只需對當前應用進行補償或數據訂正處理,而無需對整體業務進行回滾。
典型場景:
在電商購物車下單時,涉及到購物車系統和交易系統,這兩個系統之間的數據最終一致性可以通過分散式事務消息的非同步處理實現。在這種場景下,交易系統是最為核心的系統,需要最大限度地保證下單成功。而購物車系統只需要訂閱消息隊列RocketMQ的交易訂單消息,做相應的業務處理,即可保證最終的數據一致性。
事務消息交互流程如下圖所示:
事務消息發送步驟如下:
- 發送方將半事務消息發送至消息隊列RocketMQ服務端。
- 消息隊列RocketMQ服務端將消息持久化成功之後,向發送方返回Ack確認消息已經發送成功,此時消息為半事務消息。
- 發送方開始執行本地事務邏輯。
- 發送方根據本地事務執行結果向服務端提交二次確認(Commit或是Rollback),服務端收到Commit狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到Rollback狀態則刪除半事務消息,訂閱方將不會接受該消息。
事務消息回查步驟如下:
- 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達服務端,經過固定時間後服務端將對該消息發起消息回查。
- 發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。
- 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行操作。
示例程式碼:
發送事務消息包含以下兩個步驟:
-
- 發送半事務消息(Half Message,示例程式碼如下
/**
* 事務消息
*/
public void sendTransactionMsg() {
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"Topic-Tx:TagA",
MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
null );
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );
}
-
- 發送方開始執行本地事務邏輯
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執行本地事務
System.out.println("TX message listener execute local transaction");
RocketMQLocalTransactionState result;
try {
// 業務程式碼( 例如下訂單 )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("execute local transaction error");
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 檢查本地事務( 例如檢查下訂單是否成功 )
System.out.println("TX message listener check local transaction");
RocketMQLocalTransactionState result;
try {
//業務程式碼( 根據檢查結果,決定是COMMIT或ROLLBACK )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 異常就回滾
System.out.println("check local transaction error");
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
-
- 發送方在本地事務執行後,若向服務端提交二次確認是Commit,RocketMQ服務端收到Commit狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;訂閱方程式碼如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:{}" , message);
}
}
源碼
//github.com/gf-huanchupk/SpringBootLearning