MQ系列7:消息通信,追求極致性能

MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費

1 介紹

前面的章節我學習了 NameServer的原理,消息的生產發送,以及消息的消費的全過程。
我們來回顧一下:
RocketMQ 消息隊列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:
image

  1. NameServer 優先啟動。NameServer 是整個 RocketMQ 的「中央大腦」 ,作為 RocketMQ 的服務註冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  2. Broker 啟動後,需要將自己註冊至NameServer中,並 保持長連接,每 30s 發送一次發送心跳包,來確保Broker是否存活。並將 Broker 信息 ( IP+、端口等信息)以及Broker中存儲的Topic信息上報。註冊成功後,NameServer 集群中就有 Topic 跟 Broker 的映射關係。
  3. NameServer 如果檢測到Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由註冊表中將其移除。
  4. 生產者在發送某個主題的消息之前先從 NamerServer 獲取 Broker 服務器地址列表(Broker可能是Cluster模式),然後根據負載均衡算法從列表中選擇1台Broker ,建立連接通道,進行消息發送。
  5. 消費者在訂閱某個topic的消息之前從 NamerServer 獲取 Broker 服務器地址列表(Broker可能是Cluster模式),包括關聯的全部Topic隊列信息。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費數據。
  6. 生產者和消費者默認每30s 從 NamerServer 獲取 Broker 服務器地址列表,以及關聯的所有Topic隊列信息,更新到Client本地。
    2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行數據通信的過程,面對複雜的消息隊列系統,一個性能優良,穩定性高的網絡通信模塊是非常重要的,它體現了RocketMQ集群消息的整體吞吐和負載能力。也是RocketMQ保證高性能、高穩定性的基石。

2 網絡通信過程分析

2.1 通信類(rocketmq-remoting )的結構解析

image
通過上圖可以看到,在整個RocketMQ隊列系統中,rocketmq-remoting 這個module是專門用來負責網絡通信職能的。
並且從模塊依賴關係中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等模塊均依賴了它。
image
通信層是基於 Netty 進行擴展的,並自定義了通信協議,用於將消息傳遞給 Broker 進行存儲。實現Client與Server之間高效的數據請求與接收。

2.2 協議結構設計

因為是基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:

字段 類型 Request維度 Response維度
code int 請求操作碼,依據不同的請求碼做不同的業務處理 應答響應碼:0成功,非0標識對應的錯誤
language LanguageCode 枚舉(JAVA、CPP、PYThON、GO等):請求方實現的編碼語言 應答方實現的編碼語言
version int 請求方程序的版本 應答方版本
opaque int 類似請求ID:reqeustId,唯一識別碼,區分每一個獨立的請求 response的時候直接返回
flag int 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 區分是普通還是oneway RPC
remark String 自定義備註信息 自定義備註信息
extFields HashMap<String, String> Request自定義擴展的字段屬性 Response自定義擴展的字段屬性

2.3 消息內容的組成結構

傳輸的消息內容主要由一下幾個部分組成:

組成部分 說明
消息長度 消息的總長度,int類型,四個位元組存儲
序列化類型+消息頭length int類型,位元組1表示序列化類型,位元組2~4表示消息頭長度
消息頭的數據 序列化後的消息頭數據
消息主體數據 消息主體數據內容,二進制位元組

image

2.4 RocketMQ 消息通信流程

在RocketMQ消息隊列中支持通信的模式主要有

  • sync 同步發送模式
  • async 異步發送模式
  • oneway 單向模式,無需關注Response

2.4.1 通信流程說明

下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基於 NettyRemotingClient 的消息發送,以及Handler 處理過程來說明。
image

  • Broker 和 NameServer 啟動時同步調用 NettyRemotingServer.start() 方法, 初始化 Netty 服務器
    • 配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組
    • 配置 Channel
    • 添加 NettyServerHandler
    • 調用 serverBootstrap.bind() 監聽端口,等待client的connection
  • Producer 和 Consumer 同樣需要啟動 Netty 的客戶端,通過調用NettyRemotingClient.start() 初始化 Netty 客戶端
    • 配置客戶端 NioEventLoopGroup 線程組
    • 配置 Channel
    • 添加 NettyClientHandler
  • 發送同步消息時,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中獲取或者創建用於通信的 Channel 通道。
  • 創建完 Channel 後,生產者 Producer 調用 Channel.writeAndFlush() 發送數據
  • NettyRemotingServer 服務端線程組 處理可讀事件,調用 NettyServerHandler 處理數據。
  • 下一步,NettyServerHandler 調用 processMessageReceived方法,接收並處理傳送過來的數據。
  • 根據請求碼 RequestCode 區別不同的請求,來執行不同的 Processor。
    • 說明:Processor 在服務端初始化的時候,將 RequestCode 添加到 Processor 緩存中。消息的存、查、拉取都是不同的請求碼。
  • processMessageReceived 從ResponseTables(key 為 opaque) 緩存中取出 ResponseFuture,並將將返回結果設置到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,異步調用執行回調方法。
  • NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 讀取並處理返回事件。

2.4.2 Reactor多線程設計

上面我們說過了,RocketMQ的通信是採用Netty組件作為底層通信庫。同樣的,它也遵循Reactor多線程模型,並在此基礎上做了一些優化。
image

上面圖中四個圖形可以大致說明NettyRemotingServer的Reactor 多線程模型,在RocketMQ中的存在形式。

  • M:1個 Reactor 主線程:eventLoopGroupBoss,它的職能是負責監聽 TCP網絡連接請求,有連接請求過來時候,創建SocketChannel,並註冊到selector上。
  • S:RocketMQ的源碼中會選擇NIO或Epoll,來監聽網絡數據,當監聽到網絡數據過來時,讀取數據並丟給Worker線程池:eventLoopGroupSelector,Rocket源碼中默認設置線程數為3。
  • M1:執行業務之前的各種雜事(SSL認證、空閑檢查、網絡連接檢查、編解碼、序列化反序列化 等等),交付給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ源碼中默認線程數設置為8。
  • M2:剩下處理業務的操作,就直接放在業務線程池中執行了。按照之前說的,依據RequestCode去processorTable 本地緩存中找到對應的 processor,並封裝成task任務,在丟給對應的業務processor線程池來處理。
線程數標識 線程名 說明
1 NettyBoss Reactor 主線程,默認1
N NettyServerEPOLLSelector Reactor 線程池,默認3
M1 NettyServerCodecThread Worker 線程池,默認8
M2 RemotingExecutorThread Processor線程池,處理業務邏輯

完整的可以參照官網的這張圖:
image

總結

上面介紹了 RocketMQ 消息通信的主要內容,我們用幾句話總結下:

  • 整個RocketMQ隊列系統中,rocketmq-remoting Module是專門用來負責網絡通信職能的。
  • 網絡通信模塊基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
  • 理解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及調用 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
  • 同步異步:同步和異步消核心區別是 同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,異步的請求則 SendCallback 相應的方法進行回調處理。
  • 多線程模式下會通過1個Reactor 主線程(監聽連接),以及Reactor 線程池(監聽數據)、Worker 線程池(處理前置工作)、Processor線程池(處理業務邏輯) 來處理通信過程。