一張圖進階 RocketMQ – 通信機制
前 言
三此君看了好幾本書,看了很多遍源碼整理的 一張圖進階 RocketMQ 圖片,關於 RocketMQ 你只需要記住這張圖!覺得不錯的話,記得點贊關注哦。

【重要】視頻在 B 站同步更新,歡迎圍觀,輕輕鬆鬆漲姿勢。一張圖進階 RocketMQ-通信機制(視頻版)
點擊查看【bilibili】
本文是「一張圖進階 RocketMQ」第 4 篇,對 RocketMQ 不了解的同學可以先看看前面三期:
上一期分享了 RocketMQ 生產者啟動流程及同步消息發送流程,我們知道了在通信層是基於 Netty 將消息傳遞給 Broker 進行存儲的。如果對 Netty 完全不了解我們就很難真正理解 RocketMQ,所以今天我們簡單的聊一聊 Netty 基本流程,然後分析 RocketMQ 的通信機制,最後通過異步消息發送來串聯 RocketMQ 通信機制。
Netty 介紹
Netty 有很多概念,等介紹完概念大家都困了,我們就不過多介紹了,直接結合示例來看看 Netty 的基礎流程,能夠幫助我們更好的理解 RocketMQ 即可。

- Netty 服務端啟動初始化兩個線程組 BossGroup & WorkerGroup,分別用於處理客戶端連接及網絡讀寫。
- Netty 客戶端啟動初始化一個線程組, 用於處理請求及返回結果。
- 客戶端 connect 到 Netty 服務端,創建用於 傳輸數據的 Channel。
- Netty 服務端的 BossGroup 處理客戶端的連接請求,然後把剩下的工作交給 WorkerGroup。
- 連接建立好了,客戶端就可以利用這個連接發送數據給 Netty 服務端。
- Netty WorkerGroup 中的線程使用 Pipeline(包含多個處理器 Handler) 對數據進行處理。
- Netty 服務端的處理完請求後,返回結果也經過 Pipeline 處理。
- Netty 服務端通過 Channel 將數據返回給客戶端。
- 客戶端通過 Channel 接收到數據,也經過 Pipeline 進行處理。
Netty 示例
我們先用 Netty 實現一個簡單的 服務端/客戶端 通信示例,我們是這樣使用的,那 RocketMQ 基於 Netty 的通信也應該是這樣使用的,不過是在這個基礎上封裝了一層。主要關注以下幾個點:服務端什麼時候初始化的,服務端實現的 Handler 做了什麼事?客戶端什麼時候初始化的,客戶端實現的 Handler 做了什麼事?
Netty 服務端初始化:初始化的代碼很關鍵,我們要從源碼上理解 RocketMQ 的通信機制,那肯定會看到類似的代碼。根據上面的流程來看,首先是實例化 bossGroup 和 workerGroup,然後初始化 Channel,從代碼可以看出我們是在 Pipeline 中添加了自己實現的 Handler,這個 Handler 就是業務自己的邏輯了,那 RocketMQ 要處理數據應該也需要實現相應的 Handler。
public class MyServer {
public static void main(String[] args) throws Exception {
//創建兩個線程組 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創建服務端的啟動對象,設置參數
ServerBootstrap bootstrap = new ServerBootstrap();
//設置兩個線程組boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//設置服務端通道實現類型
.channel(NioServerSocketChannel.class)
//使用匿名內部類的形式初始化Channel對象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道添加處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//給workerGroup的EventLoop對應的管道設置處理器
//綁定端口號,啟動服務端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
實現自定義的服務端處理器 Handler:自定義的 Handler 需要實現 Netty 定義的 HandlerAdapter,當有可讀事件時就會調用這裡的 channelRead() 方法。等下我們看 RocketMQ 通信機制的時候留意RocketMQ 自定義了哪些 Handler,這些 Handler 有做了什麼事。
/**
* 自定義的Handler需要繼承Netty規定好的 HandlerAdapter 才能被Netty框架所關聯,有點類似SpringMVC的適配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取客戶端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
//發送消息給客戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("服務端已收到消息,記得關注三此君,記得三連", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//發生異常,關閉通道
ctx.close();
}
}
Netty 客戶端初始化:Netty 客戶端,在 RocketMQ 中對應了 Producer/Consumer。在 Producer 啟動中有一步是啟動通信模塊服務,其實就是初始化 Netty 客戶端。客戶端也需要先實例化一個 NioEventLoopGroup,然後將自定義的 handler 添加到 Pipeline,還有很重要的一步是我們需要 connect 連接到 Netty 服務端。
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//創建bootstrap啟動引導對象,配置參數
Bootstrap bootstrap = new Bootstrap();
//設置線程組
bootstrap.group(eventExecutors)
//設置客戶端的Channel實現類型
.channel(NioSocketChannel.class)
//使用匿名內部類初始化 Pipeline
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客戶端Channel的處理器
ch.pipeline().addLast(new MyClientHandler());
}
})
//connect連接服務端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//對Channel關閉進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
//關閉線程組
eventExecutors.shutdownGracefully();
}
}
}
實現自定義的客戶端處理器 Handler:客戶端處理器也繼承自 Netty 定義的 HandlerAdapter,當 Channel 變得可讀的時候(服務端數據返回)會調用我們自己實現的 channelRead()。
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//發送消息到服務端
ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生產者發送消息~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服務端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到三此君的消息,我一定會三連的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
}
}
RocketMQ 通信流程
RocketMQ 通信模塊基於 Netty 實現,總體代碼量不多。主要是 NettyRemotingServer和NettyRemotingClient,分別對應通信的服務端和客戶端。根據前面的 Netty 示例,我們要理解 RocketMQ 如何基於 Netty 通信,只需要知道 4 個地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基於 NettyRemotingClient 發送消息,無論是客戶端還是服務端收到數據後都需要 Handler 來處理。

- Broker/NameServer 需要啟動 Netty 服務端。Broker 我們後面會進一步分析,只需要知道 Broker 啟動的時候會調用 NettyRemotingServer.start() 方法初始化 Netty 服務器。主要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組,配置 Channel,添加 NettyServerHandler,調用 serverBootstrap.bind() 監聽端口等待客戶端連接。
- Producer/Consumer 需要啟動 Netty 客戶端,在生產者啟動流程中 MQClientInstantce 啟動通信服務模塊,其實就是調用NettyRemotingClient.start() 初始化 Netty 客戶端。主要做了 3 件事:配置客戶端 NioEventLoopGroup 線程組,配置 Channel,添加 NettyClientHandler。
- 客戶端配置了 Channel,但是 Channel 還沒有創建,因為 Channel 肯定要和具體的 Server IP Addr 關聯。在同步消息發送流程中,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中獲取或者創建一個新的 Channel,其實就是調用 bootstrap.connect() 連接到 NettyServer,創建用於通信的 Channel。
- 有了 Channel 後,Producer 調用 Channel.writeAndFlush() 將數據發送給服務器。NettyRemotingServer WorkerGroup 處理可讀事件,調用 NettyServerHandler 處理數據。
- NettyServerHandler 調用 processMessageReceived方法。processMessageReceived 方法做了什麼呢?通過傳入的請求碼 RequestCode 區別不同的請求,不同的請求定義了不同的 Processor。例如,是生產者存入消息使用 SendMessageProcessor,查詢消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。這些 Processor 在服務端初始化的時候,以 RequestCode 為 Key 添加到 Processor 緩存中。processMessageReceived 就是根據 RequeseCode 獲取不同的 Processor,處理完後把結果返回給 NettyRemotingClient。
- NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 處理返回結果。NettyClientHandler也調用processMessageReceived 處理返回結果。processMessageReceived 從以 opaque 為 key ResponseTables 緩存沖取出 ResponseFuture,將返回結果設置到 ResponseFuture。同步消息則執行 responseFuture.putResponse(),異步調用執行回調。
異步發送
除了同步消息發送,RocketMQ 還支持異步發送。我們只需要在前面是示例中稍作修改就會得到一個異步發送示例,最大的不同在於發送的時候傳入 SendCallback 接收異步返回結果回調。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8"));
// SendCallback 接收異步返回結果的回調
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("關注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("三連呀!!!%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
}
同步發送個異步發送主要的過程都是一樣的,不同點在於同步消息調用 Netty Channel.writeAndFlush 之後是 waitResponse 等待 Broker 返回,而異步消息是調用預先定義好的回調函數。

異步消息和同步消息整體差不多,可以說在基於 Netty 實現異步消息比同步消息還要簡單一下,我們這裡主要來看一些不同點:
- 調用 DefaultMQProducer 異步發送接口需要我們定義 SendCallback 回調函數,在執行成功或者執行失敗後回調。
- DefaultMQProducerImp 中的 send 方法會將異步發送請求封裝成 Runable 提交到線程池,然後業務線程就直接返回了。
- sendDefaultImpl 計算重試同步和異步消息有區別,異步消息在這裡不會重試,而是在後面結果返回的時候通過遞歸重試。
- 跟着調用鏈到 sendMessageAsync 方法,需要注意的是這裡構建了 InvokeCallback 實例,ResponseFuture 會持有該實例,Netty 結果返回後調用該實例的方法。
- 下面就是正常的 Netty 數據發送流程,直到 Broker 處理完請求,返回結果。NettyRemotingClient 處理可讀事件,NettyClientHandler 處理返回結果,調用 ResponseFuture.executeInokeCallback,進而調用 InvokeCallback.operationComplete.
- 如果 Broker 返回結果是成功的,則封裝返回結果 SendResult,並回調業務實現的 SendCallback.onSucess 方法,更新容錯項。
- 如果 Broker 返回失敗,或出現任何異常則執行重試,重試超過 retryTimesWhenSendFailed 次則回調業務定義的 SendCallback.onException方法。
總結
以上就是 RocketMQ 消息發送的主要內容,我們簡單的總結下:
- Netty:BossGroup 處理客戶端連接請求,生成 ServerSocketChannel 註冊到 WorkerGroup,WorkerGroup 處理網絡讀寫請求,調用 Channel 對應的 Pipeline 處理請求,Pipeline 中有很多 ChannelHandler 對請求進行處理。
- 通信機制:基於 Netty 實現,只需要留意 NettyRemotingServer/NettyRemotingClient 的初始化,並且在通道變得可讀/可寫時,會調用 NettyServerHandler/NettyClienthandler 進行處理。
- 同步異步:同步和異步消息大同小異,只是同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,而異步消息發送請求後不會等待,請求返回回調用 SendCallback 相應的方法。
以上就是今天全部的內容,如果覺得本期的內容對你有用的話記得點贊、關注、轉發收藏,這將是對我最大的支持。如果你需要 RocketMQ 相關的所有資料,可以評論區留言,或者關注三此君的公眾號,回復 mq 即可。
消息已經發送給了 Broker,下一期我們將來看看Broker 是如何存儲消息的,RocketMQ 如何支持百萬級的吞吐量?感謝觀看,我們下期再見

參考文獻
- RocketMQ 官方文檔
- RocketMQ 源碼
- 丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.
- 李偉. RocketMQ分佈式消息中間件:核心原理與最佳實踐. 電子工業出版社, 2020-08.
- 楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.


