高並發非同步解耦利器:RocketMQ究竟強在哪裡?
上篇文章消息隊列那麼多,為什麼建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:
並且詳細介紹了RabbitMQ,其功能也是挺強大的,那麼,為啥又要搞一個RocketMQ出來呢?是重複造輪子嗎?本文我們就帶大家來詳細探討RocketMQ究竟好在哪裡。
RocketMQ是一個分散式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性。它是阿里巴巴於2012年開源的第三代分散式消息中間件。
隨著阿里巴巴的電商業務不斷發展,需要一款更高性能的消息中間件,RocketMQ就是這個業務背景的產物。RocketMQ是一個分散式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性,它是阿里巴巴於2012年開源的第三代分散式消息中間件。RocketMQ經歷了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。值得一提的是,RocketMQ最初就是借鑒了Kafka進行改造開發而來的,所以熟悉Kafka的朋友,會發現RocketMQ的原理和Kafka有很多相似之處。
RocketMQ前身叫做MetaQ,在MeataQ發布3.0版本的時候改名為RocketMQ,其本質上的設計思路和Kafka類似,因為最初就是基於Kafka改造而來,經過不斷的迭代與版本升級,2016年11月21日,阿里巴巴向Apache軟體基金會捐贈了RocketMQ 。近年來被越來越多的中國企業使用。
本文帶大家從以下幾個方面詳細了解RocketMQ:
- RocketMQ如何保證消息存儲的可靠性?
- RocketMQ如何保證消息隊列服務的高可用?
- 如何構建一個高可用的RocketMQ雙主雙從最小集群?
- RocketMQ消息是如何存儲的?
- RocketMQ是如何保證存取消息的效率的?
- 如何實現基於Message Key的高效查詢?
- 如何實現基於Message Id的高效查詢?
- RocketMQ的Topic在集群中是如何存儲的?
- Broker自動創建Topic會有什麼問題?
- RocketMQ如何保證消息投遞的順序性?
- RocketMQ如何保證消息消費的順序性?
- 實現分散式事務的手段有哪些?
- RocketMQ如何實現事務消息?
- RocketMQ事務消息是如何存儲的?
1. RocketMQ技術架構
RocketMQ的架構主要分為四部分,如下圖所示:
Producer
:消息生產者,支援集群方式部署;Consumer
:消息消費者,支援集群方式部署,支援pull,push模式獲取消息進行消費,支援集群和廣播方式消費;NameServer
:Topic路由註冊中心,類似於Dubbo中的zookeeper,支援Broker的動態註冊與發現;- 提供心跳檢測機制,檢查Broker是否存活;
- 接收Broker集群的註冊資訊,作為路由資訊的基本數據;
- NameServier各個實例不相互進行通訊,每個NameServer都保存了一份完整的路由資訊,這與zookeeper有所區別,不用作複雜的節點數據同步與選主過程;
BrokerServer
:主要負責消息的存儲、投遞和查詢,以及服務高可用保證。BrokerServer包含以下幾個重要的子模組:- Remoting Module:整個Broker的實體,負責處理來自clients端的請求;
- Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱資訊;
- StoreService:提供方便簡單的API介面處理消息存儲到物理硬碟和查詢功能;
- HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能;
- Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。
2. RocketMQ執行原理
RocketMQ執行原理如下圖所示:
- 首先,啟動每個NameServer節點,共同構成一個NameServer Cluster。NameServer啟動後,監聽埠,等待Broker、Producer、Consumer的連接;
- 然後啟動Broker的主從節點,這個時候Broker會與所有的NameServer建立並保持長連接,定時發送心跳包,把自己的資訊(IP+埠號)以及存儲的所有Topic資訊註冊到每個NameServer中。這樣NameServer集群中就有Topic和Broker的映射關係了;
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic,每個Topic默認會分配4個Queue;
- 啟動生產者,這個時候生產者會把資訊註冊到NameServer中,並且從NameServer獲取Broker伺服器,Queue等資訊;
- 啟動消費者,這個時候消費者會把資訊註冊到NameServer中,並且從NameServer獲取Broker伺服器,Queue等資訊;
- 生產者發送消息到Broker集群中的時候,會從所有的Master節點的對應Topic中選擇一個Queue,然後與Queue所在的Broker建立長連接從而向Broker投遞消息。消息實際上是存儲在了CommitLog文件中,而Queue文件裡面存儲的實際是消息在CommitLog中的存儲位置資訊;
- 消費者從Broker集群中消費消息的時候,會通過特定的負載均衡演算法,綁定一個消息隊列進行消費;
- 消費者會定時(或者kill階段)把Queue的消費進度offset提交到Broker的consumerOffset.json文件中記錄起來;
- 主節點和從節點之間可以是同步或者非同步的進行數據複製,相關配置參數:
brokerRole
,可選值:ASYNC_MASTER
:非同步複製方式(非同步雙寫),生產者寫入消息到Master之後,無需等到消息複製到Slave即可返回,消息的複製由旁路執行緒進行非同步複製;SYNC_MASTER
:同步複製方式(同步雙寫),生產者寫入消息到Master之後,需要等到Slave複製成功才可以返回。如果有多個Slave,只需要有一個Slave複製成功,並成功應答,就算複製成功了。這裡是否持久化到磁碟依賴於另一個參數:flushDiskType
;SLAVE
:從節點
3. RocketMQ集群
本節我們來看看一個雙主雙從的RocketMQ是如何搭建的。
集群配置參數說明:
在討論集群前,我們需要了解兩個關鍵的集群配置參數:brokerRole,flushDiskType。brokerRole在前一節已經介紹了,而flushDiskType則是刷盤方式的配置,主要有:
- ASYNC_FLUSH: 非同步刷盤
- SYNC_FLUSH: 同步刷盤
3.1 如何保證消息存儲的可靠性?
brokerRole確定了主從同步是非同步的還是同步的,flushDiskType確定了數據刷盤的方式是同步的還是非同步的。
如果業務場景對消息丟失容忍度很低,可以採用SYNC_MASTER + ASYNC_FLUSH的方式,這樣只有master和slave在刷盤前同時掛掉,消息才會丟失,也就是說即使有一台機器出故障,仍然能保證數據不丟;
如果業務場景對消息丟失容忍度比較高,則可以採用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以儘可能的提高消息的吞吐量。
3.2 如何保證消息隊列服務的高可用?
消費端的高可用
Master Broker支援讀和寫,Slave Broker只支援讀。
當Master不可用的時候,Consumer會自動切換到Slave進行讀,也就是說,當Master節點的機器出現故障後,Consumer仍然可以從Slave節點讀取消息,不影響消費端的消費程式。
生產端的高可用
集群配置參數說明:
- brokerName: broker的名稱,需要把Master和Slave節點配置成相同的名稱,表示他們的主從關係,相同的brokerName的一組broker,組成一個broker組;
- brokerId: broker的id,0表示Master節點的id,大於0表示Slave節點的id。
在RocketMQ中,機器的主從節點關係是提前配置好的,沒有類似Kafka的Master動態選主功能。
如果一個Master宕機了,要讓生產端程式繼續可以生產消息,您需要部署多個Master節點,組成多個broker組。這樣在創建Topic的時候,就可以把Topic的不同消息隊列分布在多個broker組中,即使某一個broker組的Master節點不可用了,其他組的Master節點仍然可用,保證了Producer可以繼續發送消息。
3.3 如何構建一個高可用的RocketMQ雙主雙從最小集群?
為了儘可能的保證消息不丟失
,並且保證生產者和消費者的可用性
,我們可以構建一個雙主雙從的集群,搭建的架構圖如下所示:
部署架構說明:
- 兩個Broker組,保證了其中一個Broker組的Master節點掛掉之後,另一個Master節點仍然可以接受某一個Topic的消息投遞;
- 主從同步採用SYNC_MASTER,保證了生產者寫入消息到Master之後,需要等到Slave也複製成功,才返回消息投遞成功。這樣即使主節點或者從節點掛掉了,也不會導致丟數據;
- 由於主節點有了從節點做備份,所以,落盤策略可以使用ASYNC_FLUSH,從而儘可能的提高消息的吞吐量;
- 如果只提供兩台伺服器,要部署這個集群的情況下,可以把Broker Master1和Broker Slave2部署在一台機器,Broker Master2和Broker Slave1部署在一台機器。
關鍵配置參數
以下是關鍵的配置參數:
Broker Master1
# NameServer地址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 集群名稱
brokerClusterName=itzhai-com-cluster
# brokerIP地址
brokerIP1=192.168.1.100
# broker通訊埠
listenPort=10911
# broker名稱
brokerName=broker‐1
# 0表示主節點
brokerId=0
# 2點進行消息刪除
deleteWhen=02
# 消息在磁碟上保留48小時
fileReservedTime=48
# 主從同步複製
brokerRole=SYNC_MASTER
# 非同步刷盤
flushDiskType=ASYNC_FLUSH
# 自動創建Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
Broker Slave1
# NameServer地址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 集群名稱
brokerClusterName=itzhai-com-cluster
# brokerIP地址
brokerIP1=192.168.1.101
# broker通訊埠
listenPort=10911
# broker名稱
brokerName=broker‐1
# 非0表示從節點
brokerId=1
# 2點進行消息刪除
deleteWhen=02
# 消息在磁碟上保留48小時
fileReservedTime=48
# 從節點
brokerRole=SLAVE
# 非同步刷盤
flushDiskType=ASYNC_FLUSH
# 自動創建Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐s
Broker Master2
# NameServer地址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 集群名稱
brokerClusterName=itzhai-com-cluster
# brokerIP地址
brokerIP1=192.168.1.102
# broker通訊埠
listenPort=10911
# broker名稱
brokerName=broker‐2
# 0表示主節點
brokerId=0
# 2點進行消息刪除
deleteWhen=02
# 消息在磁碟上保留48小時
fileReservedTime=48
# 主從同步複製
brokerRole=SYNC_MASTER
# 非同步刷盤
flushDiskType=ASYNC_FLUSH
# 自動創建Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
Broker Slave2
# NameServer地址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 集群名稱
brokerClusterName=itzhai-com-cluster
# brokerIP地址
brokerIP1=192.168.1.103
# broker通訊埠
listenPort=10911
# broker名稱
brokerName=broker‐2
# 非0表示從節點
brokerId=1
# 2點進行消息刪除
deleteWhen=02
# 消息在磁碟上保留48小時
fileReservedTime=48
# 從節點
brokerRole=SLAVE
# 非同步刷盤
flushDiskType=ASYNC_FLUSH
# 自動創建Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐s
寫了那麼多頂層架構圖,不寫寫底層內幕,就不是IT宅(itzhai.com)的文章風格,接下來,我們就來看看底層存儲架構。
4. RocketMQ存儲架構
我們在broker.conf
文件中配置了消息存儲的根目錄:
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
進入這個目錄,我們可以發現如下的目錄結構:
其中:
- abort:該文件在broker啟動時創建,關閉時刪除,如果broker異常退出,則文件會存在,在下次啟動時會走修複流程;
- checkpoint:檢查點,主要存放以下內容:
- physicMsgTimestamp:commitlog文件最後一次落盤時間;
- logicsMsgTimestamp:consumequeue最後一次落盤時間;
- indexMsgTimestamp:索引文件最後一次落盤時間;
- commitlog:存放消息的完整內容,所有的topic消息都會通過文件追加的形式寫入到該文件中;
- config:消息隊列的配置文件,包括了topic配置,消費的偏移量等資訊。其中consumerOffset.json文件存放消息隊列消費的進度;
- consumequeue:topic的邏輯隊列,在消息存放到commitlog之後,會把消息的存放位置記錄到這裡,只有記錄到這裡的消息,才能被消費者消費;
- index:消息索引文件,通過Message Key查詢消息時,是通過該文件進行檢索查詢的。
4.1 RocketMQ消息是如何存儲的
下面我們來看看關鍵的commitlog以及consumequeue:
消息投遞到Broker之後,是先把實際的消息內容存放到CommitLog中的,然後再把消息寫入到對應主題的ConsumeQueue中。其中:
CommitLog:消息的物理存儲文件,存儲實際的消息內容。每個Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。
單個文件大小默認為1G,文件名長度為20位,左邊補零,剩餘為起始偏移量。預分配好空間,消息順序寫入日誌文件。當文件滿了,則寫入下一個文件,下一個文件的文件名基於文件第一條消息的偏移量進行命名;
ConsumeQueue:消息的邏輯隊列,相當於CommitLog的索引文件。RocketMQ是基於Topic主題訂閱模式實現的,每個Topic下會創建若干個邏輯上的消息隊列ConsumeQueue,在消息寫入到CommitLog之後,通過Broker的後台服務執行緒(ReputMessageService)不停地分發請求並非同步構建ConsumeQueue和IndexFile(索引文件,後面介紹),然後把每個ConsumeQueue需要的消息記錄到各個ConsumeQueue中。
ConsumeQueue主要記錄8個位元組的commitLogOffset(消息在CommitLog中的物理偏移量), 4個位元組的msgSize(消息大小), 8個位元組的TagHashcode,每個元素固定20個位元組。
ConsumeQueue相當於CommitLog文件的索引,可以通過ConsumeQueue快速從很大的CommitLog文件中快速定位到需要的消息。
ConsumeQueue的存儲結構
主題消息隊列:在consumequeue目錄下,按照topic的維度存儲消息隊列。
重試消息隊列:如果topic中的消息消費失敗,則會把消息發到重試隊列,重新隊列按照消費端的GroupName來分組,命名規則:%RETRY%ConsumerGroupName
死信消息隊列:如果topic中的消息消費失敗,並且超過了指定重試次數之後,則會把消息發到死信隊列,死信隊列按照消費端的GroupName來分組,命名規則:%DLQ%ConsumerGroupName
假設我們現在有一個topic:itzhai-test
,消費分組:itzhai_consumer_group
,當消息消費失敗之後,我們查看consumequeue目錄,會發現多處了一個重試隊列:
我們可以在RocketMQ的控制台看到這個重試消息隊列的主題和消息:
如果一直重試失敗,達到一定次數之後(默認是16次,重試時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊列:
4.2 RocketMQ是如何保證存取消息的效率的
4.2.1 如何保證高效寫
每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預先分配好1G空間的CommitLog文件,採用順序寫的方式寫入消息,大大的提高寫入的速度。
RocketMQ中消息刷盤主要可以分為同步刷盤和非同步刷盤兩種,通過flushDiskType參數進行配置。如果需要提高寫消息的效率,降低延遲,提高MQ的性能和吞吐量,並且不要求消息數據存儲的高可靠性,可以把刷盤策略設置為非同步刷盤。
4.2.2 如何保證高效讀
為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費消息的索引,使用IndexFile作為基於消息key的查詢的索引。下面來詳細介紹下。
4.2.2.1 ConsumeQueue
讀取消息是隨機讀的,為此,RocketMQ專門建立了ConsumeQueue索引文件,每次先從ConsumeQueue中獲取需要的消息的地址,消息大小,然後從CommitLog文件中根據地址直接讀取消息內容。在讀取消息內容的過程中,也盡量利用到了作業系統的頁快取機制,進一步加速讀取速度。
ConsumeQueue由於每個元素大小是固定的,因此可以像訪問數組一樣訪問每個消息元素。並且佔用空間很小,大部分的ConsumeQueue能夠被全部載入記憶體,所以這個索引查找的速度很快。每個ConsumeQueue文件由30w個元素組成,佔用空間在6M以內。每個文件默認大小為600萬個位元組,當一個ConsumeQueue類型的文件寫滿之後,則寫入下一個文件。
4.2.2.2 IndexFile為什麼按照Message Key查詢效率高?
我們在RocketMQ的store目錄中可以發現有一個index目錄,這個是一個用於輔助提高查詢消息效率的索引文件。通過該索引文件實現基於消息key來查詢消息的功能。
物理存儲結構
IndexFile索引文件物理存儲結構如下圖所示:
- Header:索引頭文件,40 bytes,包含以下資訊:
beginTimestamp
:索引文件中第一個索引消息存入Broker的時間戳;endTimestamp
:索引文件中最後一個索引消息存入Broker的時間戳beginPHYOffset
:索引文件中第一個索引消息在CommitLog中的偏移量;endPhyOffset
:索引文件中最後一個索引消息在CommitLog中的偏移量;hashSlotCount
:構建索引使用的slot數量;indexCount
:索引的總數;
- Slot Table:槽位表,類似於Redis的Slot,或者哈希表的key,使用消息的key的hashcode與slotNum取模可以得到具體的槽的位置。每個槽位佔4 bytes,一個IndexFile可以存儲500w個slot;
- Index Linked List:消息的索引內容,如果哈希取模後發生槽位碰撞,則構建成鏈表,一個IndexFile可以存儲2000w個索引:
Key Hash
:消息的哈希值;Commit Log Offset
:消息在CommitLog中的偏移量;Timestamp
:消息存儲的時間戳;Next Index Offset
:下一個索引的位置,如果消息取模後發生槽位槽位碰撞,則通過此欄位把碰撞的消息構成鏈表。
每個IndexFile文件的大小:40b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。
邏輯存儲結構
IndexFile索引文件的邏輯存儲結構如下圖所示:
IndexFile邏輯上是基於哈希表來實現的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。
4.2.2.3 為什麼按照MessageId查詢效率高?
RocketMQ中的MessageId的長度總共有16位元組,其中包含了:消息存儲主機地址(IP地址和埠),消息Commit Log offset。「
按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的地址(IP地址和埠)和Commit Log的偏移地址後封裝成一個RPC請求後通過Remoting通訊層發送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄並解析成一個完整的消息返回。
4.3 RocketMQ集群是如何做數據分區的?
我們繼續看看在集群模式下,RocketMQ的Topic數據是如何做分區的。IT宅(itzhai.com)提醒大家,實踐出真知。這裡我們部署兩個Master節點:
4.3.1 RocketMQ的Topic在集群中是如何存儲的
我們通過手動配置每個Broker中的Topic,以及ConsumeQueue數量,來實現Topic的數據分片,如,我們到集群中手動配置這樣的Topic:
broker-a
創建itzhai-com-test-1
,4個隊列;broker-b
創建itzhai-com-test-1
,2個隊列。
創建完成之後,Topic分片集群分布如下:
即:
可以發現,RocketMQ是把Topic分片存儲到各個Broker節點中,然後在把Broker節點中的Topic繼續分片為若干等分的ConsumeQueue,從而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源分配的基本單元。
這樣把Topic的消息分區到了不同的Broker上,從而增加了消息隊列的數量,從而能夠支援更塊的並發消費速度(只要有足夠的消費者)。
4.3.2 Broker自動創建Topic會有什麼問題?
假設設置為通過Broker自動創建Topic(autoCreateTopicEnable=true),並且Producer端設置Topic消息隊列數量設置為4,也就是默認值:
producer.setDefaultTopicQueueNums(4);
嘗試往一個新的 topic itzhai-test-queue-1
連續發送10條消息,發送完畢之後,查看Topic狀態:
我們可以發現,在兩個broker上面都創建了itzhai-test-queue-a
,並且每個broker上的消息隊列數量都為4。怎麼回事,我配置的明明是期望創建4個隊列,為什麼加起來會變成了8個?如下圖所示:
由於時間關係,本文我們不會帶大家從源碼方面去解讀為啥會出現這種情況,接下來我們通過一種更加直觀的方式來驗證下這個問題:繼續做實驗。
我們繼續嘗試往一個新的 topic itzhai-test-queue-10
發送1條消息,注意,這一次不做並發發送了,只發送一條,發送完畢之後,查看Topic狀態:
可以發現,這次創建的消息隊列數量又是對的了,並且都是在broker-a上面創建的。接下來,無論怎麼並發發送消息,消息隊列的數量都不會繼續增加了。
其實這也是並發請求Broker,觸發自動創建Topic的bug。
為了更加嚴格的管理Topic的創建和分片配置,一般在生產環境都是配置為手動創建Topic,通過提交運維工單申請創建Topic以及Topic的數據分配。
接下來我們來看看RocketMQ的特性。更多其他技術的底層架構內幕分析,請訪問我的部落格IT宅(itzhai.com)或者關注Java架構雜談公眾號。
5. RocketMQ特性
5.1 生產端
5.1.1 消息發布
RocketMQ中定義了如下三種消息通訊的方式:
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
SYNC
:同步發送,生產端會阻塞等待發送結果;- 應用場景:這種方式應用場景非常廣泛,如重要業務事件通知。
ASYNC
:非同步發送,生產端調用發送API之後,立刻返回,在拿到Broker的響應結果後,觸發對應的SendCallback回調;- 應用場景:一般用於鏈路耗時較長,對 RT 較為敏感的業務場景;
ONEWAY
:單向發送,發送方只負責發送消息,不等待伺服器回應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級別;- 應用場景:適用於耗時非常短,對可靠性要求不高的場景,如日誌收集。
SYNC和ASYNC關注發送結果,ONEWAY不關注發送結果。發送結果如下:
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
SEND_OK
:消息發送成功。SEND_OK並不意味著投遞是可靠的,要確保消息不丟失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;FLUSH_DISK_TIMEOUT
:消息發送成功,但是刷盤超時。如果Broker的flushDiskType=SYNC_FLUSH,並且5秒內沒有完成消息的刷盤,則會返回這個狀態;FLUSH_SLAVE_TIMEOUT
:消息發送成功,但是伺服器同步到Slave時超時。如果Broker的brokerRole=SYNC_MASTER,並且5秒內沒有完成同步,則會返回這個狀態;SLAVE_NOT_AVAILABLE
:消息發送成功,但是無可用的Slave節點。如果Broker的brokerRole=SYNC_MASTER,但是沒有發現SLAVE節點或者SLAVE節點掛掉了,那麼會返回這個狀態。
源碼內容更精彩,歡迎大家進一步閱讀源碼詳細了解消息發送的內幕:
- 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
- 非同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
- 單向發送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)
5.1.2 順序消費
消息的有序性指的是一類消息消費的時候,可以按照發送順序來消費,比如:在Java架構雜談
茶餐廳吃飯產生的消息:進入餐廳、點餐、下單、上菜、付款,消息要按照這個順序消費才有意義,但是多個顧客產生的消息是可以並行消費的。順序消費又分為全局順序消費和分區順序消費:
全局順序
:同一個Topic下的消息,所有消息按照嚴格的FIFO順序進行發布和消費。適用於:性能要求不高,所有消息嚴格按照FIFO進行發布和消費的場景;分區順序
:同一個Topic下,根據消息的特定業務ID進行sharding key分區,同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。適用於:性能要求高,在同一個分區中嚴格按照FIFO進行發布和消費的場景。
一般情況下,生產者是會以輪訓的方式把消息發送到Topic的消息隊列中的:
在同一個Queue裡面,消息的順序性是可以得到保證的,但是如果一個Topic有多個Queue,以輪訓的方式投遞消息,那麼就會導致消息亂序了。
為了保證消息的順序性,需要把保持順序性的消息投遞到同一個Queue中。
5.1.2.1 如何保證消息投遞的順序性
RocketMQ提供了MessageQueueSelector
介面,可以用來實現自定義的選擇投遞的消息隊列的演算法:
for (int i = 0; i < orderList.size(); i++) {
String content = "Hello itzhai.com. Java架構雜談," + new Date();
Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i,
content.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());
System.out.printf("content: %s, sendResult: %s%n", content, sendResult);
}
如上圖,我們實現了MessageQueueSelector
介面,並在實現的select方法裡面,指定了選擇消息隊列的演算法:訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列:
有個異常場景需要考慮:假設某一個Master節點掛掉了,導致Topic的消息隊列數量發生了變化,那麼繼續使用以上的選擇演算法,就會導致在這個過程中同一個訂單的消息會分散到不同的消息隊列裡面,最終導致消息不能順序消費。
為了避免這種情況,只能選擇犧牲failover特性了。
現在投遞到消息隊列中的消息保證了順序,那如何保證消費也是順序的呢?
5.1.2.2 如何保證消息消費的順序性?
RocketMQ中提供了MessageListenerOrderly
,該對象用於有順序收非同步傳遞的消息,一個隊列對應一個消費執行緒,使用方法如下:
consumer.registerMessageListener(new MessageListenerOrderly() {
// 消費次數,用於輔助模擬各種消費結果
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
如果您使用的是MessageListenerConcurrently
,表示並發消費,為了保證消息消費的順序性,需要設置為單執行緒模式。
使用
MessageListenerOrderly
的問題:如果遇到某條消息消費失敗,並且無法跳過,那麼消息隊列的消費進度就會停滯。
5.1.3 延遲隊列(定時消息)
定時消費是指消息發送到Broker之後不會立即被消費,而是等待特定的時間之後才投遞到Topic中。定時消息會暫存在名為SCHEDULE_TOPIC_XXXX
的topic中,並根據delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個queue只存相同延遲的消息,保證具有相同延遲的消息能夠順序消費。比如,我們設置1秒後把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
Broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
定時消息的副作用:定時消息會在第一次寫入Topic和調度寫入實際的topic都會進行計數,因此發送數量,tps都會變高。
使用延遲隊列的場景:提交了訂單之後,如果等待超過約定的時間還未支付,則把訂單設置為超時狀態。
RocketMQ提供了以下幾個固定的延遲級別:
public class MessageStoreConfig {
...
// 10個level,level:1~18
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
...
}
level = 0 表示不使用延遲消息。
另外,消息消費失敗也會進入延遲隊列,消息發送時間與設置的延遲級別和重試次數有關。
以下是發送延遲消息的程式碼:
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 指定該消息在10秒後被消費者消費
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
5.1.4 數據完整性與事務消息
通過消息對系統進行解耦之後,勢必會遇到分散式系統數據完整性的問題。
5.1.4.1 實現分散式事務的手段有哪些?
我們可以通過以下手段解決分散式系統數據最終一致性問題:
- 資料庫層面的
2PC(Two-phase commit protocol)
,二階段提交,同步阻塞,效率低下,存在協調者單點故障問題,極端情況下存在數據不一致的風險。對應技術上的XA、JTA/JTS。這是分散式環境下事務處理的典型模式; - 資料庫層面的
3PC
,三階段提交,引入了參與者超時機制,增加了預提交階段,使得故障恢復之後協調者的決策複雜度降低,但整體的交互過程變得更長了,性能有所下降,仍舊會存在數據不一致的問題; - 業務層面的TCC ,
Try - Confirm - Cancel
。對業務的侵入較大,和業務緊耦合,對於每一個操作都需要定義三個動作分別對應:Try - Confirm - Cancel
,將資源層的兩階段提交協議轉換到業務層,成為業務模型中的一部分; - 本地消息表;
- 事務消息;
RocketMQ事務消息(Transactional Message)則是通過事務消息來實現分散式事務的最終一致性。下面看看RocketMQ是如何實現事務消息的。
5.1.4.2 RocketMQ如何實現事務消息?
如下圖:
事務消息有兩個流程:
- 事務消息發送及提交:
- 發送half消息;
- 服務端響應half消息寫入結果;
- 根據half消息的發送結果執行本地事務。如果發送失敗,此時half消息對業務不可見,本地事務不執行;
- 根據本地事務狀態執行Commit或者Rollback。Commit操作會觸發生成ConsumeQueue索引,此時消息對消費者可見;
- 補償流程:
5. 對於沒有Commit/Rollback的事務消息,會處於pending狀態,這對這些消息,MQ Server發起一次回查;
6. Producer收到回查消息,檢查回查消息對應的本地事務的轉塔體;
7. 根據本地事務狀態,重新執行Commit或者Rollback。
補償階段主要用於解決消息的Commit或者Rollback發生超時或者失敗的情況。
half消息:並不是發送了一半的消息,而是指消息已經發送到了MQ Server,但是該消息未收到生產者的二次確認,此時該消息暫時不能投遞到具體的ConsumeQueue中,這種狀態的消息稱為half消息。
5.1.4.3 RocketMQ事務消息是如何存儲的?
發送到MQ Server的half消息對消費者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue資訊存儲到消息的屬性中,然後把該half消息投遞到一個專門的處理事務消息的隊列中:RMQ_SYS_TRANS_HALF_TOPIC
,由於消費者沒有訂閱該Topic,所以無法消息half類型的消息。
生產者執行Commit half消息的時候,會存儲一條專門的Op消息,用於標識事務消息已確定的狀態,如果一條事務消息還沒有對應的Op消息,說明這個事務的狀態還無法確定。RocketMQ會開啟一個定時任務,對於pending狀態的消息,會先向生產者發送回查事務狀態請求,根據事務狀態來決定是否提交或者回滾消息。
當消息被標記為Commit狀態之後,會把half消息的Topic和Queue相關屬性還原為原來的值,最終構建實際的消費索引(ConsumeQueue)。
RocketMQ並不會無休止的嘗試消息事務狀態回查,默認查找15次,超過了15次還是無法獲取事務狀態,RocketMQ默認回滾該消息。並列印錯誤日誌,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個行為。
可以通過Broker的配置參數:transactionCheckMax來修改此值。
5.1.5 消息重投
如果消息發布方式是同步發送會重投,如果是非同步發送會重試。
消息重投可以儘可能保證消息投遞成功,但是可能會造成消息重複。
什麼情況會造成重複消費消息?
- 出現消息量大,網路抖動的時候;
- 生產者主動重發;
- 消費負載發生變化。
可以使用的消息重試策略:
retryTimesWhenSendFailed
:設置同步發送失敗的重投次數,默認為2。所以生產者最多會嘗試發送retryTimesWhenSendFailed+1次。- 為了最大程度保證消息不丟失,重投的時候會嘗試向其他broker發送消息;
- 超過重投次數,拋出異常,讓客戶端自行處理;
- 觸發重投的異常:RemotingException、MQClientException和部分MQBrokerException;
retryTimesWhenSendAsyncFailed
:設置非同步發送失敗重試次數,非同步重試不會選擇其他Broker,不保證消息不丟失;retryAnotherBrokerWhenNotStoreOK
:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false。重要的消息可以開啟此選項。
oneway發布方式不支援重投。
5.1.6 批量消息
為了提高系統的吞吐量,提高發送效率,可以使用批量發送消息。
批量發送消息的限制:
- 同一批批量消息的topic,waitStoreMsgOK屬性必須保持一致;
- 批量消息不支援延遲隊列;
- 批量消息一次課發送的上限是4MB。
發送批量消息的例子:
String topic = "itzhai-test-topic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));
producer.send(messages);
如果發送的消息比較多,會增加複雜性,為此,可以對大消息進行拆分。以下是拆分的例子:
public class ListSplitter implements Iterator<List<Message>> {
// 限制最大大小
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
}
}
// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}
5.1.7 消息過濾
RocketMQ的消費者可以根據Tag進行消息過濾來獲取自己感興趣的消息,也支援自定義屬性過濾。
Tags是Topic下的次級消息類型/二級類型(註:Tags也支援TagA || TagB
這樣的表達式),可以在同一個Topic下基於Tags進行消息過濾。
消息過濾是在Broker端實現的,減少了對Consumer無用消息的網路傳輸,缺點是增加了Broker負擔,實現相對複雜。
5.2 消費端
5.2.1 消費模型
消費端有兩周消費模型:集群消費和廣播消費。
集群消費
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
廣播消費
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
5.2.2 消息重試
RocketMQ會為每個消費組都設置一個Topic名稱為%RETRY%consumerGroupName
的重試隊列(這裡需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用於暫時保存因為各種異常而導致Consumer端無法消費的消息。
考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。
RocketMQ對於重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX
的延遲隊列中,後台定時任務按照對應的時間進行Delay後重新保存至%RETRY%consumerGroupName
的重試隊列中。
比如,我們設置1秒後把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
5.2.3 死信隊列
當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。
RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)
,將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)
。
在RocketMQ中,可以通過使用console控制台對死信隊列中的消息進行重發來使得消費者實例再次進行消費。
由於RocketMQ是使用Java寫的,所以它的程式碼特別適合拿來閱讀消遣,我們繼續來看看RocketMQ的源碼結構…
不不,還是算了,一下子又到周末晚上了,時間差不多了,今天就寫到這裡了。有空再聊。
我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的裡面有,面試官不懂的裡面也有,有了它,不怕面試官連環問,就怕面試官一上來就問你Redis的Redo Log是幹啥的?畢竟這種問題我也不會。
在Java架構雜談
公眾號發送Redis
關鍵字獲取pdf文件:
本文作者: arthinking
部落格鏈接: //www.itzhai.com/articles/deep-understanding-of-rocketmq.html
高並發非同步解耦利器:RocketMQ究竟強在哪裡?
版權聲明: 版權歸作者所有,未經許可不得轉載,侵權必究!聯繫作者請加公眾號。
References
apache/rocketmq. Retrieved from //github.com/apache/rocketmq