精華!一張圖進階 RocketMQ

前 言

大家好,我是三此君,一個在自我救贖之路上的非典型程式設計師。

「一張圖」系列旨在通過「一張圖」系統性的解析一個板塊的知識點:

  • 三此君向來不喜歡零零散散的知識點,通過一張圖將零散的知識點連接起來,能夠讓我們對一個板塊有更深入、更系統的理解。
  • 同時本系列儘可能的精鍊,希望能夠讓大家花 20%的時間,快速理解這個板塊下 80% 的內容。

本文是「一張圖」系列的第一個板塊:一張圖解析 RocketMQ。

  • 為了敘述的方便,繪圖的時候將整個系列分為許多小的模組,講解的時候也是按照模組循序漸進的。一張圖解析 RocketMQ 原圖
  • 一張圖解析 RocketMQ 是會深入到源碼層面,但是文中不會粘貼源碼。三此君在看源碼的時候寫了很多備註,可以降低大家看源碼的難度,需要的同學自行到三此君的倉庫中 Fork:rocketmq release-4.3.0

一張圖進階 RocketMQ

本文是《一張圖解析 RocketMQ》系列的第 1 篇,今天的內容主要分為三個部分:

  • 整體架構:會從大家熟悉的「生產者-消費者模式」逐步推出 RocketMQ 完整架構,只需要記住一張完整的架構圖即可。
  • 元數據管理:我把 RocketMQ 集群的元數據整理成一張圖,方便大家直觀的了解都有哪些元數據,各有什麼用。
  • 消息收發示例:通過 Docker 部署 RocketMQ,並用簡單的示例串起 RocketMQ 消息收發流程。

整體架構

什麼是消息隊列?顧名思義,首先得有一個隊列,這個隊列用來存儲消息。那有了消息隊列就得有人往裡面放,有人往裡面取。有沒有似曾相識燕歸來的感 jio,這莫非就是連小學生都知道的,經典的「生產者-消費者模式」?接下來我們就來看看它裡面穿了什麼?

img

別急,先來回顧一下 「生產者-消費者模式」 這個老朋友。簡單來說,這個模型是由兩類執行緒和一個隊列構成:

  • 生產者執行緒:生產產品,並把產品放到隊列里。
  • 消費者執行緒:從隊列裡面獲取產品,並消費。

生產者-消費者

有了這個隊列,生產者就只需要關注生產,而不用管消費者的消費行為,更不用等待消費者執行緒執行完;消費者也只管消費,不用管生產者是怎麼生產的,更不用等著生產者生產。

這意味著什麼呢,生產者和消費者之間實現解藕非同步。這就厲害了,因為我們生活中很多都是非同步的。比如最近新冠疫情捲土重來,我點的外賣只能送到小區門口的外賣隊列裡面,而我只能去外賣隊列裡面取外賣,然後一頓狼吞虎咽。

具體 「生產者-消費者模式」 怎麼實現,想必各位小學都學過了,我們來看看這個模式還有什麼問題吧。最大的問題就是我們小學學的 「生產者-消費者模式」 是個單機版的,只能自嗨。這就相當於,我就是外賣騎手,我點了個外賣放到外賣隊列,然後我再從外賣隊列裡面去取,一頓操作猛如虎呀!於是就有了進化版,我們把消費者,隊列,生產者放到不同的伺服器上,這就是傳說中的分散式消息隊列了。

image

生產者生產的消息通過網路傳遞給隊列存儲,消費者通過網路從隊列獲取消息。但是還有問題,消息可能有很多種,全都放在一起豈不是亂套了?我點的外賣和快遞全都放在一起,太難找了吧。於是我們就需要區分不同類型消息,相同類型的消息稱為一個 Topic。同時,騎手不可能只有一個,點外賣的也不會只有我一個人,於是就有了生產者組消費者組

image

但還是有問題呀,小區那麼大,一個隊列放不下。我住在小區南門,點個外賣還要跑去北門拿,那真的是 eggs hurt。於是物業在東南西北門各設了一個外賣快遞放置點。也就是我們有多個隊列,組成 隊列集群

image

可是,問題又雙叒叕來了(還有完沒完),一個小區那麼多個外賣快遞隊列,騎手怎麼知道送到哪裡去,我又怎麼知道去哪裡取?很簡單,導航呀。我們把導航的資訊稱為路由資訊,這些資訊需要有一個管理的地方,它告訴生產者,某這個 Topic 的消息可以發給哪些隊列,同時告訴消費者你需要的消息可以從哪些隊列裡面取。RocketMQ 為這些路由資訊的設置了管理員 NameServer,當然 NameServer 也可以有很多個,組成 NameServer 集群。

到這裡,你就應該知道 RocketMQ 裡面都穿了什麼啦。包括了生產者(Producer),消費者(Consumer),NameServer 以及隊列本身(Broker)。Broker 是代理的意思,負責隊列的存取等操作,我們可以把 Broker 理解為隊列本身。

RocketMQ整體架構

  • NameServer:我們可以同時部署很多台 NameServer 伺服器,並且這些伺服器是無狀態的,節點之間無任何資訊同步。
    NameServer 起來後監聽 埠,等待 Broker、Producer、Consumer 連上來,NameServer 是集群元數據管理中心。

  • Broker:Broker 啟動,跟所有的 NameServer 保持長連接,每 30s 發送一次發送心跳包(像心跳一樣持續穩定的發送請求)。心跳包中包含當前 Broker 資訊 ( IP+ 埠等)以及存儲所有 Topic 資訊。註冊成功後,NameServer 集群中就有 Topic 跟 Broker 的映射關係。

    我們可以同時部署多個 Master 和多個 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master。Master 與 Slave 的需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 為 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的從伺服器才會參與消息的讀負載。(可以暫時忽略 Broker 的主從角色)

  • Topic:收發消息前,先創建 Topic,創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。

  • Producer:Producer 發送消息,啟動時先跟 NameServer 集群中的其中一台建立長連接,並從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,採用輪詢策略從選擇一個隊列,然後與隊列所在的 Broker 建立長連接,並向 Broker 發消息。

  • Consumer:Consumer 跟 Producer 類似,跟其中一台 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費消息。

    我們剛剛提到騎手不止一個,取外賣快遞的也不止我一個,所以會有生產者組合消費者組的概念。這裡需要補充說明一下,消息分為集群消息和廣播消息:

    • 集群消息:一個 Topic 的一條消息,一個消費者組只能有一個消費者實例消費。例如,同樣是外賣 Topic,一份外賣,我們整個小區也只有一個人消費,就是集群消費。

    • 廣播消息:一個 Topic 的一條消息,一個消費者組所有消費者實例都會消費。例如,如果是因為疫情,政府發放食品,那我們小區每個人都會消費,就是廣播消費。

元數據管理

因為 Producer、Consumer 和 Broker 都需要和 NameServer 交互,負責的三此君不得不先和大家嘮嘮 NameServer 是何方神聖。上面有說道 NameServer 是集群的元數據管理中心,那它到底管理了哪些元數據?我們來看看 NameServer 裡面又穿了什麼,看完了記得關注、轉發、點贊、收藏哦。

img

簡單來說,NameServer 是我們的整個 RocketMQ 集群的元數據管理中心,負責集群元數據的增刪改查。先不管這個增刪改查是怎麼實現的,我們甚至可以理解就是資料庫的增刪改查,但是我們一定要知道這些元數據都長什麼樣子。才能知道 Producer、Consumer 及 Broker 是如何根據這些數據進行消息收發的。

集群元數據

如圖所示,二主二從的 Broker 集群相關的元數據資訊,包括 topicQueueTable、BrokerAddrTable、ClusterAddrTable、brokerLiveInfo、FilterServer (暫時不用關注,圖中未畫出)。

  • HashMap<String topic, List<QueueData>> topicQueueTable:Key 是 Topic 的名稱,它存儲了所有 Topic 的屬性資訊。Value 是個 QueueData 列表,長度等於這個 Topic 數據存儲的 Master Broker 的個數,QueueData 里存儲著 Broker 的名稱、讀寫 queue 的數量、同步標識等。
  • HashMap<String BrokerName, BrokerData> brokerAddrTable:這個結構存儲著一個 BrokerName 對應的屬性資訊,包括所屬的 Cluster 名稱,一個 Master Broker 和多個 Slave Broker 的地址資訊
  • HashMap<String ClusterName, Set<String BrokerName>> clusterAddrTable:存儲的是集群中 Cluster 的資訊,就是一個 Cluster 名稱對應一個由 BrokerName 組成的集合。
  • HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable:Key 是 BrokerAddr 對應著一台機器,BrokerLiveTable 存儲的內容是這台 Broker 機器的實時狀態,包括上次更新狀態的時間戳,NameServer 會定期檢查這個時間戳,超時沒有更新就認為這個 Broker 無效了,將其從 Broker 列表裡清除。
  • HashMap<String BrokerAddr, List<String> FilterServer> filterServerTable:Key 是 Broker 的地址,Value 是和這個 Broker 關聯的多個 FilterServer 的地址。Filter Server 是過濾伺服器,是 RocketMQ 的一種服務端過濾方式,一個 Broker 可以有一個或多個 Filter Server。

其他角色會主動向 NameServer 上報狀態,根據上報消息里的請求碼做相應的處理,更新存儲的對應資訊。

  • Broker 接到創建 Topic 的請求後向 NameServer 發送註冊資訊,NameServer 收到註冊資訊後首先更新 Broker 資訊,然後對每個 Master 角色的 Broker,創建一個 QueueData 對象。如果是新建 Topic,就是添加 QueueData 對象;如果是修改 Topic,就是把舊的 QueueData 刪除,加入新的 QueueData。
  • Broker 向 NameServer 發送的心跳會更新時間戳,NameServer 每 10 秒檢查一次檢查時間戳,檢查到時間戳超過 2 分鐘則認為 Broker 已失效,便會觸發清理邏輯。
  • 連接斷開的事件也會觸髮狀態更新,當 NameServer 和 Broker 的長連接斷掉以後,onChannelDestroy 函數會被調用,把這個 Broker 的資訊清理出去。
  • Producer/Consumer 啟動之後會和 NameServer 建立長連接,定時從 NameServer 獲取路由資訊保存到本地。消息的發送與拉取都會用到上面的數據。

那麼多數據,相信大家看的有點暈,三此君簡單總結下:NameServer 通過 brokerLiveInfo 來維護存活的 Broker。Producer 會獲取上面的路由資訊,發送消息的時候指定發送到哪個 Topic,根據 Topic 可以從 topicQueueTable 選擇一個 Broker,根據 BrokerName 可以從 BrokerAddrTable 獲取到Broker IP 地址。有了地址 Producer 就可以將消息通過網路傳遞給 Broker。

消息收發示例

RocketMQ 部署

剛剛我們了解 RocketMQ 整體架構,那怎麼樣通過 RocketMQ 收發消息呢?需要先通過 Docker 部署一套 RocketMQ:

如果你沒有安裝 Docker,可以根據菜鳥教程 MacOS Docker 安裝/Windows Docker 安裝 進行安裝。然後,通過 docker-compose 部署 RocketMQ:

  • 克隆 docker-middleware 倉庫,打開 RocketMQ 目錄;
  • 修改broker.conf中的brokerIP1 參數為本機 IP;
  • 進入docker-compose.yml文件所在路徑,執行docker-compose up命令即可;

注意:如果你現在不了解 Docker 不重要,只需要按照步驟部署好 RocketMQ 即可,並不會阻礙我們理解 RocketMQ 相關內容。

Docker 部署 RocketMQ

部署完成後我們就可以在 Docker Dashboard 中看到 RocketMQ 相關容器,包括 Broker、NameServer 及 Console(RocketMQ 控制台),到這裡我們就可以使用部署的 RocketMQ 收發消息了。

RocketMQ 已經部署好了,接下來先來看一個簡單的消息收發示例,可以說是 RocketMQ 的 “Hello World”。

消息發送

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動Producer實例
        producer.start();
        // 創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
      	// 通過sendResult返回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}
  • 首先,實例化一個生產者 producer,並告訴它 NameServer 的地址,這樣生產者才能從 NameServer 獲取路由資訊。
  • 然後 producer 得做一些初始化(這是很關鍵的步驟),它要和 NameServer 通訊,要先建立通訊連接等。
  • producer 已經準備好了,那得準備好要發的內容,把 “Hello World” 發送到 Topic1。
  • 內容準備好,那 producer 就可以把消息發送出去了。producer 怎麼知道 Broker 地址呢?他就會去 NameServer 獲取路由資訊,得到 Broker 的地址是 localhost:10909,然後通過網路通訊將消息發送給 Broker。
  • 生產者發送的消息通過網路傳輸給 Broker,Broker 需要對消息按照一定的結構進行存儲。存儲完成之後,把存儲結果告知生產者。

消息接收

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
    	// 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    	// 設置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
    	// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        erbconsumerijun.subscribe("sancijun", "*");
    	// 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
	}
}
  • 首先,實例化一個消費者 consumer,告訴它 NameServer 的地址,這樣消費者才能從 NameServer 獲取路由資訊。
  • 然後這個消費者需要知道自己可以消費哪些 Topic 的消息,也就是每個消費者需要訂閱一個或多個 Topic。
  • 消費者也需要做一些初始化,業務本身並沒有理會怎麼從 Broker 拉取消息,這些都是消費者默默無聞的奉獻。所以,我們需要啟動消費者,消費者會從 NameServer 拉取路由資訊,並不斷從 Broker 拉取消息。拉取回來的消息提供給業務定義的 MessageListener。
  • 消息拉取回來後,消費這需要怎麼處理呢?每個消費者都不一樣(業務本身決定),由我們業務定義的 MessageListener 處理。處理完之後,消費者也需要確認收貨,就是告訴 Broker 消費成功了。

以上就是本文的全部內容,本文沒有堆砌太多無意義的概念,沒有講什麼削峰解耦,非同步通訊。這些內容網上也很多,看了和沒看沒什麼兩樣。最後的最後,看懂的點贊,沒看懂的收藏,順便在分享給你的小夥伴。還沒有關注的朋友記得關注,入股不虧。

banner

參考文獻

  • RocketMQ 官方文檔

  • RocketMQ 源碼

  • 丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.

  • 李偉. RocketMQ分散式消息中間件:核心原理與最佳實踐. 電子工業出版社, 2020-08.

  • 楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.

轉載請註明出處