RabbitMQ從零到集群高可用(.NetCore5.0) – 死信隊列,延時隊列

 系列文章:

RabbitMQ從零到集群高可用(.NetCore5.0) – RabbitMQ簡介和六種工作模式詳解

RabbitMQ從零到集群高可用(.NetCore5.0) – 死信隊列,延時隊列

 

一、死信隊列

 

 

 

 

 

 描述:Q1隊列綁定了x-dead-letter-exchange(死信交換機)為X2,x-dead-letter-routing-key(死信路由key)指向Q2(隊列2)

            P(生產者)發送消息經X1(交換機1)路由到Q1(隊列1),Q1的消息觸發特定情況,自動把消息經X2(交換機2)路由到Q2(隊列2),C(消費者)直接消息Q2的消息。

特定情況有哪些呢:

  1. 消息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);
  2. 當前隊列中的消息數量已經超過最大長度(創建隊列時指定” x-max-length參數設置隊列最大消息數量)。
  3. 消息在隊列中過期,即當前消息在隊列中的存活時間已經超過了預先設置的TTL(Time To Live)時間;

這裡演示情況1:

假如場景:Q1中隊列數據不完整,就算從新處理也會報錯,那就可以不ack,把這個消息轉到死信隊列另外處理。

生產者:

 public static void SendMessage()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信隊列
            string dlxQueueName = "dlx.queue";

            //消息交換機
            string exchange = "direct-exchange";
            //消息隊列
            string queueName = "queue_a";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {

                    //創建死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建死信隊列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信隊列綁定死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 創建消息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建消息隊列,並指定死信隊列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                         });
                    //消息隊列綁定消息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //發布消息
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"向隊列:{queueName}發送消息:{message}");
                }
            }
        }

 

消費者:

  public static void Consumer()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信隊列
            string dlxQueueName = "dlx.queue";

            //消息交換機
            string exchange = "direct-exchange";
            //消息隊列
            string queueName = "queue_a";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {

                    //創建死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建死信隊列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信隊列綁定死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 創建消息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建消息隊列,並指定死信隊列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                         });
                    //消息隊列綁定消息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);


                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"隊列{queueName}消費消息:{message},不做ack確認");
                        //channel.BasicAck(ea.DeliveryTag, false);
                        //不ack(BasicNack),且不把消息放回隊列(requeue:false)
                        channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

 

 

消費者加上channel.BasickNack()模擬消息處理不了,不ack確認。

執行結果:

 

 RabbitMQ管理介面:

 

 

看到消息隊列為queue_a,特性有DLX(死信交換機),DLK(死信路由)。因為消費端不nack,觸發了死信,被轉發到了死信隊列dlx.queue。

二、延時隊列

延時隊列其實也是配合死信隊列一起用,其實就是上面死信隊列的第二中情況。給隊列添加消息過時時間(TTL),變成延時隊列。

 

 簡單的描述就是:P(生產者)發送消息到Q1(延時隊列),Q1的消息有過期時間,比如10s,那10s後消息過期就會觸發死信,從而把消息轉發到Q2(死信隊列)。

解決問題場景:像商城下單,未支付時取消訂單場景。下單時寫一條記錄入Q1,延時30分鐘後轉到Q2,消費Q2,檢查訂單,支付則不做操作,沒支付則取消訂單,恢復庫存。

生產者程式碼:

 public static void SendMessage()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信隊列
            string dlxQueueName = "dlx.queue";

            //消息交換機
            string exchange = "direct-exchange";
            //消息隊列
            string queueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //創建死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建死信隊列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信隊列綁定死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 創建消息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建消息隊列,並指定死信隊列,和設置這個隊列的消息過期時間為10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                             { "x-message-ttl",10000} //設置隊列的消息過期時間
                                        });
                    //消息隊列綁定消息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //發布消息
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發送消息:{message}");
                }
            }
        }

消費者程式碼:

 public static void Consumer()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信隊列
            string dlxQueueName = "dlx.queue";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {
                    //創建死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建死信隊列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信隊列綁定死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{DateTime.Now},隊列{dlxQueueName}消費消息:{message}");
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                }
            }
        }

 

執行程式碼:

 

 向延時隊列發送消息,監聽死信隊列,發送和收到消息時間剛好是設置的10s。

RabbitMQ管理介面:

 

三、延時隊列消息設置不同過期時間

上面的延時隊列能解決消息過期時間都是相同的場景,能不能解決消息的過期時間是不一樣的呢?

例如場景:機器人客服,為了更像人為操作,收到消息後要隨機3-10秒回復客戶。

1)隊列不設置TTL(消息過期時間),把過期時間設置在消息上。

生產者程式碼:

public static void SendMessage()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信隊列
            string dlxQueueName = "dlx.queue";

            //消息交換機
            string exchange = "direct-exchange";
            //消息隊列
            string queueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //創建死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建死信隊列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信隊列綁定死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 創建消息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //創建消息隊列,並指定死信隊列,和設置這個隊列的消息過期時間為10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設置當前隊列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                             //{ "x-message-ttl",10000} //設置隊列的消息過期時間
                                        });
                    //消息隊列綁定消息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message 10s後處理";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "10000";

                    //發布消息,延時10s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發送消息:{message},延時:10s");



                    string message2 = "hello rabbitmq message 5s後處理";
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    properties2.Expiration = "5000";

                    //發布消息,延時5s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties2,
                                         body: Encoding.UTF8.GetBytes(message2));
                    Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發送消息:{message2},延時:5s");


                }
            }
        }

 

消費者程式碼還是上面延時隊列的不變,先試下效果。

 

 生產者向隊列中發送一條延時10s的消息再發一條延時5秒的消息,但消費者卻先拿到延時10s的,再拿到延時5秒的,我想要的結果是先拿到延時5s的再拿到延時10s的,是什麼原因呢。

原因是:隊列是先進先出的,而RabbitMQ只會對首位第一條消息做檢測,第一條沒過期,那麼後面的消息就會阻塞住等待前面的過期。

解決辦法:增加一個消費者對延時隊列消費,不ack,把第一條消息放到隊列尾部。一直讓消息在流動,這樣就能檢測到了。

2)新增消費者程式碼:

public static void DelayConsumer()
        {
            //延時隊列
            string queueName = "delay_queue";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Thread.Sleep(20);//消息少的時候可以加個睡眠時間減少IO
                        channel.BasicNack(ea.DeliveryTag, false, requeue: true);
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);

                }
            }
        }

 

執行效果:

 

 

這會得到了想要的效果。

RabbitMQ管理介面: