一张图进阶 RocketMQ – 通信机制
前 言
三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-通信机制(视频版)
点击查看【bilibili】
本文是“一张图进阶 RocketMQ”第 4 篇,对 RocketMQ 不了解的同学可以先看看前面三期:
上一期分享了 RocketMQ 生产者启动流程及同步消息发送流程,我们知道了在通信层是基于 Netty 将消息传递给 Broker 进行存储的。如果对 Netty 完全不了解我们就很难真正理解 RocketMQ,所以今天我们简单的聊一聊 Netty 基本流程,然后分析 RocketMQ 的通信机制,最后通过异步消息发送来串联 RocketMQ 通信机制。
Netty 介绍
Netty 有很多概念,等介绍完概念大家都困了,我们就不过多介绍了,直接结合示例来看看 Netty 的基础流程,能够帮助我们更好的理解 RocketMQ 即可。
- Netty 服务端启动初始化两个线程组 BossGroup & WorkerGroup,分别用于处理客户端连接及网络读写。
- Netty 客户端启动初始化一个线程组, 用于处理请求及返回结果。
- 客户端 connect 到 Netty 服务端,创建用于 传输数据的 Channel。
- Netty 服务端的 BossGroup 处理客户端的连接请求,然后把剩下的工作交给 WorkerGroup。
- 连接建立好了,客户端就可以利用这个连接发送数据给 Netty 服务端。
- Netty WorkerGroup 中的线程使用 Pipeline(包含多个处理器 Handler) 对数据进行处理。
- Netty 服务端的处理完请求后,返回结果也经过 Pipeline 处理。
- Netty 服务端通过 Channel 将数据返回给客户端。
- 客户端通过 Channel 接收到数据,也经过 Pipeline 进行处理。
Netty 示例
我们先用 Netty 实现一个简单的 服务端/客户端 通信示例,我们是这样使用的,那 RocketMQ 基于 Netty 的通信也应该是这样使用的,不过是在这个基础上封装了一层。主要关注以下几个点:服务端什么时候初始化的,服务端实现的 Handler 做了什么事?客户端什么时候初始化的,客户端实现的 Handler 做了什么事?
Netty 服务端初始化:初始化的代码很关键,我们要从源码上理解 RocketMQ 的通信机制,那肯定会看到类似的代码。根据上面的流程来看,首先是实例化 bossGroup 和 workerGroup,然后初始化 Channel,从代码可以看出我们是在 Pipeline 中添加了自己实现的 Handler,这个 Handler 就是业务自己的逻辑了,那 RocketMQ 要处理数据应该也需要实现相应的 Handler。
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//使用匿名内部类的形式初始化Channel对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道添加处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
实现自定义的服务端处理器 Handler:自定义的 Handler 需要实现 Netty 定义的 HandlerAdapter,当有可读事件时就会调用这里的 channelRead() 方法。等下我们看 RocketMQ 通信机制的时候留意RocketMQ 自定义了哪些 Handler,这些 Handler 有做了什么事。
/**
* 自定义的Handler需要继承Netty规定好的 HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,记得关注三此君,记得三连", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}
Netty 客户端初始化:Netty 客户端,在 RocketMQ 中对应了 Producer/Consumer。在 Producer 启动中有一步是启动通信模块服务,其实就是初始化 Netty 客户端。客户端也需要先实例化一个 NioEventLoopGroup,然后将自定义的 handler 添加到 Pipeline,还有很重要的一步是我们需要 connect 连接到 Netty 服务端。
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap启动引导对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的Channel实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化 Pipeline
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端Channel的处理器
ch.pipeline().addLast(new MyClientHandler());
}
})
//connect连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对Channel关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
实现自定义的客户端处理器 Handler:客户端处理器也继承自 Netty 定义的 HandlerAdapter,当 Channel 变得可读的时候(服务端数据返回)会调用我们自己实现的 channelRead()。
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送消息~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到三此君的消息,我一定会三连的" + ctx.channel().remoteAddress() + byteBuf.toString(CharsetUtil.UTF_8));
}
}
RocketMQ 通信流程
RocketMQ 通信模块基于 Netty 实现,总体代码量不多。主要是 NettyRemotingServer和NettyRemotingClient,分别对应通信的服务端和客户端。根据前面的 Netty 示例,我们要理解 RocketMQ 如何基于 Netty 通信,只需要知道 4 个地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基于 NettyRemotingClient 发送消息,无论是客户端还是服务端收到数据后都需要 Handler 来处理。
- Broker/NameServer 需要启动 Netty 服务端。Broker 我们后面会进一步分析,只需要知道 Broker 启动的时候会调用 NettyRemotingServer.start() 方法初始化 Netty 服务器。主要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组,配置 Channel,添加 NettyServerHandler,调用 serverBootstrap.bind() 监听端口等待客户端连接。
- Producer/Consumer 需要启动 Netty 客户端,在生产者启动流程中 MQClientInstantce 启动通信服务模块,其实就是调用NettyRemotingClient.start() 初始化 Netty 客户端。主要做了 3 件事:配置客户端 NioEventLoopGroup 线程组,配置 Channel,添加 NettyClientHandler。
- 客户端配置了 Channel,但是 Channel 还没有创建,因为 Channel 肯定要和具体的 Server IP Addr 关联。在同步消息发送流程中,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建一个新的 Channel,其实就是调用 bootstrap.connect() 连接到 NettyServer,创建用于通信的 Channel。
- 有了 Channel 后,Producer 调用 Channel.writeAndFlush() 将数据发送给服务器。NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler 处理数据。
- NettyServerHandler 调用 processMessageReceived方法。processMessageReceived 方法做了什么呢?通过传入的请求码 RequestCode 区别不同的请求,不同的请求定义了不同的 Processor。例如,是生产者存入消息使用 SendMessageProcessor,查询消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。这些 Processor 在服务端初始化的时候,以 RequestCode 为 Key 添加到 Processor 缓存中。processMessageReceived 就是根据 RequeseCode 获取不同的 Processor,处理完后把结果返回给 NettyRemotingClient。
- NettyRemotingClient 收到可读事件,调用 NettyClientHandler 处理返回结果。NettyClientHandler也调用processMessageReceived 处理返回结果。processMessageReceived 从以 opaque 为 key ResponseTables 缓存冲取出 ResponseFuture,将返回结果设置到 ResponseFuture。同步消息则执行 responseFuture.putResponse(),异步调用执行回调。
异步发送
除了同步消息发送,RocketMQ 还支持异步发送。我们只需要在前面是示例中稍作修改就会得到一个异步发送示例,最大的不同在于发送的时候传入 SendCallback 接收异步返回结果回调。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8"));
// SendCallback 接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("三连呀!!!%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
同步发送个异步发送主要的过程都是一样的,不同点在于同步消息调用 Netty Channel.writeAndFlush 之后是 waitResponse 等待 Broker 返回,而异步消息是调用预先定义好的回调函数。
异步消息和同步消息整体差不多,可以说在基于 Netty 实现异步消息比同步消息还要简单一下,我们这里主要来看一些不同点:
- 调用 DefaultMQProducer 异步发送接口需要我们定义 SendCallback 回调函数,在执行成功或者执行失败后回调。
- DefaultMQProducerImp 中的 send 方法会将异步发送请求封装成 Runable 提交到线程池,然后业务线程就直接返回了。
- sendDefaultImpl 计算重试同步和异步消息有区别,异步消息在这里不会重试,而是在后面结果返回的时候通过递归重试。
- 跟着调用链到 sendMessageAsync 方法,需要注意的是这里构建了 InvokeCallback 实例,ResponseFuture 会持有该实例,Netty 结果返回后调用该实例的方法。
- 下面就是正常的 Netty 数据发送流程,直到 Broker 处理完请求,返回结果。NettyRemotingClient 处理可读事件,NettyClientHandler 处理返回结果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete.
- 如果 Broker 返回结果是成功的,则封装返回结果 SendResult,并回调业务实现的 SendCallback.onSucess 方法,更新容错项。
- 如果 Broker 返回失败,或出现任何异常则执行重试,重试超过 retryTimesWhenSendFailed 次则回调业务定义的 SendCallback.onException方法。
总结
以上就是 RocketMQ 消息发送的主要内容,我们简单的总结下:
- Netty:BossGroup 处理客户端连接请求,生成 ServerSocketChannel 注册到 WorkerGroup,WorkerGroup 处理网络读写请求,调用 Channel 对应的 Pipeline 处理请求,Pipeline 中有很多 ChannelHandler 对请求进行处理。
- 通信机制:基于 Netty 实现,只需要留意 NettyRemotingServer/NettyRemotingClient 的初始化,并且在通道变得可读/可写时,会调用 NettyServerHandler/NettyClienthandler 进行处理。
- 同步异步:同步和异步消息大同小异,只是同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,而异步消息发送请求后不会等待,请求返回回调用 SendCallback 相应的方法。
以上就是今天全部的内容,如果觉得本期的内容对你有用的话记得点赞、关注、转发收藏,这将是对我最大的支持。如果你需要 RocketMQ 相关的所有资料,可以评论区留言,或者关注三此君的公众号,回复 mq 即可。
消息已经发送给了 Broker,下一期我们将来看看Broker 是如何存储消息的,RocketMQ 如何支持百万级的吞吐量?感谢观看,我们下期再见
参考文献
- RocketMQ 官方文档
- RocketMQ 源码
- 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
- 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
- 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.