聊聊消息隊列(MQ)那些事

image

每年的雙十一期間,各大電商平台流量暴增,同時,電商平台系統的負載壓力也會很大。譬如訂單支付的場景,每個訂單支付成功後,服務器可能要完成扣減積分、扣減優惠券、扣減商品庫存、發短訊等一系列操作。單個用戶請求,服務器處理起來並沒有什麼問題,但是,瞬時並發的多個請求到了服務器,數據庫壓力上來了,導致請求響應慢,甚至宕機。

為了解決這個問題,我們可能會想到,讓數據庫處理完一個請求後再處理下一個請求不就好了么。就這樣,消息隊列出來了。消息隊列,又稱為MQ(Message Queue),它實現了讓多個請求以消息的形式排好隊,讓消息處理程序一個一個的處理,有效防止了高並發給服務器帶來的壓力。

消息隊列應用場景

MQ應用的典型場景有異步、削峰、解耦三種。

異步
譬如說一個系統A,它有一個操作處理完自己的邏輯以後需要調用其他系統的接口,如下圖:
image
這時候,代碼是這樣:

public class SystemA {

	@Resource
	SystemBapi systemBapi;

	@Resource
	SystemCapi systemCapi;

	@Resource
	SystemDapi systemDapi;

	public void doSomething() {

		//產生一個id
		long id = doSomethingAction();

		//調用其他系統接口  
		systemBapi.doSomething(id);
		systemCapi.doSomething(id);
		systemDapi.doSomething(id);
	}
}

上面的代碼,系統A產生id的邏輯需要50ms,調用系統B的接口需要300ms,調用系統C的接口需要300ms,調用系統D的接口需要300ms。一個這樣的操作就需要50+300+300+300=950ms。如果後面還要對接其他系統,這個操作的時間會更長。

如果調用其他系統接口實時性要求不高,那麼,為了提高用戶體驗和吞吐量,調用其他系統接口的操作就可以交給MQ實現異步操作。如下圖:
image
系統A執行完了以後,將id給到消息隊列中,然後就直接返回了。

削峰
譬如有3台服務器組成集群,每台服務器的處理能力是1000個QPS/S,合起來就是3000個QPS/S。遇上了流量高峰,達到了5000個QPS/S,請求數量已經超過所有服務器總的處理能力,這時候就可以考慮利用MQ來控制並發數,以免服務器崩潰。

具體做法是所有請求先進入到MQ,然後每個服務器根據自己的能夠處理的請求數去消費消息,也就是無論每秒多個QPS,系統只處理能力範圍內的請求數,剩下的請求等有資源再去處理,這就是「削峰填谷」,如下圖:
image

解耦
解耦就是降低了消息生產者與消費者的耦合度。耦合度高,程序維護起來就會很麻煩。譬如,系統A產生了一個id後,需要把id交給系統B、系統C、系統D去處理。如果由系統A直接去調用其他系統接口,系統A的程序代碼需要寫上調用系統B、系統C、系統D接口的代碼。如果某一天系統C說不需要處理系統A的id了,讓系統A不要調用系統C的接口,那系統A要改代碼。又某一天系統E說我要處理系統A的id,讓系統A調用系統E的接口,系統A又得改代碼。系統A的程序員這樣子搞煩不煩?

系統A程序員有一天開竅了,把程序里所有調用外部系統的代碼都屏蔽,弄了個MQ中間件,讓系統A產生id以後就給到MQ。然後發個公告告訴所有其他系統的程序員,你們誰想要我這邊產生的id你們自己去MQ拿,別來煩我。image
這樣一來,系統A跟其他系統就解耦了,代碼也不用改來改去。

消息隊列要注意的問題

問題一:可用性
MQ作為整個整個分佈式架構的重要部件,如果MQ服務不可用,那整個系統都掛了。因此,MQ必須要支持集群。當下主流的MQ中間件都能夠不同程度的支持集群,實現了MQ服務的高可用。

問題二:消息丟失
消息丟失有可能發生在生產者丟失消息、MQ本身丟失消息、消費者丟失消息3個方面。

  • 生產者丟失消息
    生產者丟失消息一般是在發送消息的時候出現異常(譬如網絡異常),導致MQ無法接收到消息。這個問題可以採用本地消息表+回調通知+定時任務的方式解決。

    就以系統A發送消息,系統B消費消息為例,具體解決方案如下:
    1、系統A執行本地事務業務邏輯,並且往本地消息表插入一條數據(代表準備要發送的消息),消息狀態為「未發送」。本地事務成功,提交保存本地數據,失敗則回滾。
    2、本地事務成功後,發送消息給MQ。
    3、MQ接收到消息後,回調通知系統A,系統A把本地消息表對應的消息記錄狀態變為「已發送」。
    4、定時任務輪詢本地消息表,超過一定時間狀態為「未發送」的消息重新發送給MQ。
    5、定時任務處理超過一定次數一直發送不成功的消息告警,人工介入。

  • MQ丟失消息
    消息成功發送到MQ,是先放到內存里的,如果還沒來得及給消費者消費消息,MQ服務就掛了,就會丟失消息。這個問題一方面可以做集群,但集群的數據同步也需要一定時間,如果在同步數據之前就MQ服務掛了,消息也會丟失。還有一個方法就是MQ接收到消息的同時,把消息數據持久化到磁盤,這樣,MQ服務恢復的時候就可以從磁盤獲取數據重新給消費者消費。可能有人會問,那消息還沒來得及持久化到磁盤MQ服務就掛了咋辦?如果是這樣,就可以用到前面說到的本地消息表,把本地消息表裡的數據重新發一遍。

  • 消費者丟失消息
    消費者從MQ拉取消息,還沒來得及處理消息,消費者服務器掛了。此時,可能造成消費者丟失消息。這種情況,可以讓消費者處理完消息時給MQ一個確認消息來解決。如果MQ沒有收到確認消息,就會有重試的機制,最終確保消息給到消費者消費。當然了,如果重試超過一定次數,就應該告警,人工介入。

問題三:重複消費
因為在網絡延遲的情況下,消息重複發送的問題不可避免的發生。譬如,生產者發送消息的時候使用了重試機制,發送消息後由於網絡原因沒有收到MQ的確認信息,然後又去重新發送了一次消息。但其實MQ已經接到了消息,並返回了響應,只是因為網絡原因導致生產者沒有收到MQ的確認信息。這種情況下,生產者的消息重試機制就會繼續就這個消息重新發送,從而導致同一條消息多次發送,這樣消費者也會重複消費這條消息。當然,這只是列舉了一種情況,實際上還有其他情況會導致消息被重複消費。

解決重複消費的關鍵就是在消費者端引入冪等性機制。什麼是冪等性機制呢?我們可以把它理解成,假如一個接口被重複調用,依然可以保證數據的準確性。舉個例子,比如每條消息都會有一條唯一的id,消費者處理完這個消息會存儲這個id,如果處理消息之前能找到這個id,就說明這條消息已經處理過了,就不做處理並且返回給MQ一個確認信息。

消息隊列中間件

為什麼要用消息隊列中間件?自己寫不行嗎?我們之所以要用中間件,是因為這些中間件已經解決了很多消息隊列常見的問題(高可用、消息丟失、重複消費……),而且各種中間件都有各自的特性,已經做得非常成熟了,你確定你寫的有這些中間件好用嗎?

目前在市面上比較主流的MQ中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等這幾種。網上找來這幾個中間件的對比,如下表:

特性 ActiveMQ RabbitMQ Kafka RocketMQ
所屬社區/公司 Apache Mozilla Public License Apache Apache/Ali
單機呑吐量 萬級(最差) 萬級 十萬級 十萬級(最高)
時效性 毫秒級 微秒級 毫秒級 毫秒級
可用性 高(主從) 高(主從) 非常高(分佈式) 非常高(分佈式)
功能特性 MQ領域功能極其完備 基礎erlang開發,所以並發能力很強,性能極其好,延時很低 功能較為簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用 MQ功能比較完備,擴展性佳
消息可靠性 有較低的概率丟失數據 基本不丟 經過參數優化配置,可以做到 0 丟失 同 Kafka
事務 支持 不支持 支持 支持
broker端消息過濾 支持 不支持 不支持 可以支持Tag標籤過濾和SQL表達式過濾
消息查詢 支持 根據消息id查詢 不支持 支持Message id或Key查詢
消息回溯 支持 不支持 理論上可以支持時間或offset回溯,但是得修改代碼。 支持按時間來回溯消息,精度毫秒,例如從一天之前的某時某分某秒開始重新消費消息。
路由邏輯 基於交換機,可配置複雜路由邏輯 根據topic 根據topic,可以配置過濾消費
持久化 內存、文件、數據庫 隊列基於內存,只能少量堆積 磁盤,大量堆積 磁盤,大量堆積
順序消息 支持 不支持 支持 支持
社區活躍度
適用場景 主要場景就是解耦和異步調用,較少在大規模吞吐的場景中使用 數據量沒有那麼大,小公司 一般配合大數據類的系統來進行實時數據計算、日誌採集等場景。 目前在阿里被廣泛應用在訂單、交易、充值、流計算、消息推送、日誌流式處理、binglog分發消息等場景。

根據上表,我個人認為對性能要求比較高的,推薦選擇RocketMQ,畢竟經歷了多年阿里雙十一極端並發的場景。如果是大數據領域的,可以選擇Kafka。