RabbitMQ 入門系列:7、保障消息不重複消費:產生消息的唯一ID。
系列目錄
前言:
上一篇介紹了消息如何確保不丟失,本篇簡單介紹如何保障消息不重複消費的處理方式。
對於接收端而言,對消息的確認來往是需要時間的。
如果同一個隊列,同時存在多個客戶端監聽,那麼多個客戶端有一定概率能收到相同的資訊。
這時候就會產生消息重複的問題的:
1、處理消息重複消費的幾種方式:
網上人們說的主要兩種方式:
方式一、Redis:用setnx命令,做消息id判斷。
方式三、資料庫:做唯一索引,消息id能插入就可以處理,所以重複消費就會失敗。
優缺點說明:
方式一:Redis setnx 分散式鎖:
不推薦使用:一種不太靠譜的方式,如果項目對數據允許出錯,可以嘗試使用。
為啥不靠譜,可以參考文章://zhuanlan.zhihu.com/p/418268774
方式二:資料庫 的鎖:
推薦使用:可以用唯一索引,也可以用事務鎖,使用起來成熟又簡單。
方式三:獨佔文件、剪貼版
推薦場景:單機場景下
1、可以通過對文件的獨佔打開和釋放,來達到進程間的鎖。
2、可以通過對剪貼版的讀寫,來達到進程間的鎖。
前面幾種方式,都有一個要素,就是需要消息的唯一ID。
2、如何產生消息的唯一ID:
A:對消息進行hash,取hash值做為唯一ID(無法避免,有一定概念產生相同的hash)。
B:對每一個發送的消息,都產生一個GUID,這樣在獲取消息的時候,就可以拿到消息對應的唯一ID。
下面看程式碼演示:
3、發送消息時,帶唯一ID:
using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
channel.BasicReturn += (sender, e) =>
{
//通過交換機發送過去,但沒發送到指定的隊列,數據丟失
//do .....
Console.WriteLine("消息沒發到指定隊列:" + Encoding.UTF8.GetString(e.Body.ToArray()));
};
channel.ConfirmSelect();
channel.QueueDeclare("FirstQueue", false, false, false);
var pro = channel.CreateBasicProperties();
pro.MessageId = Guid.NewGuid().ToString();
channel.BasicPublish("", "FirstQueue", true, pro, Encoding.UTF8.GetBytes("這是要發送的內容"));
if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))
{
//發送確認成功
}
else
{
//超時或失敗,需要處理是否重發消息。
}
}
在消息發送前,生成一個屬性,這個屬性可以附帶很多資訊,其中一個是MessageId。
然後發送的時候,第4個參數,把屬性帶過去即可。
4、接收消息:獲取消息唯一ID:
var channel = Rabbit.Instance.DefaultConnection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine("收到默認消息 {0}", message);
Console.WriteLine("收到默認消息GUID {0}", ea.BasicProperties.MessageId);
try
{
channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception err)
{
//處理確認失敗的情況。
}
};
channel.BasicConsume(queue: "FirstQueue",
autoAck: false,
consumer: consumer);

總結:
本篇介紹如何保障消息不重複消費以及如何產生消息的唯一ID,除了網上的基本兩種方式,個人還奉獻了單機版的場景方式。