RabbitMQ的高级特性概念理解

  • 2019 年 11 月 26 日
  • 笔记

1、RabbitMQ中的消息如何保障百分之百的投递成功?

  答:百分之百的投递成功,方案可以参考下面的2、3。

2、什么是生产者端的可靠性投递?

  答:第一步,生产者保障消息的成功发出。第二步,保障RabbitMQ的节点成功接收到生产者发送的消息。第三步,发送端收到RabbitMQ节点(即Broker)确认应答。第四步,完善的消息进行补偿机制。

3、如何实现生产端的可靠性投递,解决方案,如下所示。

  答:第一种,消息落库,对消息状态进行打标。意思是发送消息的时候,将消息持久化到数据库中,将消息设置一个状态,比如,刚发送出去,消息状态叫做发送中,当消息到达Broker端,Broker端返回给你一个响应,当你收到这个响应,代表了Broker端已经收到了该条消息(即应答确认),此时将消息的状态设置为发送成功,做一个打标。此时,针对没有进行响应的消息状态,可以做轮询操作,将未返回给你响应的消息进行重新发送(最大努力尝试值,可以手动设置)。直到所有消息发送成功并返回给你响应。

第二种,消息的延迟投递,做二次确认,回调检查。此种方式和减少和数据库的操作。第一种方式在高并发的场景下可能不是很适合。

4、RabbitMQ的幂等性概念。幂等性是什么,幂等性的概念。

  答:可能对一件事情进行一个操作,这个操作可能执行一百次或者一千次,最终执行结果都是相同的,即执行一百次或者一千次结果都是相同的。

5、消费端,幂等性保障,比如,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?比如在高并发的情况下,会有好多消息到达RabbitMQ的Broker,消费端监听大量的消息,难免出现消息的重复投递,网络闪断导致的消息重复投递。如果不做消费端,幂等就会出现消息重复消费。

  答:消费端实现幂等性,就意味着,我们的消息永远不会消费多次,只能消费一次,即使我们收到了多条一样的消息。

6、主流的幂等性操作。

  方案一:唯一ID(全局生成的id) + 指纹码机制,利用数据库主键去重方案。好处是实现比较简单。坏处是高并发下有数据库写入的性能瓶颈。解决方案跟进ID进行分库分表进行算法路由。

  方案二:利用Redis的原子性去实现,实现去重,实现幂等性操作。需要考虑的问题,第一个问题是我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性。第二个问题,如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。

7、Confirm确认消息详解,理解Confirm消息确认机制。

  答:消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

Confirm确认消息实现,如何实现Confirm确认消息?   第一步,在channel上开启确认模式,即channel.confirmSelect()。调用方法进行确认模式的选择。   第二部,在channel上添加监听,addConfirmListener,当broker回送应答的时候,根据addConfirmListener添加的监听进行接收broker回送的结果,根据回送的结果。监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志等后续处理。

8、Return消息机制,Return Listener用于处理一些不可路由的消息。也是生产段添加的一个监听。

  我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消息处理操作。但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener。

  Return消息机制,在基础api中有一个关键的配置项。Mandatory,如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

9、RabbitMQ消费端的限流策略,什么是消费端的限流。假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况。巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。

  1)、消费端限流,RabbitMQ提供了一种Qos(quality of service服务质量保证)功能,即在非自动确认消息的前提下,如果有一定数据的消息堆积,可以设置一个限制,我们不去更新消费,不去做ack,不去做确认机制。如果有一定数目的消息(通过基于consume或者channel设置Qos的值),消息未被确认前,不进行消费新的消息。Rabbitmq有两种签收模式,一种是自动签收,一种是手动签收。如果做消费端限流的话,不能设置自动签收模式,即autoack=false。

  2)、消费端的方法void BasicQos(unit prefetchSize,ushort prefetchCount,bool global);

    a、参数1是prefetchSize消息大小的限制,设置为0,不做大小限制。

    b、参数2是prefetchCount一次最多可以处理的消息,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consume将block掉,直到有消息ack。

    c、参数3是global是什么方式应用的,true表示channel的级别,false表示consume进行限制,true或者false是否将上面设置应用于channel简单点说,就是上面限制是channel级别的还是consumer级别。

  3)、注意:prefetchSize和global这两项,rabbitmq没有实现,暂且不研究prefetch_count在no_ask=false的情况下生效,即在自动应答(自动签收)的情况下这两个值是不生效的。

  4)、消费端的限流策略,开发步骤如下所示:

第一步,限流方式,第一件事就是autoAck设置为false。     第二步,channel.basicQos(0,1,false);     第三步,手动确认,channel.basicAck(envelope.getDeliveryTag(),flase);参数1是deliveryTag,表示这一条消息已经处理完了,可以给一下条了,参数2是不批量签收,我们一条一条进行签收。

10、Rabbitmq的消费端ACK与重回队列。

  答:消费端可以进行手动的ACK和NACK。区分与ack自动确认签收,手动的ACK是代表消息确认了,消息已经收到了,确认了会给Broker端发送一个请求,说我已经收到了。手动NACK是代表了消息未进行确认,消息未收到或者处理失败了,Broker端将未收到的消息重新发送一遍。消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消息成功。

  消费端的重回队列,消息未被处理成功,将该消息重新发送给Broker,消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker。一般我们在实际应用中,都会关闭重回队列,也就是设置为False。

11、RabbitMQ的TTL队列或者消息,TTL是Time To Live的缩写,也就是生存时间。

  RabbitMQ支持消息的过期时间,在消息发送的时候可以进行指定。RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

12、死信队列,DLX,Dead-Letter-Exchange。

  1)、RabbitMQ的死信队列是路由到交换机上面的,RabbitMQ的死信队列是和Exchange、队列息息相关的。利用DLX,当消息在一个队列中变成死信(dead message,即这个消息没有任何消费者消费)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX(死信队列,Dead-Letter-Exchange)。   2)、消息变成死信有以下几种情况。

    a、当消息被拒绝(basic.reject/basic.nack)并且requeue=false。     b、消息TTL(TTL是Time To Live的缩写,也就是生存时间)过期。     c、当队列达到了最大长度。

  3)、DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信的时候,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

  4)、死信队列设置,首先需要设置死信队列的exchange和queue,然后进行绑定。

    第一步、Exchange:dlx.exchange。死信队列就是一个正常的交换机Exchange。     第二步、Queue:dlx.queue。需要将队列和这个交换机Exchange进行绑定。     第三步、RoutingKey:#。任何的路由key都可以进行绑定。然后我们进行正常声明交换机,队列,绑定,只不过我们需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");最后需要设置一个监听来监听这个队列。这样消息在过期、requeue、队列在达到最大长度的时候,消息就可以直接路由到死信队列了。