用RocketMQ這麼久,才知道消息可以這樣玩

在上一章節中,我們講解了RocketMQ的基本介紹,作為MQ最重要的就是消息的使用了,今天我們就來帶大家如何玩轉MQ的消息。

消息中間件,英文Message Queue,簡稱MQ。它沒有標準定義,一般認為:消息中間件屬於分散式系統中一個子系統,關注於數據的發送和接收,利用高效可靠的非同步消息傳遞機制對分散式系統中的其餘各個子系統進行集成。

高效: 對於消息的處理處理速度快,RocketMQ可以達到單機10萬+的並發。

可靠: 一般消息中間件都會有消息持久化機制和其他的機制確保消息不丟失。

非同步: 指發送完一個請求,不需要等待返回,隨時可以再發送下一個請求,既不需要等待。

消息中間件不生產消息,只是消息的搬運工。

首先Message包含的內容主要有幾個方面組成:id(MQ自動生成)、Topic、tag、proerties、內容。

消息的發送分為:

  • 普通消息
  • 順序消息
  • 延時消息
  • 批量消息
  • 分散式消息

普通消息

普通消息的發送方式主要有三種:發送同步消息、發送非同步消息、單向發送

我們可以先使用 RocketMQ 提供的原生客戶端的API,在 SpringBoot、SpringCloudStream 也進行了集成,但本質上這些也是基於原生API的封裝,所以我們只需要掌握原生API的時候,其他的也就無師自通了。

想要使用 RocketMQ中的API,就需要先導入對應的客戶端依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>

消息發送者的步驟分為:

  1. 創建消息生產者 producer,執行生產者組名
  2. 指定Nameserver地址
  3. 啟動producer
  4. 創建消息對象,指定Topic、Tag和消息體
  5. 發送消息
  6. 關閉生產者producer

消息消費者的步驟分為:

  1. 創建消費者 Consumer,指定消費者組名
  2. 指定Nameserver地址
  3. 訂閱主題Topic和Tag
  4. 設置回調函數,處理消息
  5. 啟動消費者consumer

發送同步消息

發送同步消息是說消息發送方發出數據後,同步等待,一直等收到接收方發迴響應之後才發下一個請求。這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,簡訊通知。

流程如下所示:

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * 同步發送
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 設置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //producer.setSendLatencyFaultEnable(true);

        // 啟動Producer實例
        producer.start();


        for (int i = 0; i < 10; i++) {
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送消息到一個Broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

響應結果如下所示:

  • msgId: 消息的全局唯一標識(RocketMQ的ID生成是使用機器IP和消息偏移量的組成),由消息隊列 MQ 系統自動生成,唯一標識某條消息。

  • sendStatus: 發送的標識:成功,失敗等

  • queueId: queueId是Topic的分區;Producer發送具體一條消息的時,對應選擇的該Topic下的某一個Queue的標識ID。

  • queueOffset: Message queue是無限長的數組。一條消息進來下標就會漲1,而這個數組的下標就是queueOffset,queueOffset是從0開始遞增。

在上面代表的是四個queue,而maxOffset代表我們發送消息的數量,之前發送過消息,所以大家現在看到的數量是17、18…這種,當你在運行一次發送消息時,就會看到十條消息會分布在不同機器上

發送非同步消息

非同步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。消息發送方在發送了一條消息後,不等接收方發迴響應,接著進行第二條消息發送。發送方通過回調介面的方式接收伺服器響應,並對響應結果進行處理。

流程如下:

package com.muxiaonong.normal;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 非同步發送--生產者
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception{
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 設置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啟動Producer實例
        producer.start();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收非同步返回結果的回調
            producer.send(msg, new SendCallback() {
                //發送成功
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%s%n", sendResult);
                }
                //發送異常
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(10000);
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

發送成功報文:

我們在dashbord下看到已經成功拿到消息了

單向發送

這種方式不需要我們特別關心發送結果的場景,比如日誌發送、單向發送特點是發送方只需要負責發送消息,不需要等待伺服器回應且沒有回調函數觸發,發送請求不需要等待應答,只管發,這种放松方式過程耗時很短,一般在微妙級別。

流程如下:

package com.muxiaonong.normal;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 單向發送
 */
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 實例化消息生產者Producer對象
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 設置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啟動Producer實例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送單向消息,沒有任何返回結果
            producer.sendOneway(msg);

        }
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

返回報文:


這種發送方式,我們客戶端不會感受到發送結果,發送完成之後,我們並不知道到底有沒有發送成功,我們只能在 top status 中去查看

普通消息發送對比:

發送方式 發送TPS 可靠性 結果回饋 使用場景
同步消息發送 不丟失 重要通知(郵件、簡訊通知、)等
非同步消息發送 不丟失 用戶文件上傳自動解析服務,完成後通知其結果
單向發送 超快 可能丟失 適用於 耗時非常短,但是對於可靠性要求不高的場景,比如日誌收集

消息的消費方式

普通消息的消費方式主要有三種:集群消費、廣播消費

一、集群消費模式

集群消費方式下,一個分組(Group) 下的多個消費者共同消費隊列消息,每一個消費者出來處理的消息不一樣,一個Consumer Group 中的各個Consumer 實例分攤去消費消息,一條消息只會投遞到一個Consumer Group 下的一個實例,如果一個Topic有三個隊列,其中一個 Consumer Group 有三個實例,那麼每個實例只會消費其中一個隊列,集群消費模式是消費者默認的消費方式。

實例程式碼:

package com.muxiaonong.normal.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


/**
 * 集群消費模式
 */
public class BalanceConsumer {
    public static void main(String[] args) throws Exception {
        // 實例化消費者,指定組名:  TopicTest  10條消息 group_consumer  ,  lijin 8(2)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        // 指定Namesrv地址資訊.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 訂閱Topic
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //consumer.setConsumeFromWhere();

        //集群模式消費
        consumer.setMessageModel(MessageModel.CLUSTERING);

        //取消
        consumer.unsubscribe("TopicTest");
        //再次訂閱Topic即可
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC

        // 註冊回調函數,處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        Thread.sleep(1000);
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //啟動消息者
        consumer.start();
        //註銷Consumer
        //consumer.shutdown();
        System.out.printf("Consumer Started.%n");
    }
}

我們啟動兩個實例對象,分別為BalanceConsumer2和BalanceConsumer,我們再去生產者生產十條消息後,我們再去看consumer,分別均攤了這十條消息

二、廣播消費模式

廣播消費模式中消息將對一個Consumer Group下的各個Consumer實例都投遞一遍。即使這些 Consumer屬於同一個Consumer Group,消息也會被Consumer Group 中的每個Consumer都消費一次。因為一個消費組下的每個消費者實例都獲取到了topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例。每一個消費者下面的消費實例,都會去拿到我們Topic下的每一條消息,但是這種消費進度的保存,不會放在broker裡面,而是持久化到我們的本地實例

流程圖如下:

具體程式碼

package com.muxiaonong.normal.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


/**
 * 廣播消費模式
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        // 實例化消費者,指定組名:  TopicTest  10條消息 group_consumer  ,  lijin 8(2)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        // 指定Namesrv地址資訊.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 訂閱Topic
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //consumer.setConsumeFromWhere();

        //廣播模式消費
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //取消
        consumer.unsubscribe("TopicTest");
        //再次訂閱Topic即可
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC

        // 註冊回調函數,處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        Thread.sleep(1000);
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //啟動消息者
        consumer.start();
        //註銷Consumer
        //consumer.shutdown();
        System.out.printf("Consumer Started.%n");
    }
}

我們先啟動 BroadcastConsumer和BroadcastConsumer2,生產十條消息以後,我們會看到不管是哪個消費者,都會接收到十條消息,這個就是廣播消費模式

消息消費的權衡

負載均衡模式: 消費端集群化部署,每條消息只需要被處理一次,由於消費進度在服務端維護,可靠性更高。

集群消費模式下,不能保證每一次失敗重投的消息路由到同一台機器上,因此處理消息時不應該做任何確定性假設。每一條消息都只會被分發到一台機器上處理,如果需要被集群下的每一台機器都處理,只能使用廣播模式。

廣播模式: 每條消息都需要被相同邏輯的多台機器處理,消費進度在客戶端維護,出現重複的概率稍大於集群模式。

廣播模式下,消息隊列 RocketMQ 保證每條消息至少被每台客戶端消費一次,但是並不會對消費失敗的消息進行失敗重投,因此需要關注消費失敗的情況,客戶端每一次重啟都會從最新消息消費。客戶端在被停止期間發送至服務端的消息會被自動跳過,這一點是需要注意的地方

每條消息都會被大量的客戶端重複處理,因此推薦儘可能使用集群模式。目前僅 Java 客戶端支援廣播模式,不支援順序消息且服務端不維護消費進度,所以消息隊列 RocketMQ 控制台不支援消息堆積查詢、消息堆積報警和訂閱關係查詢功能。

順序消息

順序消息指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ 可以嚴格的保證消息有序,可以分為 分區有序 或者 全局有序。

生產消息時在默認的情況下消息發送會採取 Round Robin 輪詢方式把消息發送到不同的 queue ( 分區隊列);而消費消息的時候從多個 queue 上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個 queue 中,消費的時候只從這個 queue 上依次拉取,則就保證了順序。當發送和消費參與的 queue 只有一個,則是全局有序;如果多個 queue 參與,則為分區有序,即相對每個 queue ,消息都是有序的。

全局有序

全局有序主要控制在於創建Topic指定只有一個隊列,同步確保生產者與消費者都只有一個實例進行即可

分區有序

在電商業務場景中,訂單的流程是:創建、付款、推送、完成。 在加入 RocketMQ 後,一個訂單會分別產生對於這個訂單的創建、付款、推送、完成等消息,如果我們把所有消息全部送入到 RocketMQ 中的一個主題中,如何實現針對一個訂單的消息順序性呢!如下圖:

要完成分區有序性,在生產者環節使用自定義的消息隊列選擇策略,確保訂單號尾數相同的消息會被先後發送到同一個隊列中(案例中主題有3個隊列,生產環境中可設定成10個滿足全部尾數的需求),然後再消費端開啟負載均衡模式,最終確保一個消費者拿到的消息對於一個訂單來說是有序的。

/** @Author 牧小農
 * @Description // 訂單消息生產
 * @Date 16:47 2022/8/20
 * @Param 
 * @return 
 **/
public class OrderProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // 訂單列表
        List<Order> orderList = new OrderProducer().buildOrders();
        for (int i = 0; i < orderList.size(); i++) {
            String body = orderList.get(i).toString();
            Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根據訂單id選擇發送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//訂單id
            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }
        producer.shutdown();
    }

    /**
     * 訂單
     */
    private static class Order {
        private long orderId;
        private String desc;
        .....
    }

    /**
     * 生成模擬訂單數據  3個訂單   每個訂單4個狀態
     * 每個訂單 創建->付款->推送->完成
     */
    private List<Order> buildOrders() {
        List<Order> orderList = new ArrayList<Order>();
        Order orderDemo = new Order();
        orderDemo.setOrderId(001);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

       //...............
        return orderList;
    }
}

訂單消費者

/** @Author 牧小農
 * @Description // 訂單消息消費
 * @Date 16:46 2022/8/20
 * @Param
 * @return
 **/
public class OrderConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("PartOrder", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每個queue有唯一的consume執行緒來消費, 訂單對每個queue(分區)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName()
                            + ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模擬業務邏輯處理中...
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                } catch (Exception e) {
                    e.printStackTrace();
                    //一會再處理這批消息,而不是放到重試隊列里
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

}

消息生產者

消息消費者:

我們可以看到消息按照順序進行了消費。使用順序消息:首先要保證消息是有序進入MQ的,消息放入MQ之前,對id等關鍵字進行取模,放入指定 messageQueue ,同時 consumer 消費消息失敗時,不能返回 reconsume——later ,這樣會導致亂序,所以應該返回 suspend_current_queue_a_moment ,意思是先等一會,一會兒再處理這批消息,而不是放到重試隊列里。

延時消息

Producer 將消息發送到消息隊列 RocketMQ 服務端,但並不期望這條消息立馬投遞(被消費者消費),而是延遲一定時間後才投遞到 Consumer 進行消費,該消息即延時消息。

消息生產和消費有時間窗口要求的場景下,比如在電商交易中超時未支付關閉訂單的場景,在訂單創建時向 RocketMQ 發送一條延時消息。這條消息將會在30分鐘以後投遞給消費者,消費者收到此消息後需要判斷對應的訂單是否已完成支付。如支付未完成,則取消訂單、釋放庫存。如已完成支付則忽略。

Apache RocketMQ 目前只支援固定精度(MQ自己規定的時間段)的定時消息,因為如果要支援任意的時間精度,在 Broker 層面,必須要做消息排序,如果再涉及到持久化,消息排序不可避免的產生巨大性能開銷。(RocketMQ 的商業版本 Aliware MQ 提供了任意時刻的定時消息功能,Apache的 RocketMQ 並沒有,阿里並沒有開源)

Apache RocketMQ 發送延時消息是設置在每一個消息體上的,在創建消息時設定一個延時時間長度,消息將從當前發送時間點開始延遲固定時間之後才開始投遞。

RocketMQ 延時消息的延遲時長不支援隨意時長的延遲,是通過特定的延遲等級來指定的。默認支援18個等級的延遲消息,延時等級定義在 RocketMQ 服務端的 MessageStoreConfig 類中。

具體如下所示:

Level 延遲時間 Level 延遲時間
1 1S 10 6m
2 5S 11 7m
3 10S 12 8m
4 30S 13 9m
5 1m 14 10m
6 2m 15 20m
7 3m 16 30m
8 4m 17 1h
9 5m 18 2h

延時消息生產者:

/** @Author 牧小農
 * @Description // 延時消息-生產者
 * @Date 10:00 2022/8/21
 * @Param 
 * @return 
 **/
public class ScheduledProducer {
    public static void main(String[] args) throws Exception {
        // 實例化一個生產者來產生延時消息
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 設置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啟動Producer實例
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 設置延時等級4,這個消息將在10s之後投遞給消費者(詳看delayTimeLevel)
            // delayTimeLevel:(1~18個等級)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            message.setDelayTimeLevel(3);
            // 發送消息
            producer.send(message);
        }
        // 關閉生產者
        producer.shutdown();
    }
}

延時消息消費者:

/** @Author 牧小農
 * @Description // 延時消息-消費者
 * @Date 10:00 2022/8/21
 * @Param 
 * @return 
 **/
public class ScheduledConsumer {
    public static void main(String[] args) throws Exception {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 指定Namesrv地址資訊.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 訂閱Topics
        consumer.subscribe("ScheduledTopic", "*");
        // 註冊消息監聽者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (message.getStoreTimestamp()-message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者
        consumer.start();
    }
}

當我們生產消息後,查看消費者資訊,延時10秒後,消息才發送完成後,之後進行了消息的消費

批量消息

批量消息發送: 能顯著提高傳遞小消息的性能。限制是這些批量消息有相同的 topic,相同的 waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過 4MB。批量消息是一個 Collection集合,所以送入消息只要是集合就行。

批量接收消息: 能提高傳遞小消息的性能,同時與順序消息配合的情況下,還能根據業務主鍵對順序消息進行去重(是否可去重,需要業務來決定),減少消費者對消息的處理。

如果我們需要發送10萬元素的數組,怎麼快速發送完?這裡可以使用批量發送,同時每一批控制在1M左右確保不超過消息大小限制。批量切分發送.

批量消息生產者:

/** @Author 牧小農
 * @Description // 批量消息-生產者  list不要超過4m
 * @Date 10:38 2022/8/21
 * @Param 
 * @return 
 **/
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 設置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啟動Producer實例
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 2".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 3".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID004", "Hello world 4".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID005", "Hello world 5".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID006", "Hello world 6".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            producer.shutdown();
            e.printStackTrace();
        }
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

批量消息消費者

/** @Author 牧小農
 * @Description // 批量消息-消費者
 * @Date 10:38 2022/8/21
 * @Param 
 * @return 
 **/
public class BatchComuser {
    public static void main(String[] args) throws Exception {
        // 實例化消息生產者,指定組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
        // 指定Namesrv地址資訊.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 訂閱Topic
        consumer.subscribe("BatchTest", "*");
        //負載均衡模式消費
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 註冊回調函數,處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

這樣我們就實現了批量消息的發送,如果我們消息超過了,4M的時候,這個時候可以考慮消息的分割,具體程式碼如下:

public class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;//1M
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) { this.messages = messages; }
    @Override
    public boolean hasNext() { return currIndex < messages.size(); }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日誌的開銷20位元組
            if (tmpSize > sizeLimit) {
                if (nextIndex - currIndex == 0) {//單個消息超過了最大的限制(1M),否則會阻塞進程
                    nextIndex++; //假如下一個子列表沒有元素,則添加這個子列表然後退出循環,否則退出循環
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) { break; }
            else { totalSize += tmpSize; }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

消息的過濾

效率的過濾主要分為兩種: Tag過濾和SQL語法過濾

在實際的開發應用中,對於一類消息儘可能使用一個Topic進行存儲,但在消費時需要選擇想要的消息,這時可以使用 RocketMQ 的消息過濾功能,具體實現是利用消息的Tag和Key。

Key 一般用於消息在業務層面的唯一標識。對發送的消息設置好 Key,根據這個 Key 來查找消息。比如消息異常,消息丟失,進行查找會很方便。RocketMQ 會創建專門的索引文件,用來存儲 Key與消息的映射,由於底層實現是 Hash 索引,應盡量使 Key唯一,避免潛在的哈希衝突。

Tag: 可以理解為是二級分類。以電商交易平台為例,訂單消息和支付消息屬於不同業務類型的消息,分別創建 OrderTopic 和PayTopic ,其中訂單消息根據不同的商品品類以不同的 Tag 再進行細分,如手機類、家電類、男裝類、女裝類、化妝品類,最後它們都被各個不同的系統所接收。通過合理的使用 Topic 和 Tag,可以讓業務結構清晰,更可以提高效率。

Key和Tag的主要差別是使用場景不同,Key主要用於通過命令行命令查詢消息,而Tag用於在消息端的程式碼中,用來進行服務端消息過濾。

Tag過濾

使用Tag過濾的方式是在消息生產時傳入感興趣的Tag標籤,然後在消費端就可以根據Tag來選擇您想要的消息。具體的操作是在創建Message的時候添加,一個Message只能有一個Tag。

使用案例:

/** @Author 牧小農
 * @Description // tag過濾-生產者
 * @Date 10:51 2022/8/21
 * @Param 
 * @return 
 **/
public class TagFilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // 設定三種標籤
        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消費者

/** @Author 牧小農
 * @Description // tag過濾-消費者
 * @Date 10:51 2022/8/21
 * @Param 
 * @return 
 **/
public class TagFilterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
        //指定Namesrv地址資訊.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //只有TagA 或者TagB 的消息
        consumer.subscribe("TagFilterTest", "TagA || TagB");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags +  " ,a : "
                                + msgPro +" ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

我們生成了 TagA\b\c 三條消息,但是消費者只想接收 TagA或B, 那麼我們可以在消費者端進行消息過濾

Tag過濾的形式非常簡單。

|| 代表或 * 代表所有

因此Tag過濾對於複雜的場景可能不能進行覆蓋。在這種情況下,可以使用SQL表達式篩選消息。

SQL語法

SQL基本語法:

  • 數值比較: >,>=,<,<=,BETWEEN,=
  • 字元比較: =,<>,IN
  • 非空比較: IS NULL 或者 IS NOT NULL
  • 邏輯符號: AND,OR,NOT
  • 常量支援類型為: 數值(123,3.1415)、字元(’abc’)單引號包裹起來、NULL、布爾值(TRUE 或 FALSE)

Sql過濾需要 Broker 開啟這項功能,需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然後重啟Broker服務。

消息生產者,發送消息時加入消息屬性,通過 putUserProperty 來設置消息的屬性,生產者發送10條消息,

生產者:

/** @Author 牧小農
 * @Description // sql過濾 -消息生產者(加入消息屬性)
 * @Date 11:04 2022/8/21
 * @Param 
 * @return 
 **/
public class SqlFilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 設置SQL過濾的屬性
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消費者:

/** @Author 牧小農
 * @Description // sql過濾-消費者
 * @Date 11:04 2022/8/21
 * @Param 
 * @return 
 **/
public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("SqlFilterTest",
            //bySql:通過 SQL過濾
            // 1. TAGS不為空且TAGS 在('TagA', 'TagB')
            // 2. 同時 a 不等於空並且a在0-3之間
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags +  " ,a : " + msgPro +" ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消費結果:按照Tag和SQL過濾消費3條消息。

第一個消息是TagA ,消息的屬性(a)是3

第一個消息是TagB ,消息的屬性(a)是1

第一個消息是TagA ,消息的屬性(a)是0

注意哦!

公眾號後台回復:rocketMQ 獲取案例源碼

到這裡有關於RocketMQ基本消息的講解,就結束了,雖然不舍,但是可以關注我,我們下期見。你是不怕被打嗎?(手動狗頭)

在上面的消息類型講述中,可以滿足絕大部分業務場景,同學們可以根據自己實際的業務場景,去選擇合適的消息類型方式進行學習和了解,關注我,後續精彩內容第一時間收到,下期,小農會帶大家了解關於分散式事務消息和 Request-Reply 消息以及後續RocketMQ集群架構方面的知識。本篇點贊過百,就是中暑,也出下篇。

我是牧小農怕什麼無窮,進一步有進一步的歡喜,大家加油!

關注我,下期更精彩。

Tags: