MQ系列7:消息通訊,追求極致性能
MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費
1 介紹
前面的章節我學習了 NameServer的原理,消息的生產發送,以及消息的消費的全過程。
我們來回顧一下:
RocketMQ 消息隊列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:
- NameServer 優先啟動。NameServer 是整個 RocketMQ 的「中央大腦」 ,作為 RocketMQ 的服務註冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
- Broker 啟動後,需要將自己註冊至NameServer中,並 保持長連接,每 30s 發送一次發送心跳包,來確保Broker是否存活。並將 Broker 資訊 ( IP+、埠等資訊)以及Broker中存儲的Topic資訊上報。註冊成功後,NameServer 集群中就有 Topic 跟 Broker 的映射關係。
- NameServer 如果檢測到Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由註冊表中將其移除。
- 生產者在發送某個主題的消息之前先從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),然後根據負載均衡演算法從列表中選擇1台Broker ,建立連接通道,進行消息發送。
- 消費者在訂閱某個topic的消息之前從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),包括關聯的全部Topic隊列資訊。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費數據。
- 生產者和消費者默認每30s 從 NamerServer 獲取 Broker 伺服器地址列表,以及關聯的所有Topic隊列資訊,更新到Client本地。
2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行數據通訊的過程,面對複雜的消息隊列系統,一個性能優良,穩定性高的網路通訊模組是非常重要的,它體現了RocketMQ集群消息的整體吞吐和負載能力。也是RocketMQ保證高性能、高穩定性的基石。
2 網路通訊過程分析
2.1 通訊類(rocketmq-remoting )的結構解析
通過上圖可以看到,在整個RocketMQ隊列系統中,rocketmq-remoting 這個module是專門用來負責網路通訊職能的。
並且從模組依賴關係中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等模組均依賴了它。
通訊層是基於 Netty 進行擴展的,並自定義了通訊協議,用於將消息傳遞給 Broker 進行存儲。實現Client與Server之間高效的數據請求與接收。
2.2 協議結構設計
因為是基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:
欄位 | 類型 | Request維度 | Response維度 |
---|---|---|---|
code | int | 請求操作碼,依據不同的請求碼做不同的業務處理 | 應答響應碼:0成功,非0標識對應的錯誤 |
language | LanguageCode | 枚舉(JAVA、CPP、PYThON、GO等):請求方實現的編碼語言 | 應答方實現的編碼語言 |
version | int | 請求方程式的版本 | 應答方版本 |
opaque | int | 類似請求ID:reqeustId,唯一識別碼,區分每一個獨立的請求 | response的時候直接返回 |
flag | int | 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 | 區分是普通還是oneway RPC |
remark | String | 自定義備註資訊 | 自定義備註資訊 |
extFields | HashMap<String, String> | Request自定義擴展的欄位屬性 | Response自定義擴展的欄位屬性 |
2.3 消息內容的組成結構
傳輸的消息內容主要由一下幾個部分組成:
組成部分 | 說明 |
---|---|
消息長度 | 消息的總長度,int類型,四個位元組存儲 |
序列化類型+消息頭length | int類型,位元組1表示序列化類型,位元組2~4表示消息頭長度 |
消息頭的數據 | 序列化後的消息頭數據 |
消息主體數據 | 消息主體數據內容,二進位位元組 |
2.4 RocketMQ 消息通訊流程
在RocketMQ消息隊列中支援通訊的模式主要有
- sync 同步發送模式
- async 非同步發送模式
- oneway 單向模式,無需關注Response
2.4.1 通訊流程說明
下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基於 NettyRemotingClient 的消息發送,以及Handler 處理過程來說明。
- Broker 和 NameServer 啟動時同步調用 NettyRemotingServer.start() 方法, 初始化 Netty 伺服器
- 配置 BossGroup/WorkerGroup NioEventLoopGroup 執行緒組
- 配置 Channel
- 添加 NettyServerHandler
- 調用 serverBootstrap.bind() 監聽埠,等待client的connection
- Producer 和 Consumer 同樣需要啟動 Netty 的客戶端,通過調用NettyRemotingClient.start() 初始化 Netty 客戶端
- 配置客戶端 NioEventLoopGroup 執行緒組
- 配置 Channel
- 添加 NettyClientHandler
- 發送同步消息時,調用 NettyRemoteClient.invokeSync(),從 channelTables 快取中獲取或者創建用於通訊的 Channel 通道。
- 創建完 Channel 後,生產者 Producer 調用 Channel.writeAndFlush() 發送數據
- NettyRemotingServer 服務端執行緒組 處理可讀事件,調用 NettyServerHandler 處理數據。
- 下一步,NettyServerHandler 調用 processMessageReceived方法,接收並處理傳送過來的數據。
- 根據請求碼 RequestCode 區別不同的請求,來執行不同的 Processor。
- 說明:Processor 在服務端初始化的時候,將 RequestCode 添加到 Processor 快取中。消息的存、查、拉取都是不同的請求碼。
- processMessageReceived 從ResponseTables(key 為 opaque) 快取中取出 ResponseFuture,並將將返回結果設置到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,非同步調用執行回調方法。
- NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 讀取並處理返回事件。
2.4.2 Reactor多執行緒設計
上面我們說過了,RocketMQ的通訊是採用Netty組件作為底層通訊庫。同樣的,它也遵循Reactor多執行緒模型,並在此基礎上做了一些優化。
上面圖中四個圖形可以大致說明NettyRemotingServer的Reactor 多執行緒模型,在RocketMQ中的存在形式。
- M:1個 Reactor 主執行緒:eventLoopGroupBoss,它的職能是負責監聽 TCP網路連接請求,有連接請求過來時候,創建SocketChannel,並註冊到selector上。
- S:RocketMQ的源碼中會選擇NIO或Epoll,來監聽網路數據,當監聽到網路數據過來時,讀取數據並丟給Worker執行緒池:eventLoopGroupSelector,Rocket源碼中默認設置執行緒數為3。
- M1:執行業務之前的各種雜事(SSL認證、空閑檢查、網路連接檢查、編解碼、序列化反序列化 等等),交付給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ源碼中默認執行緒數設置為8。
- M2:剩下處理業務的操作,就直接放在業務執行緒池中執行了。按照之前說的,依據RequestCode去processorTable 本地快取中找到對應的 processor,並封裝成task任務,在丟給對應的業務processor執行緒池來處理。
執行緒數標識 | 執行緒名 | 說明 |
---|---|---|
1 | NettyBoss | Reactor 主執行緒,默認1 |
N | NettyServerEPOLLSelector | Reactor 執行緒池,默認3 |
M1 | NettyServerCodecThread | Worker 執行緒池,默認8 |
M2 | RemotingExecutorThread | Processor執行緒池,處理業務邏輯 |
完整的可以參照官網的這張圖:
總結
上面介紹了 RocketMQ 消息通訊的主要內容,我們用幾句話總結下:
- 整個RocketMQ隊列系統中,rocketmq-remoting Module是專門用來負責網路通訊職能的。
- 網路通訊模組基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
- 理解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及調用 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
- 同步非同步:同步和非同步消核心區別是 同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,非同步的請求則 SendCallback 相應的方法進行回調處理。
- 多執行緒模式下會通過1個Reactor 主執行緒(監聽連接),以及Reactor 執行緒池(監聽數據)、Worker 執行緒池(處理前置工作)、Processor執行緒池(處理業務邏輯) 來處理通訊過程。