RabbitMQ 入門系列:7、保障消息不重複消費:產生消息的唯一ID。

系列目錄

RabbitMQ 入門系列:1、MQ的應用場景的選擇與RabbitMQ安裝。

RabbitMQ 入門系列:2、基礎含義:鏈接、通道、隊列、交換機。

RabbitMQ 入門系列:3、基礎含義:持久化、排它性、自動刪除、強制性、路由鍵。

RabbitMQ 入門系列:4、基礎編碼:官方SDK使用:鏈接創建、單例改造、發送消息、接收消息。

RabbitMQ 入門系列:5、基礎編碼:交換機的進階介紹及編碼方式。

RabbitMQ 入門系列:6、保障消息:不丟失:發送方、Rabbit存儲端、接收方。

RabbitMQ 入門系列:7、保障消息:不重複消費:產生消息的唯一ID。

RabbitMQ 入門系列:8、擴展內容:接收資訊時:可否根據RoutingKey過濾監聽資訊,答案是不能。

RabbitMQ 入門系列:9、擴展內容:死信隊列:真不適合當延時隊列。

RabbitMQ 入門系列:10、擴展內容:延時隊列:延時隊列插件及其有限的適用場景。

前言:

上一篇介紹了消息如何確保不丟失,本篇簡單介紹如何保障消息不重複消費的處理方式。

對於接收端而言,對消息的確認來往是需要時間的。

如果同一個隊列,同時存在多個客戶端監聽,那麼多個客戶端有一定概率能收到相同的資訊。

這時候就會產生消息重複的問題的:

1、處理消息重複消費的幾種方式:

網上人們說的主要兩種方式:

方式一、Redis:用setnx命令,做消息id判斷。

方式三、資料庫:做唯一索引,消息id能插入就可以處理,所以重複消費就會失敗。

優缺點說明:

方式一:Redis setnx 分散式鎖:

不推薦使用:一種不太靠譜的方式,如果項目對數據允許出錯,可以嘗試使用。

為啥不靠譜,可以參考文章://zhuanlan.zhihu.com/p/418268774

方式二:資料庫 的鎖:

推薦使用:可以用唯一索引,也可以用事務鎖,使用起來成熟又簡單。

方式三:獨佔文件、剪貼版

推薦場景:單機場景下

1、可以通過對文件的獨佔打開和釋放,來達到進程間的鎖。

2、可以通過對剪貼版的讀寫,來達到進程間的鎖。

前面幾種方式,都有一個要素,就是需要消息的唯一ID。

2、如何產生消息的唯一ID:

A:對消息進行hash,取hash值做為唯一ID(無法避免,有一定概念產生相同的hash)。

B:對每一個發送的消息,都產生一個GUID,這樣在獲取消息的時候,就可以拿到消息對應的唯一ID。

下面看程式碼演示:

3、發送消息時,帶唯一ID:

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.BasicReturn += (sender, e) =>
    {
        //通過交換機發送過去,但沒發送到指定的隊列,數據丟失
        //do .....
        Console.WriteLine("消息沒發到指定隊列:" + Encoding.UTF8.GetString(e.Body.ToArray()));
    };
    channel.ConfirmSelect();
    channel.QueueDeclare("FirstQueue", false, false, false);
    var pro = channel.CreateBasicProperties();
    pro.MessageId = Guid.NewGuid().ToString();
    channel.BasicPublish("", "FirstQueue", true, pro, Encoding.UTF8.GetBytes("這是要發送的內容"));
    if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))
    {
        //發送確認成功
    }
    else
    {
        //超時或失敗,需要處理是否重發消息。
    }
}

在消息發送前,生成一個屬性,這個屬性可以附帶很多資訊,其中一個是MessageId。

然後發送的時候,第4個參數,把屬性帶過去即可。

4、接收消息:獲取消息唯一ID:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();
 
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
   
    Console.WriteLine("收到默認消息 {0}", message);
    Console.WriteLine("收到默認消息GUID {0}", ea.BasicProperties.MessageId);
    try
    {
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception err)
    {

       //處理確認失敗的情況。
    }
   
   
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: false,
                      consumer: consumer);

 

總結:

本篇介紹如何保障消息不重複消費以及如何產生消息的唯一ID,除了網上的基本兩種方式,個人還奉獻了單機版的場景方式。