RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。
系列目录
前言:
本篇简单介绍如何保障消息不丢失的处理方式。
1、保障消息不丢失:发送方
主要是通过消息确认或事务,来保障这个过程,下面见具体代码:
1、通过确认机制处理的代码:
using RabbitMQ.Client;
using System.Text;using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
channel.ConfirmSelect();//开启确认
channel.QueueDeclare("FirstQueue", false, false, false);
channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));
if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))//设置最长超时时间
{
//发送确认成功
}
else
{
//超时或失败,需要处理是否重发消息。
}
}
2、通过事务机制处理的代码:
using RabbitMQ.Client;
using System.Text;
using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
channel.TxSelect();
channel.QueueDeclare("FirstQueue", false, false, false);
channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));
try
{
channel.TxCommit();
}
catch (Exception)
{
channel.TxRollback();
//处理事务提交失败的逻辑。
}
}
2、保障消息不丢失:RabbitMQ端
对于RabbitMQ端的消息保障,我们人为可以处理的是,设置创建的队列或消息是否持久化。
通过创建持久化的队列或消息,可以保障消息写入硬盘,重启时仍能还原信息。
//第二个参数:是否持久化
channel.QueueDeclare("FirstQueue", true, false, false);
3、保障消息不丢失:接收方
接收方主要是通过消息确认,来指示是否收到信息。
var channel = Rabbit.Instance.DefaultConnection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine("收到默认消息 {0}", message);
};
channel.BasicConsume(queue: "FirstQueue",
autoAck: true,
consumer: consumer);
在以上代码中,通过指定authAck可以自动回应收到信息。
当然,有需要也可以手动回应:
var channel = Rabbit.Instance.DefaultConnection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine("收到默认消息 {0}", message);
try
{
channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception err)
{
//处理确认失败的情况。
}
};
channel.BasicConsume(queue: "FirstQueue",
autoAck: false,
consumer: consumer);
说明:
为了避免消息丢失问题,消息的确认,最好在是业务处理完再进行确认。
否则会出现第三方中介出问题时,或业务处理出问题时,或刚确认好消息,业务还没处理就系统异常,导致消息未消费就丢失的问题。
4、发送方:还有一种情况:通过交换机发送过去,但交换机没送到指定的队列时
这时候应答也是正常的,但数据丢失,这种情况,是这样处理的:

就两点:
1、发送信息BasicPublish方法的第三个参数:mandatory设置为true。
2、定义接收的回调:BasicReturn事件。
总结:
本篇简单介绍如何使用RabbitMQ消息时,做到消息的可靠性,不丢失。