一张图进阶 RocketMQ – 通信机制

前 言

三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
一张图进阶 RocketMQ.jpg
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-通信机制(视频版)
点击查看【bilibili】

本文是“一张图进阶 RocketMQ”第 4 篇,对 RocketMQ 不了解的同学可以先看看前面三期:

  1. 一张图进阶 RocketMQ-整体架构
  2. 一张图进阶 RocketMQ – NameServer
  3. 一张图进阶 RocketMQ – 消息发送

上一期分享了 RocketMQ 生产者启动流程及同步消息发送流程,我们知道了在通信层是基于 Netty 将消息传递给 Broker 进行存储的。如果对 Netty 完全不了解我们就很难真正理解 RocketMQ,所以今天我们简单的聊一聊 Netty 基本流程,然后分析 RocketMQ 的通信机制,最后通过异步消息发送来串联 RocketMQ 通信机制。

Netty 介绍

Netty 有很多概念,等介绍完概念大家都困了,我们就不过多介绍了,直接结合示例来看看 Netty 的基础流程,能够帮助我们更好的理解 RocketMQ 即可。
image.png

  1. Netty 服务端启动初始化两个线程组 BossGroup & WorkerGroup,分别用于处理客户端连接及网络读写
  2. Netty 客户端启动初始化一个线程组, 用于处理请求及返回结果。
  3. 客户端 connect 到 Netty 服务端,创建用于 传输数据的 Channel
  4. Netty 服务端的 BossGroup 处理客户端的连接请求,然后把剩下的工作交给 WorkerGroup。
  5. 连接建立好了,客户端就可以利用这个连接发送数据给 Netty 服务端。
  6. Netty WorkerGroup 中的线程使用 Pipeline(包含多个处理器 Handler) 对数据进行处理。
  7. Netty 服务端的处理完请求后,返回结果也经过 Pipeline 处理。
  8. Netty 服务端通过 Channel 将数据返回给客户端。
  9. 客户端通过 Channel 接收到数据,也经过 Pipeline 进行处理。

Netty 示例

我们先用 Netty 实现一个简单的 服务端/客户端 通信示例,我们是这样使用的,那 RocketMQ 基于 Netty 的通信也应该是这样使用的,不过是在这个基础上封装了一层。主要关注以下几个点:服务端什么时候初始化的,服务端实现的 Handler 做了什么事?客户端什么时候初始化的,客户端实现的 Handler 做了什么事?
Netty 服务端初始化:初始化的代码很关键,我们要从源码上理解 RocketMQ 的通信机制,那肯定会看到类似的代码。根据上面的流程来看,首先是实例化 bossGroup 和 workerGroup,然后初始化 Channel,从代码可以看出我们是在 Pipeline 中添加了自己实现的 Handler,这个 Handler 就是业务自己的逻辑了,那 RocketMQ 要处理数据应该也需要实现相应的 Handler。

public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                //使用匿名内部类的形式初始化Channel对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道添加处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器
            //绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

实现自定义的服务端处理器 Handler:自定义的 Handler 需要实现 Netty 定义的 HandlerAdapter,当有可读事件时就会调用这里的 channelRead() 方法。等下我们看 RocketMQ 通信机制的时候留意RocketMQ 自定义了哪些 Handler,这些 Handler 有做了什么事。

/**
 * 自定义的Handler需要继承Netty规定好的 HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,记得关注三此君,记得三连", CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常,关闭通道
        ctx.close();
    }
}

Netty 客户端初始化:Netty 客户端,在 RocketMQ 中对应了 Producer/Consumer。在 Producer 启动中有一步是启动通信模块服务,其实就是初始化 Netty 客户端。客户端也需要先实例化一个 NioEventLoopGroup,然后将自定义的 handler 添加到 Pipeline,还有很重要的一步是我们需要 connect 连接到 Netty 服务端。

public class MyClient {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap启动引导对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                //设置客户端的Channel实现类型    
                .channel(NioSocketChannel.class)
                //使用匿名内部类初始化 Pipeline
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端Channel的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    })
            //connect连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //对Channel关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}

实现自定义的客户端处理器 Handler:客户端处理器也继承自 Netty 定义的 HandlerAdapter,当 Channel 变得可读的时候(服务端数据返回)会调用我们自己实现的 channelRead()。

public class MyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送消息~", CharsetUtil.UTF_8));
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到三此君的消息,我一定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

RocketMQ 通信流程

RocketMQ 通信模块基于 Netty 实现,总体代码量不多。主要是 NettyRemotingServer和NettyRemotingClient,分别对应通信的服务端和客户端。根据前面的 Netty 示例,我们要理解 RocketMQ 如何基于 Netty 通信,只需要知道 4 个地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基于 NettyRemotingClient 发送消息,无论是客户端还是服务端收到数据后都需要 Handler 来处理。
image.png

  • Broker/NameServer 需要启动 Netty 服务端。Broker 我们后面会进一步分析,只需要知道 Broker 启动的时候会调用 NettyRemotingServer.start() 方法初始化 Netty 服务器。主要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组,配置 Channel,添加 NettyServerHandler,调用 serverBootstrap.bind() 监听端口等待客户端连接。
  • Producer/Consumer 需要启动 Netty 客户端,在生产者启动流程中 MQClientInstantce 启动通信服务模块,其实就是调用NettyRemotingClient.start() 初始化 Netty 客户端。主要做了 3 件事:配置客户端 NioEventLoopGroup 线程组,配置 Channel,添加 NettyClientHandler。
  • 客户端配置了 Channel,但是 Channel 还没有创建,因为 Channel 肯定要和具体的 Server IP Addr 关联。在同步消息发送流程中,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建一个新的 Channel,其实就是调用 bootstrap.connect() 连接到 NettyServer,创建用于通信的 Channel。
  • 有了 Channel 后,Producer 调用 Channel.writeAndFlush() 将数据发送给服务器。NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler 处理数据。
  • NettyServerHandler 调用 processMessageReceived方法。processMessageReceived 方法做了什么呢?通过传入的请求码 RequestCode 区别不同的请求,不同的请求定义了不同的 Processor。例如,是生产者存入消息使用 SendMessageProcessor,查询消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。这些 Processor 在服务端初始化的时候,以 RequestCode 为 Key 添加到 Processor 缓存中。processMessageReceived 就是根据 RequeseCode 获取不同的 Processor,处理完后把结果返回给 NettyRemotingClient。
  • NettyRemotingClient 收到可读事件,调用 NettyClientHandler 处理返回结果。NettyClientHandler也调用processMessageReceived 处理返回结果。processMessageReceived 从以 opaque 为 key ResponseTables 缓存冲取出 ResponseFuture,将返回结果设置到 ResponseFuture。同步消息则执行 responseFuture.putResponse(),异步调用执行回调。

异步发送

除了同步消息发送,RocketMQ 还支持异步发送。我们只需要在前面是示例中稍作修改就会得到一个异步发送示例,最大的不同在于发送的时候传入 SendCallback 接收异步返回结果回调。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8")); 
        // SendCallback 接收异步返回结果的回调
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.out.printf("三连呀!!!%-10d Exception %s %n", index, e);
                e.printStackTrace();
            }
        });
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

同步发送个异步发送主要的过程都是一样的,不同点在于同步消息调用 Netty Channel.writeAndFlush 之后是 waitResponse 等待 Broker 返回,而异步消息是调用预先定义好的回调函数。
image.png
异步消息和同步消息整体差不多,可以说在基于 Netty 实现异步消息比同步消息还要简单一下,我们这里主要来看一些不同点:

  • 调用 DefaultMQProducer 异步发送接口需要我们定义 SendCallback 回调函数,在执行成功或者执行失败后回调。
  • DefaultMQProducerImp 中的 send 方法会将异步发送请求封装成 Runable 提交到线程池,然后业务线程就直接返回了。
  • sendDefaultImpl 计算重试同步和异步消息有区别,异步消息在这里不会重试,而是在后面结果返回的时候通过递归重试。
  • 跟着调用链到 sendMessageAsync 方法,需要注意的是这里构建了 InvokeCallback 实例,ResponseFuture 会持有该实例,Netty 结果返回后调用该实例的方法。
  • 下面就是正常的 Netty 数据发送流程,直到 Broker 处理完请求,返回结果。NettyRemotingClient 处理可读事件,NettyClientHandler 处理返回结果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete.
  • 如果 Broker 返回结果是成功的,则封装返回结果 SendResult,并回调业务实现的 SendCallback.onSucess 方法,更新容错项。
  • 如果 Broker 返回失败,或出现任何异常则执行重试,重试超过 retryTimesWhenSendFailed 次则回调业务定义的 SendCallback.onException方法。

总结

以上就是 RocketMQ 消息发送的主要内容,我们简单的总结下:

  • Netty:BossGroup 处理客户端连接请求,生成 ServerSocketChannel 注册到 WorkerGroup,WorkerGroup 处理网络读写请求,调用 Channel 对应的 Pipeline 处理请求,Pipeline 中有很多 ChannelHandler 对请求进行处理。
  • 通信机制:基于 Netty 实现,只需要留意 NettyRemotingServer/NettyRemotingClient 的初始化,并且在通道变得可读/可写时,会调用 NettyServerHandler/NettyClienthandler 进行处理。
  • 同步异步:同步和异步消息大同小异,只是同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,而异步消息发送请求后不会等待,请求返回回调用 SendCallback 相应的方法。

以上就是今天全部的内容,如果觉得本期的内容对你有用的话记得点赞、关注、转发收藏,这将是对我最大的支持。如果你需要 RocketMQ 相关的所有资料,可以评论区留言,或者关注三此君的公众号,回复 mq 即可。
消息已经发送给了 Broker,下一期我们将来看看Broker 是如何存储消息的,RocketMQ 如何支持百万级的吞吐量?感谢观看,我们下期再见
image.png

参考文献

  • RocketMQ 官方文档
  • RocketMQ 源码
  • 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
  • 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.