RabbitMQ消息積壓的幾種解決思路

在日常工作中使用RabbitMQ偶爾會遇不可預料的情況導致的消息積壓,一般出現消息積壓基本上分為幾種情況:

  1. 消費者消費消息的速度趕不上生產速度,這總問題主要是業務邏輯沒設計好消費者和生產者之間的平衡,需要改業務流程或邏輯已保證消費度跟上生產消息的速,譬如增加消費者的數量等。

  2. 消費者出現異常,導致一直無法接收新的消息,這種問題需要排查消費的邏輯是不是又問題,需要優化程式。

除了上面的者兩種問題,還有一些其他情況會導致消息積壓,譬如一些系統是無法預計成產消息的速度和頻率,又或者消費者的速度已經被限制,不能通過加新的消費者來解決,譬如不同的系統間的API對接,對接那一方就做了請求頻率的限制,或者對方系統承受不了太大的並發,還有一些系統如果是面對企業客戶,譬如電商,物流,倉儲等類似平台系統的客戶的下單是沒有規律的或者集中某一個時間段下單的,這種就不能簡單的通過加消費者來解決,就需要分析具體業務來避免消息積壓。

針對這種情況,我想到了4中解決思路:

  1. 拆分MQ,生產者一個MQ,消費者一個MQ,寫一個程式監聽生產者的MQ模擬消費速度(譬如執行緒休眠),然後發送到消費者的MQ,如果消息積壓則只需要處理生產者的MQ的積壓消息,不影響消費者MQ

  2. 拆分MQ,生產者一個MQ,消費者一個MQ,寫一個程式監聽生產者的MQ,定義一個全局靜態變數記錄上一次消費的時間,如果上一次時間和當前時間只差小於消費者的處理時間,則發送到一個延遲隊列(可以使用死信隊列實現)發送到消費者的MQ,如果消息積壓則只需要處理生產者的MQ的積壓消息,不影響消費者MQ

  3. 使用Redis的List或ZSET做接收消息快取,寫一個程式按照消費者處理時間定時從Redis取消息發送到MQ

  4. 設置消息過期時間,過期後轉入死信隊列,寫一個程式處理死信消息(重新如隊列或者即使處理或記錄到資料庫延後處理)

其中使用延時隊列會相對來說邏輯簡單,業務邏輯變更也不大,在RabbitMQ中,可使用死信來及延時隊列插件rabbitmq_delayed_message_exchange兩種方式實現延時隊列。
使用插件可以在官網找到://www.rabbitmq.com/community-plugins.html

插件的安裝及使用方式就不做介紹了,主要介紹下使用死信來實現延時隊列,原理就是將消息發送到一個死信隊列,並設置過期時間,過期後將死信轉發到要處理的消息隊列。
生產者相關程式碼:

          /// <summary>
        /// 發送延時隊列消息
        /// </summary>
        /// <param name="message"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">默認20</param>
        public void SendDelayQueues(string message, string queueName,double delayMilliseconds,string beDeadLetterPrefix="beDeadLetter_")
        {
            #region 死信到期後轉入的交換機及隊列
            //死信轉入新的隊列的路由鍵(消費者使用的路由鍵)
            var routingKey = queueName;
            var exchangeName = queueName;
            //定義隊列
            Channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            //定義交換機
            Channel.ExchangeDeclare(exchange: exchangeName,
                type: "direct");
            //隊列綁定到交換機
            Channel.QueueBind(queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
            #endregion

            //將變成死信的隊列名
            var beDeadLetterQueueName = beDeadLetterPrefix + queueName;
            //將變成死信的交換機名
            var beDeadLetterExchangeName = beDeadLetterPrefix + queueName;

            //定義一個有延遲的交換機來做死信(該消息不能有消費者,不然無法變成死信)
            Channel.ExchangeDeclare(exchange:beDeadLetterExchangeName ,
                type: "direct");
            
            //定義該延遲消息過期變成死信後轉入的交換機(消費者需要綁定的交換機)
            //Channel.ExchangeDeclare(exchange: queueName,type: "direct");

            var dic = new Dictionary<string, object>();
            //dic.Add("x-expires", 30000);
            //dic.Add("x-message-ttl", 12000);//隊列上消息過期時間,應小於隊列過期時間  
            dic.Add("x-dead-letter-exchange", queueName);//變成死信後轉向的交換機
            dic.Add("x-dead-letter-routing-key",routingKey);//變成死信後轉向的路由鍵
            //定義將變成死信的隊列
            Channel.QueueDeclare(queue: beDeadLetterQueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: dic);

            //隊列綁定到交換機
            Channel.QueueBind(queue: beDeadLetterQueueName,
                exchange: beDeadLetterExchangeName,
                routingKey: routingKey);

            //不要同時給一個消費者推送多於prefetchCount個消息, ushort prefetchCount = 20
            //Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
            var body = Encoding.UTF8.GetBytes(message);
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;//持久化消息
            //過期時間
            properties.Expiration = delayMilliseconds.ToString();
            Channel.BasicPublish(exchange: beDeadLetterExchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);
        }

消費者相關程式碼:

        /// <summary>
        /// 設置延遲隊列接收的事件
        /// </summary>
        /// <param name="action"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">默認1</param>
        /// <param name="autoAck"></param>
        /// <param name="consumerCount"></param>
        public void SetDelayQueuesReceivedAction(Action<string> action, string queueName, ushort prefetchCount = 1,
            bool autoAck = false, int consumerCount = 1)
        {
            if (prefetchCount < 1)
            {
                throw new Exception("consumerCount must be greater than 1 !");
            }

            var exchangeName = queueName;
            var routingKey = queueName;
            for (int i = 0; i < consumerCount; i++)
            {
                var Channel = Connection.CreateModel();
                //定義隊列
                Channel.QueueDeclare(queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
                //定義交換機
                Channel.ExchangeDeclare(exchange: exchangeName,
                    type: "direct");
                //隊列綁定到交換機
                Channel.QueueBind(queue: queueName,
                    exchange: exchangeName,
                    routingKey: routingKey);
                //不要同時給一個消費者推送多於prefetchCount個消息
                Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
                ChannelList.Add(Channel);
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Console.WriteLine("處理消費者ConsumerTag:" + ea.ConsumerTag);
                    action(message);
                    //手動確認消息應答
                    Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                //autoACK自動消息應答設置為false
                Channel.BasicConsume(queue: queueName, autoAck: autoAck, consumer: consumer);
            }
        }

完整程式碼實現放到了Github://github.com/tanyongzheng/TZ.RabbitMQ