消息队列的一些场景及源码分析,RocketMQ使用相关问题及性能优化

 前文目录链接参考:

 消息队列的一些场景及源码分析,RocketMQ使用相关问题及性能优化 //www.cnblogs.com/yizhiamumu/p/16694126.html

消息队列的对比测试与RocketMQ使用扩展 //www.cnblogs.com/yizhiamumu/p/16677881.html

消息队列为什么选用redis?聊聊如何做技术方案选型?//www.cnblogs.com/yizhiamumu/p/16573472.html

分布式事务原理及解决方案案例 //www.cnblogs.com/yizhiamumu/p/16662412.html

分布式事务实战方案汇总 //www.cnblogs.com/yizhiamumu/p/16625677.html

消息队列初见:一起聊聊引入系统mq 之后的问题 //www.cnblogs.com/yizhiamumu/p/16573472.html

 

 

参考:消息队列为什么选用redis?聊聊如何做技术方案选型?//www.cnblogs.com/yizhiamumu/p/16573472.html

 

上文,我们把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

 一:消息队列的一些场景

1.1 为什么有各种各样的 MQ?

近几年,确实出现了很多消息队列解决方案,但其实去分析每种消息队列,会发现他们诞生的背景和要针对性解决的问题是不一样的。

  • RabbitMQ 诞生于标准化与开源,打破了商业化消息队列的技术壁垒,但应用场景其实没变,定位为异步与解耦;
  • Kafka 诞生的背景是大数据,以批量,高吞吐等核心能力抢占了大数据管道的心智,随后非常自然地定位到 Streaming 领域;
  • EMQ 重点聚焦的领域在物联网,物联网的挑战跟其他领域是大相径庭的,超大规模的设备与连接数,规则引擎,甚者边缘段需要有一整套完整的解决方案;
  • Pulsar 作为后起之秀尝试在多个领域发力,包括 Messaging、Function、Streaming 等多领域都有相应布局。

回到 RocketMQ,大家能从近两年 RocketMQ 在社区的一系列动作中发现,RocketMQ 同时在消息、事件、流三个领域都有发力,逐渐演进至一个超融合处理平台。作为一个融合的数据处理平台,RocketMQ 当前在开源的布局看起来是与业界多个 MQ 趋同,在 RocketMQ 开源的背后其实是商业上真实的需求驱动。

 

1.2 从性能上来讲,相关基准测试数据是什么水平?

一般讲性能,其实就是吞吐量和延迟两个指标。

对于吞吐量来讲,RocketMQ 在 2017 年就能优化到单机 50W 的 TPS。如果是在批量的场景,实际上从生产环境的稳定性,以及业务消息的重要性来讲,各个消息队列都能轻易地打满网络带宽或者磁盘资源。

也就是说,性能一般情况下差异都不大,是很难作为一个产品的核心竞争力的,除非是架构层面有限制。

延迟就是一个非常重要的指标了,在线业务对于是 2ms 延时和 5ms 延时基本上都能接受,但非常难以接受的是经常性有秒级的毛刺(在延迟这个指标后面长尾延迟)。

除了上述两点,弹性和可扩展能力也是非常重要的。

 

1.3 消息如何存储

消息我们可以直接在内存中使用数组或者队列来存储数据即可。性能非常高。

但是有几方面的缺点

  1. 数据丢失,比如异常情况服务器宕机重启后内存的消息会被丢失掉
  2. 数据量大的时候,内存放不下,或者需要高昂的成本。如果 面对一些业务系统是不能容忍消息丢失的情况,单纯放内存存储也不太可能,所以需要一款持久化的消息系统。

既然要存储数据,就需要解决数据存哪里?从存储方式来看,主要有几个方面:

  • 关系型数据库,比如mysql
  • 分布式KV存储,比如采用rocketdb实现的
  • 文件系统,log 的方式直接追加

性能,吞吐量,本质上就是数据结构的设计决定的。我们看看上面数据存储方式对应的数据结构

存储 数据结构 写放大 读放大
mysql B+ tree 写一条数据需要两次写入1、数据写入是按页为单位进行写的,假设页的大小为B 字节,那么写放大为Θ(B)(最坏的结果)2、为了避免在写页的过程中出现故障,需要写入redo log(WAL) 既支持随机读取又支持范围查找的系统。读放大为O(logBN/B),数据量大的适合性能会急剧下降,常规是b+ tree 超过4层,大约2000万记录是临界点
rocketdb LSM tree Memtable/SSTable实现,写的话也变成顺序写了(这一点是极大的优化点),但是后台会出现多路归并算法来合并,这个过程占用磁盘IO 会到当前消息的读写有扰动,写放大Θ(klogkN/B) 读的顺序是MemTable->分层的sst ,性能会比B+ tree 略差,读放大Θ((log2N/B)/logk)
文件系统 append only log 直接在文件末尾追加,所有的的写都是顺序的,因此性能极高 不支持根据内容进行检索,只能根据文件偏移量执行查询

 

mysql 在大数据量的情况,性能会急剧下降,并且扩展性非常不友好。

分布式KV 存储 天然的分布式系统,对大数据量和未来的扩展都问题不大,LSM tree 对写性能和吞吐都比mysql 要好。查询其实是可以通过缓存等手段去优化,可以考虑。

但是,满足性能和吞吐量最优的毫无疑问是使用文件系统,因为消息不需要修改,读和写都是顺序读写,性能极高。

但是现实中的需求我们可能需要使用多个队列来完成不同的业务。比如一个队列来处理订单相关的业务,一个队列来处理商品相关的业务等等。那么我们该如何调整呢?

我们都知道文件 append only log 的方式是不支持根据消息的内容来搜索的,如果所有的队列的数据存在一个文件中,是没办法满足需求的。

换个思路,一个队列一个文件我们就可以绕开根据内容检索的需求,kafka 就是这么玩的。

这个时候,每个队列一个文件,读写还是顺序的吗

 

我们现在面临的问题是,作为一款面向业务的高性能消息中间件,随着业务的复杂度变高,队列数量是急剧变大的。

如果要保证写入的吞吐量和性能,还需要得所有的队列都写在同一个文件。

但是,按照队列消费的场景就意味着要根据消息内容(队列名字)来进行消费,append only log 是不支持检索的,如何解决这个问题。

 

我们会增加一个索引来处理慢sql 。我们是否也可以建立一个队列的索引,每一个队列就是一个索引文件。

读取数据的时候,先从索引队列找到消息在文件的偏移量后,在到数据文件去读取。

 

那么,索引的文件的数量变大的之后,那么对索引文件的读写不就是又变成随机读写了吗?性能又会急剧下降?

一个一个来解决:

  1. 写索引文件的时候,我们可以改成异步写,也就是写完数据文件,可以直接返回给客户端成功了,后台再由一个线程不停的从数据文件获取数据来构建索引,这样就可以解决写的性能瓶颈了
  2. 读的问题,我们要尽量避免直接从磁盘读,改成从内存读。放在内存就意味着索引的内容要足够小,不然根本放不下。所以问题就变成尽量控制索引文件的大小,放在内存里面来避开磁盘读从而提高性能

 (rocketmq 中数据文件称为:commitlog,   topic索引文件称为 consumeQueue)

方案 优点 缺点
每一个queue 都单独一个文件 消费的时候不需要独立建立一个索引,系统复杂度降低,并且性能高 当queue 很多的时候,并且每个queue 的数据量都不是很大情况,就会存在很多小文件,写和读都讲变成随机读,性能会受到影响
所有queue 共享一个文件 所有的写都是顺序写的,性能比较高,可以支撑大量queue 性能也不至于下降的厉害 1、需要建立独立的索引文件,查询数据的链路变长,需要先从索引查到数据再到数据文件查询2、索引队列本身也是小文件,好在因为数据量少,基本可以常驻内存3、读变成随时读,不过整体还是顺序读

 

我们得出结论:选择文件系统,append only log.根据消息队列即时消费和顺序读写的特点,刚写入的内容还在page cache ,就被读走了,甚至都不需要回到磁盘,性能会非常高。

 

1.5 数据量大了存储怎么办

本地切割,大文件变小文件

如果所有的数据都存在一个commitlog 文件的话,随着数据量变大,文件必然会非常大。

解决方案是,我们大文件切换成小文件,每个文件固定大小1G,写满了就切换到一个新的文件

分布式存储

消息队列的第一个特点就是数据量大,一台机器容易面临瓶颈,因此我们需要把数据均衡的分发到各个机器上。

解决方案是,一段很长的队列平均切成N份,把这N份分别放到不同的机器上

1.6 消息高可靠

虽然消息已经分成切分成为多份放到不同的机器了,但是每一份都是都只有一个副本,也就意味着,任何一台机器的硬盘坏掉的话,该机器上的消息就会丢失掉了,这是不可接受的。

行业通常的做法一份数据存多个副本,并且确保所有的副本不能全都在同一台机器。

问题来了,那么这多份数据是同步双写还是异步双写呢?

方案 优点 缺点
同步双写 数据不会丢失 性能会降低,单个RT变长
异步双写 单个RT 更加小,性能更高,吞吐量更大 数据可能会丢失

 

其实每个业务场景需求是不一样的,RocketMq 是支持可配置的

 

1.7 Broker是怎么保存数据的呢?

RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件

 

RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对ProducerConsumer分别采用了数据索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。

只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据

 

所以,Broker是怎么保存数据的呢?Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。

 
Broker

CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。

 
log

 

由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。

 
log
 

1.8 RocketMQ怎么对文件进行读写

RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache顺序读写零拷贝

1.8.1 PageCache、顺序读取

RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为Deadline(此时块存储采用SSD的话),随机读的性能也会有所提升。

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取

1.8.2 零拷贝

RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

什么是零拷贝
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。
 

 

 传统文件传输示意图

 

所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。

 

 

 mmap示意图

 

 

1.9 消息刷盘怎么实现

RocketMQ提供了两种刷盘策略:同步刷盘异步刷盘

  • 同步刷盘:在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
  • 异步刷盘:异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中

Broker在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成

异步而言,只是唤醒对应的线程,不保证执行的时机,

 

 

1.10 RocketMQ的负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

1.10.1 Producer的负载均衡

Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not availableBroker代理。

 
Producer负载均衡:索引递增随机取模
public MessageQueue selectOneMessageQueue(){
    //索引递增
    int index = this.sendWhichQueue.incrementAndGet();
    //利用索引取随机数,取余数
    int pos = Math.abs(index) % this.messageQueueList.size();
    if(pos<0){
        pos=0;  
    }
    return this.messageQueueList.get(pos);
}

 

所谓的latencyFaultTolerance,是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

1.10.2 Consumer的负载均衡

RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

  1. Consumer端的心跳包发送
    Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。

  2. Consumer端实现负载均衡的核心类—RebalanceImpl
    Consumer实例的启动流程中的启动MQClientInstance实例部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。
    通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,这个方法是实现Consumer端负载均衡的核心。
    rebalanceByTopic()方法会根据消费者通信类型为广播模式还是集群模式做不同的逻辑处理

 

1.11 RocketMQ消息长轮询

所谓的长轮询,就是Consumer拉取消息,如果对应的Queue如果没有数据,Broker不会立即返回,而是把 PullReuqest hold起来,等待 queue消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest

 //如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
          pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
     }

    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封装一个PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
   //把PullRequest挂起来
   this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
     response = null;
     break;
}

 

 

挂起的请求,有一个服务线程会不停地检查,看queue中是否有数据,或者超时。

PullRequestHoldService#run()

@Override
public void run() {
   log.info("{} service started", this.getServiceName());
   while (!this.isStopped()) {
       try {
          if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                 this.waitForRunning(5 * 1000);
          } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

         long beginLockTimestamp = this.systemClock.now();
         //检查hold住的请求
         this.checkHoldRequest();
         long costTime = this.systemClock.now() - beginLockTimestamp;
         if (costTime > 5 * 1000) {
              log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
         }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    log.info("{} service end", this.getServiceName());
    }

 

1.12 RocketMQ为什么速度快?

是因为使用了顺序存储、Page Cache和异步刷盘。

1、我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多。

2、写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache。

3、最后由操作系统异步将缓存中的数据刷到磁盘。

 

二:RocketMQ的基本架构

 RocketMQ 一共有四个部分组成:NameServerBrokerProducer 生产者Consumer 消费者,它们对应了:发现,为了保证高可用,一般每一部分都是集群部署的

1.1 NameServer

NameServer 是一个无状态的服务器,角色类似于 Kafka使用的 Zookeeper,但比 Zookeeper 更轻量。

特点:
每个 NameServer 结点之间是相互独立,彼此没有任何信息交互。
Nameserver 被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer中获取 Topic 的路由信息也就是发往哪个 BrokerConsumer 也会定时从 NameServer获取 Topic的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 TopicNameServer
功能主要有两个:

  • Broker 结点保持长连接。
  • 维护 Topic 的路由信息。

1.2 Broker

消息存储和中转角色,负责存储和转发消息

Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)

单个 Broker 与所有的 Nameserver 保持着长连接和心跳,并会定时将 Topic 信息同步到 NameServer,和 NameServer 的通信底层是通过 Netty 实现的。

1.3 Producer

消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。

Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
RocketMQ 提供了三种方式发送消息:同步异步单向

  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集

1.4 Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

Consumer也由用户部署,支持PUSHPULL两种消费模式,支持集群消费和广播消费,提供实时的消息订阅机制。

  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息

2 RocketMQ 原理

2.1 RocketMQ整体工作流程

RocketMQ是一个分布式消息队列,也就是消息队列+分布式系统

作为消息队列,它是发-存-收的一个模型,对应的就是Producer、Broker、Cosumer;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer

主要的工作流程:RocketMQNameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干BrokerRocketMQ进程)组成:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

2.2 如何保证RocketMQ的高可用?

NameServer因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

RocketMQ的高可用主要是在体现在Broker的读和写的高可用,Broker的高可用是通过集群主从实现的。

Broker可以配置两种角色:Master和Slave,Master角色的Broker支持读和写,Slave角色的Broker只支持读,Master会向Slave同步消息。

也就是说Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用

如何达到发送端写的高可用性呢?

在创建 Topic 的时候,把 Topic 的多个Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把Slave自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。

 

2.3 Master和Slave之间是怎么同步数据的呢?

而消息在master和slave之间的同步是根据raft协议来进行的:

1、在broker收到消息后,会被标记为uncommitted状态
2、然后会把消息发送给所有的slave
3、slave在收到消息之后返回ack响应给master
4、master在收到超过半数的ack之后,把消息标记为committed
5、发送committed消息给所有slave,slave也修改状态为committed

 

2.4 为什么RocketMQ不使用Zookeeper作为注册中心呢?

Kafka采用Zookeeper作为注册中心(也开始逐渐去Zookeeper),

RocketMQ不使用Zookeeper其实主要可能从这几方面来考虑:

  1. 基于可用性的考虑,根据CAP理论【Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性),不能同时成立】,Zookeeper满足的是CP,并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper  节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,这也是RocketMQ的设计理念,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

 

 三:RocketMQ使用相关问题

1. 如何保证消息的可用性/可靠性/不丢失呢?

消息的一个处理方式是异步发送,那消息可靠性怎么保证?

消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。

 

 

1.1 生产者丢失

生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。

由于同步发送的一般不会出现这样使用方式。

异步发送的场景下,一般分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。

 

所以在生产阶段,主要通过请求确认机制,来保证消息的可靠传递

  • 1、同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。
  • 2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
  • 3、如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功

 

以下单的场景举例。

1、下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚(订单数据、MQ消息记录都不会保存)。

2、下单成功,直接返回客户端成功,异步发送MQ消息。

3、MQ回调通知消息发送结果,对应更新数据库MQ发送状态。

4、JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。

 
MQ

异步回调的形式是适合大部分场景下的一种解决方案。

1.2 MQ 存储丢失

如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。

比如RocketMQ:

RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

比如Kafka也可以通过配置做到:

 
acks=all 只有参与复制的所有节点全部收到消息,才返回生产者成功。这样的话除非所有的节点都挂了,消息才会丢失。
replication.factor=N,设置大于1的数,这会要求每个partion至少有2个副本
min.insync.replicas=N,设置大于1的数,这会要求leader至少感知到一个follower还保持着连接
retries=N,设置一个非常大的值,让生产者发送失败一直重试

虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

 

所以存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。

  • 1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
  • 2、Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
  • 3、Broker通过主从模式来保证高可用,Broker支持Master和Slave同步复制、Master和Slave异步复制模式,生产者的消息都是发送给Master,但是消费既可以从Master消费,也可以从Slave消费。同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。

 

 图:同步刷盘和异步刷盘

 

1.3 消费者丢失

消费者丢失消息的场景1:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。

消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)

 
MQ

消费者丢失消息的场景2:消费者收到消息,但消费业务逻辑出错,消费失败。
解决:利用前面提到的MQ本地表,消费者收到消息且业务逻辑执行完毕后再更新MQ消息的状态(更新为已消费)

 

所以从Consumer角度分析,如何保证消息被成功消费?

  • Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

 

2 如何处理消息重复的问题呢?

对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。

处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等消息去重

 

 

业务幂等:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。

消息去重:第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。

具体做法是可以建立一个消费记录表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。

 

 

3 怎么处理消息积压?

3.1 消息积压

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

  • 消费者扩容:如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  • 消息迁移Queue扩容:如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。

 

3.2 如果消费者一直消费失败导致消息积压怎么处理?

我们可以从以下几个角度来考虑:

1、消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费。

2、如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息。

3、处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状。

 
MQ

3.3 那如果消息积压达到磁盘上限,消息被删除了怎么办?

最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。

如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。

 

 4 顺序消息如何实现?

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。

 

顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的所有消息都要保证顺序;

部分顺序消息只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 个消息能按顺序消费即可。

部分顺序消息

部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个Message Queue读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。

 发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue 。
 消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题。

 

全局顺序消息

RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。

要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了。

 

5 如何实现消息过滤?

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。

对消息的过滤有三种方式:

  • 根据Tag过滤:这是最常见的一种,用起来高效简单

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
     
  • SQL 表达式过滤:SQL表达式过滤更加灵活

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    // 只有订阅的消息有这个属性a, a >=0 and a <= 3
    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
    });
    consumer.start();
     
  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

6 RocketMQ怎么实现延时消息的?

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 启动生产者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 发送消息
          producer.send(message);
      }
 

但是目前RocketMQ支持的延时级别是有限的:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
 

RocketMQ怎么实现延时消息的:临时存储+定时任务

Broker收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的Message Queue中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息。

7 什么是事务消息、半事务消息?怎么实现的?

事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。

半事务消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。

实现原理如下:

 
事务
 
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查。
  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、如果本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、如果发生异常情况,Broker 端迟迟等不到二次确认。在一定时间后,MQ对生产者发起消息回查,到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端,再次提交二次确认。(5,6,7 是消息回查)
  • 8、最终,消费者消费到消息,二次确认commit,就可以把消息投递给消费者,执行本地事务。反之如果是rollback,消息会保存下来并且在3天后被删除

 

8 什么是死信队列?

死信队列用于处理无法被正常消费的消息,即死信消息。

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试

达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列

死信消息的特点

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。

死信队列的特点

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

 

四:RocketMQ性能优化

1.JVM层面

(1)STW

监控暂停

 rocketmq-console 这个是官方提供了一个 WEB 项目,可以查看 rocketmq数据和执行一些操作。但是这个监控界面又没有权限控制,并且还有一些消 耗性能的查询操作,如果要提高性能,建议这个可以暂停

消除偏向锁

 -XX:-UseBiasedLocking: 禁用偏向锁

(2)垃圾回收

RocketMQ 推荐使用 G1 垃圾回收器

    -Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是 8g 内存,新生代是 4g 内存。

    如果是内存比较大,比如有 48g 的内存,所以这里完全可以给他们翻几倍,比如给堆内存 20g,其中新生代给 10g,甚至可以更多些,当然要留一些内存给操作系统来用

   -XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的,这是选用了G1垃圾回收器来做分代回收,对新生代和老年代都是用G1来回收。这里把G1的region大小设置为了16m,这个因为机器内存比较多,所以region 大小可以调大一些给到16m,不然用2m的region, 会导致region数量过多。

    -XX:G1ReservePercent=25:这个参数是说,在 G1 管理的老年代里预留 25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了。默认值是 10%,略微偏少,这里 RocketMQ 给调大了一些。

   -XX:initiatingHeapOccupancyPercent= :30:这个参数是说,当堆内存的使用率达到 30%之后就会自动启动 G1 的并发垃圾回收,开始尝试回收一些垃圾对象。默认值是 45%,这里调低了一些,也就是提高了 GC 的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题。

   -XX:-OmitStackTraceInFastThrow:这个参数是说,有时候 JVM 会抛弃-些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来。

   -XX:+AIwaysPreTouch:这个参数的意思是我们刚开始指定 JVM 用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给他。所以使用这个参数之后,就是强制让 JVM 启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配。

   -XX:-UseLargePages:这个参数的意思是禁用大内存页,某些情况下会导致内存浪费或实例无法启动。默认启动。

2.操作系统层面

(1)基本参数

# vim /etc/sysctl.conf

 

(2)网络接口控制器 NIC – network interface controller

一个请求到 RocketMQ 的应用,一般会经过网卡、内核空间、用户空间

 

 

(3)Kernel

 

 

在操作系统级别,是可以做软中断聚合的优化。

网卡队列 CPU 绑定

缓冲区调整

队列大小调整等

 

 

文: 一只阿木木

 

Exit mobile version