一張圖進階 RocketMQ – 通信機制

前 言

三此君看了好幾本書,看了很多遍源碼整理的 一張圖進階 RocketMQ 圖片,關於 RocketMQ 你只需要記住這張圖!覺得不錯的話,記得點贊關注哦。
一張圖進階 RocketMQ.jpg
【重要】視頻在 B 站同步更新,歡迎圍觀,輕輕鬆鬆漲姿勢。一張圖進階 RocketMQ-通信機制(視頻版)
點擊查看【bilibili】

本文是「一張圖進階 RocketMQ」第 4 篇,對 RocketMQ 不了解的同學可以先看看前面三期:

  1. 一張圖進階 RocketMQ-整體架構
  2. 一張圖進階 RocketMQ – NameServer
  3. 一張圖進階 RocketMQ – 消息發送

上一期分享了 RocketMQ 生產者啟動流程及同步消息發送流程,我們知道了在通信層是基於 Netty 將消息傳遞給 Broker 進行存儲的。如果對 Netty 完全不了解我們就很難真正理解 RocketMQ,所以今天我們簡單的聊一聊 Netty 基本流程,然後分析 RocketMQ 的通信機制,最後通過異步消息發送來串聯 RocketMQ 通信機制。

Netty 介紹

Netty 有很多概念,等介紹完概念大家都困了,我們就不過多介紹了,直接結合示例來看看 Netty 的基礎流程,能夠幫助我們更好的理解 RocketMQ 即可。
image.png

  1. Netty 服務端啟動初始化兩個線程組 BossGroup & WorkerGroup,分別用於處理客戶端連接及網絡讀寫
  2. Netty 客戶端啟動初始化一個線程組, 用於處理請求及返回結果。
  3. 客戶端 connect 到 Netty 服務端,創建用於 傳輸數據的 Channel
  4. Netty 服務端的 BossGroup 處理客戶端的連接請求,然後把剩下的工作交給 WorkerGroup。
  5. 連接建立好了,客戶端就可以利用這個連接發送數據給 Netty 服務端。
  6. Netty WorkerGroup 中的線程使用 Pipeline(包含多個處理器 Handler) 對數據進行處理。
  7. Netty 服務端的處理完請求後,返回結果也經過 Pipeline 處理。
  8. Netty 服務端通過 Channel 將數據返回給客戶端。
  9. 客戶端通過 Channel 接收到數據,也經過 Pipeline 進行處理。

Netty 示例

我們先用 Netty 實現一個簡單的 服務端/客戶端 通信示例,我們是這樣使用的,那 RocketMQ 基於 Netty 的通信也應該是這樣使用的,不過是在這個基礎上封裝了一層。主要關注以下幾個點:服務端什麼時候初始化的,服務端實現的 Handler 做了什麼事?客戶端什麼時候初始化的,客戶端實現的 Handler 做了什麼事?
Netty 服務端初始化:初始化的代碼很關鍵,我們要從源碼上理解 RocketMQ 的通信機制,那肯定會看到類似的代碼。根據上面的流程來看,首先是實例化 bossGroup 和 workerGroup,然後初始化 Channel,從代碼可以看出我們是在 Pipeline 中添加了自己實現的 Handler,這個 Handler 就是業務自己的邏輯了,那 RocketMQ 要處理數據應該也需要實現相應的 Handler。

public class MyServer {
    public static void main(String[] args) throws Exception {
        //創建兩個線程組 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //創建服務端的啟動對象,設置參數
            ServerBootstrap bootstrap = new ServerBootstrap();
            //設置兩個線程組boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //設置服務端通道實現類型    
                .channel(NioServerSocketChannel.class)
                //使用匿名內部類的形式初始化Channel對象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //給pipeline管道添加處理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//給workerGroup的EventLoop對應的管道設置處理器
            //綁定端口號,啟動服務端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //對關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

實現自定義的服務端處理器 Handler:自定義的 Handler 需要實現 Netty 定義的 HandlerAdapter,當有可讀事件時就會調用這裡的 channelRead() 方法。等下我們看 RocketMQ 通信機制的時候留意RocketMQ 自定義了哪些 Handler,這些 Handler 有做了什麼事。

/**
 * 自定義的Handler需要繼承Netty規定好的 HandlerAdapter 才能被Netty框架所關聯,有點類似SpringMVC的適配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //獲取客戶端發送過來的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        //發送消息給客戶端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服務端已收到消息,記得關注三此君,記得三連", CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //發生異常,關閉通道
        ctx.close();
    }
}

Netty 客戶端初始化:Netty 客戶端,在 RocketMQ 中對應了 Producer/Consumer。在 Producer 啟動中有一步是啟動通信模塊服務,其實就是初始化 Netty 客戶端。客戶端也需要先實例化一個 NioEventLoopGroup,然後將自定義的 handler 添加到 Pipeline,還有很重要的一步是我們需要 connect 連接到 Netty 服務端。

public class MyClient {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //創建bootstrap啟動引導對象,配置參數
            Bootstrap bootstrap = new Bootstrap();
            //設置線程組
            bootstrap.group(eventExecutors)
                //設置客戶端的Channel實現類型    
                .channel(NioSocketChannel.class)
                //使用匿名內部類初始化 Pipeline
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客戶端Channel的處理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    })
            //connect連接服務端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //對Channel關閉進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            //關閉線程組
            eventExecutors.shutdownGracefully();
        }
    }
}

實現自定義的客戶端處理器 Handler:客戶端處理器也繼承自 Netty 定義的 HandlerAdapter,當 Channel 變得可讀的時候(服務端數據返回)會調用我們自己實現的 channelRead()。

public class MyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //發送消息到服務端
        ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生產者發送消息~", CharsetUtil.UTF_8));
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服務端發送過來的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到三此君的消息,我一定會三連的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

RocketMQ 通信流程

RocketMQ 通信模塊基於 Netty 實現,總體代碼量不多。主要是 NettyRemotingServer和NettyRemotingClient,分別對應通信的服務端和客戶端。根據前面的 Netty 示例,我們要理解 RocketMQ 如何基於 Netty 通信,只需要知道 4 個地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基於 NettyRemotingClient 發送消息,無論是客戶端還是服務端收到數據後都需要 Handler 來處理。
image.png

  • Broker/NameServer 需要啟動 Netty 服務端。Broker 我們後面會進一步分析,只需要知道 Broker 啟動的時候會調用 NettyRemotingServer.start() 方法初始化 Netty 服務器。主要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組,配置 Channel,添加 NettyServerHandler,調用 serverBootstrap.bind() 監聽端口等待客戶端連接。
  • Producer/Consumer 需要啟動 Netty 客戶端,在生產者啟動流程中 MQClientInstantce 啟動通信服務模塊,其實就是調用NettyRemotingClient.start() 初始化 Netty 客戶端。主要做了 3 件事:配置客戶端 NioEventLoopGroup 線程組,配置 Channel,添加 NettyClientHandler。
  • 客戶端配置了 Channel,但是 Channel 還沒有創建,因為 Channel 肯定要和具體的 Server IP Addr 關聯。在同步消息發送流程中,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中獲取或者創建一個新的 Channel,其實就是調用 bootstrap.connect() 連接到 NettyServer,創建用於通信的 Channel。
  • 有了 Channel 後,Producer 調用 Channel.writeAndFlush() 將數據發送給服務器。NettyRemotingServer WorkerGroup 處理可讀事件,調用 NettyServerHandler 處理數據。
  • NettyServerHandler 調用 processMessageReceived方法。processMessageReceived 方法做了什麼呢?通過傳入的請求碼 RequestCode 區別不同的請求,不同的請求定義了不同的 Processor。例如,是生產者存入消息使用 SendMessageProcessor,查詢消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。這些 Processor 在服務端初始化的時候,以 RequestCode 為 Key 添加到 Processor 緩存中。processMessageReceived 就是根據 RequeseCode 獲取不同的 Processor,處理完後把結果返回給 NettyRemotingClient。
  • NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 處理返回結果。NettyClientHandler也調用processMessageReceived 處理返回結果。processMessageReceived 從以 opaque 為 key ResponseTables 緩存沖取出 ResponseFuture,將返回結果設置到 ResponseFuture。同步消息則執行 responseFuture.putResponse(),異步調用執行回調。

異步發送

除了同步消息發送,RocketMQ 還支持異步發送。我們只需要在前面是示例中稍作修改就會得到一個異步發送示例,最大的不同在於發送的時候傳入 SendCallback 接收異步返回結果回調。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 設置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動Producer實例
        producer.start();
        // 創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8")); 
        // SendCallback 接收異步返回結果的回調
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("關注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.out.printf("三連呀!!!%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

同步發送個異步發送主要的過程都是一樣的,不同點在於同步消息調用 Netty Channel.writeAndFlush 之後是 waitResponse 等待 Broker 返回,而異步消息是調用預先定義好的回調函數。
image.png
異步消息和同步消息整體差不多,可以說在基於 Netty 實現異步消息比同步消息還要簡單一下,我們這裡主要來看一些不同點:

  • 調用 DefaultMQProducer 異步發送接口需要我們定義 SendCallback 回調函數,在執行成功或者執行失敗後回調。
  • DefaultMQProducerImp 中的 send 方法會將異步發送請求封裝成 Runable 提交到線程池,然後業務線程就直接返回了。
  • sendDefaultImpl 計算重試同步和異步消息有區別,異步消息在這裡不會重試,而是在後面結果返回的時候通過遞歸重試。
  • 跟着調用鏈到 sendMessageAsync 方法,需要注意的是這裡構建了 InvokeCallback 實例,ResponseFuture 會持有該實例,Netty 結果返回後調用該實例的方法。
  • 下面就是正常的 Netty 數據發送流程,直到 Broker 處理完請求,返回結果。NettyRemotingClient 處理可讀事件,NettyClientHandler 處理返回結果,調用 ResponseFuture.executeInokeCallback,進而調用 InvokeCallback.operationComplete.
  • 如果 Broker 返回結果是成功的,則封裝返回結果 SendResult,並回調業務實現的 SendCallback.onSucess 方法,更新容錯項。
  • 如果 Broker 返回失敗,或出現任何異常則執行重試,重試超過 retryTimesWhenSendFailed 次則回調業務定義的 SendCallback.onException方法。

總結

以上就是 RocketMQ 消息發送的主要內容,我們簡單的總結下:

  • Netty:BossGroup 處理客戶端連接請求,生成 ServerSocketChannel 註冊到 WorkerGroup,WorkerGroup 處理網絡讀寫請求,調用 Channel 對應的 Pipeline 處理請求,Pipeline 中有很多 ChannelHandler 對請求進行處理。
  • 通信機制:基於 Netty 實現,只需要留意 NettyRemotingServer/NettyRemotingClient 的初始化,並且在通道變得可讀/可寫時,會調用 NettyServerHandler/NettyClienthandler 進行處理。
  • 同步異步:同步和異步消息大同小異,只是同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,而異步消息發送請求後不會等待,請求返回回調用 SendCallback 相應的方法。

以上就是今天全部的內容,如果覺得本期的內容對你有用的話記得點贊、關注、轉發收藏,這將是對我最大的支持。如果你需要 RocketMQ 相關的所有資料,可以評論區留言,或者關注三此君的公眾號,回復 mq 即可。
消息已經發送給了 Broker,下一期我們將來看看Broker 是如何存儲消息的,RocketMQ 如何支持百萬級的吞吐量?感謝觀看,我們下期再見
image.png

參考文獻

  • RocketMQ 官方文檔
  • RocketMQ 源碼
  • 丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.
  • 李偉. RocketMQ分佈式消息中間件:核心原理與最佳實踐. 電子工業出版社, 2020-08.
  • 楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.