MQ系列6:消息的消費

MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式

在之前的文章中,我們學習了RocketMQ的原理;RocketMQ中 命名服務 ServiceName 的運行流程;以及消息生產、發送的原理和模式。這一篇,就讓我們從消息消費的角度去進一步的學習。

1 消息消費

消息的消費主要是由如下幾個核心能力組成的:

  • 消費方式:Push(推) 或者 Pull(拉)
  • 消費模式:廣播模式和集群模式
  • 消息消費反饋
  • 流量控制(包括消費並發線程數設置)
  • 消息的過濾(Tag, Key),過濾標籤 TagA||TagB||TagC

1.1 消費方式Push or Pull

RocketMQ消息訂閱有方式:

  • Push方式(MQPushConsumer),MQ Server主動向消費端推送;
    這種模式不考慮消費端是否有能力處理消費數據,實時性比較高,能夠及時推送數據,適合大部分業務場景。但同時存在一個問題,如果遇到峰值期,瞬間推送過多消息,會導致積壓,甚至客戶端雪崩。
  • Pull方式(MQPullConsumer),消費端在有需要時,主動從MQ Server拉取數據。
    消費端比較靈活,可以根據自己的吞吐能力,消費的節奏,主動安排消息拉取。適合消費和計算耗時比較大的消費場景。
    缺點就是如何從代碼層面精準地控制拉取的頻率,過短對消費端有壓力,並且有可能空拉照成資源拉菲;過長可能對消息及時性有影響,可以採用長輪詢的方式進行處理。
  • Push模式與Pull模式的區別
    Push方式的做法是,Consumer封裝了長輪詢的操作,並註冊MessageListener監聽器,當MessageListener監聽到有新的消息的時候,消費端便被喚醒,讀取消息進行消費。從用戶角度上,訂閱消息並消費感覺消息是推過來的。
    Pull方式的做法是,消費端主動去拉取數據,獲取相應的Topic的,遍歷MessageQueue集合,取數,重新標記offset,再取數,直至消費完成。

1.2 消費模式 集群 or 廣播

RocketMQ 目前支持集群模式和廣播消費模式,其中集群模式使用範圍比較大,即點對點,消息消費了即完成。

  • 集群負載均衡消費模式(默認)
    集群模式是一個主題下的單條消息只允許被同一消費組內的一個消費者消費,消費完即完成,即P2P。
    在集群模式下,消息隊列負載的模式:一個MessageQueue集合同一個時間內只允許被同一消費組內的單個消費者消費一次(這種模式不允許重複消費,如付款,訂單提交),單個Consumer可以消費多個遍歷MessageQueue集合。
  • 廣播消費模式
    廣播模式指的是當前主題下的消費組所有消費者都可以消費並處理消息一次,達到廣播的目的。很多業務場景,比如航班延遲的消息通知,告知客戶端緩存信息過期需要重新拉起等。

1.3 消費進度反饋

RocketMQ客戶端消費數據之後,需要向Broker反饋消息的消費進度,Broker獲取到消費進度記錄下來。這樣保證 隊列rebalance和客戶端消費者重啟動的時候,可以獲取到準確的消費進度。

消息消費以及進度反饋的主步驟如下:

  • 消費線程池消費完數據之後,將消息消費進度緩存在內存中。
  • 定時調度任務 5s 一次將消息隊里的消費 offset 提交至Broker。
  • Broker接受到消息之後,存儲在內存中,如果有新的過來,可以更行,同樣的每5s將offset持久化下來。
  • 消費客戶端從Broker拉取消息時,同步將MessageQueue的消費偏移量提交到Broker。

綜合上面的內容,需要注意的點如下:

  • RocketMQ以Consumer Group(消費者小組)和 Queue(隊列)為標準對消費刻度進行管理的
  • Consumer Offset標記消費組在消息隊列(Queue)上的消費進度。
  • 消費成功後,消費進度暫時更新到本地緩存,調度任務會定時(默認5s)將進度同步到broker(需注意如果宕機,消費進度未提交則可能導致被重複消費),Broker最終將消費進度持久化到磁盤。
  • RocketMQ支持並發消費,所以是多個線程並行處理,每次記錄消費進度的時候,把線程中最小的offset值作為消費進度值,這樣避免了消息丟失,但有重複消費的風險,業務中需保證操作冪等性。
  • offset存儲模式:集群模式,消息進度存儲於Broker上;廣播模式,消息消費進度在消費端即可。

image

1.4 消費端流量控制

可以在DefaultMQPushConsumer 對象中配置各種屬性來對消費流量進行控制:

  • PullInterval: 配置消費端拉取MQ消息的間隔時間。間隔時間是按照上次消費完成之後(比如rocketMQ收到Ack回復消息之後)。
    PullInterval=20s,比如上次rocketMq服務收到Ack消息是12:15:15,則 12:15:35再去拉消息。

  • PullBatchSize: 消費端每個隊列一次拉取多少個消息,若該消費端分賠了N個監控隊列,每次拉取M個,那麼消費端每次去rocketMq拉取的消息為N * M。
    消費端每次pull到消息總數=PullBatchSize * 監聽隊列數,如 PullBatchSize = 2, 監聽隊列=5,則 消息總數量 = 2 * 5 = 10。

  • ThreadMin和ThreadMax: 消費端消費pull到的消息需要的線程數量。

    • ThreadMin:消費端拉取到消息後分配消費的線程數
    • ThreadMax:最大消費線程,如果默認隊列滿了,則啟用新的線程
  • RocketMq 邏輯消費隊列數量的配置
    rocketMq 可以配置消費隊列,如 queue Read1 ,queue Read2,配置數量決定每次pull到的消息總數。Rocket MQ 提供了讀寫隊列數量的配置。

  • 消費端節點部署數量
    多節點消費端線程數量要比單節點消費線程數量多,理論上消費速度大於單節點,分治思維。

1.5 消息的過濾

在過濾消息的時候,標籤模式簡單而是用,可以篩選出你需要的數據。如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");
consumer.subscribe("testTopic", MessageSelector.byTag("Tag1  || Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));

這種情況下,消息中帶有 Tag1 、Tag2、Tag3 標籤就會被過濾出來,但是單個消限制息只能有一個標籤,這就遠遠滿足不了各種複雜的並交集場景的需要了。
這時候Rocket MQ可以在消息中設置一些屬性,再使用SQL表達式篩選屬性來過濾出需要的數據。 如下

------------
| message  |
|----------|  sex = male AND name = 'brand' , Gotten
| name = 'brand' |  
| sex = 'male'|
| age = 21|
------------

------------
| message  |
|----------|   sex = male AND name = 'brand', Gotten , Missed
| name = 'Anny'    | 
| sex = 'female'|
| age = 20 |
------------

1.8 提高Consumer的處理能力 :看情況

  1. 提高消費並行度
    在同一個ConsumerGroup下(Clustering方式),可以通過增加Consumer實例的數量來提高並行度。
    通過加機器,或者在已有機器中啟動多個Consumer進程都可以增加Consumer實例數。
    注意:總的Consumer數量不要超過Topic下Read Queue數量,超過的Consumer實例接收不到消息。
    此外,通過提高單個Consumer實例中的並行處理的線程數,可以在同一個Consumer內增加並行度來提高吞吐量(設置方法是修改consumeThreadMin和consumeThreadMax)。

  2. 以批量方式進行消費
    某些業務場景下,多條消息同時處理的時間會大大小於逐個處理的時間總和,比如消費消息中涉及update某個數據庫,一次update10條的時間會大大小於十次update1條數據的時間。
    可以通過批量方式消費來提高消費的吞吐量。實現方法是設置Consumer的consumeMessageBatchMaxSize這個參數,默認是1,如果設置為N,在消息多的時候每次收到的是個長度為N的消息鏈表。

  3. 檢測延時情況,跳過非重要消息
    Consumer在消費的過程中,如果發現由於某種原因發生嚴重的消息堆積,短時間無法消除堆積,這個時候可以選擇丟棄不重要的消息,使Consumer儘快追上Producer的進度。

2 消息消費的模式

2.1 基本信息消費

消費者的基本實現,連接 NameServer的地址,指定Topic和Tag,讀取到需要消費的數據,然後輪詢並處理。

public class SimpleConsumerApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.創建消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");
        // 3.指定Topic和Tag 信息。* 代表所有
        consumer.subscribe("testTopic", "*");

        // 4.設置回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的數據
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

2.2 順序消費

相比與基本消費,多了一個 ConsumeFromWhere的設置。代表消費者從哪個位置開始消費,枚舉如下:

  • CONSUME_FROM_LAST_OFFSET:第一次啟動從隊列最後位置消費,非第一次啟動接着上次消費的進度繼續消費
  • CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,非第一次啟動接着上次消費的進度繼續消費
  • CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,非第一次啟動接着上次消費的進度繼續消費
    以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那麼會在broker端記錄該消費者的消費位置,消費者掛了再啟動,則從上次消費進度繼續執行。
public class SimpleOrderApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.創建消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");

        /**
         * 設置Consumer第一次啟動是從隊列頭部、隊列尾部、還是指定時間戳節點開始消費
         * 非第一次啟動接着上次消費的進度繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 3.指定Topic和Tag 信息。* 代表所有
        consumer.subscribe("testTopic", "*");

        // 4.設置回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的數據
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

2.3 過濾消息消費

可以使用MessageSelector.byTag來進行標籤篩選;或者使用MessageSelector.bySql 來進行消息屬性篩選;或者混合使用。
參考下面代碼,注釋說明的比較清楚。

public class FilterConsumerApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.創建消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");

        // 3.指定Topic和Tag 信息。只有訂閱的消息有 sex 和 name 屬性, 並且年齡為 18 歲以上的男性
        // consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));
        consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));

        // 4.設置回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的數據
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

3 總結

  • 消費方式:Push(推) 或者 Pull(拉)
  • 消費模式:廣播模式和集群模式
  • 消息消費反饋
  • 流量控制(包括消費並發線程數設置)
  • 消息的過濾(Tag, Key),過濾標籤 TagA||TagB||TagC