分布式消息队列

消息队列

什么是消息队列

消息队列(Message Queue),简称为MQ,是分布式应用程序之间的通讯方法

常用的消息队列 RabbitMQ ActiveMQ RocketMQ Kafka

一 RabbitMQ

  RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

  主要特性:

  1. 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;

  2. 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;

  3. 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;

  4. 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;

  5. 多种协议的支持:支持多种消息队列协议;

  6. 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言;

  7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;

  8. 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;

  9. 插件机制:官方啥都没有,全靠第三方,但同时也增强了,插件的扩展性(个人以为)!

优点:
    1. 由于erlang语言的特性,mq 性能较好,高并发;

    2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;

    3. 有消息确认机制和持久化机制,可靠性高;

    4. 高度可定制的路由;

    5. 管理界面较丰富,在互联网公司也有较大规模的应用;

    6. 社区活跃度高;
缺点:
    1. 尽管结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护;

    2. 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,但是使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;

    3. 需要学习比较复杂的接口和协议,学习和维护成本较高;

二 ActiveMQ

 ActiveMQ是由Apache出品,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

  主要特性:

  1. 服从 JMS 规范:JMS 规范提供了良好的标准和保证,包括:同步或异步的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;

  2. 连接性:ActiveMQ 提供了广泛的连接选项,支持的协议有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性。

  3. 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP ;

  4. 持久化插件和安全插件:ActiveMQ 提供了多种持久化选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行自定义鉴权和授权;

  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;

  6. 代理集群:多个 ActiveMQ 代理可以组成一个集群来提供服务;

  1. 异常简单的管理:ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通过处理 JMX 的告警消息,通过使用命令行脚本,甚至可以通过监控各种类型的日志。
优点:
        1. 跨平台(JAVA编写与平台无关有,ActiveMQ几乎可以运行在任何的JVM上)

        2. 可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都是开发人员最熟悉的存储介质。将消息存到数据库,看得见摸得着。而且公司有专门的DBA去对数据库进行调优,主从分离;

        3. 支持JMS :支持JMS的统一接口;

        4. 支持自动重连;

        5. 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权。

        6. 监控完善:拥有完善的监控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;

        7. 界面友善:提供的Web Console可以满足大部分情况,还有很多第三方的组件可以使用,如hawtio;
缺点:
        1. 社区活跃度不及RabbitMQ高;

        2. 根据其他用户反馈,会出莫名其妙的问题,会丢失消息;

        3. 目前重心放到activemq6.0产品-apollo,对5.x的维护较少;

   4. 不适合用于上千个队列的应用场景;

三 RocketMQ

      RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

        主要特性:

        1. 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;

        2. Producer、Consumer、队列都可以分布式;

        3. Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;

        4. 能够保证严格的消息顺序;

        5. 提供丰富的消息拉取模式;

        6. 高效的订阅者水平扩展能力;

        7. 实时的消息订阅机制;

        8. 亿级消息堆积能力;

        9. 较少的依赖;
优点:
        1. 单机支持 1 万以上持久化队列

        2. RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。

        3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);

        4. 性能非常好,可以大量堆积消息在broker中;

        5. 支持多种消费,包括集群消费、广播消费等。

        6. 各个环节分布式扩展设计,主从HA;

        7. 开发度较活跃,版本更新很快。
缺点:
        1. 支持的客户端语言不多,目前是java及c++,其中c++不成熟;

        2. RocketMQ社区关注度及成熟度也不及前两者;

        3. 没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各种问题;

        4. 没有在 mq 核心中去实现JMS等接口;

四 Kafka

Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( adistributed commit log),,之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

  主要特性:

  1. 快速持久化,可以在O(1)的系统开销下进行消息持久化;

  2. 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;

  3. 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;

  4. 支持同步和异步复制两种HA;

  5. 支持数据批量发送和拉取;

  6. zero-copy:减少IO操作步骤;

  7. 数据迁移、扩容对用户透明;

  8. 无需停机即可扩展机器;

  9. 其他特性:严格的消息顺序、丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制;

优点
1. 客户端语言丰富,支持java、.net、php、ruby、python、go等多种语言;

  2. 性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;

  3. 提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积;

  4. 支持批量操作;

  5. 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;

  6. 有优秀的第三方Kafka Web管理界面Kafka-Manager;

  7. 在日志领域比较成熟,被多家公司和多个开源项目使用;
缺点
 1. Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长

  2. 使用短轮询方式,实时性取决于轮询间隔时间;

  3. 消费失败不支持重试;

  4. 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;

  5. 社区更新较慢;

  2. RabbitMQ/ActiveMQ/RocketMQ/Kafka对比

  这里列举了上述四种消息队列的差异对比:
四种消息队列差异对比图

为什么使用消息队列,消息队列的好处

消息队列的好处就6个字,异步、程序解耦、销峰

一 解耦
传统模式:

缺点:耦合性,太强,每一个消费者接入,都要修改代码

说到这里要说一下生产者消费者这两个概念,我一直没有注意过这连个词,因为消费者和生产者,出现往往就伴随着分布式,但是今天画这张图的时候,我才有了这想法,其实假如有两个线程,其中线程a生产了一条数据,然后线程b,那走了这条数据,那么其实线程a就是生产者,线程b就是消费者,消费者和和生产者,只是一种供求方式,并不是专属于,分布式!

消息队列作为中间件

优点:生产者将消息发送到消息队列中,消费者去消息队列中取,实现了解耦,生产者无需修改代码!

二 异步
传统方式注册

缺点:耦合性高及一些非必要的业务逻辑以同步的方式运行,太耗费时间。

消息队列作为中间件

优点“:即实现了解耦同时异步处理一些非必要的逻辑,加快了页面响应速度!

三 消峰
传统模式

消息队列作为中间件模式

使用消息队列的缺点:

一 系统可用性降低

原来的系统如果没问题那就一直没问题,但是当你加入了消息队列,那么当消息队列挂了呢?那系统就蹦了!

二 提高系统复杂性

要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

如何保证消息不被重复

消息重复是如何引起的

其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,当消息消费后需要提交ack但是当RabbitMQ确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失此时就需要其中手动确认方式则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果手动签收后因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

以转账为例:
1.发送消息 
2.消息内容包含了id 和 版本和 金额
3.消费者接收到消息,则根据ID 和版本执行sql语句,
update account set money=money-?,version=version+1 where id=? and version=?
4.如果消费第二次,那么同一个消息内容是修改不成功的。

以上大意就是在,支付数据提交到消息中间件后,由消息中间件将支付成功发送给消费放,当消息方因为网络故障或者其他情况收不到消息,就有中间件多次发送,当消费方接受到消息后,在插入数据库之前进行判断如果数据库显示支付成功那么则不修改,如果数据库显示为支付则修改!

如何保证消息队列高可用

前文又说,消息队列的缺点 系统可用性降低,一旦只有一台消息队列,一旦他挂了那么整个系统就崩了!

所以说一台消息队列就是demo级别的,自己拿来玩玩就行!

![](/attachment/20201202/耿直的微笑.jpg

说下第二种

普通集群模式

普通集群模式就是在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是创建的queue只会放在一个rabbitmq实例上面,但是其他的实例都同步了这个queue的元数据。在你消费的时候,如果连接到了另一个实例,他会从拥有queue的那个实例获取消息然后再返回给你。

这种呢和所谓的分布式,高可用毫无关系!

直接说第三种

镜像队列

什么是镜像队列

默认情况下,rabbitmq集群中的队列内容位于单个节点(声明队列的节点)上。这与交换器和绑定形成了对比,交换器和绑定始终可视为位于所有节点上。队列可以选择性地跨多个节点进行镜像。

每个镜像队列由一个master和一个或多个mirrors组成。主节点位于一个通常称为master的节点上。每个队列都有自己的主节点。给定队列的所有操作首先应用于队列的主节点,然后传播到镜像。这包括队列发布(enqueueing publishes)、向消费者传递消息、跟踪消费者的确认等等。

队列镜像意味着是一个集群内的节点。因此,不建议跨广域网使用它(当然,客户机仍然可以根据需要尽可能近地连接)。

发布到队列的消息将复制到所有镜像。不管消费者连接到哪个节点,都会连接到master,镜像会删除在master上已确认的消息。因此,队列镜像提高了可用性,但不会在节点之间分配负载(所有参与节点都完成所有工作)。

如果承载队列master的节点出现故障,则最旧的镜像将升级为新的master,只要它已同步。根据队列镜像参数,也可以升级未同步的镜像。

在分布式系统中,通常使用多个术语来标识主副本和辅助副本。本指南通常使用“master”来指代队列的主副本,使用“mirror”指代辅助副本。但是,你会发现很多地方都使用了”slave”。这是因为rabbitmq CLI工具在历史上一直使用术语“slave”来指代辅助设备。因此,这两个术语目前都可以互换使用,但我们希望最终摆脱遗留术语。

在任意的节点(A或者B)中执行如下命令:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

解释
rabbitmqctl set_policy 
	用于设置策略
ha-all 
	表示设置为镜像队列并策略为所有节点可用 ,意味着 队列会被(同步)到所有的节点,当一个节点被加入到集群中时,也会同步到新的节点中,此策略比较保守,性能相对低,对接使用半数原则方式设置(N/2+1),例如:有3个结点 此时可以设置为:ha-two 表示同步到2个结点即可。
"^"  表示针对的队列的名称的正则表达式,此处表示匹配所有的队列名称
'{"ha-mode":"all"}' 设置一组key/value的JSON 设置为高可用模式 匹配所有exchange

此时查看web管理界面:添加一个队列itcast111,如下图已经可以出现结果为有一个结点,并且是ha-all模式(镜像队列模式)

如何处理消息丢失的问题

其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,当消息消费后需要提交ack但是当RabbitMQ确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失此时就需要其中手动确认方式则需要在业务处理成功后,调用channel.basicAck(),手动签收如果出现异常,则调用channel.basicNack()等方法,让其按照业务功能进行处理,比如:重新发送,比如拒绝签收进入死信队列等等。!这个方式可以消息丢失但是不能解决消息重复

何保证消息的顺序性

针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中比如rabbitMq中queue然后只用一个消费者去消费该队列。

那如果为了吞吐量,有多个消费者去消费怎么办?

下面的就有点抽象了

RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。

RabbitMQ解决方案

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

比如说发朋友圈,评论,删除,三个异步操作,你朋友圈还没有发出去,评论显然是执行不了的,所以等,等朋友圈发完了,再去评论!

总而言之呢保证入队有序,出队让消费者自行决定!

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理

首先分析一下什么情况下回出现这个问题

可能你的消费端出了问题,不消费了,或者消费的极其极其慢。可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是整个这就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如rabbitmq设置了消息过期时间后就没了怎么办?

所以就这事儿,其实线上挺常见的,一般不出,一出就是大case,一般常见于,举个例子,消费端每次消费之后要写mysql,结果mysql挂了,消费端hang那儿了,不动了。或者是消费端出了个什么叉子,导致消费速度极其慢。

一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:

第一种情况

先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉

新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量

然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue

接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据

这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据

等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

第二种情况

如果是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。

这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如打局绝地求生英雄联盟啥都很快就到凌晨了,用户都睡觉了。

这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。

假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次

第三种情况

如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

图解