.Net Core&RabbitMQ死信隊列

過期時間

RabbitMQ可以為消息和隊列設置過期時間Time To Live(TTL)。其目的即過期。

消息過期時間

消息存儲在隊列中時,如果想為其設置一個有限的生命周期,而不是一直存儲著,可以為其設置過期時間。比如,一條消息,我想要三分鐘內有效,三分鐘後再接收到該消息就算過時了,如果在隊列中存儲已經超過三分鐘,消費者再去接收就是過時了,那便沒有意義了。

為消息設置過期時間可以從兩方面著手,一是為消息本身設置過期時間,二是為消息的承載體隊列設置過期時間。兩者同時設置情況下取最短生命周期。

為消息設置

在BasicPublish方法中,可以設置BasicProperties中的Expiration來設置過期時間(單位為毫秒)。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "messagettl_queue";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        while (true)
        {
            Console.WriteLine("消息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            var basicProperties = channel.CreateBasicProperties();
            basicProperties.Expiration = "10000";//10秒
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
            Console.WriteLine("消息內容發送完畢:" + message);
        }
    }
}

如此一來,設置消息生命周期為10秒,當超過該時間後,再由消費者去獲取該消息,則獲取不到,可以直接從Web介面看到,經過10秒後,消息過期(未啟動消費者)。

為隊列設置

為每個消息設置過期時間可能不符合一些特定的場景,當需要設定特定隊列中的消息都是指定的過期時間時,可以為隊列中的消息統一設置過期時間。

隊列聲明時可以指定參數,其中設置x-message-ttl參數。

var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000 }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

如此一來,該隊列發送消息時,如消息本身沒有設置過期時間,則使用隊列的過期時間。
生產者發送的7條消息過期時間都為10s,一段時間後,消息全部過期。

當ttl設置為0時,僅當消息能夠立即被消費,否則消息立馬過期,Web面板中只能見到消息發送,隊列中沒有消息,都被立馬過期了。

過期策略

為隊列中的消息統一設置過期時間時,當超出了過期時間,消息立馬過期挪出隊列。先發送的消息在隊列頭部,先過期的也在隊列頭部,因此可從頭部掃描清除過期消息。

而為直接為消息設置過期時間時,各個消息的過期時間不盡相同,掃描時得隊列全局掃描才能識別哪些消息是過期的。

因此,設定在投遞給消費者前判斷是否過期,超出過期時間消息仍在隊列中。

隊列過期時間

此處的隊列過期時間與消息中的為隊列設置過期時間不同,此處是為隊列本身考慮,隊列自身沒有消費者超過一段時間內且沒有重新生命該隊列,則無需考慮存在。

隊列聲明時指定參數,其中設置x-expires參數,需大於0,單位毫秒。

var arguments = new Dictionary<string, object>
{
    { "x-expires", 20000 }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

生產者啟動應用創建隊列,發送消息。

空閑20s不發送消息後隊列被刪除,再次發送消息匹配不到隊列,消息被回退。

死信隊列

死信交換機即Dead-Letter-Exchange(DLX),和備份交換機一樣,沒有什麼特殊,只是屬性上標記了下,其綁定的隊列稱之為死信隊列。當消息在一個隊列中變成死信之後,被重發到死信交換機,存儲到死信隊列中。

消息變為死信情況

  • 消息被拒絕且不重入隊列
  • 消息超出過期時間
  • 隊列達到最大長度

設置死信隊列

聲明隊列時可以給定參數,其中設置x-dead-letter-exchange來指明該隊列對應的死信交換機。

//死信交換機和死信隊列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

//常規隊列
var queueName = "nornalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000},
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

如下可見到,生產者初期發送的消息過期後經死信交換機路由進入到死信隊列中,後期發送的暫未過期的消息仍在原隊列中。當有該部分過期消息的需要時,消費者可以監聽死信隊列獲取消息。

當死信交換機的類型為direct時,可以指定RoutingKey(不指定默認使用原隊列RoutingKey)。

可在聲明常規隊列的屬性中設置x-dead-letter-routing-key,以能夠匹配上死信交換機與死信隊列綁定時的routingkey。

//死信交換機和死信隊列
var dlxExchangeName = "dlxroutingkey_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "direct", durable: false, autoDelete: false, arguments: null);
var dlxQueueName1 = "dlx_queue1";
channel.QueueDeclare(queue: dlxQueueName1, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName1, exchange: dlxExchangeName, routingKey: "waring");
var dlxQueueName2 = "dlx_queue2";
channel.QueueDeclare(queue: dlxQueueName2, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName2, exchange: dlxExchangeName, routingKey: "info");
var dlxQueueName3 = "dlx_queue1";
channel.QueueDeclare(queue: dlxQueueName3, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName3, exchange: dlxExchangeName, routingKey: "error");

//常規隊列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000},
    { "x-dead-letter-exchange", dlxExchangeName },
    { "x-dead-letter-routing-key", "info" }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

當消息過期時,死信交換機根據常規隊列綁定的routingkey,匹配到相應的死信隊列存儲。先發送的消息過期後存入到對應隊列中,後續暫未過期消息仍然保持在原隊列。

延遲隊列

延遲隊列算是死信隊列的一種應用場景,其本身並不在RabbitMQ或是AMQP協議中有所體現。當不想消費者立馬獲取到消息,而是等待一段時間才讓消費者消費時,比如訂單限時支付,超出時間未支付則取消訂單,那麼可以使用到延遲隊列來處理這一場景。

消息過期時間

發送消息時設置過期時間場景下,消息可能並不會在過期後立馬從隊列中刪除,而是要等到消費時候才會判斷該消息是否過時。當出現以下場景時則會有點問題。

第一個消息的過期時間很長,而後續的消息的過期時間很短,後續的消息過期後不會立馬刪除,而是要等到第一個消息過期刪除後才會被刪除,那麼對應延遲隊列來說會有點問題,時間超出了設定的延遲時間。

註:對隊列設定消息過期時間不存在該問題。

解決方案

RabbitMQ提供了延遲隊列的插件,提供延遲隊列類型交換機,其不會根據第一個消息是否過期來判斷,解決了如上提到的第一個沒有過期,後續消息過期的場景,不會受消息先後順序的影響,而是關注過期時間,先過期的先發送。

生產者程式碼

聲明延遲交換機時類型使用x-delayed-message,需要注意聲明交換機類型時需要給定參數x-delayed-type,至於值是哪種類型可依據匹配隊列的需要選擇。延遲交換機和延遲隊列都需要持久化。消息發送時需要使用消息頭並設置x-delay來設置延遲時間。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        //延遲交換機
        var delayExchangeName = "delay_exchange";
        var delayArguments = new Dictionary<string, object>
        {
            { "x-delayed-type", "direct" } //x-delayed-type必須,否則啟動報錯
        };
        channel.ExchangeDeclare(exchange: delayExchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: delayArguments); //持久化必須,否則啟動報錯

        //延遲隊列
        var delayQueueName = "delay_queue";
        channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);//持久化必須,否則啟動報錯
        channel.QueueBind(queue: delayQueueName, exchange: delayExchangeName, routingKey: delayQueueName);

        channel.BasicReturn += new EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>((sender, e) =>
        {
            var message = Encoding.UTF8.GetString(e.Body.ToArray());
            Console.WriteLine($"收到回退消息:{message}");
        });

        while (true)
        {
            Console.WriteLine("消息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            var basicProperties = channel.CreateBasicProperties();
            basicProperties.Headers = new Dictionary<string, object>
            {
                { "x-delay", message == "aaa" ? 30000 : 10000 }//延時時間從header賦值
            };
            channel.BasicPublish(exchange: delayExchangeName, routingKey: delayQueueName, mandatory: true, basicProperties: basicProperties, body: body);
            Console.WriteLine("消息內容發送完畢:" + message + $" {DateTime.Now}");
        }
    }
}

生產者發送消息,到延遲交換機,消息將在設定的延遲時間後路由到相應的延遲隊列。

此處設置了mandatory為true,消息最終到了延遲隊列,但又被回退到了生產者,實際上對於該延遲交換機插件並不支援mandatory,官方不建議使用該參數

2022-08-29,望技術有成後能回來看見自己的腳步

Tags: