《高性能利器》-32張圖帶你解決RocketMQ所有場景問題

  • 2021 年 8 月 15 日
  • 筆記

一、RocketMQ的基本原理

RocketMQ基本架構圖如下

image.png

從這個架構圖上我們可以知道,RocketMQ有4塊核心部分:

  • NameServer:管理Broker的資訊,讓使用MQ的系統感知到集群裡面的broker

  • Broker:主從架構實現數據多副本存儲和高可用

  • producer:生產者

  • consumer:消費者

二、NameServer

2.1 Broker資訊註冊到哪個NameServer?

每台broker機器需要向所有的NameServer機器上註冊自己的資訊,防止單台NameServer掛掉導致Broker資訊不全,保證NameServer的集群高可用。

2.2 Broker資訊怎麼註冊?

基於Netty的網路通訊。

2.3 Broker掛了如何感知?

  • NameServer感知:30s心跳機制和120s故障感知機制

image.png

broker會每隔30秒向NameServer發送一個的心跳 ,NameServer收到一個心跳會更新對應broker的最近一次心跳事件,然後NamServer會每隔十秒運行一個任務,去檢查一下各個broker的最近一次心跳的時間,如果超過120s沒有收到相應broker的心跳,則判定對應的broker已經掛掉。

三、Broker

3.1 Master-Slave模式

為了保證MQ的數據不丟失而且具備一定的高可用性,我們採用的是主從複製模式。

RocketMQ自身的Master-Slave模式主採取的是Slave主動從Master拉取消息。

3.2 究竟是如何從Master-Slave中進行讀寫呢?

image.png

  • 生產者在寫入消息時,一般寫入到Master

  • 消費者在拉取消息時,可能從Master拉取,也可能從Slave拉取,根據Master的負載情況和Slave的同步情況, 由Master給出建議

    • Master負載過高,建議下次從Slave獲取消息
    • Slave未同步完全,建議下次從Master獲取消息

3.3 Broker宕機分析

3.3.1 Slave宕機

對系統會存在一點影響,但是影響不大,只不過少了Slave Broker,會導致所有的讀寫壓力都集中在Master Broker上

3.3.2 Master宕機:基於Dledger實現RocketMQ高可用自動切換

選舉方式這裡不做重點介紹。

image.png

四、生產者

4.1 MessageQueue是什麼?

我們先看看Topic、Broker、Message之間的關係。

如圖比如說一個TopicA有n條消息,然後一個TopicA中的n條數據分配放入給4個MessageQueue1-4。

image.png

所以本質上來說就是一個數據分片機制,通過MessageQueue將一個Topic的數據拆分為很多數據分片,在每個Broker機器上都存儲一些MessageQueue。通過這個方法可以實現分散式存儲。

4.2 生產者發送消息寫入哪個MessageQueue?

image.png

因為從前面我們知道,生產者會跟NameServer通訊獲取相應Topic的路由數據,從而知道,一個Topic有幾個MessageQueue,哪些MessageQueue在哪台Broker機器上,通過對應的規則寫入對應的MessageQueue。

4.2.1 Master Broker故障分析

當MasterBroker宕機,此時SlaveBroker正在切換過程中,有一組Broker就沒有Master可以寫入。

此時我們可以打開Producer的自動容錯機制開關:sendLatencyFaultEnable,比如說訪問其中一個Broker發現網路延遲有1000ms還無法訪問,我們會自動迴避這個Broker一段時間,比如接下來3000ms內,就不會訪問這個Broker。

過一段時間之後,MasterBroker修復好了,或者說SlaveBroker選舉成功了,就可以提供給別人訪問了。

image.png

4.3 Broker數據存儲(核心環節)

Broker數據存儲實際上是MQ最核心的環節:

  • 消息吞吐量
  • 消息不丟失

4.3.1 磁碟日誌文件CommitLog

image.png

首先,Producer發送消息給Broker,Broker接收到消息後,把這個消息直接順序寫入寫入到磁碟上的一個日誌文件,叫做CommitLog。

  • CommitLog是由很多磁碟文件組成
  • 每個文件限定最多1GB

4.3.2 ConsumeQueue存儲對應消息的偏移量

在Broker中,每一個Topic下的每一個MessageQueue都會有對應一系列的ConsumeQueue文件。

Broker磁碟存儲類似於文件樹的形式存在:

image.png

ConsumeQueue中存儲著對應MessageQueue中的消息在CommitLog中的物理偏移量地址offset。

image.png

如圖:

  1. Broker接受消息,順序寫入消息到CommitLog中

  2. 同時找到對應的TopicA/MessageQueue1/ConsumeQueue0寫入對應的物理地址

  3. TopicA/MessageQueue1/ConsumeQueue0的物理地址,即為CommitLog文件中一個消息的引用

即:Topic的每個MessageQueue都對應了Broker機器上的多個ConsumeQueue文件,這些ConsumeQueue共同組成保存了MessageQueue的所有消息在CommitLog文件中的物理offset偏移量。

4.3.3 Broker寫入磁碟CommitLog怎麼近乎記憶體寫性能?

磁碟文件順序寫+OS PageCache寫入+OS非同步刷盤的策略

image.png

如圖:

  1. 數據寫入CommitLog時候,不是直接寫入磁碟,而是寫入OS的PageCache記憶體緩衝中
  2. 後台開啟執行緒,非同步刷盤到CommitLog中

這樣的話基本上可以讓消息寫入CommitLog的性能跟直接寫入記憶體裡面是差不多的,所以Broker才能具有高吞吐量。

4.3.4 非同步刷盤和同步刷盤

  • 非同步刷盤:高吞吐寫入+丟失數據風險
  • 同步刷盤:吞吐量下降+數據不丟失

對於日誌類型這種場景,可以允許數據的丟失,但是要求比較高的吞吐量,可以採用非同步刷盤的方式。另外非核心的業務場景,不涉及重要核心數據變更的場景,也可以使用非同步刷盤,比如訂單支付成功,發送簡訊這種場景。但是對於涉及到核心的數據變更的場景,就需要使用同步刷盤,比如訂單支付成功後扣減庫存。

五、消費者

5.1 一個Topic上多個MessageQueue怎麼被消費?

image.png

原則:一個Consumer機器可以消費處理多個MessageQueue,一個MessageQueue只能被一個相同ConsumerGroup中的同一個Consumer消費。

5.2 Broker收到消息拉取請求,返回給消費者處理提交消費進度

Broker收到消息拉取請求後,會找到對應的MessageQueue中開始消費的位置,在ConsumeQueue讀取裡面對應位置的的消息在CommitLog中的offset

image.png

如圖:

  1. consumer找到要消費的MessageQueue對應的ConsumeQueue對應要消費的位置

  2. 消費完成之後消費者返回一個消費狀態,broker會存儲我們的消費位置

  3. 接下來可以根據這個消費位置進行下一步消費,不需要從頭拉取

5.3 消費者消費消息的性能問題

生產者是基於os cache提升寫性能的,broker收到一條消息,會寫入CommitLog文件,但是會先把CommitLog文件中的數據寫入os cache(作業系統管理的快取中),然後os開啟後台執行緒,非同步的將os cache快取中的CommitLog文件的數據刷入磁碟。

在消費者消費資訊的時候:

第一步,我們會去讀取ConsumeQueue中的offset偏移量,此時大量的讀取壓力全部都在ConsumeQueue,ConsumeQueue文件的讀性能是很大程度上會影響消息拉取的性能和吞吐量。

所以,Broker對ConsumeQueue文件也是基於os cache來進行優化的。

image.png

實際上,ConsumeQueue主要只是存放消息的offset,所以每個文件很小,占不了多少磁碟空間,完全可以被os快取在記憶體里。所以幾乎可以說消息的讀取性能達到記憶體級別。

第二步,根據讀取到的offset去CommitLog里讀取消息的完整數據。此時會有兩種可能

  • 第一種:如果讀取的是剛剛寫入到CommitLog的數據,那麼大概率他們還停留在os cache中,此時可以順利的直接從os cache中讀取CommitLog中的數據,這個就是直接讀取記憶體,性能很高。
  • 第二種:讀取較早之前的CommitLog的數據,已經被刷入磁碟不在os cache裡面了,此時只能從磁碟上的文件讀取了,這個性能稍微差一點。

這兩種狀態很好區分,比如說消費者一直在快速的拉取和消費處理,跟上了broker的消息寫入速率,這麼來說os cache中每次CommitLog的消息還沒來得及被刷入磁碟中的時候就被消費者消費了;但是比如說broker負載很高,拉取消息的性能很低,跟不上生產者的速率,那麼數據會保存在磁碟中進行讀取。

5.4 Master Broker什麼時候通知你去Slave Broker讀取?

根據以上,我們可以判斷了什麼時候Master Broker負載會高,也就是當消費者讀取消息的時候,要從磁碟中載入大量的數據出來,此時Master Broker就會知道本次的負載會比較高,通知消費者下次從Slave Broker去拉取數據。

本質上就是對比當前沒有拉取消息的數量和大小,以及最多可以存放在os cache記憶體里的消****息的大小,如果沒有拉取的消息超過了最大能使用的記憶體的量,那麼之後會頻繁的從磁碟載入數據,此時就讓你從slave broker去載入數據了!

六、問題分析

舉一個簡單的例子作為分析的入口,將從各個環節可能發生的問題進行深入分析,如圖:

image.png

  1. 用戶進行一筆生活繳費

  2. 訂單系統推送繳費訂單支付消息到RocketMQ

  3. 紅包系統接受訂單消息

  4. 發紅包給用戶

6.1 消息發送失敗

消息發送失敗的原因多種多樣,存在於多個環節,我們一一分析。

6.1.1 系統推送消息丟失

image.png

第一個環節就是,訂單系統推送消息到MQ的過程中,由於網路等因素導致消息丟失。

6.1.2 RocketMQ的事務消息原理分析

為了解決系統推送消息丟失問題,RocketMQ有一個非常強悍的功能就是事務消息,能夠確保我們消息一定會成功寫入MQ裡面,不會半路搞丟。

如圖是在本系統中的一個基本事務消息的流程圖。

image.png

  1. 訂單系統先發送half消息到MQ中,試探MQ是否正常

如果此階段,half消息發送給MQ失敗,會執行一系列回滾操作,關閉這個訂單的狀態,因為後續的消息都操作不了

  1. 當half消息成功被RocketMQ接收時

    1. 返回half消息的成功響應,進入第3步
    2. 返回的響應未收到,但是此時MQ已經存儲下來了一條half消息,進入第5步
  2. 得知half消息發送成功之後,訂單系統可以更新資料庫,此時會有兩種情況對應兩種不同的提交

    1. 更新資料庫等操作一切順利,向RocketMQ發送一個commit請求
    2. 由於網路異常或者資料庫掛了等,為了執行資料庫更新等操作,更新不了訂單狀態,發送rollback請求
    3. 發送rollback或者commit失敗,跳轉到第5步
  3. RocketMQ收到commit或者rollback請求

    1. 收到rollback請求刪除half消息
    2. 收到commit請求改變half消息狀態為已提交,紅包系統可以開始消費消息
  4. 未收到commit和rollback請求的消息,RocketMQ會有補償機制,回調介面去判斷訂單的狀態是已關閉,則發送rollback進行回滾。

6.1.3 RocketMQ的事務消息底層分析

image.png

如圖解釋如下:

  • 消費系統對half消息不可見的原因: 我們知道,消費者是是通過ConsumeQueue獲取到對應的CommitLog裡面的消息,如圖,消費系統對half消息不可見的原因是因為half消息在未提交的時候,MQ維護了一個內部的TRANS_HALF_TOPIC,此時紅包系統只獲取TopicA中的MessageQueue中ConsumeQueue。

  • 返回half消息成功的響應時機: 當half消息寫入成功到TRANS_HALF_TOPIC中的ConsumeQueue的時候,就回認為寫入消息成功,返回給對應的訂單系統成功響應。

  • 補償機制: RocketMQ會啟動一個定時任務,定時掃描half消息狀態,如果還是為half消息,則回調訂單系統介面,判斷狀態。

  • 如何標記消息回滾或提交: 消息回滾並不是直接刪除,而是內部維護了一個OP_TOPIC,用一個OP操作來標記half消息的狀態。

  • 執行commit操作後消費系統可見: 執行commit操作之後,OP操作會標記half為commit狀態,並且把對應消息在TRANS_HALF_TOPIC中的消息offset寫入到TOPICA中,此時消息可見

6.1.4 思考:一定要用事務消息嗎?

上面這麼複雜的事務消息機制可能導致整體的性能比較差,而且吞吐量會比較低,我們一定要用事務消息嗎?

可以基於同步發送消息+反覆多次重試的方案

6.1.5 消息成功發送到MQ中了,就一定不會丟了嗎?

我們可以分析的到,事務消息能夠保證我們的消息從生產者成功發送到broker中對應的消費者需要消費的Topic中,我們認為他的消息推送成功。

問題一:

但是這個消息推送僅僅先是推送到os cache快取中,僅僅只是可以被消費系統看到,由於消息積壓等原因,還沒來得及去獲取這條消息,還沒來得及刷到ConsumeQueue的磁碟文件中去,此時萬一機器突然宕機,os cache中的數據全部丟失,此時消息必然丟失,消費系統無法讀到這條消息。

如圖示意:image.png

解決

為了解決這個問題,一定要確保消息零丟失的話,我們的解決辦法就是將非同步刷盤調整為同步刷盤

放棄了非同步刷盤的高吞吐量,確保消息數據的零丟失,也就是說只要MQ返迴響應half消息發送成功了,此時消息就已經進入了磁碟文件了。

問題二:

就算os cache的消息寫入ConsumeQueue的磁碟文件了,紅包沒來得及消費這條消息的時候,磁碟突然就壞了,一樣會導致消息丟失。

image.png

所以說,無論是通過同步發送消息+反覆多次重試的方案,還是事務消息的方案,哪怕保證寫入MQ成功了,消息未必不會丟失。

解決:

對Broker使用主從架構的模式,每一個MasterBroker至少有一個SlaveBroker去同步他的數據,而且一條消息寫入成功,必須讓SlaveBroker也寫入成功,保證數據有多個副本的冗餘。

6.1.6 紅包系統拿到了消息就一定會消費消息嗎?

不一定。

問題分析:

因為當紅包系統拿到消息數據進記憶體里時,此時還沒有執行發紅包的邏輯,然後此時紅包系統就已經提交了這條消息的offset到broker中告訴broker已經消費掉了這條消息,消息位置會往後移。然後此時紅包系統宕機,這條消息就會丟失,永遠執行不了發紅包的邏輯。

RocketMQ解決方案: 利用消息監聽器同步處理消息

image.png

在RocketMQ的Consumer的默認消費模式下,我們在消息監聽器中接收到一批消息之後,會執行處理消息的邏輯,處理完成之後才會返回SUCCESS狀態提交offset到broker中,如果處理時宕機,不會返回SUCCESS狀態給broker,broker會繼續將這個消息給下一個Consumer消費。

6.2 消息發送全鏈路零丟失方案總結

6.2.1 發送消息到MQ的零丟失

  • 同步發送消息+反覆多次嘗試
  • 事務消息機制

6.2.2 MQ收到消息之後的零丟失

  • 同步刷盤策略:解決os cache未能刷入磁碟問題
  • 主從架構同步機制:解決單個broker磁碟文件損壞問題

6.2.3 消費消息的零丟失

  • 採用同步處理消息方式

6.2.4 適用場景

首先,消息零丟失方案會必然的導致從頭到尾的性能下降和MQ的吞吐量下降

一般和金錢、交易以及核心數據相關的系統和核心鏈路,可以用這套零消息丟失方案:比如支付系統、訂單系統等。

6.3 消息發送重複

重複發紅包等問題

6.3.1 發送方重複發送

如圖:

image.png

  • 用戶支付繳費訂單時候,會通知訂單系統發送訂單支付消息

  • 此時訂單系統響應超時

  • 支付系統再次重試調用訂單介面通知發送消息

  • 兩個訂單都成功,推送兩條相同的消息到MQ

  • 紅包系統收到兩條消息發送兩個紅包

有類似很多這種消息重試,介面重試的情況都會有消息重複發送的可能性,還比如說當你發送消息成功到MQ,MQ返回的SUCCESS的響應由於網路原因未收到,重試機制會再次發送消息,導致消息重複。

解決方案:冪等性機制

  • 業務判斷法:RocketMQ支援消息查詢功能

image.png

  1. 由於訂單系統調用超時,重試調用介面

  2. 當訂單系統發消息之前,發送請求到MQ查詢是否存在這條消息

  3. 如果MQ已經存在,則不重複發送

  • Redis快取:

image.png

Redis快取思想也比較簡單,只需要根據對應的訂單資訊去快取裡面查詢一下是否已經發送給MQ了。

但是這種解決方案也不是絕對的安全,因為你消息發送成功給MQ了還沒來得及寫Redis系統就掛了,之後也會被重複調用。

總結以上兩種解決方案,我們不建議在消息的發送環節保證消息的不重複發送,會影響介面性能。

6.3.2 消費方重複消費

image.png

  • 消費方消費消息,執行完了發紅包邏輯後,應該返回SUCCESS狀態,提交消費進度

  • 但是剛發完紅包,沒來得及提交offset消費進度,紅包系統重啟

  • MQ沒收到offset消費進度返回,將這個消息繼續發送到消費系統進行消費

  • 二次發送紅包。

解決方案:

  • 依據在生產者方設置消息的messageKey,然後每一條消息在消費方依據這個唯一的messageKey,進行冪等判斷:

  • 業務判斷,判斷這個業務的環節是否執行成功,如果沒有成功則消費,成功則捨棄消息

  • 維護一個消息表,當新的消息到達的時候,根據新消息的id在該表中查詢是否已經存在該id,如果存在則表明消息已經被消費過,那麼丟棄該消息不再進行業務操作即可

  • 若是消息,然後執行insert資料庫方法,可以建立一個唯一主鍵,插入會保證不會重複

6.4 死信隊列

通過以上的學習,我已經基本解決了MQ消息不丟失以及不會重複處理消息的問題,在正常流程下基本上沒有什麼問題。但是會出現以下問題。

我們一直都是假設的一個場景就是紅包系統的MessageListener監聽回調函數接收到消息都能順利的處理好消息邏輯,發送紅包,落庫等操作,返回SUCCESS,提交offset到broker中去,然後從broker中獲取下一批消息來處理。

如圖:

image.png

問題:

但是如果在我們MessageListener處理消息邏輯時候,紅包資料庫宕機了,沒辦法完成發紅包的邏輯,此時出現對消息處理的異常,我們應該怎麼處理呢?

解決方案:

在MessageListener中,除了返回SUCCESS狀態,我們還可以返回RECONSUME_LATER狀態,也就是用try-cache包裹住我們的業務程式碼,成功則返回SUCCESS狀態,順利進行後續操作,如果出現異常則返回RECONSUME_LATER狀態。

當RocketMQ收到我們返回的RECONSUME_LATER狀態之後,會將這批消息放到對應消費組的重試隊列中。

image.png

重試隊列裡面的消息會再次發給消費組,默認最多重試16次,如果重試16次失敗則進入死信隊列。

死信隊列:

對於死信隊列,一般我們可以專門開一個後台執行緒,訂閱這個死信隊列,對死信隊列中的消息,一直不停的嘗試。

6.5 消息亂序

6.5.1 業務場景

大數據團隊要獲取到訂單系統的binlog,然後保存一份在自己的大數據存儲系統中

資料庫binlog:記錄資料庫的增刪改查操作。

大數據團隊不能直接跑複雜的大SQL在訂單系統的資料庫中跑出來一些數據報表,這樣會嚴重影響訂單系統的性能,為了優化方案,我們採用類似基於Canal這樣的中間件去監聽訂單數據的binlog,然後把這個binlog發到MQ中去,然後我們的大數據系統自己用MQ里獲取binlog,自己在自己的大數據存儲中執行增刪改查操作,得到我們需要的報表,如圖下:

image.png

6.5.2 亂序問題原理分析

image.png

  • Canal監聽到binlog日誌中,操作資料庫的順序為先執行insert插入操作,後update更新操作。

  • 因為我們消息可能會發送到不同MessageQueue中的不同的ConsumeQueue中去

  • 然後同一個消費組的大數據消費系統獲取到insert binlog和update binlog,這兩個是並行操作的,所以不能確定哪個消息先獲取到執行,可能會出現消息亂序。

6.5.3 消息亂序解決方案

出現上面問題的原因,根本問題就是一個訂單binlog分別進入了兩個MessageQueue中,解決這個問題的方法其實非常簡單,就是得想辦法讓同一個訂單的binlog進入到一個MessageQueue裡面去。

方法很簡單:我們可以根據訂單id對MessageQueue的數量取模來對應每個訂單究竟去哪個MessageQueue。

消息亂序解決方案不能和重試隊列混用。

6.6 延遲消息

6.6.1 業務場景

大量訂單點擊提交未支付,超過30min需要自動退款,我們研究需要定時退款掃描問題。

如圖:

image.png

當一個訂單下單之後,沒有支付會進入訂單資料庫保存,如果30分鐘內沒有支付,就必須訂單系統自動關閉這個訂單。

可能我們就需要有一個後台的執行緒,不停的去掃描訂單資料庫里所有的未支付狀態的訂單,超過30分鐘了就必須把訂單狀態更新為關閉。這裡會有一個問題,訂單系統的後台執行緒必須不停的掃描各種未支付的訂單,可能每個未支付的訂單在30分鐘之內會被掃描很多遍。這個掃描訂單的服務是一個麻煩的問題。

針對這種場景,RocketMQ的延遲消息就登場了。

如圖:

image.png

  • 創建一個訂單,發送一條延遲消息到MQ中去

  • 需要等待30分鐘之後,才能被訂單掃描服務消費

  • 當訂單掃描服務在30分鐘後消費了一條消息,就針對這條消息查詢訂單資料庫

  • 看過了30分鐘了,他的支付狀態如果是未支付,則關閉,這樣只會被掃描到一次

所以RocketMQ的延遲消息,是非常常用並且非常有用的一個功能

6.7 經驗總結

6.7.1 運用tags過濾數據

在一些真正的生產項目中,我們需要合理的規劃Topic和裡面的tags,一個Topic代表了某一類的業務消息類型數據,我們可以通過裡面的tags來對同一個topic的一些消息進行過濾。

6.7.2 基於消息key來定位消息是否丟失

我們在消息零丟失方案中,萬一消息真的丟失了,我們怎麼去排查呢?在RocketMQ中我們可以給消息設置對應的Key值,比如設置一個訂單ID:message.setKeys(orderId),這樣這個消息就和一個key綁定起來,當這個消息發送到broker中去,會根據對應message的數量構建hash索引,存放在IndexFile索引文件中,我們可以通過MQ提供的命令去查詢。

6.7.3 消息零丟失方案的補充

在我們這種大型的金融級的系統,或者跟錢有關的支付系統等等,需要有超高級別的高可用保障機制,所以對於零消息丟失解決方案來說,萬一一整個MQ集群徹底崩潰了,我們需要有更完善的措施來保證我們消息不會丟失。

此時生產者發送不了消息到MQ,所以我們生產者就應該把消息在本地進行持久化,可以是存在本地磁碟,也可以是在資料庫里去存起來,MQ恢復之後,再把持久化的消息投遞到MQ中去。

6.7.4 提高消費者的吞吐量

最簡單的方法去提高消費者的吞吐量,就是提高消費者的並行度,比如說部署更多的Consumer機器去消費消息。但是我們需要明確的一點就是對應的MessageQueue也要增加,因為一個MessageQueue只能被一個Consumer機器消費。

第二個辦法是我們可以增加Consumer的執行緒數量,消費執行緒樂隊,消費速度越快。

第三個辦法是我們可以開啟消費者的批量消費功能(有對應的參數設置)。

6.7.5 要不要消費歷史記錄

Consumer是支援設置在哪裡開始消費的,常見的有兩種:從Topic的第一條數據消費(CONSUME_FROM_LAST_OFFSET),或者是從最後一次消費過的消息之後開始消費(CONSUME_FROM_FIRTST_OFFSET),我們一般都是設置選擇後者。

七、百萬消息積壓問題

7.1 業務場景

如圖所示:在一個系統中,由生產者系統和消費者系統兩個環節組成,生產者不斷的向MQ裡面寫入消息,消費者不斷的從MQ中消費消息。突然有一天消費者依賴的一些資料庫掛了,消費者就處理不了當下的業務邏輯,消息也不能正常的被消費,此時生產者還在正常的向MQ中寫入消息,結果在高峰期內,就往MQ中寫入了百萬條消息,都積壓在了MQ裡面了。

image.png

7.2 解決方案

第一, 最簡單粗暴的方法,如果我們的消息能夠容忍百萬消息的丟失,那麼我們可以直接修改消費者系統的程式碼,丟棄所有的消息,那麼百萬消息很快就被處理完了,但是往往對於絕大多數系統而言,我們不能使用這種辦法。

第二, 我們需要等待消費者依賴的資料庫恢復之後,根據線上的Topic的MEssageQueue來判斷後續如何處理。

MessageQueue數量多:

  • 比如說我現在一個Topic中有20個MessageQueue,有4個消費者系統在消費,那麼我每個消費者就會從5個MessageQueue中獲取消息進行消費,畢竟積壓了百萬消息,僅僅依賴4個消費者是遠遠不夠的。

  • 所以我們可以臨時申請16台機器多部署16個消費者,這樣20個消費者去同時消費20個MessageQueue,速度提高了5倍,積壓的百萬消息很快就能處理完畢。

  • 但是此時我們要考慮的是,增加了5倍的消費能力,那麼資料庫的壓力就增加了5倍,這個是我們需要考慮的

如圖:

image.png

MessageQueue數量少:

  • 比如說一個Topic總共就只有4個MessageQueue,然後就只有4個消費者系統,這時候沒辦法擴容消費系統

  • 所以此時我們可以臨時修改那4個消費者系統的程式碼,讓他們獲取的消息不寫入資料庫,而是寫入一個新的topic

  • 新的Topic有新增的20個MessageQUeue,部署20台臨時增加的消費者系統去消費新的Topic中的Message。

如圖:

image.png

八、MQ集群數據遷移問題:雙讀+雙寫

要做MQ的集群遷移,不是簡單的粗暴的把Producer更新停機,新的程式碼重新上線發到新的MQ中去。

一般來說,我們需要做到兩件事情:

  • 雙寫: 要遷移的時候,我們需要在所有的Producer系統中,要引入一個雙寫的程式碼,讓他同時往新老兩個MQ中寫入消息,多寫幾天,起碼要持續一個星期,我們會發現這兩個MQ的數據幾乎一模一樣了,但是雙寫肯定是不夠的的,我們還要同時進行雙讀。

  • 雙讀: 也就是說我在雙寫的時候,所有的Consumer系統都需要同時從新老兩個MQ裡面獲取消息,分別都用一模一樣的邏輯處理,只不過從老MQ中還是去走核心邏輯處理,可以落庫存儲之類的操作,但是新的MQ我們可以用一樣的邏輯處理,但是不能把處理的結果具體落庫,我們可以寫入一個臨時的存儲中。

  • 觀察: 雙寫和雙讀一段時間之後,我們通過消費端對比,發現處理消息數量一致。

  • 切換: 正式實施切換,停機Producer系統,再重新修改上線,全部修改為新MQ,此時他數據並不會丟失,因為之前已經雙寫一段時間了,然後Consumer系統程式碼修改上線。

點關注,不迷路

好了各位,以上就是這篇文章的全部內容了,我後面會每周都更新幾篇高品質的大廠面試和常用技術棧相關的文章。感謝大夥能看到這裡,如果這個文章寫得還不錯, 求三連!!! 感謝各位的支援和認可,我們下篇文章見!

我是 九靈 ,有需要交流的童鞋可以關注公眾號:Java 補習課! 如果本篇部落格有任何錯誤,請批評指教,不勝感激 !