聊聊消息隊列(MQ)那些事
每年的雙十一期間,各大電商平台流量暴增,同時,電商平台系統的負載壓力也會很大。譬如訂單支付的場景,每個訂單支付成功後,服務器可能要完成扣減積分、扣減優惠券、扣減商品庫存、發短訊等一系列操作。單個用戶請求,服務器處理起來並沒有什麼問題,但是,瞬時並發的多個請求到了服務器,數據庫壓力上來了,導致請求響應慢,甚至宕機。
為了解決這個問題,我們可能會想到,讓數據庫處理完一個請求後再處理下一個請求不就好了么。就這樣,消息隊列出來了。消息隊列,又稱為MQ(Message Queue),它實現了讓多個請求以消息的形式排好隊,讓消息處理程序一個一個的處理,有效防止了高並發給服務器帶來的壓力。
消息隊列應用場景
MQ應用的典型場景有異步、削峰、解耦三種。
異步
譬如說一個系統A,它有一個操作處理完自己的邏輯以後需要調用其他系統的接口,如下圖:
這時候,代碼是這樣:
public class SystemA {
@Resource
SystemBapi systemBapi;
@Resource
SystemCapi systemCapi;
@Resource
SystemDapi systemDapi;
public void doSomething() {
//產生一個id
long id = doSomethingAction();
//調用其他系統接口
systemBapi.doSomething(id);
systemCapi.doSomething(id);
systemDapi.doSomething(id);
}
}
上面的代碼,系統A產生id的邏輯需要50ms,調用系統B的接口需要300ms,調用系統C的接口需要300ms,調用系統D的接口需要300ms。一個這樣的操作就需要50+300+300+300=950ms。如果後面還要對接其他系統,這個操作的時間會更長。
如果調用其他系統接口實時性要求不高,那麼,為了提高用戶體驗和吞吐量,調用其他系統接口的操作就可以交給MQ實現異步操作。如下圖:
系統A執行完了以後,將id給到消息隊列中,然後就直接返回了。
削峰
譬如有3台服務器組成集群,每台服務器的處理能力是1000個QPS/S,合起來就是3000個QPS/S。遇上了流量高峰,達到了5000個QPS/S,請求數量已經超過所有服務器總的處理能力,這時候就可以考慮利用MQ來控制並發數,以免服務器崩潰。
具體做法是所有請求先進入到MQ,然後每個服務器根據自己的能夠處理的請求數去消費消息,也就是無論每秒多個QPS,系統只處理能力範圍內的請求數,剩下的請求等有資源再去處理,這就是「削峰填谷」,如下圖:
解耦
解耦就是降低了消息生產者與消費者的耦合度。耦合度高,程序維護起來就會很麻煩。譬如,系統A產生了一個id後,需要把id交給系統B、系統C、系統D去處理。如果由系統A直接去調用其他系統接口,系統A的程序代碼需要寫上調用系統B、系統C、系統D接口的代碼。如果某一天系統C說不需要處理系統A的id了,讓系統A不要調用系統C的接口,那系統A要改代碼。又某一天系統E說我要處理系統A的id,讓系統A調用系統E的接口,系統A又得改代碼。系統A的程序員這樣子搞煩不煩?
系統A程序員有一天開竅了,把程序里所有調用外部系統的代碼都屏蔽,弄了個MQ中間件,讓系統A產生id以後就給到MQ。然後發個公告告訴所有其他系統的程序員,你們誰想要我這邊產生的id你們自己去MQ拿,別來煩我。
這樣一來,系統A跟其他系統就解耦了,代碼也不用改來改去。
消息隊列要注意的問題
問題一:可用性
MQ作為整個整個分佈式架構的重要部件,如果MQ服務不可用,那整個系統都掛了。因此,MQ必須要支持集群。當下主流的MQ中間件都能夠不同程度的支持集群,實現了MQ服務的高可用。
問題二:消息丟失
消息丟失有可能發生在生產者丟失消息、MQ本身丟失消息、消費者丟失消息3個方面。
-
生產者丟失消息
生產者丟失消息一般是在發送消息的時候出現異常(譬如網絡異常),導致MQ無法接收到消息。這個問題可以採用本地消息表+回調通知+定時任務的方式解決。
就以系統A發送消息,系統B消費消息為例,具體解決方案如下:
1、系統A執行本地事務業務邏輯,並且往本地消息表插入一條數據(代表準備要發送的消息),消息狀態為「未發送」。本地事務成功,提交保存本地數據,失敗則回滾。
2、本地事務成功後,發送消息給MQ。
3、MQ接收到消息後,回調通知系統A,系統A把本地消息表對應的消息記錄狀態變為「已發送」。
4、定時任務輪詢本地消息表,超過一定時間狀態為「未發送」的消息重新發送給MQ。
5、定時任務處理超過一定次數一直發送不成功的消息告警,人工介入。
-
MQ丟失消息
消息成功發送到MQ,是先放到內存里的,如果還沒來得及給消費者消費消息,MQ服務就掛了,就會丟失消息。這個問題一方面可以做集群,但集群的數據同步也需要一定時間,如果在同步數據之前就MQ服務掛了,消息也會丟失。還有一個方法就是MQ接收到消息的同時,把消息數據持久化到磁盤,這樣,MQ服務恢復的時候就可以從磁盤獲取數據重新給消費者消費。可能有人會問,那消息還沒來得及持久化到磁盤MQ服務就掛了咋辦?如果是這樣,就可以用到前面說到的本地消息表,把本地消息表裡的數據重新發一遍。
-
消費者丟失消息
消費者從MQ拉取消息,還沒來得及處理消息,消費者服務器掛了。此時,可能造成消費者丟失消息。這種情況,可以讓消費者處理完消息時給MQ一個確認消息來解決。如果MQ沒有收到確認消息,就會有重試的機制,最終確保消息給到消費者消費。當然了,如果重試超過一定次數,就應該告警,人工介入。
問題三:重複消費
因為在網絡延遲的情況下,消息重複發送的問題不可避免的發生。譬如,生產者發送消息的時候使用了重試機制,發送消息後由於網絡原因沒有收到MQ的確認信息,然後又去重新發送了一次消息。但其實MQ已經接到了消息,並返回了響應,只是因為網絡原因導致生產者沒有收到MQ的確認信息。這種情況下,生產者的消息重試機制就會繼續就這個消息重新發送,從而導致同一條消息多次發送,這樣消費者也會重複消費這條消息。當然,這只是列舉了一種情況,實際上還有其他情況會導致消息被重複消費。
解決重複消費的關鍵就是在消費者端引入冪等性機制。什麼是冪等性機制呢?我們可以把它理解成,假如一個接口被重複調用,依然可以保證數據的準確性。舉個例子,比如每條消息都會有一條唯一的id,消費者處理完這個消息會存儲這個id,如果處理消息之前能找到這個id,就說明這條消息已經處理過了,就不做處理並且返回給MQ一個確認信息。
消息隊列中間件
為什麼要用消息隊列中間件?自己寫不行嗎?我們之所以要用中間件,是因為這些中間件已經解決了很多消息隊列常見的問題(高可用、消息丟失、重複消費……),而且各種中間件都有各自的特性,已經做得非常成熟了,你確定你寫的有這些中間件好用嗎?
目前在市面上比較主流的MQ中間件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等這幾種。網上找來這幾個中間件的對比,如下表:
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所屬社區/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
單機呑吐量 | 萬級(最差) | 萬級 | 十萬級 | 十萬級(最高) |
時效性 | 毫秒級 | 微秒級 | 毫秒級 | 毫秒級 |
可用性 | 高(主從) | 高(主從) | 非常高(分佈式) | 非常高(分佈式) |
功能特性 | MQ領域功能極其完備 | 基礎erlang開發,所以並發能力很強,性能極其好,延時很低 | 功能較為簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用 | MQ功能比較完備,擴展性佳 |
消息可靠性 | 有較低的概率丟失數據 | 基本不丟 | 經過參數優化配置,可以做到 0 丟失 | 同 Kafka |
事務 | 支持 | 不支持 | 支持 | 支持 |
broker端消息過濾 | 支持 | 不支持 | 不支持 | 可以支持Tag標籤過濾和SQL表達式過濾 |
消息查詢 | 支持 | 根據消息id查詢 | 不支持 | 支持Message id或Key查詢 |
消息回溯 | 支持 | 不支持 | 理論上可以支持時間或offset回溯,但是得修改代碼。 | 支持按時間來回溯消息,精度毫秒,例如從一天之前的某時某分某秒開始重新消費消息。 |
路由邏輯 | 基於交換機,可配置複雜路由邏輯 | 根據topic | 根據topic,可以配置過濾消費 | |
持久化 | 內存、文件、數據庫 | 隊列基於內存,只能少量堆積 | 磁盤,大量堆積 | 磁盤,大量堆積 |
順序消息 | 支持 | 不支持 | 支持 | 支持 |
社區活躍度 | 低 | 中 | 高 | 高 |
適用場景 | 主要場景就是解耦和異步調用,較少在大規模吞吐的場景中使用 | 數據量沒有那麼大,小公司 | 一般配合大數據類的系統來進行實時數據計算、日誌採集等場景。 | 目前在阿里被廣泛應用在訂單、交易、充值、流計算、消息推送、日誌流式處理、binglog分發消息等場景。 |
根據上表,我個人認為對性能要求比較高的,推薦選擇RocketMQ,畢竟經歷了多年阿里雙十一極端並發的場景。如果是大數據領域的,可以選擇Kafka。