Alibaba-技術專區-RocketMQ 延遲消息實現原理和源碼分析

痛點背景

業務場景

假設有這麼一個需求,用戶下單後如果30分鐘未支付,則該訂單需要被關閉。你會怎麼做?

之前方案

最簡單的做法,可以服務端啟動個定時器,隔個幾秒掃描資料庫中待支付的訂單,如果(當前時間-訂單創建時間)>30分鐘,則關閉訂單。

方案評估
  • 優點:是實現簡單,缺點呢?

  • 缺點:定時掃描意味著隔個幾秒就得查一次資料庫,頻率高的情況下,如果資料庫中訂單總量特別大,這種高頻掃描會對資料庫帶來一定壓力,待付款訂單特別多時(做個爆品秒殺活動,或者啥促銷活動),若一次性查到記憶體中,容易引起宕機,需要分頁查詢,多少也會有一定資料庫層面壓力。

延時隊列出現
  • 能夠在指定時間間隔後觸發某個業務操作

  • 能夠應對業務數據量特別大的特殊場景

RocketMQ延時消息能夠完美的解決上述需求,正常的消息在投遞後會立馬被消費者所消費,而延時消息在投遞時,需要設置指定的延時級別(不同延遲級別對應不同延遲時間),即等到特定的時間間隔後消息才會被消費者消費,這樣就將資料庫層面的壓力轉移到了MQ中,也不需要手寫定時器,降低了業務複雜度,同時MQ自帶削峰功能,能夠很好的應對業務高峰。

功能特點

  • RocketMQ支援發送延遲消息,但不支援任意時間的延遲消息的設置,僅支援內置預設值的延遲時間間隔的延遲消息;

  • 預設值的延遲時間間隔為:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

  • 在消息創建的時候,調用 setDelayTimeLevel(int level) 方法設置延遲時間;

  • broker在接收到延遲消息的時候會把對應延遲級別的消息先存儲到對應的延遲隊列中,等延遲消息時間到達時,會把消息重新存儲到對應的topic的queue裡面。

Broker處理延遲消息

延時隊列生產者端:

延時消息的關鍵點在於Producer生產者需要給消息設置特定延時級別,消費端程式碼與正常消費者沒有差別。

public class Producer {
	private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //設置namesrv地址
        producer.setNamesrvAddr("111.231.110.149:9876");
        //啟動生產者
        producer.start();
        //發送10條消息
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //設置消息延時級別  3對應10秒後發送
                //延時級別1對應延時1秒後發送消息
                //延時級別2對應延時5秒後發送消息
                //延時級別3對應延時10秒後發送消息
                //以此類推。
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}
初始化

DefaultMessageStore在啟動時,會調用ScheduleMessageService#load()方法來載入消息消費進度和初始化延遲級別對應map,然後調用ScheduleMessageService#start()方法來啟動類

load方法

public boolean load() {
        boolean result = super.load();
        result = result && this.parseDelayLevel();
        return result;
}

ScheduleMessageService繼承自ConfigManager類,super.load()方法對應

public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
}

延時隊列源碼分析:

先從延時消息延遲級別設置與broker端消息持久化入手。

具體實現

RocketMQ發送延時消息時先把消息按照延遲時間段發送到指定的隊列中(rocketmq把每種延遲時間段的消息都存放到同一個隊列中)然後通過一個定時器進行輪訓這些隊列,查看消息是否到期,如果到期就把這個消息發送到指定topic的隊列中,這樣的好處是同一隊列中的消息延時時間是一致的,還有一個好處是這個隊列中的消息時按照消息到期時間進行遞增排序的,說的簡單直白就是隊列中消息越靠前的到期時間越早。

啟動延遲消息定時任務

如果想要深入了解的可以看一下ScheduleMessageService這個類

內部變數含義

延時消息定時投遞相關具體實現程式碼在ScheduleMessageService中,先看下變數定義

  • delayLevelTable定義了延遲級別和延遲時間的對應關係

  • offsetTable存放延延遲級別對應的隊列消費的offset

ScheduleMessageService.start()

延遲消息投遞

其中根據,delayLevel獲取消費隊列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
}

核心邏輯就是取出tagCode(延時消息持久化時,tagsCode存儲的是消息投遞時間),解析成消息投遞時間,與當前時間戳做差,判斷是否應該進行消息投遞,具體進行消息投遞的方法,在if (countdown <= 0)中,看下程式碼

每個掃描任務主要是把隊列中所有到期的消息都拿出來,並發送到指定的topic下,並把延遲隊列中的消息刪除

重新投遞實現

重新構建投遞消息的關鍵點在於messageTimeup中,其構建了一個新的消息,並從延時消息屬性中恢復出了原有的topic,queueId,再調用putMessage重新進行投遞。

總結

  • 優點:設計簡單,把所有相同延遲時間的消息都先放到一個隊列中,定時掃描,可以保證消息消費的有序性

  • 缺點:定時器採用了timer,timer是單執行緒運行,如果延遲消息數量很大的情況下,可能單執行緒處理不過來,造成消息到期後也沒有發送出去的情況

  • 改進點:可以在每個延遲隊列上各採用一個timer,或者使用timer進行掃描,加一個執行緒池對消息進行處理,這樣可以提供效率

基本思路已經介紹完,梳理下延時消息實現思路

  • producer端設置消息delayLevel延遲級別,消息屬性DELAY中存儲了對應了延時級別
  • broker端收到消息後,判斷延時消息延遲級別,如果大於0,則備份消息原始topic,queueId,並將消息topic改為延時消息隊列特定topic(SCHEDULE_TOPIC),queueId改為延時級別-1
  • mq服務端ScheduleMessageService中,為每一個延遲級別單獨設置一個定時器,定時(每隔1秒)拉取對應延遲級別的消費隊列
  • 根據消費偏移量offset從commitLog中解析出對應消息
  • 從消息tagsCode中解析出消息應當被投遞的時間,與當前時間做比較,判斷是否應該進行投遞
  • 若到達了投遞時間,則構建一個新的消息,並從消息屬性中恢復出原始的topic,queueId,並清除消息延遲屬性,從新進行消息投遞