消息隊列常見問題分析

一、簡介

很久以前也寫過一篇關於消息隊列的文章,這裡的文章,這篇文章是對消息隊列使用場景,以及一些模型做過一點介紹。

這篇文章將分析消息隊列常見問題。

消息隊列:利用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通訊來進行分散式系統集成。

從定義看:它是一種數據交流平台,也是數據通訊平台。
然而,數據通訊我們可以用http,RPC來進行通訊,這些與消息隊列有什麼區別呢?
最大的區別就是同步和非同步。http和RPC一般都是同步,而消息隊列是非同步。

二、為什麼要用消息隊列

1.解耦
雙方不在基於對方直接通訊了,而是基於消息隊列來通訊,通過MQ解耦了客戶端和服務端通訊。處理數據的雙方關注的點不同了,比如說一個事務,我們只關心核心流程,而需要依賴其他系統但不是那麼重要的事情,有通知即可,不需要等待結果。這種消息模型,關心的是通知,而不在意處理過程。也可以用消息隊列。
上下游開發人員也可以基於消息隊列發送消息,而不需要同步的處理消息了。

2.非同步處理
傳統的業務邏輯都是基於同步的方式進行處理的。而有了消息隊列,就可以把消息存放在MQ里,消息隊列的消費者就可以從消息隊列中獲取數據並進行處理。它不一定要實時處理,可以隔幾分鐘處理消息隊列里的數據。

3.削峰和流控
這裡有點像電腦中的硬體,比如CPU和記憶體,CPU運算速度比記憶體高N個數量級,那怎麼才能緩解兩者之間的差異?中間加一個快取來緩解兩者速度的差異。
同理,MQ也可以起到這種作用。對於上下游軟體不同的處理速度的差異進行調節。

比如,我們常見的秒殺應用,前端瞬間湧入成千上萬的請求,前端可以承受這麼大的請求壓力,但是複雜的後端系統,肯定會被壓垮,從而導致秒殺服務不可以用的情況。為了解決這種前後端處理速度不平衡的差異,導致的服務問題,可以引入消息隊列來調節,用消息隊列來快取用戶的請求,等待後端系統來消費。

上面就是消息隊列的主要功能,當然還有其他一些功能,比如消息廣播,最終一致性等。

使用MQ後的問題

當然使用了消息隊列,會增加系統的複雜性,一致性延遲,可用性降低等問題。
可用性降低是指系統可用性降低,如果MQ掛了,那麼肯定會影響到整個系統了。
因為上下游系統可能都會與MQ交互。

三、什麼時候引入MQ?

這個要看業務系統功能需求,一個是系統處理是否到達了瓶頸,需要消息隊列來緩解;
還有,業務系統一致性要求是不是特別高。通常業務系統不會要求那麼高的一致性要求。當然一些高頻交易系統,一致性要求特別高,就不適合用了。

引入任何一個新的軟體必然會增加原有系統的複雜性,還是要根據業務特性進行合理的選擇。

四、消息隊列常見問題

1.如何保證消息不被重複消費(怎麼保證冪等)

為什麼會重複消費

  • 生產者:也就是客戶端,可能會重複推送一條數據到MQ中。有可能是客戶端超時重複推送,也有可能是網路比較慢客戶端重複推送了數據到MQ中。
  • MQ:消費者消費完了一條數據,發送ACK資訊表示消費成功時,這時候,MQ突然掛了,導致MQ以為消費者還未消費該條消息,MQ恢復後再次推送了該條消息,導致重複消費。
  • 消費者:與上面MQ掛掉情況類似,消費者已經消費完了一條消息,正準備給MQ發送ACK消息但還未發送時,這時候消費者掛了,服務重啟後MQ以為消費者還沒有消費該條消息,再次推送該條消息。

怎麼處理重複消費

每個消息都帶一個唯一的消息id。消費端保證不重複消費就可以了,即使生產端產生了重複的數據,當然生產端也最好控制下重複數據。

消費端保證不重複消費:
通常方法都是存儲消費了的消息,然後判斷消息是否存在。

1.先保存在查詢
每次保存數據前,先查詢下,不存在就插入。這種是並發不高的情況下可以使用。

2.資料庫添加唯一約束條件
比如唯一索引

3.增加一個消息表
已經消費的消息,把消息id插入到消息表裡面。
為了保證高並發,消息表可以用Redis來存。

2.如何處理消息丟失的問題

消息丟失的原因

  • 生產者:生產者推送消息到MQ中,但是網路出現了故障,比如網路超時,網路抖動,導致消息沒有推送到MQ中,在網路中丟失了。又或者推送到MQ中了,但是這時候MQ內部出錯導致消息丟失。

  • MQ:MQ自己內部發生了錯誤,導致消息丟失。

  • 消費者:有時處理消息的消費者處理不當,還沒等消息處理完,就給MQ發送確認資訊,但是這時候消費者自身出問題,掛了,確認消息已經發送給MQ告訴MQ自己已經消費完了,導致消息丟失。

如何保證消息不丟失呢? 下面談談這方面的做法。

3.如何保證消息可靠性傳輸

整個消息從生產到消費一般分為三個階段:生產者-生產階段,MQ-存儲階段,消費者-消費階段

3.1 生產者-生產階段
在這個階段,一般通過請求確認機制,來保證消息可靠性傳輸。 與TCP/IP協議里ACK機制有點像。
客戶端發送消息到消息隊列,消息隊列給客戶端一個確認響應,表示消息已經收到,客戶端收到響應,表示一次正常消息發送完畢。

3.2 MQ-存儲階段
消息隊列給客戶端發送確認消息。存儲完成後,才發送確認消息。

3.3 消費者-消費階段
跟生產階段相同,消費完了,給消息隊列發送確認消息。

4.如何保證消息的順序性

我們日常說的順序性是什麼呢?

比如說小孩早上上學過程,他先起床,然後洗漱,吃早餐,最後上學。我們認為他做的事情是有先後順序的,及是時間的先後順序,我們用時間來標記他的順序。
更抽象的理解,這些發生的事件有一個相同的參考系,即他們的時間是對應同一個物理時鐘的時間。

如果沒有絕對的時間作為參考系,那他們之間還能確定順序嗎?
如果事件之間有因果關係,比如A、B兩個事件是因果關係,那麼A一定發生在B之前(前應後果)。相反,在沒有一個絕對的時間的參考的情況下,若A、B之間沒有因果關係,那麼A、B之間就沒有順序關係。跟java里的happen before很像。

總結一下,我們說順序時,其實說的是

  • 在有絕對時間作為參考系的情況下,事件發生的時間先後關係;
  • 在沒有絕對時間作為參考系的情況下,一種由因果關係推斷出來的happening before的關係;

在分散式系統領域,有一篇關於時間,時鐘和事件的順序的很有名的一篇論文
Time, Clocks, and the Ordering of Events in a Distributed System
,可以看一看,上面舉例情況都是參考這篇論文。

參考上面的結論,在消息隊列中,我們也是以時間作為參考系,讓消息有序。

但是,在消息隊列中,消息有序會遇到一些問題,下面讓我們來討論這些問題。

消息的順序性的一些問題

在電腦系統中,有一個比較棘手的問題是,它可以是多執行緒執行的,而且哪個執行緒先運行,哪個執行緒後運行,完全是由作業系統決定的,完全沒有規律,是亂序執行。顯然與消息隊列中的消息有序相悖。

還有,在消息隊列中,涉及到生產者,MQ,消費者,還有網路,這4者之間的關係。然後他們又涉及到消息的順序性,就有很多種情況需要考慮。可以參考這篇文章
分散式開放消息系統(RocketMQ)的原理與實踐
(作者:CHUAN.CHEN),各種情況討論的很全面。

最後的結論就是:消息的順序性,不僅僅是MQ本身存儲消息要保證順序性,還需要生產者和消費者一同來保證順序性。

順序性保證

在消息隊列中,消息的順序性需要3方面來保證:
1、生產者發送消息時要保證順序
2、消息被消息隊列存儲時要保持和發送的順序一致
3、消息被消費時保持和存儲的順序一致

生產者:發送時要求用戶在同一個執行緒中採用同步的方式發送。
消息隊列:存儲保持和發送的順序一致。一般是在一個分區中保持順序性。
消費者:一個分區的消息由一個執行緒來處理消費消息。

//www.hicsc.com/post/2020041566 這個鏈接中,作者分析了RocketMQ順序消息的程式碼實現。

5.消息隊列中消息延遲問題

你說的 消息的延遲 是延遲消息隊列嗎? 啊,並不是,是完全2個不同的概念。延遲消息隊列是MQ提供的一個功能。消息的延遲,是指消費端消費的速度跟不上生產端產生消息的速度,可能導致消費端丟失數據,也可能導致消息積壓在MQ中。所以這裡說的消息的延遲,指的是消費端消費消息的延遲。

消息隊列的消費模型pull和push:

1、push模式

這種模式是消息隊列主動將消息推送給消費者。

  • 優點:儘可能實時的將消息發送給消費者進行消費。
  • 缺點:如果消費端消費能力弱,消費端的消費速度趕不上生產端,而MQ又不斷的給消費端推送消息,消費端的快取滿了導致快取溢出,就會產生錯誤或丟失數據的可能。
2、pull模式

這種模式是由消費端主動向消息隊列拉取消息。

  • 優點:可以自主可控的拉取消息。
  • 缺點:拉取消息的頻率不好控制。

a、如果每次pull時間間隔比較久,會增加消息延遲,消息到達消費者時間會加長。這樣時間一長會導致MQ中消息的堆積,而消息長時間堆積就會導致一系列的問題:

  • 1、如果積壓了幾個小時的數據,有幾千萬的數據量,消費端處理的壓力會越來越大。
  • 2、如果是帶有過期時間的消息,可能這些消息已經到了過期時間,因為積壓時間太長,但還沒被消費端消費掉,消費端來不及消費。
  • 3、如果持續的積壓,達到了MQ能存儲消息數量的上限,也就是說MQ滿了,存不下了,會導致MQ丟掉數據,導致數據丟失。
    想一下,上面的情形是不是跟TCP/IP協議的流量控制和擁塞控制遇到的一些問題很像,也有很多不同。

b、如果每次pull的時間間隔比較短,在一段時間內MQ中沒有可消費的消息,會產生很多無效的pull請求,導致一定的網路開銷。

所以解決問題的辦法最主要就是優化消費端的消費性能。1.優化消費邏輯 2.水平擴容,增加消費端並發。

延遲問題處理

如果消息堆積已經發生了,導致了上面的3個問題,這時怎麼辦?
1、積壓了幾個小時幾千萬的數據
第一:肯定要找到積壓數據的原因,一般都是消費端的問題。
第二:如果可以的,擴大消費端的數量,快速消費掉消息。
第三:擴容,增加多機器消費。新建一個topic,partition是原來10倍,建立原先10倍的queue。然後寫一個臨時的消費程式,這個消費程式去轉移積壓的數據,把積壓的數據均勻輪詢寫入建立好的10倍數量的queue。然後在徵用10倍機器的消費端來消費這個queue。這種做法相當於臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數據。消費完了,恢復原來的部署。這是大廠做法。

2、積壓時間過長,帶有過期時間的消息過期失效了
這個沒有好的辦法處理,只能通過程式找出丟失的數據,然後也是通過程式把丟失的數據重新導入到MQ里,重新消費。

3、長時間積壓倒是MQ寫滿了
這個也沒啥好辦法處理,只能快速消費掉MQ里的數據,快速消費指消費一個,丟掉一個,不要這些數據了,然後重新導入數據。用戶少的時候在補回數據。

6.消息隊列高可用

6.1 kafka

kafka基本架構:

  • Broker:一個kafka節點就是一個broker,多個broker組成一個kafka集群。一個broker可以是一個單機器kafka伺服器。
  • Topic:存放消息的主題,相當於一個隊列。可以理解為存放消息的分類,比如你可以有前端日誌的Topic,後端日誌的Topic。可以理解為MySQL里的表。
  • Partition:一個topic可以劃分為多個partition,每個partition都是一個有序隊列。把topic主題中的消息進行分拆,均攤到kafka集群中不同機器上。partition是topic的進一步拆分。
  • Replica:副本消息。kafka可以以partition為單位,保存多個副本,分散在不同的broker上。副本數是可以設置的。
  • Segment: 一個Partition被切分為多個Segment,每個Segment包含索引文件和數據文件。
  • Message:kafka里最基本消息單元。

一個kafka集群可以由多個broker組成,每個broker是一個節點,你創建一個topic,這個topic可以劃分為多個partition,每個partition可以存儲在不同的broker上,每個partition存放一部分數據。

6.2 RocketMQ

在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一種部署方式來實現高可用。
一組 Broker 中有一個 Master,有零到多個 Slave,Slave 通過同步複製或非同步複製方式去同步 Master 的數據。Master/Slave 部署模式,提供了一定的高可用性。

上面主從高可用架構有一個缺點:
主節點掛了後需要人為的進行重啟或者切換。為了解決這個問題,後續引入了raft,用raft協議來完成自動選主。RocketMQ的DLedger 就是一個基於 raft 協議的 commitlog 存儲庫,也是 RocketMQ 實現新的高可用多副本架構的關鍵。

還可以多master多slave部署,防止單點故障。

五、參考