.Net Core&RabbitMQ限制循環消費
前言
當消費者端接收消息處理業務時,如果出現異常或是拒收消息將消息又變更為等待投遞再次推送給消費者,這樣一來,則形成循環的條件。
循環場景
生產者發送100條消息到RabbitMQ中,消費者設定讀取到第50條消息時,設置拒收,同時設定是否還留存在當前隊列中(當requeue為false時,設置了死信隊列則進入死信隊列,否則移除消息)。
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine("拒收");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
當第50條消息拒收,則仍在隊列中且處在隊列頭部,重新推送給消費者,再次拒收,再次推送,反反覆復。
最終其他消息全部消費完畢,僅剩第50條消息往複間不斷消費,拒收,消費,這將可能導致RabbitMQ出現記憶體泄漏問題。
解決方案
RabbitMQ及AMQP協議本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實現重試限定(以下只考慮基於手動確認模式情況)。此處只想到或是只查到了如下幾種方案解決消息循環消費問題。
- 一次消費
- 無論成功與否,消費者都對外返回ack,將拒收原因或是異常資訊catch存入本地或是新隊列中另作重試。
- 消費者拒絕消息或是出現異常,返回Nack或Reject,消息進入死信隊列或丟棄(requeue設定為false)。
- 限定重試次數
- 在消息的頭中添加重試次數,並將消息重新發送出去,再每次重新消費時從頭中判斷重試次數,遞增或遞減該值,直到達到限制,requeue改為false,最終進入死信隊列或丟棄。
- 可以在Redis、Memcache或其他存儲中存儲消息唯一鍵(例如Guid、雪花Id等,但必須在發布消息時手動設置它),甚至在mysql中連同重試次數一起存儲,然後在每次重新消費時遞增/遞減該值,直到達到限制,requeue改為false,最終進入死信隊列或丟棄。
- 隊列使用Quorum類型,限制投遞次數,超過次數消息被刪除。
- 隊列消息過期
- 設置過期時間,給隊列或是消息設置TTL,重試一定次數消息達到過期時間後進入死信隊列或丟棄(requeue設定為true)。
- 也許還有更多好的方案…
一次消費
對外總是Ack
消息到達了消費端,可因某些原因消費失敗了,對外可以發送Ack,而在內部走額外的方式去執行補償操作,比如將消息轉發到內部的RabbitMQ或是其他處理方式,終歸是只消費一次。
var queueName = "alwaysack_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
try
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
throw new Exception("模擬異常");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
}
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
當消費端收到消息,處理時出現異常,可以另想辦法去處理,而對外保持著ack的返回,以避免消息的循環消費。
消息不重入隊列
在消費者端,因異常或是拒收消息時,對requeue設置為false時,如果設置了死信隊列,則符合「消息被拒絕且不重入隊列」這一進入死信隊列的情況,從而避免消息反覆重試。如未設置死信隊列,則消息被丟失。
此處假定接收100條消息,在接收到第50條消息時設置拒收,並且設置了requeue為false。
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
var queueName = "nackorreject_queue";
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine("拒收");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關鍵在於requeue=false
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
如此一來,拒收消息不會重入隊列,並且現有隊列綁定了死信交換機,因此,消息進入到死信隊列中,如不綁定,則消息丟失。
限定重試次數
設置重試次數,限定循環消費的次數,允許短暫的循環,但最終打破循環。
消息頭設定次數
在消息頭中設置次數記錄作為標記,但是,消費端無法對接收到的消息修改消息頭然後將原消息送回MQ,因此,需要將原消息內容重新發送消息到MQ,具體步驟如下
- 原消息設置不重入隊列。
- 再發送新的消息其內容與原消息一致,可設置新消息的消息頭來攜帶重試次數。
- 消費端再次消費時,便可從消息頭中查看消息被消費的次數。
此處假定接收10條消息,在接收到第5條消息時設置拒收, 當消息頭中重試次數未超過設定的3次時,消息可以重入隊列,再次被消費。
var queueName = "messageheaderretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
{
var maxRetryCount = 3;
Console.WriteLine($"拒收 {DateTime.Now}");
//初次消費
if (ea.BasicProperties.Headers == null)
{
//原消息設置為不重入隊列
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
//發送新消息到隊列中
RetryPublishMessage(channel, queueName, message.ToArray(), 1);
return;
}
//獲取重試次數
var retryCount = ParseRetryCount(ea);
if (retryCount < maxRetryCount)
{
//原消息設置為不重入隊列
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
//發送新消息到隊列中
RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);
return;
}
//到達最大次數,不再重試消息
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount)
{
var basicProperties = channel.CreateBasicProperties();
basicProperties.Headers = new Dictionary<string, object>();
basicProperties.Headers.Add("retryCount", retryCount);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
}
static int ParseRetryCount(BasicDeliverEventArgs ea)
{
var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
if (!existRetryRecord)
{
throw new Exception("沒有設置重試次數");
}
return (int)retryCount;
}
消息被拒收後,再重新發送消息到原有交換機或是隊列下中,以使得消息像是消費失敗回到了隊列中,如此來控制消費次數,但是這種場景下,新消息排在了隊列的尾部,而不是原消息排在隊列頭部。
存儲重試次數
在存儲服務中存儲消息的唯一標識與對應重試次數,消費消息前對消息進行判斷是否存在。
與消息頭判斷一致,只是消息重試次數的存儲從消息本身挪入存儲服務中了。需要注意的是,消息發送端需要設置消息的唯一標識(MessageId屬性)
//模擬外部存儲服務
var MessageRetryCounts = new Dictionary<ulong, int>();
var queueName = "storageretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
var maxRetryCount = 3;
Console.WriteLine("拒收");
//重試次數判斷
var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
if (!existRetryRecord)
{
//重入隊列,繼續重試
MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)
{
//重入隊列,繼續重試
MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
//到達最大次數,不再重試消息
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
除第一次拒收外,允許三次重試機會,三次重試完畢後,設置requeue為false,消息丟失或進入死信隊列(如有設置的話)。
隊列使用Quorum類型
第一種和第二種分別是消息自身、外部存儲服務來管理消息重試次數,使用Quorum,由MQ來限定消息的投遞次數,也就控制了重試次數。
設置隊列類型為quorum,設置投遞最大次數,當超過投遞次數後,消息被丟棄。
var queueName = "quorumtype_queue";
var arguments = new Dictionary<string, object>()
{
{ "x-queue-type", "quorum"},
{ "x-delivery-limit", 3 }
};
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine($"拒收 {DateTime.Now}");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
第一次消費被拒收重入隊列後,經最大三次投遞後,消費端不再收到消息,如此一來也限制了消息的循環消費。
隊列消息過期
當為消息設置了過期時間時,當消息沒有受到Ack,且還在隊列中,受到過期時間的限制,反覆消費但未能成功時,消息將走向過期,進入死信隊列或是被丟棄。
聚焦於過期時間的限制,因此在消費者端,因異常或是拒收消息時,需要對requeue設置為true,將消息再次重入到原隊列中。
設定消費者端第五十條消息會被拒收,且隊列的TTL設置為5秒。
//死信交換機和死信隊列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
//常規隊列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
{ "x-message-ttl", 5000},
{ "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine($"拒收 {DateTime.Now}");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
當消費者端拒收消息後消息重入隊列,再次消費,反覆進行超過5秒後,消息在隊列中達到了過期時間,則被挪入到死信隊列中。
從Web管理中死信隊列中可查看該條過期的消息。
參考資料
- //www.jianshu.com/p/f77a0b10c140
- //www.jianshu.com/p/4904c609632f
- //stackoverflow.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq
2022-10-29,望技術有成後能回來看見自己的腳步