RabbitMQ的消息持久化处理
- 2019 年 11 月 5 日
- 筆記
1、RabbitMQ的消息持久化处理,消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化。
2、autoDelete属性的理解。
1)、@Queue: 当autoDelete属性设置到该注解的时候,含义即是,当所有消费者客户端连接断开后,是否自动删除队列,当设置值是true的时候删除该队列,当值是false的时候不删除该队列。
2)、@Exchange:当autoDelete属性设置到该注解的时候,含义即是,当所有绑定队列都不在使用时,是否自动删除交换器,当设置值是true的时候删除该交换器,当值是false的时候不删除该交换器。
3、之前写过RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配),这里借助这个进行消息持久化测试。生产者的代码不发生改变,这里只是将消费者的autoDelete = "true"属性修改为autoDelete = "false",进行对比测试。
Info级别的日志进行消息的持久化操作,即队列不进行自动删除。将autoDelete = "false"即可。
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 1、@RabbitListener bindings:绑定队列 18 * 19 * 2、@QueueBinding 20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器 21 * 22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列 23 * 24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "false"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT), 34 35 key = "${rabbitmq.config.queue.info.routing.key}")) 36 public class LogInfoConsumer { 37 38 /** 39 * 接收消息的方法,采用消息队列监听机制. 40 * 41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面 42 * 43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("INFO消费者===>消费: " + msg); 51 } 52 53 }
Error级别的日志进行消息的持久化操作,即队列进行自动删除。将autoDelete = "true"即可。
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 1、@RabbitListener bindings:绑定队列 18 * 19 * 2、@QueueBinding 20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器 21 * 22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列 23 * 24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT), 34 35 key = "${rabbitmq.config.queue.error.routing.key}")) 36 public class LogErrorConsumer { 37 38 /** 39 * 接收消息的方法,采用消息队列监听机制. 40 * 41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面 42 * 43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("ERROR消费者===>消费<===消息message: " + msg); 51 } 52 53 }
4、启动你的生产者,启动你的消费者,观察RabbitMQ的图形化界面。未生产消息、未消费消息的界面如下所示:
![](https://ask.qcloudimg.com/http-save/6430374/as9f9250hp.png)
生产消息、消费消息的界面如下所示,我这里还使用浏览器访问控制层触发生产者生产消息,消费者消费消息:
![](https://ask.qcloudimg.com/http-save/6430374/y80b3v490z.png)
现在停止你的消费者,记录消息到第几条消息了。方便再次启动消费者进行观察。
![](https://ask.qcloudimg.com/http-save/6430374/qv9q0clsi8.png)
![](https://ask.qcloudimg.com/http-save/6430374/e7oiyxq79g.png)
启动你的消费者,观察,看看是从第几条开始消费的。可以看到消息从第82条开始消费的。
![](https://ask.qcloudimg.com/http-save/6430374/w5l5i6ns72.png)
RabbitMQ的消息持久化处理,Ready是对未接收到的数据状态表示,如果RabbitMQ在队列里面存放的消息未被消费者所消费,那么会给未消费的消息加一个标记,表示当前这个消息未被消费。消息持久化处理解决了丢失消息的这种状况,我们可以接收到消息,就是因为队列一直存在着呢,但是手动删除队列,消息也就丢失了,所以要慎重操作。当消费者停止以后,生产者生产的消息存储在RabbitMQ的服务器内存中,队列也存在内存中,数据在队列中,即数据保存在内存中。但是如果RabbitMQ的服务都停止了,队列也就消失了,队列消失了,数据也就丢失了。