深入理解RocketMQ延遲消息
- 2020 年 2 月 11 日
- 筆記

延遲消息是實際開發中一個非常有用的功能,本文第一部分從整體上介紹秒級精度延遲消息的實現思路,在第二部分結合RocketMQ的延遲消息實現,進行細緻的講解,點出關鍵部分的源碼。第三步介紹延遲消息與消息重試的關係。
1 延遲消息介紹
基本概念:延遲消息是指生產者發送消息發送消息後,不能立刻被消費者消費,需要等待指定的時間後才可以被消費。
場景案例:用戶下了一個訂單之後,需要在指定時間內(例如30分鐘)進行支付,在到期之前可以發送一個消息提醒用戶進行支付。
一些消息中間件的Broker端內置了延遲消息支援的能力,如:
- NSQ:這是一個go語言的消息中間件,其通過記憶體中的優先順序隊列來保存延遲消息,支援秒級精度,最多2個小時延遲。Java中也有對應的實現,如ScheduledThreadPoolExecutor內部實際上也是使用了優先順序隊列。
- QMQ:採用雙重時間輪實現。可參考:任意時間延時消息原理講解:設計與實現
- RabbitMQ:需要安裝一個rabbitmq_delayed_message_exchange插件。
- RocketMQ:RocketMQ 開源版本延遲消息臨時存儲在一個內部主題中,不支援任意時間精度,支援特定的 level,例如定時 5s,10s,1m 等。
Broker端內置延遲消息處理能力,核心實現思路都是一樣:將延遲消息通過一個臨時存儲進行暫存,到期後才投遞到目標Topic中。如下圖所示:

步驟說明如下:
- producer要將一個延遲消息發送到某個Topic中
- Broker判斷這是一個延遲消息後,將其通過臨時存儲進行暫存。
- Broker內部通過一個延遲服務(delay service)檢查消息是否到期,將到期的消息投遞到目標Topic中。這個的延遲服務名字為delay service,不同消息中間件的延遲服務模組名稱可能不同。
- 消費者消費目標topic中的延遲投遞的消息
顯然,臨時存儲模組和延遲服務模組,是延遲消息實現的關鍵。上圖中,臨時存儲和延遲服務都是在Broker內部實現,對業務透明。
此外, 還有一些消息中間件原生並不支援延遲消息,如Kafka。在這種情況下,可以選擇對Kafka進行改造,但是成本較大。另外一種方式是使用第三方臨時存儲,並加一層代理。
第三方存儲選型要求:
對於第三方臨時存儲,其需要滿足以下幾個特點:
- 高性能:寫入延遲要低,MQ的一個重要作用是削峰填谷,在選擇臨時存儲時,寫入性能必須要高,關係型資料庫(如Mysql)通常不滿足需求。
- 高可靠:延遲消息寫入後,不能丟失,需要進行持久化,並進行備份
- 支援排序:支援按照某個欄位對消息進行排序,對於延遲消息需要按照時間進行排序。普通消息通常先發送的會被先消費,延遲消息與普通消息不同,需要進行排序。例如先發一條延遲10s的消息,再發一條延遲5s的消息,那麼後發送的消息需要被先消費。
- 支援長時間保存:一些業務的延遲消息,需要延遲幾個月,甚至更長,所以延遲消息必須能長時間保留。不過通常不建議延遲太長時間,存儲成本比較大,且業務邏輯可能已經發生變化,已經不需要消費這些消息。
例如,滴滴開源的消息中間件DDMQ,底層消息中間件的基礎上加了一層代理,獨立部署延遲服務模組,使用rocksdb進行臨時存儲。rocksdb是一個高性能的KV存儲,並支援排序。
此時對於延遲消息的流轉如下圖所示:

說明如下:
- 生產者將發送給producer proxy,proxy判斷是延遲消息,將其投遞到一個緩衝Topic中;
- delay service啟動消費者,用於從緩衝topic中消費延遲消息,以時間為key,存儲到rocksdb中;
- delay service判斷消息到期後,將其投遞到目標Topic中。
- 消費者消費目標topic中的數據
這種方式的好處是,因為delay service的延遲投遞能力是獨立於broker實現的,不需要對broker做任何改造,對於任意MQ類型都可以提供支援延遲消息的能力。例如DDMQ對RocketMQ、Kafka都提供了秒級精度的延遲消息投遞能力,但是Kafka本身並不支援延遲消息,而RocketMQ雖然支援延遲消息,但不支援秒級精度。
事實上,DDMQ還提供了很多其他功能,僅僅從延遲消息的角度,完全沒有必要使用這個proxy,直接將消息投遞到緩衝Topic中,之後通過delay service完成延遲投遞邏輯即可。
具體到delay service模組的實現上,也有一些重要的細節:
- 為了保證服務的高可用,delay service也是需要部署多個節點。
- 為了保證數據不丟失,每個delay service節點都需要消費緩衝Topic中的全量數據,保存到各自的持久化存儲中,這樣就有了多個備份,並需要以時間為key。不過因為是各自拉取,並不能保證強一致。如果一定要強一致,那麼delay service就不需要內置存儲實現,可以藉助於其他支援強一致的存儲。
- 為了避免重複投遞,delay service需要進行選主,可以藉助於zookeeper、etcd等實現。只有master可以通過生產者投遞到目標Topic中,其他節點處於備用狀態。否則,如果每個節點進行都投遞,那麼延遲消息就會被投遞多次,造成消費重複。
- master要記錄自己當前投遞到的時間到一個共享存儲中,如果master掛了,從slave節點中選出一個新的master節點,從之前記錄時間繼續開始投遞。
- 延遲消息的取消:一些延遲消息在未到期之前,可能希望進行取消。通常取消邏輯實現較為複雜,且不夠精確。對於那些已經快要到期的消息,可能還未取消之前,已經發送出去了,因此需要在消費者端做檢查,才能萬無一失。
2 RocketMQ中的延遲消息
開源RocketMQ支援延遲消息,但是不支援秒級精度。默認支援18個level的延遲消息,這是通過broker端的messageDelayLevel配置項確定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在啟動時,內部會創建一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,創建對應數量的隊列,也就是說18個level對應了18個隊列。注意,這並不是說這個內部主題只會有18個隊列,因為Broker通常是集群模式部署的,因此每個節點都有18個隊列。
延遲級別的值可以進行修改,以滿足自己的業務需求,可以修改/添加新的level。例如:你想支援2天的延遲,修改最後一個level的值為2d,這個時候依然是18個level;也可以增加一個2d,這個時候總共就有19個level。
可以看到這裡並不支援秒級精度,按照《rocketmq developer guide》中的說法,是為了避免在broker對消息進行排序,造成性能影響。不過筆者考慮,之所以不支援,更多應該是商業上的考慮。
生產者發送延遲消息:
生產者在發送延遲消息非常簡單,只需要設置一個延遲級別即可,注意不是具體的延遲時間,如:
Message msg=new Message(); msg.setTopic("TopicA"); msg.setTags("Tag"); msg.setBody("this is a delay message".getBytes()); //設置延遲level為5,對應延遲1分鐘 msg.setDelayTimeLevel(5); producer.send(msg);
如果設置的延遲level超過最大值,那麼將會重置最最大值。
Broker端存儲延遲消息:
延遲消息在RocketMQ Broker端的流轉如下圖所示:

可以看到,總共有6個步驟,下面會對這6個步驟進行詳細的講解:
- 修改消息Topic名稱和隊列資訊
- 轉發消息到延遲主題的CosumeQueue中
- 延遲服務消費SCHEDULE_TOPIC_XXXX消息
- 將資訊重新存儲到CommitLog中
- 將消息投遞到目標Topic中
- 消費者消費目標topic中的數據
第一步:修改消息Topic名稱和隊列資訊
RocketMQ Broker端在存儲生產者寫入的消息時,首先都會將其寫入到CommitLog中。之後根據消息中的Topic資訊和隊列資訊,將其轉發到目標Topic的指定隊列(ConsumeQueue)中。
由於消息一旦存儲到ConsumeQueue中,消費者就能消費到,而延遲消息不能被立即消費,所以這裡將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,並根據延遲級別確定要投遞到哪個隊列下。
同時,還會將消息原來要發送到的目標Topic和隊列資訊存儲到消息的屬性中。相關源碼如下所示:
org.apache.rocketmq.store.CommitLog#putMessage

第二步:轉發消息到延遲主題的CosumeQueue中
CommitLog中的消息轉發到CosumeQueue中是非同步進行的。在轉發過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什麼時候進行投遞。
投遞時間=消息存儲時間(storeTimestamp) + 延遲級別對應的時間
需要注意的是,會將計算出的投遞時間當做消息Tag的哈希值存儲到CosumeQueue中,CosumeQueue單個存儲單元組成結構如下圖所示:

其中:
- Commit Log Offset:記錄在CommitLog中的位置。
- Size:記錄消息的大小
- Message Tag HashCode:記錄消息Tag的哈希值,用於消息過濾。特別的,對於延遲消息,這個欄位記錄的是消息的投遞時間戳。這也是為什麼java中hashCode方法返回一個int型,只佔用4個位元組,而這裡Message Tag HashCode欄位卻設計成8個位元組的原因。
相關源碼參見:
CommitLog#checkMessageAndReturnSize

第三步:延遲服務消費SCHEDULE_TOPIC_XXXX消息
Broker內部有一個ScheduleMessageService類,其充當延遲服務,消費SCHEDULE_TOPIC_XXXX中的消息,並投遞到目標Topic中。
ScheduleMessageService在啟動時,其會創建一個定時器Timer,並根據延遲級別的個數,啟動對應數量的TimerTask,每個TimerTask負責一個延遲級別的消費與投遞。
相關源碼如下所示:
ScheduleMessageService#start

需要注意的是,每個TimeTask在檢查消息是否到期時,首先檢查對應隊列中尚未投遞第一條消息,如果這條消息沒到期,那麼之後的消息都不會檢查。如果到期了,則進行投遞,並檢查之後的消息是否到期。
第四步:將資訊重新存儲到CommitLog中
在將消息到期後,需要投遞到目標Topic。由於在第一步已經記錄了原來的Topic和隊列資訊,因此這裡重新設置,再存儲到CommitLog即可。此外,由於之前Message Tag HashCode欄位存儲的是消息的投遞時間,這裡需要重新計算tag的哈希值後再存儲。
源碼參見:DeliverDelayedMessageTimerTask的messageTimeup方法。
第五步:將消息投遞到目標Topic中
這一步與第二步類似,不過由於消息的Topic名稱已經改為了目標Topic。因此消息會直接投遞到目標Topic的ConsumeQueue中,之後消費者即消費到這條消息。
3 延遲消息與消費重試的關係
RocketMQ提供了消息重試的能力,在併發模式消費消費失敗的情況下,可以返回一個枚舉值RECONSUME_LATER,那麼消息之後將會進行重試。如:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //處理消息,失敗,返回RECONSUME_LATER,進行重試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } });
重試默認會進行重試16次。使用過RocketMQ消息重試功能的用戶,可能看到過以下這張圖:
|
第幾次重試 |
與上次重試的間隔時間 |
第幾次重試 |
與上次重試的間隔時間 |
|---|---|---|---|
|
1 |
10 秒 |
9 |
7 分鐘 |
|
2 |
30 秒 |
10 |
8 分鐘 |
|
3 |
1 分鐘 |
11 |
9 分鐘 |
|
4 |
2 分鐘 |
12 |
10 分鐘 |
|
5 |
3 分鐘 |
13 |
20 分鐘 |
|
6 |
4 分鐘 |
14 |
30 分鐘 |
|
7 |
5 分鐘 |
15 |
1 小時 |
|
8 |
6 分鐘 |
16 |
2 小時 |
細心地的讀者發現了,消息重試的16個級別,實際上是把延遲消息18個級別的前兩個level去掉了。事實上,RocketMQ的消息重試也是基於延遲消息來完成的。在消息消費失敗的情況下,將其重新當做延遲消息投遞迴Broker。
在投遞迴去時,會跳過前兩個level,因此只重試16次。當然,消息重試還有一些其他的設計邏輯,在之後的文章將會進行分析。

