RabbitMQ 入門系列:6、保障消息:不丟失:發送方、Rabbit存儲端、接收方。

系列目錄

RabbitMQ 入門系列:1、MQ的應用場景的選擇與RabbitMQ安裝。

RabbitMQ 入門系列:2、基礎含義:鏈接、通道、隊列、交換機。

RabbitMQ 入門系列:3、基礎含義:持久化、排它性、自動刪除、強制性、路由鍵。

RabbitMQ 入門系列:4、基礎編碼:官方SDK使用:鏈接創建、單例改造、發送消息、接收消息。

RabbitMQ 入門系列:5、基礎編碼:交換機的進階介紹及編碼方式。

RabbitMQ 入門系列:6、保障消息:不丟失:發送方、Rabbit存儲端、接收方。

RabbitMQ 入門系列:7、保障消息:不重複消費:產生消息的唯一ID。

RabbitMQ 入門系列:8、擴展內容:接收信息時:可否根據RoutingKey過濾監聽信息,答案是不能。

RabbitMQ 入門系列:9、擴展內容:死信隊列:真不適合當延時隊列。

RabbitMQ 入門系列:10、擴展內容:延時隊列:延時隊列插件及其有限的適用場景。

前言:

本篇簡單介紹如何保障消息不丟失的處理方式。

1、保障消息不丟失:發送方

主要是通過消息確認或事務,來保障這個過程,下面見具體代碼:

1、通過確認機制處理的代碼:

using RabbitMQ.Client;
using System.Text;using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.ConfirmSelect();//開啟確認
    channel.QueueDeclare("FirstQueue", false, false, false);
    channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("這是要發送的內容"));
    if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))//設置最長超時時間
    {
        //發送確認成功
    }
    else
    {
        //超時或失敗,需要處理是否重發消息。
    }
}

2、通過事務機制處理的代碼:

using RabbitMQ.Client;
using System.Text;

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.TxSelect();
    channel.QueueDeclare("FirstQueue", false, false, false);
    channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("這是要發送的內容"));
    try
    {
        channel.TxCommit();
    }
    catch (Exception)
    {
        channel.TxRollback();
        //處理事務提交失敗的邏輯。
    }
}

2、保障消息不丟失:RabbitMQ端

對於RabbitMQ端的消息保障,我們人為可以處理的是,設置創建的隊列或消息是否持久化。

通過創建持久化的隊列或消息,可以保障消息寫入硬盤,重啟時仍能還原信息。

//第二個參數:是否持久化
channel.QueueDeclare("FirstQueue", true, false, false);

3、保障消息不丟失:接收方

接收方主要是通過消息確認,來指示是否收到信息。

var channel = Rabbit.Instance.DefaultConnection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine("收到默認消息 {0}", message);
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: true,
                      consumer: consumer);

在以上代碼中,通過指定authAck可以自動回應收到信息。

當然,有需要也可以手動回應:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine("收到默認消息 {0}", message);
  try
    {
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception err)
    {

       //處理確認失敗的情況。
    }
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: false,
                      consumer: consumer);

說明:

為了避免消息丟失問題,消息的確認,最好在是業務處理完再進行確認。

否則會出現第三方中介出問題時,或業務處理出問題時,或剛確認好消息,業務還沒處理就系統異常,導致消息未消費就丟失的問題。

 

4、發送方:還有一種情況:通過交換機發送過去,但交換機沒送到指定的隊列時

這時候應答也是正常的,但數據丟失,這種情況,是這樣處理的:

 

就兩點:

1、發送信息BasicPublish方法的第三個參數:mandatory設置為true。

2、定義接收的回調:BasicReturn事件。

總結:

本篇簡單介紹如何使用RabbitMQ消息時,做到消息的可靠性,不丟失。