MQ系列7:消息通信,追求極致性能
MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費
1 介紹
前面的章節我學習了 NameServer的原理,消息的生產發送,以及消息的消費的全過程。
我們來回顧一下:
RocketMQ 消息隊列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:
- NameServer 優先啟動。NameServer 是整個 RocketMQ 的「中央大腦」 ,作為 RocketMQ 的服務註冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
- Broker 啟動後,需要將自己註冊至NameServer中,並 保持長連接,每 30s 發送一次發送心跳包,來確保Broker是否存活。並將 Broker 信息 ( IP+、端口等信息)以及Broker中存儲的Topic信息上報。註冊成功後,NameServer 集群中就有 Topic 跟 Broker 的映射關係。
- NameServer 如果檢測到Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由註冊表中將其移除。
- 生產者在發送某個主題的消息之前先從 NamerServer 獲取 Broker 服務器地址列表(Broker可能是Cluster模式),然後根據負載均衡算法從列表中選擇1台Broker ,建立連接通道,進行消息發送。
- 消費者在訂閱某個topic的消息之前從 NamerServer 獲取 Broker 服務器地址列表(Broker可能是Cluster模式),包括關聯的全部Topic隊列信息。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接通道,開始消費數據。
- 生產者和消費者默認每30s 從 NamerServer 獲取 Broker 服務器地址列表,以及關聯的所有Topic隊列信息,更新到Client本地。
2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行數據通信的過程,面對複雜的消息隊列系統,一個性能優良,穩定性高的網絡通信模塊是非常重要的,它體現了RocketMQ集群消息的整體吞吐和負載能力。也是RocketMQ保證高性能、高穩定性的基石。
2 網絡通信過程分析
2.1 通信類(rocketmq-remoting )的結構解析
通過上圖可以看到,在整個RocketMQ隊列系統中,rocketmq-remoting 這個module是專門用來負責網絡通信職能的。
並且從模塊依賴關係中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等模塊均依賴了它。
通信層是基於 Netty 進行擴展的,並自定義了通信協議,用於將消息傳遞給 Broker 進行存儲。實現Client與Server之間高效的數據請求與接收。
2.2 協議結構設計
因為是基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:
字段 | 類型 | Request維度 | Response維度 |
---|---|---|---|
code | int | 請求操作碼,依據不同的請求碼做不同的業務處理 | 應答響應碼:0成功,非0標識對應的錯誤 |
language | LanguageCode | 枚舉(JAVA、CPP、PYThON、GO等):請求方實現的編碼語言 | 應答方實現的編碼語言 |
version | int | 請求方程序的版本 | 應答方版本 |
opaque | int | 類似請求ID:reqeustId,唯一識別碼,區分每一個獨立的請求 | response的時候直接返回 |
flag | int | 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 | 區分是普通還是oneway RPC |
remark | String | 自定義備註信息 | 自定義備註信息 |
extFields | HashMap<String, String> | Request自定義擴展的字段屬性 | Response自定義擴展的字段屬性 |
2.3 消息內容的組成結構
傳輸的消息內容主要由一下幾個部分組成:
組成部分 | 說明 |
---|---|
消息長度 | 消息的總長度,int類型,四個位元組存儲 |
序列化類型+消息頭length | int類型,位元組1表示序列化類型,位元組2~4表示消息頭長度 |
消息頭的數據 | 序列化後的消息頭數據 |
消息主體數據 | 消息主體數據內容,二進制位元組 |
2.4 RocketMQ 消息通信流程
在RocketMQ消息隊列中支持通信的模式主要有
- sync 同步發送模式
- async 異步發送模式
- oneway 單向模式,無需關注Response
2.4.1 通信流程說明
下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基於 NettyRemotingClient 的消息發送,以及Handler 處理過程來說明。
- Broker 和 NameServer 啟動時同步調用 NettyRemotingServer.start() 方法, 初始化 Netty 服務器
- 配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組
- 配置 Channel
- 添加 NettyServerHandler
- 調用 serverBootstrap.bind() 監聽端口,等待client的connection
- Producer 和 Consumer 同樣需要啟動 Netty 的客戶端,通過調用NettyRemotingClient.start() 初始化 Netty 客戶端
- 配置客戶端 NioEventLoopGroup 線程組
- 配置 Channel
- 添加 NettyClientHandler
- 發送同步消息時,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中獲取或者創建用於通信的 Channel 通道。
- 創建完 Channel 後,生產者 Producer 調用 Channel.writeAndFlush() 發送數據
- NettyRemotingServer 服務端線程組 處理可讀事件,調用 NettyServerHandler 處理數據。
- 下一步,NettyServerHandler 調用 processMessageReceived方法,接收並處理傳送過來的數據。
- 根據請求碼 RequestCode 區別不同的請求,來執行不同的 Processor。
- 說明:Processor 在服務端初始化的時候,將 RequestCode 添加到 Processor 緩存中。消息的存、查、拉取都是不同的請求碼。
- processMessageReceived 從ResponseTables(key 為 opaque) 緩存中取出 ResponseFuture,並將將返回結果設置到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,異步調用執行回調方法。
- NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 讀取並處理返回事件。
2.4.2 Reactor多線程設計
上面我們說過了,RocketMQ的通信是採用Netty組件作為底層通信庫。同樣的,它也遵循Reactor多線程模型,並在此基礎上做了一些優化。
上面圖中四個圖形可以大致說明NettyRemotingServer的Reactor 多線程模型,在RocketMQ中的存在形式。
- M:1個 Reactor 主線程:eventLoopGroupBoss,它的職能是負責監聽 TCP網絡連接請求,有連接請求過來時候,創建SocketChannel,並註冊到selector上。
- S:RocketMQ的源碼中會選擇NIO或Epoll,來監聽網絡數據,當監聽到網絡數據過來時,讀取數據並丟給Worker線程池:eventLoopGroupSelector,Rocket源碼中默認設置線程數為3。
- M1:執行業務之前的各種雜事(SSL認證、空閑檢查、網絡連接檢查、編解碼、序列化反序列化 等等),交付給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ源碼中默認線程數設置為8。
- M2:剩下處理業務的操作,就直接放在業務線程池中執行了。按照之前說的,依據RequestCode去processorTable 本地緩存中找到對應的 processor,並封裝成task任務,在丟給對應的業務processor線程池來處理。
線程數標識 | 線程名 | 說明 |
---|---|---|
1 | NettyBoss | Reactor 主線程,默認1 |
N | NettyServerEPOLLSelector | Reactor 線程池,默認3 |
M1 | NettyServerCodecThread | Worker 線程池,默認8 |
M2 | RemotingExecutorThread | Processor線程池,處理業務邏輯 |
完整的可以參照官網的這張圖:
總結
上面介紹了 RocketMQ 消息通信的主要內容,我們用幾句話總結下:
- 整個RocketMQ隊列系統中,rocketmq-remoting Module是專門用來負責網絡通信職能的。
- 網絡通信模塊基於Netty進行擴展的,所以自定義了RocketMQ的消息協議,在傳輸過程的數據進行結構制定、封裝、編解碼的過程。
- 理解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及調用 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
- 同步異步:同步和異步消核心區別是 同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,異步的請求則 SendCallback 相應的方法進行回調處理。
- 多線程模式下會通過1個Reactor 主線程(監聽連接),以及Reactor 線程池(監聽數據)、Worker 線程池(處理前置工作)、Processor線程池(處理業務邏輯) 來處理通信過程。