RocketMQ掃盲篇

  • 2020 年 10 月 20 日
  • 筆記

本篇博客主要參考:
《淺入淺出》-RocketMQ 敖丙
APACHE-RocketMQ Gitee RocketMQ官方文檔
RocketMQ 實戰與進階 GitChat

又是好久沒有寫博客了,雖然可以找出無數個沒有寫的博客的理由,但是說到底,還是一個字「懶」。今天我終於吃了一顆治療懶癌的藥丸,決定寫一篇博客。介紹什麼好呢,思來想去,還是介紹下RocketMQ吧,畢竟寫了30多篇博客,還沒有好好寫過關於MQ的博客呢。本篇博客比較基礎,不涉及到源碼分析,只是掃盲。

MQ有什麼用

解耦

我覺得從某種角度來說,微服務促進了MQ的蓬勃發展,本來一個系統有N多個模塊,所有模塊都強耦合在一起,現在微服務了,一個模塊就是一個系統,系統之間肯定需要交互,交互有三種常見的方法,一種是RPC,一種是HTTP,一種就是MQ了。

異步

原本一個業務分為N步,要一步一步處理,才能把最終的結果返回給用戶,現在有了MQ,先把最關鍵的部分處理完畢,然後發送消息到MQ,直接返回給用戶OK,至於後面的步驟在後台慢慢處理吧,真乃提升用戶體驗的神器。

削峰

某個接口的請求量突然飆升,勢必會對應用服務器、數據庫服務器造成很大的壓力,現在有了MQ,來多少請求都不在怕的,後台慢慢處理唄。

RocketMQ簡介

RocketMQ是用Java編寫的,是阿里開源的消息中間件,吸收了Kafka很多優點。Kafka也是比較熱門的消息中間件,不過Kafka是用Scala編寫的,不利於Java程序員去閱讀源碼,也不利於Java程序員做一些定製化的開發。接觸過Kafka的小夥伴都知道,要用好Kafka實屬不易,相對來說,RocketMQ簡單多了,而且RocketMQ有阿里加持,經歷了N次雙11的考驗,比較適合國內互聯網公司,所以國內使用RocketMQ的公司很多。

RocketMQ四大組件

image.png
圖片來自//gitee.com/mirrors/rocketmq/blob/master/docs/cn/architecture.md

可以看到RocketMQ主要有四個組件:

NameServer

  • 無狀態服務,註冊中心,可集群部署,但是NameServer節點之間沒有任何數據交互。
  • Borker會以定時把Topic路由信息上報給所有的NameServer。Producer、Consumer會隨機選擇一個NameServer定時Topic更新路由信息。
  • Topic路由信息在NameServer集群中採用最終一致性。
  • 保證AP。

Borker

  • RocketMQ的服務端,用於存儲消息、分發消息。
  • Borker會定時把自身擁有的所有的Topic路由信息上報給NameServer。
  • Borker有兩個角色:Master、Follower,Master承擔讀(消費消息)寫(生產消息)操作,如果Master比較忙,或者不可用,Follower可以承擔讀操作。BorkerId=0,代表是Matser,BorkerId!=0,代表是Follower,需要注意的有兩點:
    其一,目前為止,BorkerId=1的Follower才可以承擔讀操作;
    其二,只有較高版本的RocketMQ才支持當Master節點掛掉,Follower自動升級到Master。

Producer

生產者,每隔一定時間向NameServer發起Topic的路由信息查詢。

Consumer

消費者,每隔一定時間向NameServer發起Topic的路由信息查詢。

為什麼註冊中心不選用Zookeeper

其實,在低版本的RocketMQ中,確實是選用Zookeeper作為註冊中心的,但是後面改成了現在的NameServer,猜想主要原因是:

  • RocketMQ已經是一個中間件了,不想再依賴其他中間件。
  • Zookeeper比較重,有很多功能RocketMQ是用不到的,不如寫一個輕量級的註冊中心。
  • Zookeeper是CP,一旦觸發領導選舉,那麼註冊中心就不可用了,而RocketMQ的註冊中心,不需要強一致性,只要保證最終一致性。

RocketMQ消息領域模型

Message

  • 傳輸的消息。
  • 消息必須有Topic。
  • 消息可以有多個Tag和多個Key,可以看做消息的附加屬性。

Topic

  • 一類消息的集合。
  • 每個消息必須有一個Topic。
  • 消息的第一級類型。

Tag

  • 一個消息除了有Topic之外,還可以有Tag,用來細分同一個Topic下的不同種類的消息。
  • Tag不是必須的。
  • 消息的第二級類型。

Group

分為ProducerGroup,ConsumerGroup,我們更多的是關注ConsumerGroup,ConsumerGroup包含多個Consumer。

在集群消費模式下,一個ConsumerGroup下的Consumer共同消費一個Topic,且每個Consumer會被分配到N個隊列,但是一個隊列只會被一個Consumer消費,不同的ConsumerGroup可以消費同一個Topic,一條消息會被訂閱此Topic的所有ConsumerGroup消費。

Queue

  • 一個Topic默認包含四個Queue。
  • 在集群消費模式下,同一個ConsumerGroup中的Consumer可以消費多個Queue的消息,但是一個Queue只能被一個Consumer消費。
  • Queue中的消息是有序的。
  • 分為讀Queue和寫Queue,一般來說,讀Queue的數量和寫Queue的數量是一致的,否則很容易出問題。

消費模式

消費模式有兩種:Clustering(集群消費)和Broadcasting(廣播消費)。

和其他MQ不同,其他MQ是在發送消息的時候,指定是集群消費還是廣播消費,RocketMQ是在消費者端設置是集群消費還是廣播消費。

Clustering(集群消費)

默認情況下是集群消費模式,該模式下,ConsumerGroup所有的Consumer共同消費一個Topic的消息,每個Consumer負責消費N個隊列的消息(N也可能為1,甚至是0,沒有分配到隊列),但是一個隊列只會被一個Consumer消費。如果某個Consumer掛掉,ConsumerGroup下的其他Consumer會接替掛掉的Consumer繼續消費。

集群消費模式下,消費進度維護在Borker端,存儲路徑為${ROCKET_HOME}/store/config/ consumerOffset.json,如下圖所示:
image.png
使用topicName@consumerGroupName為Key,消費進度為Value,Value的形式是queueId:offset ,說明如果有多個ConsumerGroup,每個ConsumerGroup的消費進度是不同的,需要分開來存儲。

Broadcasting(廣播消費)

廣播消費消息會發給ConsumerGroup中所有的Consumer。

廣播消費模式下,消費進度維護在Consumer端。

消費隊列負載算法與重平衡機制

消費隊列負載算法

我們知道了在集群消費模式下,ConsumerGroup下所有的Consumer共同消費一個Topic的消息,每個Consumer負責消費N個隊列的消息,那麼具體是如何分配的呢?這就涉及到消費隊列負載算法了。

RocketMQ提供了眾多的消費隊列負載算法,其中最常用的是兩種算法,即AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle。下面我們來看下這兩個算法的區別。

假設,現在一個Topic有16個隊列,用q0~q15表示,有3個Consumer,用c0-c2表示。

用AllocateMessageQueueAveragely消費隊列負載算法的結果如下:

  • c0:q0 q1 q2 q3 q4 q5
  • c1:q6 q7 q8 q9 q10
  • c2:q11 q12 q13 q14 q15

用AllocateMessageQueueAveragelyByCircle消費隊列負載算法的結果如下:

  • c0:q0 q3 q6 q9 q12 q15
  • c1:q1 q4 q7 q10 q13
  • c2:q2 q5 q8 q11 q14

ConsumerGroup下所有的Consumer共同消費一個Topic的消息,每個Consumer負責消費N個隊列的消息,但是一個隊列不能同時被N個Consumer消費,這意味着什麼?

聰明的你一定可以想到,如果一個Topic只有4個隊列,而有5個Consumer,那麼有一個Consumer將不能分配到任何隊列,所以在RocketMQ中,Topic下隊列的個數直接決定了Consumer的最大個數,也就說明,不能光靠增加Consumer來提高消費速度。

重平衡

雖然建議在創建Topic的時候,就應該充分考慮隊列的個數,但是實際情況往往是不盡人意的,哪怕隊列數沒有發生改變,Consumer的數量也一定會發生改變,比如Consumer的上下線,比如某個Consumer掛了,比如新增了Consumer。隊列的擴容、縮容,Consumer的擴容、縮容都會導致重平衡,也就是為Consumer重新分配消費的隊列。

在RocketMQ中,Consumer會定時查詢Topic的隊列的個數,Consumer的個數,如果發生了改變,就會觸發重平衡。

重平衡是RocketMQ內部實現的,程序員無需關心。

Pull OR Push?

一般來說,MQ有兩種方法獲取消息:

  • Pull:Consumer主動拉取消息,好處是Consumer可以控制拉取消息的頻率,條數,Consumer知道自身的消費能力,所以在Consumer端不容易造成消息堆積,但是實時性不是太好,效率相對較低。
  • Push:Broker主動發送消息,好處是實時性、效率比較高,但是Broker無法知道Consumer端的消費能力,如果發給Consumer的消息過多,會造成Consumer端的消息堆積;如果發給Consumer的數據太少,又會造成Consumer端的空閑。

不管是Pull,還是Push,Consumer總會與Broker產生交互,交互的方式一般有短連接、長連接、輪詢三種方式。

看起來,RocketMQ支持既支持Pull,也支持Push,但是實際上Push也是用Pull實現的,那麼Consumer是怎麼與Broker產生交互的呢?

這就是RocketMQ設計的巧妙的地方了,既不是短連接,也不是長連接,也不是輪詢,而是採用的長輪詢。

長輪詢

Consumer發起拉取消息的請求,分為兩種情況:

  • 有消息:Consumer拿到消息後,連接斷開。
  • 沒有消息:Borker Hold(保持)住連接一定時間,每隔5秒,檢查下是否有消息,如果有消息,給Consumer,連接斷開。

事務消息

RocketMQ支持事務消息,Producer把事務消息發送給Broker後,Broker會把消息存儲在系統Topic:RMQ_SYS_TRANS_HALF_TOPIC,這樣Consumer就無法消費到這條消息了。

Broker會有一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC的消息,向Producer發起回查,回查的狀態有三種:提交、回滾、未知。

  • 如果回查的狀態是提交,回滾,會觸發消息的提交和回滾;
  • 如果是未知,會等待下一次回查,RocketMQ可以設置一條消息的回查間隔與回查次數,超過一定的回查次數,消息會自動回滾。

延遲消息

延遲消息是指息發到Broker後,不能立刻被Consumer消費,需要等待一定的時間才可以被消費到,RocketMQ只支持特定的延遲時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消費形式

RocketMQ支持兩種消費形式:並發消費、順序消費。
如果是順序消費,需要保證排序的消息在同一個隊列。如何選擇隊列發送呢,RocketMQ發送消息的方法有好幾個重載,其中有一個重載方法支持隊列的選擇。

同步刷盤、異步刷盤

Producer把消息發送到Borker中,Borker是需要把消息持久化的,RocketMQ支持兩種持久化策略:

  • 同步刷盤:Borker把消息持久後才返回ACK給Producer,好處是消息可靠性高,但是效率較慢。
  • 異步刷盤:Broker把消息寫入到PageCache中,就返回ACK給Producer。好處是效率極高,但是如果服務器掛了,消息可能會丟失,如果只是RocketMQ服務掛了,不會造成消息丟失。

同步複製、異步複製

為了MQ的可靠性、可用性,在生產環境,一般會部署Follower節點,Follower節點會複製Master的數據,RocketMQ支持兩種持複製策略:

  • 同步複製:Master、Follower都把消息寫入成功,才返回ACK給Producer,可靠性較高,但是效率較慢。
  • 異步複製:只要Master寫入成功,就返回ACK給Producer,效率較高,但是可能會丟失消息。

“寫入”是寫入PageCache,還是寫入硬盤,要看Follower Broker的配置。

再談談Producer

RocketMQ提供了三種發送消息的方法:

  • oneway:fire and forget,單向消息,指消息發送出去後,就不管了,這個方法是沒有返回值的。
  • 同步:消息發送出去後,同步等待Borker的響應。
  • 異步:消息發送出去後,立即返回,收到Boker的響應後,會執行函調方法。

在實際開發中,一般選用同步方法,如果要提高RocketMQ的性能,一般都是修改Borker端的參數,特別是刷盤策略和複製策略。

發送消息重試

消息發送時,如果使用了MessageQueueSelector,那消息發送的重試機制將會失效。

發送消息響應可能為以下四種:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

除了第一種,其他情況都是有問題的,為了保證消息不丟失,需要設置Producer參數:RetryAnotherBrokerWhenNotStoreOK為true。

故障規避機制

如果消息發送失敗了,重試的時候,還是發送給這個Borker,那麼大概率發送還是失敗的,RockteMQ設計精巧之處在於,重試的時候,會自動避開這個Borker,而選擇其他Borker,但是目前為止,異步發送沒有那麼智能,只會在一個Borker上重試,所以強烈建議選擇同步發送方式。

RocketMQ提供了兩種故障規避機制。用參數SendLatencyFaultEnable來控制。

  • false:默認值,只有在重試的時候,才會啟用故障規避機制,比如發送消息給BorkerA失敗了,重試的時候,會選擇BorkerB,但是下次發送消息,還是會選擇發送給BorkerA。
  • true:開啟延遲退避機制,一旦消息發送給BorkerA失敗,就會悲觀的認為在一段時間內,BorkerA不可用,在將來的一段時間內,不會再向BorkerA發送消息。

延遲退避機制看起來很好用,但是一般來說Borker端繁忙,導致Borker不可用或者網絡不可用只是一瞬間的事情,馬上就可以恢復,如果開啟了延遲退避機制,本來可用的Borker在一段時間內卻被規避了,其他Borker更加繁忙,那可能情況更糟糕。

再談談Consumer

Consumer線程注意事項

Consumer有兩個參數,可以消費的並行度,即ConsumeThreadMinConsumeThreadMax,看起來給人的感覺是,如果Consumer端堆積消息比較少,消費線程數為ConsumeThreadMin;如果Consumer端堆積消息比較多,就自動開啟新的線程來消費,直到消費線程數為ConsumeThreadMax。但是並不是這樣,Consumer內部持有一個線程池,選用的是無界隊列,也就是ConsumeThreadMax參數是無效的,所以在實際開發中,ConsumeThreadMinConsumeThreadMax往往設置成一樣。

ConsumeFromWhere

如果查詢不到消費進度的時候,Consumer從哪裡開始消費,RocketMQ支持從最新消息、最早消息、指定時間戳這三種方式進行消費。

消費消息重試

RocketMQ會為每個ConsumerGroup都設置一個Topic名稱為%RETRY%+consumerGroup的重試隊列,用來保存需要給ConsumerGroup重試的消息,但是重試需要一定的延時時間,RocketMQ對於重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX的延遲隊列中,後台定時任務按照對應的時間進行Delay後重新保存至%RETRY%+consumerGroup的重試隊列中。

消息堆積、消費能力不夠,怎麼辦

  • 提高消費進度,這是最好的辦法。
  • 增加隊列,增加Consumer。
  • 原先的Consumer作為搬磚工,根據一定的規則把消息「搬」到多個新的Topic,再開幾個ConsumerGroup去消費不同的Topic。
  • 新開一個ConsumerGroup去消費,也就是兩個ConsumerGroup同時消費一個Topic,但是需要注意offset的判斷,比如一個ConsumerGroup消費offset為奇數的消息,一個ConsumerGroup消費offset為偶數的消息。

本來以為寫掃盲文,應該會寫的很順,但是還是想多了,因為是掃盲文,面向的是沒有怎麼接觸過RocketMQ的小夥伴,但是RocketMQ有沒有那麼簡單,不可能用一篇博客,就讓沒有怎麼接觸過RocketMQ的小夥伴順利入門,所以在寫博客的時候,一直在想,這個東西重要嗎,需要仔細描述嗎;這個東西可以忽視,可以不介紹嗎 等等,大家可以看到本文基本都是在介紹各種概念,幾乎沒有涉及到API的層面,因為一旦涉及到API,那麼估計寫兩個星期也寫不完。

End