「查缺補漏」鞏固你的RocketMQ知識體系
Windows安裝部署
下載
地址:[//www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip]
選擇『Binary』進行下載
解壓已下載工程
配置
新增系統變量
ROCKETMQ_HOME -> F:\RocketMQ\rocketmq-4.5.2JAVA_HOME -> F:\Java_JDK\JDK1.8
Path 系統變量新增:Maven/bin目錄
PS:RocketMQ 消息存儲在C:\Users\Administrator\store store目錄中
文件佔用較大,注意刪除不必要的內容
啟動
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Rocket集成可視化監控插件
-
任意目錄(拉取項目,隨便哪裡都行)git clone //github.com/apache/rocketmq-externals.git
-
進入『rocketmq-externals\rocketmq-console\src\main\resources』文件夾,打開『application.properties』進行配置
-
其實就是一個SpringBoot服務,確定好端口,別重複即可
server.port=8100
rocketmq.config.namesrvAddr=127.0.0.1:9876
-
進入『\rocketmq-externals\rocketmq-console』文件夾
執行『mvn clean package -Dmaven.test.skip=true』,編譯生成target
java -jar rocketmq-console-ng-1.0.1.jar
-
根據配置地址訪問: //127.0.0.1:8100
Rocket可視化監控插件 增加Topic | 自動增加Topic(4.5.2版本)
4.5.2 版本 支持自動創建Topic
4.3.0 版本 必須通過監控程序配置Topic,否則執行程序報錯,沒有此路由
SpringBoot集成 RocketMQ
<!--RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
RocketMQ基本概念
概覽
基於RocketMQ的分佈式系統,一般可以分為四個集群:Name server、broker、producer、consumer
-
name server
-
提供輕量級的服務發現和路由服務;
-
每個節點都存放了全部的路由信息和對應的讀寫服務;
-
存儲支持水平擴展
-
-
broker
- 提供滿足TOPIC和QUEUE機制的消息存儲服務;
- 有推和拉兩種模式;
- 通過2或3拷貝實現高可用;
- 提供上億消息的堆積能力;
- 提供故障恢復、統計功能和告警功能;
-
producer
- 支持分佈式部署,通過負載平衡模塊給broker發消息
- 支持快速失敗
- 低延遲
-
consumer
- 支持推和拉兩種模式
- 支持集群消費和廣播消費
核心模塊
Name Server
提供Broker管理;Routing管理(路由管理)
NameServer,很多時候稱為命名發現服務,其在RocketMQ中起着中轉承接的作用,是一個無狀態的服務,多個NameServer之間不通信。任何Producer,Consumer,Broker與所有NameServer通信,向NameServer請求或者發送數據。而且都是單向的,Producer和Consumer請求數據,Broker發送數據。正是因為這種單向的通信,RocketMQ水平擴容變得很容易
- 提供輕量級的服務發現和路由服務;
- 每個節點都存放了全部的路由信息和對應的讀寫服務;
- 存儲支持水平擴展
總結:相比於ZooKeeper提供的分佈式鎖,發佈和訂閱,數據一致性,選舉等,在RocketMQ是不適用的,因此重寫了一套更加輕量級的發現服務,主要用以存儲 Broker相關信息以及當前Broker上的topic信息,路由信息等
Broker Server
提供Remoting Module、客戶端管理、存儲服務、HA服務(主從)、索引服務
- 提供滿足TOPIC和QUEUE機制的消息存儲服務;
- 有推和拉兩種模式;
- 通過2或3拷貝實現高可用;
- 提供上億消息的堆積能力;
- 提供故障恢復、統計功能和告警功能;
producer
- 支持分佈式部署,通過負載平衡模塊給broker發消息
- 支持快速失敗
- 低延遲
consumer
- 支持推和拉兩種模式
- 支持集群消費和廣播消費
核心角色介紹
生產者
生產者發送業務系統產生的消息給broker, RocketMQ提供了多種發送方式:同步的、異步的、單向的
生產者組
具有相同角色的生產者被分到一組, 假如原始的生產者在事務後崩潰,broker會聯繫 同一生產者組中的不同生產者實例,繼續提交或回滾事務
消費者
一個消費者從broker拉取信息,並將信息返還給應用。為了我們應用的正確性,提供了兩種消費者類型:
拉式消費者:拉式消費者從broker拉取消息,一旦一批消息被拉取,用戶應用系統將發起消費過程。
推式消費者:推式消費者,從另一方面講,囊括了消息的拉取、消費過程,並保持了內部的其他工作,留下了一個回調 接口給終端用戶去實現,實現在消息到達時要執行的內容。
消費者組
具有相同角色的消費者被組在一起,稱為消費者組,它完成了負載均衡和容錯的目標
一個消費組中的消費者實例必須有確定的相同的訂閱topic
Topic(主題)
Topic是一個消息的目錄,在這個目錄中,生產者傳送消息,消費者拉取消息,可以多個消費者訂閱同一個topic,一個生產者也可以發送多個topic
PS:RocketMQ 基於發佈訂閱模式,發佈訂閱的核心即 Topic 主題
Message(消息)
消息是被傳遞的信息。一個消息必須有一個Topic,它可以理解為信件上的地址。一個消息也可以有一個可選的tag,和額外的key-value對。 例如:你可以設置業務中的鍵到你的消息中,在broker服務中查找消息,以便在開發期間診斷問題
消息隊列
Topic被分割成一個或多個消息隊列。隊列分為3中角色:異步主、同步主、從。如果你不能容忍消息丟失,我們建議你部署同步主,並加一個從隊列。 如果你容忍丟失,但你希望隊列總是可用,你可以部署異步主和從隊列。如果你想最簡單,你只需要一個異步主,不需要從隊列。 消息保存磁盤的方式也有兩種,推薦使用的是異步保存,同步保存是昂貴的並會導致性能損失,如果你想要可靠性,我們推薦你使用同步主+從的方式。
Tag(標籤)
標籤,用另外一個詞來說,就是子主題,為用戶提供額外的靈活性。具有相同Topic的消息可以有不同的tag。
Broker(隊列)
Broker是RocketMQ的一個主要組件,它接收生產者發送的消息,存儲它們並準備處理消費者的拉取請求。它也存儲消息相關的元數據, 包括消費組,消費成功的偏移量,主題、隊列的信息。
名稱服務
名稱服務主要提供路由信息。生產者/消費者客戶端尋找topic,並找到通信的隊列列表。
消息的順序
當DefaultMQPushConsumer
被使用,你就要決定消費消息時,是順序消費還是同時消費
- 順序消費
順序消費消息的意思是 消息將按照生產者發送到隊列時的順序被消費掉。如果你被強制要求使用全局的順序,你要確保你的topic只有一個消息隊列。
如果指定順序消費,消息被同時消費的數量就是訂閱這個topic的消費組的數量。
- 同時消費
當同時消費消息時,消息同時消費的最大數量取決於消費客戶端指定的線程池的大小。
最佳實踐
Producer最佳實踐
-
一個應用儘可能用一個 Topic,消息子類型用 tags 來標識,tags 可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。
-
每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。由於是哈希索引,請務必保證 key 儘可能唯一,這樣可以避免潛在的哈希衝突。
消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
-
對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
-
某些應用如果不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。
Consumer最佳實踐
- 消費過程要做到冪等(即消費端去重)
- 盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。
- 優化每條消息消費過程
MQ核心問題
1.消息隊列適合解決的問題
解決的核心問題主要是:異步、解耦、削峰
但是引入消息隊列也會有很多額外的問題,比如系統複雜性會大大增加,同時需要解決重複下發,重複消費,消費順序,消息丟失,重試機制等等問題,因此不能濫用,合適的場景用合適的技術
2.消息模型:主題和隊列的區別
一、消息隊列的演進
1、初始階段
最初的消息隊列,就是一個嚴格意義上的隊列。隊列是一種數據結構,先進先出,在消息入隊出隊過程中,保證這些消息嚴格有序。早期的消息隊列就是按照「隊列」的數據結構設計的。
隊列模型:
生產者(Producer)發消息就是入隊操作,消費者(Consumer)收消息就是出隊也就是刪除操作,服務端存放消息的容器自然就稱為「隊列」。
- 如果有多個生產者往同一個隊列裏面發送消息,這個隊列中可以消費到的消息,就是這些生產者生產的所有消息的合集。消息的順序就是這些生產者發送消息的自然順序。
- 如果有多個消費者接收同一個隊列的消息,這些消費者之間實際上是競爭的關係,每個消費者只能收到隊列中的一部分消息,也就是說任何一條消息只能被其中的一個消費者收到。
2、發佈 – 訂閱模型階段
如果需要將一份消息數據分發給多個消費者,要求每個消費者都能收到全量的消息,例如,對於一份訂單數據,風控系統、分析系統、支付系統等都需要接收消息。
這個時候,單個隊列就滿足不了需求,一個可行的解決方式是,為每個消費者創建一個單獨的隊列,讓生產者發送多份。但是同樣的一份消息數據被複制到多個隊列中會浪費資源,更重要的是,生產者必須知道有多少個消費者。為每個消費者單獨發送一份消息,這實際上違背了消息隊列「解耦」這個設計初衷。
為了解決這個問題,演化出了另外一種消息模型:發佈 – 訂閱模型(Publish-Subscribe Pattern)
消息的發送方稱為發佈者(Publisher),消息的接收方稱為訂閱者(Subscriber),服務端存放消息的容器稱為主題(Topic)。
- 發佈者將消息發送到主題中,訂閱者在接收消息之前需要先「訂閱主題」。
- 每份訂閱中,訂閱者都可以接收到主題的所有消息。
3、總結:
- 在很長的一段時間,隊列模式和發佈 – 訂閱模式是並存的。
- 有些消息隊列同時支持這兩種消息模型,比如 ActiveMQ。
- 對比這兩種模型,生產者就是發佈者,消費者就是訂閱者,隊列就是主題,並沒有本質的區別。它們最大的區別是:一份消息數據能不能被消費多次的問題。
- 實際上,在這種發佈 – 訂閱模型中,如果只有一個訂閱者,那它和隊列模型就基本是一樣的了。也就是說,發佈 – 訂閱模型在功能層面上是可以兼容隊列模型的。
二、RabbitMQ 的消息模型
少數依然堅持使用隊列模型的產品之一。
RabbitMQ 使用 Exchange 模塊解決多個消費者的問題。Exchange 位於生產者和隊列之間,生產者並不關心將消息發送給哪個隊列,而是將消息發送給 Exchange,由 Exchange 上配置的策略來決定將消息投遞到哪些隊列中。
- 同一份消息如果需要被多個消費者來消費,需要配置 Exchange 將消息發送到多個隊列,每個隊列中都存放一份完整的消息數據,可以為一個消費者提供消費服務。
三、RocketMQ 的消息模型
RocketMQ 使用的消息模型是標準的發佈 – 訂閱模型。在 RocketMQ 也有隊列(Queue)這個概念。
消息隊列的消費機制:
幾乎所有的消息隊列產品都使用一種非常樸素的「請求 – 確認」機制,確保消息不會在傳遞過程中由於網絡或服務器故障丟失。
在生產端,生產者先將消息發送給服務端,也就是 Broker,服務端在收到消息並將消息寫入主題或者隊列中後,會給生產者發送確認的響應。如果生產者沒有收到服務端的確認或者收到失敗的響應,則會重新發送消息。
在消費端,消費者在收到消息並完成自己的消費業務邏輯(比如,將數據保存到數據庫中)後,也會給服務端發送消費成功的確認,服務端只有收到消費確認後,才認為一條消息被成功消費,否則它會給消費者重新發送這條消息,直到收到對應的消費成功確認。
這個確認機制很好地保證了消息傳遞過程中的可靠性,但是,引入這個機制在消費端帶來了一個問題:為了確保消息的有序性,在某一條消息被成功消費之前,下一條消息是不能被消費的,也就是說,每個主題在任意時刻,至多只能有一個消費者實例在進行消費,那就沒法通過水平擴展消費者的數量來提升消費端總體的消費性能。
為了解決這個問題,RocketMQ 在主題下面增加了隊列的概念:
- 每個主題包含多個隊列,通過多個隊列來實現多實例並行生產和消費。需要注意的是,RocketMQ 只在隊列上保證消息的有序性,主題層面是無法保證消息的嚴格順序的。
- 生產者會往所有隊列發消息,但不是「同一條消息每個隊列都發一次」,每條消息只會往某個隊列裏面發送一次。
- 一個消費組,每個隊列上只能串行消費,多個隊列加一起就是並行消費了,並行度就是隊列數量,隊列數量越多並行度越大,所以水平擴展可以提升消費性能。
- 每隊列每消費組維護一個消費位置(offset),記錄這個消費組在這個隊列上消費到哪兒了。
- 訂閱者是通過消費組(Consumer Group)來體現的。每個消費組都消費主題中一份完整的消息,不同消費組之間消費進度彼此不受影響,也就是說,一條消息被 Consumer Group1 消費過,也會再給 Consumer Group2 消費。
- 消費組中包含多個消費者,同一個組內的消費者是競爭消費的關係,每個消費者負責消費組內的一部分消息。如果一條消息被消費者 Consumer1 消費了,那同組的其他消費者就不會再收到這條消息。
- 由於消息需要被不同的組進行多次消費,所以消費完的消息並不會立即被刪除,這就需要 RocketMQ 為每個消費組在每個隊列上維護一個消費位置(Consumer Offset),這個位置之前的消息都被消費過,之後的消息都沒有被消費過,每成功消費一條消息,消費位置就加一。我們在使用消息隊列的時候,丟消息的原因大多是由於消費位置處理不當導致的。
四、Kafka 的消息模型
Kafka 的消息模型和 RocketMQ 是完全一樣的,唯一的區別是,在 Kafka 中,隊列這個概念的名稱不一樣,Kafka 中對應的名稱是「分區(Partition)」,含義和功能是沒有任何區別的。
五、總結
- 常用的消息隊列中,RabbitMQ 採用的是隊列模型,但是它一樣可以實現發佈 – 訂閱的功能。RocketMQ 和 Kafka 採用的是發佈 – 訂閱模型,並且二者的消息模型是基本一致的。
3.消息丟失怎麼辦? 如何保證消息的可靠性傳輸?
首先如何驗證消息是否丟失?
- 如果是 IT 基礎設施比較完善的公司,一般都有分佈式鏈路追蹤系統,使用類似的追蹤系統可以很方便地追蹤每一條消息。
- 如果沒有這樣的追蹤系統,我們可以利用消息隊列的有序性來驗證是否有消息丟失
即保證消息消費順序的情況下,根據消息的序號,在消費段判斷是否連續
解決方案:
消息從生產到消費的過程中,可以劃分三個階段:
1、生產階段
消息隊列通過最常用的請求確認機制,來保證消息的可靠傳遞:當你代碼調用發消息方法時,消息隊列客戶端會把消息發送到Broker,Broker收到消息後,會給客戶端返回一個確認響應,表明消息已收到。客戶端收到響應後,完成了一次正常消息的發送。
有些消息隊列在長時間沒收到發送確認響應後,會自動重試,如果重試失敗,就會以返回值或者異常的方式告知用戶。在編寫發送消息的代碼時,需要注意,正確處理返回值或者捕獲異常,就可以保證這個階段的消息不會丟失。
同步發送時,只要注意捕獲異常即可。
異步發送時,則需要在回調方法里進行檢查。這個地方需要特別注意,很多丟消息的原因就是,我們使用了異步發送,卻沒有在回調中檢查發送結果。
2、存儲階段
在存儲階段正常情況下,只要Broker在正常運行,就不會出現丟消息的問題;但是如果Broker出現故障,比如進程死掉或者服務器宕機,還是可能會丟失消息的。
如果對消息的可靠性要求非常高,可以通過配置Broker參數來避免因為宕機丟消息:
- 對於單個節點的 Broker,需要配置 Broker 參數,在收到消息後,將消息寫入磁盤後再給 Producer 返回確認響應,這樣即使發生宕機,由於消息已經被寫入磁盤,就不會丟失消息,恢復後還可以繼續消費。例如,在 RocketMQ 中,需要將刷盤方式 flushDiskType 配置為 SYNC_FLUSH 同步刷盤。
- 對於 Broker 是由多個節點組成的集群,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回複發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。
3、消息階段
消費階段採用和生產階段類似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息後,執行用戶的消費業務邏輯,成功後,才會給 Broker 發送消費確認響應。如果 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程中丟失,也不會因為客戶端在執行消費邏輯中出錯導致丟失。
在編寫消費代碼時需要注意的是:不要在收到消息後就立即發送消費確認,而是應該在執行完所有消費業務邏輯之後,再發送消費確認。
4.處理消費過程中的重複消息
在消息傳遞過程中,如果出現傳遞失敗的情況,發送方會執行重試,重試過程中就有可能產生重複的消息。如果沒有對重複消息進行處理,就可能導致系統的數據出現錯誤。
比如,一個消費訂單消息,統計下單金額的微服務,如果沒有正確處理重複消息,那就會出現重複統計,導致統計結果錯誤。
一、消息重複的情況必然存在
在MQTT協議中,給出了三種傳遞消息時能夠提供的服務質量標準:
- At most once:至多一次。最多會被送達一次,也就是說沒有消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不高的監控場景使用,比如每分鐘上報一次機房溫度數據,可以接受數據少量丟失。
- At least once:至少一次。至少會被送達一次,也就是說不允許丟消息,但是允許有少量重複消息出現。
- Exactly once:恰好一次。只會被送達一次,不允許丟失也不允許重複,這個是最高等級。
這個服務質量標準不僅適用於 MQTT,對所有的消息隊列都是適用的。常用的絕大部分消息隊列提供的服務質量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是說,消息隊列很難保證消息不重複。
注意:Kafka 支持的「Exactly once」和我們剛剛提到的消息傳遞的服務質量標準「Exactly once」是不一樣的,它是 Kafka 提供的另外一個特性,Kafka 中支持的事務也和我們通常意義理解的事務有一定的差異。在 Kafka 中,事務和 Excactly once 主要是為了配合流計算使用的特性。
二、用冪等性解決重複消息問題
冪等本來是一個數學上的概念,它的定義是:如果一個函數f(x)滿足:f(f(x)) = f(x),則函數f(x)滿足米冪等性。擴展到計算機領域,被用來描述一個操作、方法或者服務。
- 一個冪等操作的特點是,其任意多次執行所產生的影響均與一次執行的影響相同。
- 一個冪等方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。所以不用擔心重複執行會對系統造成任何改變。
舉例:
1、在不考慮並發的情況下,「將賬戶 X 的餘額設置為 100 元」,執行一次後對系統的影響是,賬戶 X 的餘額變成了 100 元。只要提供的參數 100 元不變,那即使再執行多少次,賬戶 X 的餘額始終都是 100 元,不會變化,這個操作就是一個冪等的操作。
2、「將賬戶 X 的餘額加 100 元」,這個操作它就不是冪等的,每執行一次,賬戶餘額就會增加 100 元,執行多次和執行一次對系統的影響(也就是賬戶的餘額)是不一樣的。
如果消費消息的業務邏輯具備冪等性,那就不用擔心消息重複的問題,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。消費多次等於消費一次。從對系統的影響結果來說:At least once + 冪等消費 = Exactly once。
實現冪等操作最好的方式是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。
常用的設計冪等操作的方法:
(1)利用數據庫的唯一約束實現冪等
上面提到的那個不具備冪等特性的轉賬的例子:將賬戶 X 的餘額加 100 元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。
首先,我們可以限定,對於每個轉賬單每個賬戶只可以執行一次變更操作,在分佈式系統中,這個限制實現的方法非常多,最簡單的是我們在數據庫中建一張轉賬流水表,這個表有三個字段:轉賬單 ID、賬戶 ID 和變更金額,然後給轉賬單 ID 和賬戶 ID 這兩個字段聯合起來創建一個唯一約束,這樣對於相同的轉賬單 ID 和賬戶 ID,表裡至多只能存在一條記錄。
這樣,我們消費消息的邏輯可以變為:「在轉賬流水表中增加一條轉賬記錄,然後再根據轉賬記錄,異步操作更新用戶餘額即可。」在轉賬流水表增加一條轉賬記錄這個操作中,由於我們在這個表中預先定義了「賬戶 ID 轉賬單 ID」的唯一約束,對於同一個轉賬單同一個賬戶只能插入一條記錄,後續重複的插入操作都會失敗,這樣就實現了一個冪等的操作。
基於這個思路,不光是可以使用關係型數據庫,只要是支持類似「INSERT IF NOT EXIST」語義的存儲類系統都可以用於實現冪等,比如,你可以用 Redis 的 SETNX 命令來替代數據庫中的唯一約束,來實現冪等消費。
(2)為更新的數據設置前置條件
給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。這樣,重複執行這個操作時,由於第一次更新數據的時候已經變更了前置條件中需要判斷的數據,不滿足前置條件,則不會重複執行更新數據操作。
比如,「將賬戶 X 的餘額增加 100 元」這個操作並不滿足冪等性,我們可以把這個操作加上一個前置條件,變為:「如果賬戶 X 當前的餘額為 500 元,將餘額加 100 元」,這個操作就具備了冪等性。對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的餘額,在消費的時候進行判斷數據庫中,當前餘額是否與消息中的餘額相等,只有相等才執行變更操作。
但是,如果我們要更新的數據不是數值,或者我們要做一個比較複雜的更新操作怎麼辦?用什麼作為前置判斷條件呢?更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等更新。
(3)記錄並檢查操作
如果上面提到的兩種實現冪等方法都不能適用於你的場景,還有一種通用性最強,適用範圍最廣的實現冪等性方法:記錄並檢查操作,也稱為「Token 機制或者 GUID(全局唯一 ID)機制」,實現的思路特別簡單:在執行數據更新操作之前,先檢查一下是否執行過這個更新操作。這種方法適用範圍最廣,但是實現難度和複雜度也比較高,一般不推薦使用。
具體的實現方法是,在發送消息時,給每條消息指定一個全局唯一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,如果沒有消費過,才更新數據,然後將消費狀態置為已消費。
在分佈式系統中,這個方法其實是非常難實現的。首先,給每個消息指定一個全局唯一的 ID 就是一件不那麼簡單的事兒,方法有很多,但都不太好同時滿足簡單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在「檢查消費狀態,然後更新數據並且設置消費狀態」中,三個操作必須作為一組操作保證原子性,才能真正實現冪等,否則就會出現 Bug。
比如說,對於同一條消息:「全局 ID 為 8,操作為:給 ID 為 666 賬戶增加 100 元」,有可能出現這樣的情況:
- t0 時刻:Consumer A 收到條消息,檢查消息執行狀態,發現消息未處理過,開始執行「賬戶增加 100 元」;
- t1 時刻:Consumer B 收到條消息,檢查消息執行狀態,發現消息未處理過,因為這個時刻,Consumer A 還未來得及更新消息執行狀態。
這樣就會導致賬戶被錯誤地增加了兩次 100 元,這是一個在分佈式系統中非常容易犯的錯誤,一定要引以為戒。對於這個問題,當然我們可以用事務來實現,也可以用鎖來實現,但是在分佈式系統中,無論是分佈式事務還是分佈式鎖都是比較難解決問題。
5.利用事務消息實現分佈式事務
一、消息事務
其實很多場景下,我們「發消息」這個過程,目的往往是通知另外一個系統或者模塊去更新數據,消息隊列中的「事務」,主要解決消息生產者和消息消費者的數據一致性問題。
用戶在電商APP上購物時,先把商品加到購物車裡,然後幾件商品一起下單,最後支付,完成購物流程。
這個過程中有一個需要用到消息隊列的步驟,訂單系統創建訂單後,發消息給購物車系統,將已下單的商品從購物車中刪除。因為從購物車刪除已下單商品這個步驟,並不是用戶下單支付這個主要流程中必要的步驟,使用消息隊列來異步清理購物車是更加合理。
對於訂單系統,它創建訂單的過程實際執行了2個步驟的操作:
- 在訂單庫中插入一條訂單數據,創建訂單;
- 發消息給消息隊列,消息的內容就是剛剛創建的訂單
對於購物車系統:
- 訂閱相應的主題,接收訂單創建的消息,然後清理購物車,在購物車中刪除訂單的商品。
在分佈式系統中,上面提到的步驟,任何一個都有可能失敗,如果不做任何處理,那就有可能出現訂單數據與購物車數據不一致的情況,比如:
- 創建了訂單,沒有清理購物車;
- 訂單沒創建成功,購物車裏面的商品卻被清掉了。
所以我們需要解決的問題為:在上述任意步驟都有可能失敗的情況下,還要保證訂單庫和購物車庫這兩個庫的數據一致性。
二、分佈式事務
分佈式事務就是要在分佈式系統中實現事務。在分佈式系統中,在保證可用性和不嚴重犧牲性能的前提下,光是要實現數據的一致性就已經非常困難了,顯然實現嚴格的分佈式事務是更加不可能完成的任務。所以目前大家所說的分佈式事務,更多情況下,是在分佈式系統中事務的不完整實現,在不同的應用場景中,有不同的實現,目的都是通過一些妥協來解決實際問題。
常見的分佈式事務實現:
- 2PC(Two-phase Commit,也叫二階段提交)
- TCC(Try-Confirm-Cancel)
- 事務消息
每一種實現都有其特定的使用場景,也有各自的問題,都不是完美的解決方案。
事務消息適用的場景主要是那些需要異步更新數據,並且對數據實時性要求不太高的場景。比如在創建訂單後,如果出現短暫的幾秒,購物車裡的商品沒有被及時情況,也不是完全不可接受的,只要最終購物車的數據和訂單數據保持一致就可。
三、消息隊列實現分佈式事務
事務消息需要消息隊列提供相應的功能才能實現,kafka和RocketMQ都提供了事務相關功能。
對於訂單系統:
- 首先,訂單系統在消息隊列上開啟一個事務。
- 然後訂單系統給消息服務器發送一個「半消息」,這個半消息不是說消息內容不完整,它包含的內容就是完整的消息內容,半消息和普通消息的唯一區別是,在事務提交之前,對於消費者來說,這個消息是不可見的。
- 半消息發送成功後,訂單系統就可以執行本地事務了,在訂單庫中創建一條訂單記錄,並提交訂單庫的數據庫事務。
- 然後根據本地事務的執行結果決定提交或者回滾事務消息。如果訂單創建成功,那就提交事務消息,購物車系統就可以消費到這條消息繼續後續的流程。如果訂單創建失敗,那就回滾事務消息,購物車系統就不會收到這條消息。這樣就基本實現了「要麼都成功,要麼都失敗」的一致性要求。
對於購物車系統:
- 對於購物車系統收到訂單創建成功消息清理購物車這個操作來說,失敗的處理比較簡單,只要成功執行購物車清理後再提交消費確認即可,如果失敗,由於沒有提交消費確認,消息隊列會自動重試。
如果在第四步提交事務消息時失敗了怎麼辦?Kafka 和 RocketMQ 給出了 2 種不同的解決方案:
1、Kafka 的解決方案:
直接拋出異常,讓用戶自行處理。我們可以在業務代碼中反覆重試提交,直到提交成功,或者刪除之前創建的訂單進行補償。
2、RocketMQ 的解決方案:
在 RocketMQ 中的事務實現中,增加了事務反查的機制來解決事務消息提交失敗的問題。如果 Producer 也就是訂單系統,在提交或者回滾事務消息時發生網絡異常,RocketMQ 的 Broker 沒有收到提交或者回滾的請求,Broker 會定期去 Producer 上反查這個事務對應的本地事務的狀態,然後根據反查結果決定提交或者回滾這個事務。為了支撐這個事務反查機制,我們的業務代碼需要實現一個反查本地事務狀態的接口,告知 RocketMQ 本地事務是成功還是失敗。
綜合上面講的通用事務消息的實現和 RocketMQ 的事務反查機制,使用 RocketMQ 事務消息功能實現分佈式事務的流程如下圖:
6.消息隊列中的順序問題
當我們說順序時,我們在說什麼?
日常思維中,順序大部分情況會和時間關聯起來,即時間的先後表示事件的順序關係。
比如事件A發生在下午3點一刻,而事件B發生在下午4點,那麼我們認為事件A發生在事件B之前,他們的順序關係為先A後B。
上面的例子之所以成立是因為他們有相同的參考系,即他們的時間是對應的同一個物理時鐘的時間。如果A發生的時間是北京時間,而B依賴的時間是東京時間,那麼先A後B的順序關係還成立嗎?
如果沒有一個絕對的時間參考,那麼A和B之間還有順序嗎,或者說怎麼斷定A和B的順序?
顯而易見的,如果A、B兩個事件之間如果是有因果關係的,那麼A一定發生在B之前(前因後果,有因才有果)。相反,在沒有一個絕對的時間的參考的情況下,若A、B之間沒有因果關係,那麼A、B之間就沒有順序關係。
那麼,我們在說順序時,其實說的是:
- 有絕對時間參考的情況下,事件的發生時間的關係;
- 和沒有時間參考下的,一種由因果關係推斷出來的happening before的關係;
在分佈式環境中討論順序
當把順序放到分佈式環境(多線程、多進程都可以認為是一個分佈式的環境)中去討論時:
- 同一線程上的事件順序是確定的,可以認為他們有相同的時間作為參考
- 不同線程間的順序只能通過因果關係去推斷
(點表示事件,波浪線箭頭表示事件間的消息)
上圖中,進程P中的事件順序為p1->p2->p3->p4(時間推斷)。而因為p1給進程Q的q2發了消息,那麼p1一定在q2之前(因果推斷)。但是無法確定p1和q1之間的順序關係。
推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會透徹的分析分佈式系統中的順序問題。
消息中間件中的順序消息
什麼是順序消息
有了上述的基礎之後,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。
順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發佈和消費的消息類型。順序消息由兩個部分組成:順序發佈和順序消費。
順序消息包含兩種類型:
分區順序:一個Partition內所有的消息按照先進先出的順序進行發佈和消費
全局順序:一個Topic內所有的消息按照先進先出的順序進行發佈和消費
這是阿里雲上對順序消息的定義,把順序消息拆分成了順序發佈和順序消費。那麼多線程中發送消息算不算順序發佈?
如上一部分介紹的,多線程中若沒有因果關係則沒有順序。那麼用戶在多線程中去發消息就意味着用戶不關心那些在不同線程中被發送的消息的順序。即多線程發送的消息,不同線程間的消息不是順序發佈的,同一線程的消息是順序發佈的。這是需要用戶自己去保障的。
而對於順序消費,則需要保證哪些來自同一個發送線程的消息在消費時是按照相同的順序被處理的(為什麼不說他們應該在一個線程中被消費呢?)。
全局順序其實是分區順序的一個特例,即使Topic只有一個分區(以下不在討論全局順序,因為全局順序將面臨性能的問題,而且絕大多數場景都不需要全局順序)。
如何保證順序
在MQ的模型中,順序需要由3個階段去保障:
- 消息被發送時保持順序
- 消息被存儲時保持和發送的順序一致
- 消息被消費時保持和存儲的順序一致
發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中採用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之後必須按照先A後B的順序被處理。
如下圖所示:
對於兩個訂單的消息的原始數據:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):
-
在發送時,a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒有順序關係,這意味着a、b訂單的消息可以在不同的線程中被發送出去
-
在存儲時,需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證
-
- a1、b1、b2、a2、a3、b3是可以接受的
- a1、a2、b1、b2、a3、b3也是可以接受的
- a1、a3、b1、b2、a2、b3是不能接受的
-
消費時保證順序的簡單方式就是「什麼都不做」,不對收到的消息的順序進行調整,即只要一個分區的消息只由一個線程處理即可;當然,如果a、b在一個分區中,在收到消息後也可以將他們拆分到不同線程中處理,不過要權衡一下收益
開源RocketMQ中順序的實現
上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區中。文檔只是給出了Producer順序的處理,Consumer消費時通過一個分區只能有一個線程消費的方式來保證消息順序,具體實現如下。
Producer端
Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區,在RocketMQ中,通過MessageQueueSelector來實現分區的選擇。
- List
mqs:消息要發送的Topic下所有的分區 - Message msg:消息對象
- 額外的參數:用戶可以傳遞自己的參數
比如如下實現就可以保證相同的訂單的消息被路由到相同的分區:
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用戶控制線程,主動從服務端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲順序一致,用戶需要再拿到這批消息後自己保證消費的順序。
對於PushConsumer,由用戶註冊MessageListener來消費消息,在客戶端中需要保證調用MessageListener時消息的順序性。RocketMQ中的實現如下:
- PullMessageService單線程的從Broker獲取消息
- PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個消息的緩存),之後提交一個消費任務到ConsumeMessageOrderService
- ConsumeMessageOrderService多線程執行,每個線程在消費消息時需要拿到MessageQueue的鎖
- 拿到鎖之後從ProcessQueue中獲取消息
保證消費順序的核心思想是:
- 獲取到消息後添加到ProcessQueue中,單線程執行,所以ProcessQueue中的消息是順序的
- 提交的消費任務時提交的是「對某個MQ進行一次消費」,這次消費請求是從ProcessQueue中獲取消息消費,所以也是順序的(無論哪個線程獲取到鎖,都是按照ProcessQueue中消息的順序進行消費)
順序和異常的關係
順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區,消息需要保證每個分區的數據只有一個線程消息,那麼就會有一些缺陷:
- 發送順序消息無法利用集群的Failover特性,因為不能更換MessageQueue進行重試
- 因為發送的路由策略導致的熱點問題,可能某一些MessageQueue的數據量特別大
- 消費的並行讀依賴於分區數量
- 消費失敗時無法跳過
不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過Raft、Paxos之類的算法保證有可用的副本,或者通過其他高可用的存儲設備來存儲MessageQueue。
熱點問題好像沒有什麼好的解決辦法,只能通過拆分MessageQueue和優化路由方法來盡量均衡的將消息分配到不同的MessageQueue。
消費並行度理論上不會有太大問題,因為MessageQueue的數量可以調整。
消費失敗的無法跳過是不可避免的,因為跳過可能導致後續的數據處理都是錯誤的。不過可以提供一些策略,由用戶根據錯誤類型來決定是否跳過,並且提供重試隊列之類的功能,在跳過之後用戶可以在「其他」地方重新消費到這條消息。
鳴謝
感謝極客時間所屬的《消息隊列高手課》 鏈接
最後
本篇是一篇大合集,中間肯定參考了許多其他人的文章內容或圖片,但由於時間比較久遠,當時並沒有一一記錄,為此表示歉意,如果有作者發現了自己的文章或圖片,可以私聊我,我會進行補充。
如果你發現寫的還不錯,可以搜索公眾號「是Kerwin啊」,一起進步!
也可以查看Kerwin的GitHub主頁