MQ系列5:RocketMQ消息的發送模式

MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析

在之前的篇章中,我們學習了RocketMQ的原理,以及RocketMQ中 命名服務 ServiceName 的運行流程,本篇從消息的生產、消費來理解一條消息的生命周期。

1 消息生產

在RocketMQ中,消息生產指的是 消息生產者往消息隊列中寫入數據的過程。因為業務場景的複雜性,RocketMQ架構設計了多種不同的發送策略。下面先討論幾種常見的場景:
-** 同步發送:** 整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務執行緒。

  • 非同步發送: 調用RocketMQ 的 Async API,消息生產者只要把消息發送任務放進執行緒池就返回給業務執行緒。所有的邏輯處理、IO操作、網路請求 都由執行緒池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
  • OneWay(單向)發送: 只負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀態、結果負責。
  • 延遲發送: 指定延遲的時間,在延遲時間到達之後再進行消息的發送。
  • 批量發送: 對於同類型、同特徵的消息,可以聚合進行批量發送,減少MQ的連接發送次數,能夠顯著提升性能。
    以下是生產者實例化啟動的過程:
    image

1.1 消息發送步驟

一般情況下,我們發送消息,會使用默認的DefaultMQProducer類,經過以下幾個步驟實現:

  • 創建消息生產者Producer,並設置Producer的GroupName(生產組)。
  • 設置InstanceName(實例名稱),當你的業務需要啟用多個Producer的時候,使用不同的InstanceName來區分。
  • 設置NameServer地址,這樣Producer才能從NameServer中得到路由資訊
  • 完成其他的初始化配置,比如配置異常重試次數(降低消息丟失的可能性),通訊模組初始化等。
  • 組裝消息對象,指定主題Topic、Tag和消息體Message 等資訊。
  • 通過NameServer獲取到的Broker路由地址,將消息發送。

1.2 消息發生返回狀態

消息發送之後,會相應的拿到回執。返回對象中的狀態(SendResult.SendStatus)有4種,如下:

  • FLUSH_DISK_TIMEOUT 刷盤超時
    如果將Broker的刷盤策略設置成SYNC_FLUSH,那麼沒有在規定的時間完成刷盤則會報該錯誤。
  • FLUSH_SLAVE_TIMEOUT 主從同步超時
    主從模式下(也可以叫主備),Broker配置為SYNC_MASTER模式,如果沒有在設定時間內完成主從同步,則會報該錯誤。
  • SLAVE_NOT_AVAILABLE 未找到Slave Broker
    主從模式下,且Broker配置為SYNC_MASTER,如果未找到Slave的Broker,則會報該錯誤。
  • SEND_OK
    表示發送成功。

1.3 發送同步消息

實時同步消息是一種對可靠性、實時性要求比較高的場景,使用的也比較廣泛,比如:

  • 重要的消息通知,比如驗證碼,不能超過太長時間推送,那樣可能失效
  • 消費記錄確認
  • 數據實時處理和推送 等等
public class SyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、創建生產者producer,並指定生產者組名為 testSyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("testTopic","sync", "測試同步消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 6、通過sendResult返回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 7、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

image

1.4 發送非同步消息

我們知道,非同步主要用於那些對實時響應不敏感的業務,可以容忍一定時間的等待,只要能達到最終一致性即可。
有時候為了在流量高峰期進行削峰和分流,緩解壓力,我們經常採用非同步消息的發送模式。這種業務場景也很常見,比如:

  • 消費資訊的推送,可能在你買單之後的幾分鐘才送達
  • 數據統計、文件打包下載等需要長耗時的任務
public class AsyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、創建生產者producer,並指定生產者組名為 testAsyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("testTopic","async", "測試非同步消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 6. 發送非同步消息,SendCallback是處理非同步回調的方法
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {  // 成功回調
                System.out.println("success: " + sendResult);
            }
            @Override
            public void onException(Throwable throwable) {  // 失敗回調
                System.out.println("fail: " + throwable);
            }
        });
        // 7、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

image

1.5 單向發送消息

OneWay的模式主要用在Care發送結果的場景,只要消息發送出去即完成任務,不需要對發送的狀態、結果負責。常見的使用場景如

  • 普通日誌記錄
  • 非核心的埋點上報等
public class OneWayProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、創建生產者producer,並指定生產者組名為 testOneWayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("testTopic","oneway", "測試單向發送消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        producer.sendOneway(msg);
        // 6、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

image

1.6 發送延時消息

指定延遲的時間,在延遲時間到達之後再進行消息的發送。這種的使用場景也很多:

  • 比如火車票訂購,提交了一個訂單就把車票給佔位了,這時候可以發送一個延時確認的消息,15m 未付款,就要把該車票釋放,讓其他人去購買。
  • 還比如購買了電影票,可以發送一個核銷資訊,在電影開場前15分鐘就無法退票了。

1.6.1 延時時間的使用限制

延時時間並不是隨意指定的,Rocket源碼中指定了18種等級,分別代表不同的時間時長,如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMq不支援任意時間延時,需設置固定的延時等級,從1s到2h分別對應著等級1到18
  • 可以使用setDelayTimeLevel(int level) 方法設置延時等級,level 從 0 開始

1.6.2 發送延時消息具體實現

通過下面的程式碼,可以得到的結果是消費的時間點比資訊記錄的時間點延遲了1分鐘,這是因為我們在send的時候做了delay。

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、創建生產者producer,並指定生產者組名為 testDelayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("testTopic","delay", "測試延遲發送消息".getBytes("UTF-8"));
        // 5、設置延時等級4,對應1m,所以這個消息在一分鐘之後發送
        msg.setDelayTimeLevel(4);
        // 6、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 7、通過sendResult返回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 8、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

image

1.7 發送批量消息

  • 對於同類型、同特徵的消息,可以聚合進行批量發送,減少MQ的連接發送次數,能夠顯著提升性能。
  • 批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。

waitStoreMsgOK: 消息發送時是否等消息存儲完成後再返回。

  • 一批次的消息總大小不應超過4MB。
public class BatchProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        // 1、創建生產者producer,並指定生產者組名為 testBatchGroup
        DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、創建消息列表,並指定Topic,Tag和消息體
        List<Message> messages = new ArrayList<>();
        String topic = "testTopic";
        messages.add(new Message(topic, "batch", "測試批量發送消息 0".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "測試批量發送消息 1".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "測試批量發送消息 2".getBytes("UTF-8")));

        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(messages);
        // 6、通過sendResult返回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 7、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

1.8 如何提升消息生產的性能

消息的發送一般是經過 client發送、Broker伺服器接收並處理、Broker伺服器返回應答 三個步驟。
如果我們想要提高消息生產的效率,一般有如下方法:

  • Oneway方式發送
    Oneway方式發送用在一些性能要求高,可靠性要求低的場景下,比如日誌採集,非核心的埋點上報等。Oneway方式發送請求無需應答,即將數據寫入客戶端的Socket緩衝區就返回,不等待結果的返回。
    所以這種模式是極快的,可以把發送消息時長縮短至微秒級。
  • 增加Producer的並發量,使用多個Producer同時發送
    RocketMQ引入了一個並發窗口,在窗口內消息可以並發地寫入DirectMem中,然後非同步地將連續數據寫入文件系統。
    順序執行CommitLog讓RocketMQ可以保持較高的寫入性能。
  • 恰當的批量發送
    對於同類型、同特徵的消息,可以聚合進行批量發送,減少MQ的連接發送次數,能夠顯著提升性能。批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。
    對於消息體的大小也要注意不能超過4MB。

根據阿里內部調優後的性能測試報告,消息的寫入性能達到90萬+的TPS,我們可以朝著這個指標進行優化。

2 總結

本篇介紹了RocketMQ 消息生產與發送的幾種模式:

  • 同步發送:整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務執行緒。
  • 非同步發送:調用RocketMQ 的 Async API,消息生產者只要把消息發送任務放進執行緒池就返回給業務執行緒。所有的邏輯處理、IO操作、網路請求 都由執行緒池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
  • OneWay(單向)發送:只負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀態、結果負責。
  • 延遲發送:指定延遲的時間,在延遲時間到達之後再進行消息的發送。
  • 批量發送:對於同類型、同特徵的消息,可以聚合進行批量發送,減少MQ的連接發送次數,能夠顯著提升性能。
    可以根據實際的業務場景選擇適當的發送模式。