rocketmq 精華

(ps:)通過本人語雀文檔閱讀體驗更好哦–有目錄

介紹

rocket mq 翻譯成中文就是火箭消息隊列,從名字就可以看出來,它是一個很快的消息隊列… rocket mq 是 阿里巴巴研製的後面貢獻給 apache 基金會,其設計思想很多都是來自 kafka,所以和 kafka 有不少類似的地方,但是也是有很多 kafka 沒有的新特性,比如:廣播消費(這個其實 kafka 也是可以通過設置消費組來實現,但是 rocket mq 比較方便)、延遲消費、多執行緒消費、擁有自己的 nameservice 伺服器等具體的看下錶。

消息產品 客戶端SDK 協議和規範 有序消息 延遲消息 批量消息 廣播消息 消息過濾器 伺服器觸發重新投遞 消息存儲 消息追溯 高可用性和故障轉移 消息跟蹤 配置 管理和操作工具
kafka Java、Scala 等 拉模式,支援TCP 確保分區內的消息排序 不支援 支援,帶有非同步生產者 不支援 支援,可以使用Kafka Streams過濾消息 不支援 高性能文件存儲 支援的偏移指示 支援,需要 ZooKeeper 伺服器 不支援 Kafka 使用鍵值對格式進行配置。這些值可以從文件或以編程方式提供。 支援,使用終端命令公開核心指標
火箭MQ Java、C++、Go 拉模型,支援TCP、JMS、OpenMessaging 確保消息的嚴格排序,並可以優雅地橫向擴展 支援的 支援,同步模式避免消息丟失 支援的 支援,基於 SQL92 的屬性過濾器表達式 支援的 高性能和低延遲的文件存儲 支援的時間戳和偏移量兩個表示 受支援的主從模型,無需其他套件 支援的 開箱即用,用戶只需要注意幾個配置 支援的、豐富的網路和終端命令來公開核心指標

整體架構

技術架構

img

  • Producer:消息發布的角色,支援分散式集群方式部署。Producer通過MQ的負載均衡模組選擇相應的Broker集群隊列進行消息投遞,投遞的過程支援快速失敗並且低延遲。

  • Consumer:消息消費的角色,支援分散式集群方式部署。支援以push推,pull拉兩種模式對消息進行消費。同時也支援集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。

  • NameServer:NameServer是一個非常簡單的Topic路由註冊中心,其角色類似Dubbo中的zookeeper,支援Broker的動態註冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的註冊資訊並且保存下來作為路由資訊的基本數據。然後提供心跳檢測機制,檢查Broker是否還存活;路由資訊管理,每個NameServer將保存關於Broker集群的整個路由資訊和用於客戶端查詢的隊列資訊。然後Producer和Conumser通過NameServer就可以知道整個Broker集群的路由資訊,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行資訊通訊。Broker是向每一台NameServer註冊自己的路由資訊,所以每一個NameServer實例上面都保存一份完整的路由資訊。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer,Consumer仍然可以動態感知Broker的路由的資訊。

  • BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證

部署架構

img

  • NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何資訊同步。

  • Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關係通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時註冊Topic資訊到所有NameServer。 注意:當前RocketMQ版本在部署架構上支援一Master多Slave,但只有BrokerId=1的從伺服器才會參與消息的讀負載。

  • Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由資訊,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。

  • Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master伺服器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從伺服器是否可讀等因素建議下一次是從Master還是Slave拉取。

集群工作流程

  • 啟動NameServer,NameServer起來後監聽埠,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。

  • Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker資訊(IP+埠等)以及存儲所有Topic資訊。註冊成功後,NameServer集群中就有Topic跟Broker的映射關係。

  • 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。

  • Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然後與隊列所在的Broker建立長連接從而向Broker發消息。

  • Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接通道,開始消費消息。

一些概念

  • 生產者:消息發送方

  • 生產者組:同一類Producer的集合

  • topic :一類消息的集合,可以根據業務場景來指定 topic

  • tag:一個應用儘可能用一個Topic,而消息子類型則可以用tags來標識。tags可以由應用自由設置,只有生產者在發送消息設置了tags,消費方在訂閱消息時才可以利用tags通過broker做消息過濾。

  • keys:每個消息在業務層面的唯一標識碼要設置到keys欄位,方便將來定位消息丟失問題,應用可以通過topic、key來查詢這條消息內容,以及消息被誰消費。

  • broker:消息中轉角色,負責存儲消息、轉發消息。

  • queue(隊列):一個邏輯概念,通過 queue 來對消費者進行消息的並發消費

  • NameServer:名稱伺服器,功能類似 kafka 中 zk 中代表的角色。名稱服務充當路由消息的提供者。

  • consumer:消費者,消費消息

  • consumer group:消費者組,同一類消費者,消費相同的 topic 資訊

生產者

和大多數的消息中間件一樣,生產者就是消息發送方,事件觸發後,生產者將消息發送到 mq 伺服器,以便消費組進行消息消費處理。

rocket mq 消息發送有三種方式

  • 同步發送

  • 非同步發送

  • 單向發送

前兩者消息發送是可靠的,會有消息應答和重試,單向發送沒有應答也沒有重試機制,消息可能會丟失。

Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上(會定時拉取這些元素據資訊),輪詢(默認是輪詢,但是可以在程式碼中指定queue選擇策略)從隊列列表中選擇一個隊列,然後與隊列所在的Broker建立長連接從而向Broker發消息。

消息發送的時候可以通過指定 tag 來區分具體的場景,便於消費者指定消費哪些 tag。可以通過指定 key 來檢索消息。

同步發送

消息是發送到 broker 上的,reocket mq 有個隊列的概念,類似於 kafka 的分區,但是和分區這種物理概念不同,隊列是個邏輯概念,這裡先把它當成分區來理解即可。

消息是發送到 broker 上的隊列的,由於一個 topic 可能有多個隊列,因此由負載均衡策略或者自己指定的策略發送到特定的隊列上。

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
    	producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // 創建消息,並指定Topic,Tag和消息體
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 發送消息到一個Broker
            SendResult sendResult = producer.send(msg);
            // 通過sendResult返回消息是否成功送達
            System.out.printf("%s%n", sendResult);
    	}
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

發送失敗策略

對於普通消息,消息發送默認採用輪詢策略來選擇所發送到的隊列,如果發送失敗,默認重試 2 次,但是重試時會選擇其他 broker,不會選擇之前失敗的那台 broker,當然,若只有一個 broker,也只能發送到這台 broker 了,但是會盡量發送到該 broker 上的其他 queue。

如果超過重試次數,則拋出異常,由程式設計師保證消息不丟,當然當生產者出現 RemotingException、MQClientException 和 MQBrokerException時,Producer 會自動重投消息,重投消息可能會導致消息發送重複,這是不可避免的。

以上策略也是在一定程度上保證了消息可以發送成功。如果業務對消息可靠性要求比較高,建議應用增加相應的重試邏輯:比如調用 send 同步方法發送失敗時,則嘗試將消息存儲到 db,然後由後台執行緒定時重試,確保消息一定到達 Broker。

保證

rocket mq 通過同步消息發送可以保證消息不丟,但是無法保證消息不重複,如果對消息重複有要求的在消費的時候需要做冪等處理。這也是 rocket mq 整體的保證:我可以不丟消息,但是消息可能會重複。

非同步發送

默認 send(msg) 將阻塞,直到返迴響應。因此,如果您關心性能,我們建議您使用以非同步方式運行的 send(msg, callback)。非同步也可以獲取響應。

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // 根據消息數量實例化倒計時計算器
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) {
                final int index = i;
            	// 創建消息,並指定Topic,Tag和消息體
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收非同步返回結果的回調
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
      	                System.out.printf("%-10d Exception %s %n", index, e);
      	                e.printStackTrace();
                    }
            	});
    	}
	// 等待5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

失敗重試

非同步發送失敗重試時,不會選擇其他 broker,僅在同一台 broker 上重試,所以該策略無法保證消息不丟。

單向發送

單向發送一般是用於不關心發送是否成功的場景,單項發送無法獲取響應,也不進行重試,常用於日誌發送場景,失敗了也不會造成什麼影響。

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 啟動Producer實例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 創建消息,並指定Topic,Tag和消息體
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 發送單向消息,沒有任何返回結果
        	producer.sendOneway(msg);

    	}
    	// 如果不再發送消息,關閉Producer實例。
    	producer.shutdown();
    }
}

NameServer

名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP 列表。多個Nameserver 實例組成集群,但相互獨立,沒有資訊交換。

NameService 是無狀態的,互相沒有通訊,所以也就沒有所謂的 leader 和 follower 的概念,非要有的話,那就是每台實例都是 leader,都可以提供服務。

工作方式

所有 broker 伺服器都需要和每個 NameService 維持一個長連接,定時發送心跳,NameServer 維護著所有 broker 資訊。生產者和消費組客戶端也會和其中某一台 NameServer 建立一個長連接,定時獲取最新的 broker 資訊。

客戶端 NameServer 選擇策略

客戶端首先會選擇一個隨機數,然後對 NameServer 節點數取模,得到的就是要連接的節點索引,如果連接失敗,就採用輪詢策略,去嘗試連接其他節點。

存在的問題

雖然使用 NameServer 伺服器而不適用 zk 可以降低對外部系統的耦合度,並且一台伺服器既可以是 NameServer 伺服器,也可以是 broker。但是由於 NameServer 是無狀態的,互相沒有消息同步,那麼在某一個瞬間可能會導致彼此資訊不一致的情況。但是最終資訊是會一致的。

因為是無狀態的,因此 NameServer 擴容的時候必須在客戶端配置中把擴容的機器的地址新增上,可以說擴容既方便又麻煩。

Broker

消息中轉角色,負責存儲消息、轉發消息。代理伺服器在 RocketMQ 系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作準備。代理伺服器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

模組

  1. Remoting Module:整個Broker的實體,負責處理來自clients端的請求。

  2. Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱資訊

  3. Store Service:提供方便簡單的API介面處理消息存儲到物理硬碟和查詢功能。

  4. HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。

  5. Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。

img

消息存儲

img

(圖一)

正常來說,我們 broker 是部署多台的,以便 broker 的負載均衡,降低壓力。多台 broker 之間是怎麼分配消息的呢,這個和 kafka 差不多,是按照隊列來的,如下圖。有一個 broker 集群,有兩台 broker master,一共有四個隊列,每台 broker master 分配兩個隊列,生產者根據發送方負載均衡策略發送到指定的隊列上。這裡的 q1,q2,q3,q4 其實是個邏輯概念,並沒有存儲真正的數據,他們就是我們下面要將的 consume queue。真正數據其實是存儲在 commit log 里。

img

(圖二)

commit log

到現在為止,我們一直在說消息是生產者發送到隊列,消費者消費隊列里的消息,會讓我們誤以為消息就是存儲在隊列里的,如果消息就是存儲在隊列里,那就和 kafka 每什麼區別了,kafka 將消息存儲在分區里。但其實,隊列只是一個邏輯概念。消息實際上是存儲在 commit log 文件里。

commit log 是消息主體以及元數據的存儲主體,存儲 Producer 端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G, 文件名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;

所有的消息,即使是不同的 topic 以及不同的隊列,都是存儲在 commit log里的,不做區分。那麼問題來了,既然所有 topic 和隊列的消息都存儲在裡面,消費者怎麼知道怎麼消費呢,難道 broker 要遍歷所有消息,找到滿足符合要求的數據然後推送給消費者嗎,這樣就太低效了,這時候就是 consume queue 登場的時候了。

consumeQueue

消息消費隊列,引入的目的主要是提高消息消費的性能,由於 RocketMQ 是基於主題 topic 的訂閱模式,消息消費是針對主題進行的,如果要遍歷 commitlog 文件中根據 topic 檢索消息是非常低效的。Consumer 即可根據 ConsumeQueue 來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定 Topic 下的隊列消息在 CommitLog 中的起始物理偏移量offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基於 topi c的commitlog 索引文件,故 consumequeue 文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue 文件採取定長設計,每一個條目共20個位元組,分別為8位元組的 commitlog 物理偏移量、4位元組的消息長度、8位元組tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

說白了 consume Queue 就是一個索引,用來定位具體消息的位置。

indexLog

index log 提供了一種可以通過key或時間區間來查詢消息的方法。IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。

消息存儲總結

rocket mq 採用這種混合型的存儲結構,主要是為了一個低延遲的讀取,雖然 kafka 的順序讀也是很快的,但是 rocket mq 採用預讀的形式將數據放入記憶體,對記憶體進行操作,會更快一些

弊端

這種存儲架構有幾個弊端:

  • 提交日誌和消費隊列需要在邏輯上保持一致,這給編程模型帶來了額外的複雜性。
  • 每次都是通過 comsume queue 獲取到消息在 commit log 文件的位置,會產生大量的隨機讀。

設計目的

但是官方也出了這樣的設計目的,同時這也是為了保證 rocket mq 的低延遲和高可靠性:

  1. 隨機閱讀。盡量多讀,提高頁快取命中率,減少讀IO操作。所以大記憶體還是比較可取的。如果大量的消息堆積,讀取性能會不會很差?答案是否定的,原因如下:
  • 即使消息大小只有1KB,系統也會提前讀取更多數據。這意味著對於後續數據讀取,將執行對主記憶體的訪問,而不是慢速磁碟 IO 讀取。
  • 從磁碟隨機訪問 CommitLog。如果在SSD的情況下將I/O調度器設置為NOOP,則讀取qps將大大加速。

預讀應該是很多針對讀頻繁場景的正常操作,

  1. 鑒於 ConsumeQueue 僅存儲固定大小的元數據,主要用於記錄消費進度,因此很好地支援隨機讀取。利用page cache prefetch,訪問 ConsumeQueue 和訪問主存一樣高效,即使是在海量消息堆積的情況下。因此,ConsumeQueue 不會對讀取性能帶來明顯的損失。
  2. CommitLog 存儲幾乎所有資訊,包括消息數據。類似於關係型資料庫的redo log,只要commit log存在,消費隊列、消息鍵索引等所有需要的數據都可以完全恢復。

消息刷盤

img

同步刷盤

同步刷盤是消息寫入到磁碟後才會給生產者返回 ack 消息,這種刷盤模式對可靠性來講是不錯的保障,但是對效率來說就比較低了。

非同步刷盤

非同步刷盤是只要消息寫入PageCache即可將成功的ACK返回給Producer端,又後台執行緒進行 pageCache 刷到磁碟。這種降低了讀寫延遲,提高了吞吐量,但是如果消息沒有即使刷入磁碟,機房斷電了,消息會丟失。

消息過濾

rocket mq 可以針對 tag 和 sql 表達式進行過濾,由於 consume queue 存儲著 tag 的 hash code,因此可以直接在 queue 這裡進行 tag 過濾,而無需進入到 commit log,但是可能會存在 hash 值一致的情況,因此拿到具體消息後還是要進行一遍過濾。

具體的消息過濾細節這裡就不詳細說明了(因為我不太關心,所以沒有深入學習…)

消息篩選

前面我們說過有個索引文件 index log 以及發送的時候可以指定 key,這個就是為消息篩選做的一些準備,有興趣的可以自己看官方文檔

事務機制

rocket mq 也支援事務的,但是我感覺使用場景不多。前面的 produce group 其實就是為事務服務的,當某一台生產者掛掉了,broker 會通知相同的 group 的其他生產者進行回滾,有興趣的自己翻閱下官方文檔。

消費者

消費者無非就是消費誰,怎麼消費,怎樣才算消費完成,接下來也將從這幾個方面進行介紹。

消費誰

前面其實已經有了很多鋪墊,這裡再啰嗦一下。消費的時候從 NameService 伺服器獲取到 broker ip 列表,然後獲取到自己要消費的 topic 隊列所在的 broker 的地址。根據 broker 記憶體儲的偏移量,從偏移量 + 1的位置開始拉取消費。

因此,消費者是以隊列為基本單位進行消費的

怎麼消費

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {

    	// 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// 設置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

    	// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("TopicTest", "*");
    	// 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
	}
}

消費者組

和 kafka 一樣,rocket mq 也有消費組的概念,一個消費組負責一起消費一個 topic 的所有消息,如果是集群消費,每個消費者負責 n 個隊列。也就是隊列和消費者是多對1的狀態,一個消費者可以消費多個隊列,一個隊列只能被一個消費者消費。當然,如果消費組裡的消費者數量大於隊列的數量,那麼就有一些消費者閑置,沒有消息可以消費。如果是廣播消費,那麼消費組裡的所有消費者都全量消費所有隊列消息。

消費執行緒

和 kafka 不同的是,kafka 的消費並發度最高等於分區數,假如有20個分區,那麼最多只能有20個執行緒一起消費。 rocket mq 不同,它可以指定一個消費者開多個執行緒去消費,因此並發度最高等於一個消費者的執行緒數 * 隊列數。

這也是 rocket mq 的一個賣點,消費速度更快了。

同時,當消息擠壓的時候不一定非要擴容,只需要增加消費者的執行緒數,當執行緒數到達一個瓶頸了,再進行擴容,變相的節省了資源。

消費方式

rocket mq 的消費方式有兩種,廣播消費和集群消費,集群消費就是一個消費者負責n個隊列,廣播消費就是每個消費者都全量消費所有隊列消息。

和 kafka 不同的是,kafka 的集群消費需要指定多個消費者組,每個消費者組一個消費者,但是 rocket mq 支援一個消費者組裡的所有消費者都全量消費 topic 消息,這樣也更方便許多,無需申請多個消費組。

廣播消費

廣播消費就是一個消費組裡的每個消費者都全量消費 topic 資訊。由於每個消費者的消費進度可能都不一樣,消費者之間互不關心各自的消費進度,因此,消費進度,也就是消息的偏移量保存在每台消費者自己身上。

集群消費

集群消費是一個消費組裡的消費者各自負責幾個隊列,消費者數量如果大於隊列數量,那麼會造成有幾個消費者被閑置,因此最好的情況下就是消費組裡的消費者數量與隊列保持一致。

集群消費有種情況就是消費過程中,可能組裡來了新的消費者,或者有消費者離開了,那麼就需要進行再均衡,也就是我們常說的 rebalence,就需要給組裡的消費者重新分配負責的隊列。那麼每個消費者就需要知道隊列里的消息被消費到哪裡了,以前消費過的消息就不再消費了,直接消費還沒消費的。這時候消費進度,也就是隊列偏移量就不能保存在本地了,因為如果你掛了,其他消費者不知道你消費到哪。所以消費進度就需要保存在 broker 上。具體的消息格式是map<topic&consumergroup,map<queue,offset>>,就是一個map嵌套map,外層map key 為 topic 與 消費組的結合,value為內層map,內層 map key 為特定隊列,value 為消費到的偏移量。整個map代表,某個 topic 的各個隊列被某個消費組消費到了哪裡。通過這個消息,當組裡有消費者加入或者離開後,隊列重新分配的時候,消費者就知道要從哪裡開始消費了。

rebalance

rebalabce 也就是再均衡

觸發時機
  • 某個主題的隊列數量發生了變化
  • 某個消費組的消費者數量發生了變化
影響
  • 消費暫停:在再均衡的那一瞬間,消費者是沒有拉取消息進行消費的

  • 消息重複:如果採用非同步提交偏移量的方式,可能在再均衡前提交的偏移量丟失了,這時候在均衡後,新的消費者在消費對應的隊列時就會重上一次成功提交的偏移量處開始消費,導致消息重複

  • 消息峰刺:由於再均衡的時候消費是暫停的,所以消息會積壓一點,再均衡後,消費壓力就大了些,會出現峰刺的情況

如何再均衡

當隊列數量改變或者消費組裡的消費者數量改變的時候,由於 broker 內部維護著多個topic、消費組、消費者、隊列關係的數據結構,因此 broker 伺服器能夠感知到這個變化,感知到變化後,會通知消費者,消費者拿到最新的隊列元素據後,自己會採用 queue 分配演算法計算得出自己要消費的隊列,然後開始消費。

與 kafka 的不同

kafka 每個消費組都有一個 broker 負責,這個 broker 稱為分區協調器,每個消費組可以由不同的 broker 負責。消費組裡還有個概念叫組長,組長往往都是最新加入消費組的那個消費者。當分區數量發生變化或者消費組的消費者數量發生變化的時候,分區協調器會把最新的分區和消費者資訊告訴組長,由組長計算得出每個消費者應該負責的分區。然後告訴協調器,協調器再下發給每個消費者,告訴他們應該再均衡了,要開始消費新分區了。

因此,再均衡 kafka 和 rocket mq 最大的不同就是一個是由組長計算得出,一個是自己計算自己的。

怎樣才算消費完成

同步提交偏移量

同步提交偏移量也就是只有噹噹前的消息偏移量成功提交後,才會拉取下一批消息進行消費,會重試

非同步提交偏移量

非同步提交偏移量是直接發送消息偏移量,不關心是否提交成功,直接拉取下一批消息。

非同步提交可能會導致消費重複消息。當非同步提交後,broker 由於各種原因,沒有收到偏移量,這時候如果發生了再均衡,消息會從上一次提交的偏移量處開始消費,導致消息重複。

但是非同步提交的方式消費速度更快,性能更高。

消費總結

消費方要注意的其實就是三個方面:

  • 提前計算好需要開幾台消費者,每台消費者要開幾個執行緒進行消費,避免後期的擴縮容導致分區再均衡

  • 如果對消息重複有要求的話處理邏輯需要做好消息冪等

  • 提交偏移量需要根據自己的業務場景,如果不能容忍消息重複,那就同步提交,如果可以容忍消息重複那就非同步提交

延遲消息

如果對延遲消息有興趣的話可以自己看官網,這裡不做太多的描述

消息事務

rocket mq 對事務的支援是採用的 XA,如果對消息事務有興趣的話可以自己看官網,這裡不做太多描述

順序消息

順序消息分為兩種:全局順序和隊列順序

全局順序

全局順序是採用一個 topic 一個 queue 實現的,這種可以保證嚴格的消息生產和消費順序,由於只有一個 queue 因此只能單個消費者消費。如果要保證嚴格的消費順序,也只能開一個執行緒消費。

這裡需要注意的點是,如果發送的是全局順序消費,那麼生產者不會進行消息重試。因為順序發送必然是同步的,如果是非同步的也就有可能在發送的時候是順序發的,但是到達 broker 就不一定是按順序到達了。前面也提到過,同步發送的重試會選擇不同的 broker 和 queue,由於全局順序只有一個 queue,所以也只能分布在一台 broker 上,所以不會進行消息重發。

隊列順序

隊列順序是天然就支援的,但是如果正常發送消費的話,消息還真不一定是按照真實的順序進行存儲和消費的。如果生產者採用的是非同步發送,那麼有可能同一個 queue,由於網路原因,後一條消息先於前一條消息存儲進 queue 了,那就不是順序了。如果消費的時候指定多執行緒消費,那也不能保證順序了。

因此如果對隊列中的消息有消費順序要求的話,那最好就是發送的時候採用同步發送,消費的時候採用單執行緒消費。

即使這樣,從整個 topic 視角來看,消息的消費也不是順序的。由於有多個 queue,queueA的消息A可能在queueB的消息B之前發送的。但是由於有多個消費者分別消費不同的 queue,每台消費者的消費能力不同,會導致queueB的消息B先消費。

總結

整個 rocket mq 分為 四個大模組:生產者,消費者,broker 代理伺服器,NameServer

生產者具有三種發送方式:同步發送,非同步發送,單向發送,其中同步發送和非同步發送有消息失敗重試機制

消費者有可以同步消費,非同步消費,也可以開多執行緒進行消費。消費並發度和執行緒數和消費者數有關。當然消費方式還可以分為廣播消費和集群消費。當隊列數或者消費者數量發生變化的時候,會產生再均衡機制。

broker 主要負責消息的存儲,轉發,過濾等。消息存儲有 commit log 文件存儲實際消息資訊,consume queue 存儲邏輯消費資訊。index log 存儲索引資訊。它也負責給消費者提供資訊,這裡的資訊包括實際的消費資訊和偏移量等。

NameServer在 rocket mq 中的角色類似於 zk 在 kafka 中的角色,負責管理元資訊。NameServer 雖然也是一個集群,但是每台伺服器之間沒有相互通訊,所有 broker 都和每台 NameServer 伺服器建立長連接,定時發送心跳和自己負責的 topic 以及隊列資訊。consumer 和 producer 選一台 NameServer 伺服器進行長連接,定時獲取 broker 資訊,以便指定自己要從哪台 broker 消費和發送消息到哪台 broker

每個模組都有同步和非同步的方式,如果採用全同步的話,消息的保證行最高,可以不丟消息,消息重複數也會變少很多,但是全同步會導致性能和吞吐低,除非對消息有嚴格的要求不能丟失,並且對性能和吞吐要求不大,可以全同步。採用全非同步的話吞吐量和性能是最高的,但是消息可能會丟,也會重複,所以對少量消息丟失沒影響,但是對性能要求高的可以全非同步。

其實還是得根據自己的業務場景選擇具體的方式,看是注重性能,還是注重消息的保證。

ps

文章為本人學習過程中的一些個人見解,漏洞是必不可少的,希望各位大佬多多指教,幫忙修復修復漏洞!!!

參考資料

官方中文文檔

官方文檔

Tags: