RocketMQ架构原理解析(四):消息生产端(Producer)

RocketMQ架构原理解析(一):整体架构
RocketMQ架构原理解析(二):消息存储(CommitLog)
RocketMQ架构原理解析(三):消息索引(ConsumeQueue & IndexFile)
RocketMQ架构原理解析(四):消息生产端(Producer)

一、概述

如果你曾经使用过RocketMQ,那么一定对以下发送消息的代码不陌生

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message(topic, new byte[] {'hello, world'});
producer.send(message);

寥寥几行代码,便是本文要论述的全部。阿里有句土话,叫“把复杂留给自己,把简单交给别人”用在这里可能最合适不过了,这5行代码中,最重要的是producer.start()producer.send(),也就是producer启动及消息发送

二、Producer启动

对应代码producer.start()

其实仅仅一行代码,在produer端的后台启动了多个线程来协同工作,接下来我们逐一阐述

2.1、Netty

我们都知道,RocketMQ是一个集群部署、跨网络的产品,除了producer、consumer需要网络传输外,数据还需要在集群中流转。所以一个高效、可靠的网络组件是必不可少的。而RocketMQ选择了netty

使用netty首先需要考虑的便是业务上的数据粘包问题,netty提供了一些较为常用的解决方案,如:固定长度(比如每次发送的消息长度均为1024byte)、固定分隔符(比如每次发送的消息长度均为1024byte)等。而RocketMQ使用的则是最为通用的head、body分离方式,即head存储消息的长度,body存储真正的消息数据,具体实现可参见类o.a.r.r.n.NettyRemotingClient

而消息收发这块,RocketMQ将所有的消息都收敛到同一个协议类o.a.r.r.p.RemotingCommand中,即消息发送、接收都会将其封装在该类中,这样做的好处是不言而喻的,即统一规范,减轻网络协议适配不同的消息类型带来的负担

其中较为重要的2个 ChannelHanlder 如下

  • org.apache.rocketmq.remoting.netty.NettyEncoder
    • 消息编码,向 broker 或 nameServer 发送消息时使用,将RemotingCommand转换为byte[]形式
  • org.apache.rocketmq.remoting.netty.NettyDecoder
    • 消息解码,将byte[]转换为RemotingCommand对象,接收 broker 返回的消息时,进行解码操作

2.2、消息格式

消息格式是什么概念?在《消息存储》章节不是已经阐述过消息格式了吗?其实这是两个概念,《消息存储》章节是消息真正落盘时候的存储格式,本小节的消息格式是指消息以什么样的形态交给netty从而在网络上进行传输

消息格式由MsgHeader及MsgBody组成,而消息的长度、标记、版本等重要参数都放在 header 中,body 中仅仅存储数据,没有额外字段;我们主要看一下 header 的数据格式

消息header格式

而站在 netty 视角来看,不论是 msgHeader 还是 msgBody,都属于 netty 网络消息的body部分,所以我们可以简单画一张 netty 视角的消息格式

netty视角的消息格式

2.2.1、Msg Header的自动适配

上文得知,RocketMQ将所有的消息类型、收发都收敛到类RemotingCommand中,但RocketMQ消息类型众多,除了常见的消息发送、接收外,还有通过msgID查询消息、msgKey查询消息、获取broker配置、清理不再使用的topic等等,用一个类适配如此多的类型,具体是如何实现的呢?当新增、修改一种类型又该怎么应对呢?

翻看源码便发现,RemotingCommand的消息头定义为一个接口org.apache.rocketmq.remoting.CommandCustomHeader,不同类型的请求都实现这个接口,并在自己的子类中定义成员变量;那RemotingCommand的消息头又是如何自动解析呢?

public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }
        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

答案就是反射,通过反射获取子类的全部成员属性,并放入变量extFields中,makeCustomHeaderToNet()通过牺牲少量性能的方式,换取了程序极大的灵活性与扩展性,当新增请求类型时,仅需要编写新请求的encode、decode,不用修改其他类型请求的代码

消息编解码适配

2.3、Topic路由信息

2.3.1、Topic创建

发送消息的前置是需要创建一个topic,创建topic的admin命令如下

updateTopic -b <> -t <> -r <> -w <> -p <> -o <> -u <> -s <>

例如:
updateTopic -b 127.0.0.1:10911 -t testTopic -r 8 -w 8 -p 6 -o false -u false -s false

简单介绍下每个参数的作用

  • -b broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
  • -c cluster 地址,表示 topic 所在 cluster,会向 cluster 中所有的 broker 发送请求
  • -t topic 名称
  • -r 可读队列数(默认为 8,后文还会展开)
  • -w 可写队列数(默认为 8,后文还会展开)
  • -p 指定新topic的读写权限 (W=2|R=4|WR=6)2表示当前topic仅可写入数据,4表示仅可读,6表示可读可写
  • -o set topic’s order(true|false)
  • -u is unit topic (true|false)
  • -s has unit sub (true|false)

创建流程为 admin -> broker -> nameServer

如果执行命令updateTopic -b 127.0.0.1:8899 -t testTopic -r 8 -w 8 意味着会在127.0.0.1:8899对应的broker下创建一个topic,这个topic的读写队列都是 8

那如果是这样的场景呢:集群A有3个master节点,当执行命令updateTopic -c clusterName -t testTopic -r 8 -w 8 后,站在集群A角度来看,当前topic总共创建了多少个写队列?其实 RocketMQ 接到这条命令后,会向3个 broker 分别发送创建 topic 的命令,这样每个broker上都会有8个读队列,8个写队列,所以站在集群的视角,这个 topic 总共会有 24 个读队列,24 个写队列

2.3.2、writeQueueNum VS readQueueNum

首选需要明确的是,读、写队列,这两个概念是 RocketMQ 独有的,而 kafka 中只有一个partition的概念,不区分读写。一般情况下,这两个值建议设置为相等;我们分别看一下 client 端对它们的处理 (均在类MQClientInstance.java

producer端

for (int i = 0; i < qd.getWriteQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    info.getMessageQueueList().add(mq);
}

consumer端

for (int i = 0; i < qd.getReadQueueNums(); i++) {
    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
    mqList.add(mq);
}

如果2个队列设置不相等,例如我们设置6个写队列,4个读队列的话:

writeQueueNum_and_writeQueueNum

这样,4、5号队列中的数据一定不会被消费。

  • writeQueueNum > readQueueNum
    • 大于 readQueueNum 部分的队列永远不会被消费
  • writeQueueNum < readQueueNum
    • 所有队列中的数据都会被消费,但部分读队列数据一直是空的

这样设计有什么好处呢?其实是更精细的控制了读写操作,例如当我们要迁移 broker 时,可以首先将写入队列设置为0,将客户端引流至其他 broker 节点,等读队列数据也处理完毕后,再关闭 read 操作

2.3.3、路由数据格式

topic的路由数据如何由Admin发起创建,再被各个broker响应,继而被nameServer统一组织创建的流程我们暂且不讨论,为防止发散,我们直接从producer从nameServer获取路由数据开始。从nameServer获取到的路由数据格式如下

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

而存放路由数据的结构是queueDatasbrokerDatas

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
}

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

在此,简单阐述一下RocketMQ的cluster、brokerName、brokerId的概念
cluster_brokerName_brokerId概念

上图描述了一个cluster下有3个broker,每个broker又有1个master,2个slave组成;这也就是为什么类BrokerData中有HashMap<Long, String> brokerAddrs变量的原因,因为可能同一个brokerName下由多个节点组成。注:master节点的编号始终为0

2.3.4、Topic路由信息何时发生变化

这些路由信息什么时候发生变化呢?我们举例说明

举例1:某集群有3台 master,分别向其中的2台发送了创建topic的命令,此时所有的clent端都知道这个topic的数据在这两个broker上;这个时候通过admin向第3台broker发送创建topic命令,nameServer的路由信息便发生了变更,等client端30秒轮训后,便可以更新到最新的topic路由信息

举例2:某集群有3台 master,topic分别在3台broker上都创建了,此时某台broker宕机,nameServer将其摘除,等待30秒轮询后,client拿到最新路由信息

思考:client 端路由信息的变化是依托于30秒的轮询,如果路由信息已经发生变化,且轮询未发生,client端拿着旧的topic路由信息访问集群,一定会有短暂报错,此处也是待优化的点

2.3.5、定时更新Topic路由信息

RocketMQ会每隔30秒更新topic的路由信息

定时更新topic路由信息

此处简单留意一下TopicRouteDataTopicPublishInfo,其实TopicPublishInfo是由TopicRouteData变种而来,多了一个messageQueueList的属性,在producer端,该属性为写入队列,即某个topic所有的可写入的队列集合

此处抛出一个问题,如果producer只想某个topic发送了一条消息,后续再没有发送过,这种设计会带来哪些问题?如果这种场景频繁发生呢?

2.4、与Broker心跳

主要分为两部分:

  • 1、清空无效broker
  • 2、向有效的broker发送心跳

2.4.1、清空无效的broker

由上节得知,RocketMQ会获取所有已经注册的topic所在的broker信息,并将这些信息存储在变量brokerAddrTable中,brokerAddrTable的存储结构如下

ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable ;
  • key: brokerName,例如一个master带2个slave都属于同一个brokerName
  • val: HashMap<Long, String>,key为brokerId(其中master的brokerId固定为0),val为ip地址

如何判断某个broker有效无效呢?判断依据便是MQClientInstance#topicRouteTable,这个变量是上节中从nameServer中同步过来的,如果brokerAddrTable中有broker A、B、C,而topicRouteTable只有A、B的话,那么就需要从brokerAddrTable中删除C。

需要注意的是,在整个check及替换过程中都添加了独占锁lockNamesrv,而上节中维护更新topic路由信息也是指定的该锁

2.4.2、发送心跳数据

发送心跳数据

此处目的仅为与broker保持网络心跳,如果连接失败或发生异常,仅会打印日志,并不会有额外操作

三、消息发送

消息发送流程

消息发送比较重要的是2点内容

  • 发送数据的负载均衡问题;RocketMQ默认采用的是轮训的方式
  • 消息发送的方式;分同步、异步、单向

3.1、负载均衡

默认的发送策略为轮询的方式

不过RocketMQ也支持了比较灵活的队列选择,可以使用MessageQueueSelector

producer.send(zeroMsg, (mqs, msg, arg) -> {
    int index = msg.getKeys().hashCode() % mqs.size();
    return mqs.get(index);
}, 1000);

上例便是将msgKey取模,这样同样msgKey的消息必定会落在同一个队列中

3.2、消息发送的3种方式

RocketMQ的rpc组件采用的是netty,而netty的网络请求设计是完全异步的,所以一个请求避免一定可以拆成以下3个步骤

  • a、客户端发送请求到服务器(由于完全异步,所以请求数据可能只放在了socket缓冲区,并没有出网卡
  • b、服务器端处理请求(此过程不涉及网络开销,不过通常也是比较耗时的
  • c、服务器向客户端返回应答(请求的response

3.2.1、同步发送消息

SendResult result = producer.send(zeroMsg);

此过程比较好理解,即完成a、b、c所有步骤后才会返回,耗时也是 a + b + c 的总和

3.2.2、异步发送消息

通常在业务中发送消息的代码如下:

SendCallback sendCallback = new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // doSomeThing;
    }
    @Override
    public void onException(Throwable e) {
        // doSomeThing;
    }
};
producer.send(zeroMsg, sendCallback);

而RocketMQ处理异步消息的逻辑是,直接启动一个线程,而最终的结果异步回调SendCallback

ExecutorService executor = this.getAsyncSenderExecutor();
try {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
            } catch (Exception e) {
                sendCallback.onException(e);
            }
        }

    });
} catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
}

3.2.2、单向发送消息

producer.sendOneway(zeroMsg);

此模式与sync模式类似,都要经过producer端在数据发送前的数据组装工作,不过在将数据交给netty,netty调用操作系统函数将数据放入socket缓冲区后,所有的过程便已结束。什么场景会用到此模式呢?比如对可靠性要求并不高,但要求耗时非常短的场景,比如日志收集等

三个请求哪个更快呢?如果单论一个请求的话,肯定是async异步的方式最快,因为它直接把工作交给另外一个线程去完成,主线程直接返回了;但不论是async还是sync,它们都是需要将 a、b、c 3个步骤都走完的,所以总开销并不会减少。但oneWay因为只需将数据放入socket缓冲区后,client 端就直接返回了,少了监听并解析 server 端 response 的过程,所以可以得到最好的性能

四、总结

本章阐述了producer端相对重要的一些功能点,感觉比较核心的还是队列相关的概念;但RocketMQ发展迭代了这么多年,也涵盖了很多及细小的特性,本文不能穷尽,比如“消息的压缩”、“规避发送延迟较长的broker”、“超时异常”等等,这些功能点独立且零碎,读源码时可以带着问题跟进,这样针对性强,效率也会高很多

Tags: