RabbitMQ-如何保證消息在99.99%的情況下不丟失

1. 簡介

MQ雖然幫我們解決了很多問題,但是也帶來了很多問題,其中最麻煩的就是,如何保證消息的可靠性傳輸

我們在聊如何保證消息的可靠性傳輸之前,先考慮下哪些情況下會出現消息丟失的情況。

首先,上圖中完整的展示了消息從生產到被消費的完整鏈路,我們通過圖列舉下各種情況。

  1. Producer在把Message發送到Broker的過程中,因為網路不可靠的原因,可能會出現Message還未發送到Broker就丟失,或者Message發送到了Broker,但是由於某種原因,消息未保存到Broker。
  2. Broker接收到Message數據存儲在記憶體,Consumer還沒消費,Broker宕機了。
  3. Consumer接收到了MessageMessage相關業務還沒來得及處理,程式報錯或者宕機了,Broker會認為Consunmer消息正常消費了,就把當前消息從隊列中移除了。這種情況也算是消息丟失。

從上述的問題中我們可以總結出想要消息被正常消費,就得保證:

  1. 消息成功被Broker接收到。
  2. 消息可以被Broker持久化。
  3. 消息成功被Consumer接收並且當消費失敗時,消息可以重回隊列。
  4. 要有相應的補償機制。(當任何一個環節出錯時,可以進行消息 補償)。

2. 消息的可靠投遞

我們在使用MQ的時候,為了避免消息丟失或者投遞失敗。RabbitMQ為我們提供了兩種方式來控制消息的投遞可靠性。

  1. confirm 確認模式
  2. return 退回模式

如圖所示:

消息從 producer 到 exchange 則會返回一個confirmCallback 。
消息從 exchange 到 queue 投遞失敗則會返回一個 ReturnsCallback 資訊,其內容為ReturnedMessage實例資訊。
我們將利用這兩個 callback 控制消息的可靠性投遞。

2.1 confirm

2.1.1 引入所需依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

2.1.2 application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbit 用戶名密碼
    username: admin
    password: admin123
    # 開啟消息發送確認功能
    publisher-confirm-type: correlated
    # 高版本已棄用
#    publisher-confirms: true

2.1.3 ConfirmCallBack

package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * 生產者消息確認回調方法
 *
 * @author ludangxin
 * @date 2021/9/11
 */
@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    /**
     *
     * @param correlationData 相關配置資訊
     * @param ack   exchange交換機 是否成功收到了消息。true 成功,false代表失敗
     * @param cause 失敗原因
     */
    @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("MsgSendConfirmCallBack , 回調id: {}", correlationData);
        if(ack) {
            log.info("消息發送成功");
        }else {
            log.info("消息發送失敗: {}", cause);
        }
    }
}

2.1.3 RabbitConfig

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig implements InitializingBean {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 設置一個簡單的隊列
      */
    @Bean(name = "durableQueue")
    public Queue queue() {
        /*
         * 參數1:隊列名稱
         * 參數2:是否定義持久化隊列
         * 參數3:是否獨佔本次連接
         * 參數4:是否在不使用的時候自動刪除隊列
         * 參數5:隊列其它參數
         */
        return new Queue("helloRabbitMQ", true, false, false, null);
    }

    /**
     * bean 初始化後執行
     */
    @Override
    public void afterPropertiesSet() {
        // 設置消息確認回調類
        rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
    }
}

2.1.4 測試方法

這裡兩個測試方法,sentMsg()使用默認的Exchange,而sentMsg2()設置一個不存在的Exchange測試失敗情況。

package com.ldx.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;

@SpringBootTest
public class ProducerTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sentMsg(){
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend("", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
    }

    @Test
    public void sentMsg2(){
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        // 設置一個不存在的exchange 測試失敗情況
        rabbitTemplate.convertAndSend("abc", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
    }
}

2.1.5 啟動測試

sendMsg()方法日誌如下:

2021-09-11 21:30:38.336  INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=8e9fc4b8-aa32-4e1b-a165-8a83457636ed]
2021-09-11 21:30:38.339  INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功

sendMsg2()方法日誌如下:

2021-09-11 21:32:27.377  INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=399c8d85-f010-433f-946c-419d9b9396c2]
2021-09-11 21:32:27.379  INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送失敗: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)

2.1.6 小結

  1. Confirm 確認模式 是從ProducerExchange
  2. Producer發送的消息正常或失敗時都會進入Confirm Callback方法。
  3. Producer發送消息的Exchange不存在時,Confirm Callback中的 Ack為false且Cause為發送失敗原因。

2.2 return

2.2.1 application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbit 用戶名密碼
    username: admin
    password: admin123
    # 開啟消息發送確認功能
    publisher-confirm-type: correlated
    # 高版本已棄用
#    publisher-confirms: true
    # 開啟失敗退回功能
    publisher-returns: true

2.2.2 ReturnCallback

這裡注意下,網上很多提到的ReturnCallback(少了個s)介面已經棄用,注釋中也提到了,棄用是為了更好的使用ReturnedMessage類,因為對象的方式可以更好的支援lambda表達式。

package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * 發生異常時的消息返回提醒
 *
 * @author ludangxin
 * @date 2021/9/11
 */
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {

    /**
     * Returned message callback.
     *
     * @param returned the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息主體: {}", returned.getMessage());
        log.info("回復編碼: {}", returned.getReplyCode());
        log.info("回復內容: {}", returned.getReplyText());
        log.info("交換器: {}", returned.getExchange());
        log.info("路由鍵: {}", returned.getRoutingKey());
    }
}

2.2.3 RabbitConfig

RabbitReturnCallback設置到RabbitTemplate中。

/**
 * bean 初始化後執行
 */
@Override
public void afterPropertiesSet() {
    // 設置消息確認回調類
    rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
    // 設置消息回退回調類
    rabbitTemplate.setReturnsCallback(new RabbitReturnCallback());
}

2.2.4 測試方法

@Test
public void sentMsg3(){
    String uuid = UUID.randomUUID().toString();
    CorrelationData correlationId = new CorrelationData(uuid);
    // 設置一個不存在的routingkey 測試失敗情況
    rabbitTemplate.convertAndSend("", "helloRabbitMQ1", "Hello RabbitMQ ~ ", correlationId);
}

2.2.5 啟動測試

# sentMsg()
2021-09-11 22:12:24.079  INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=fb471c69-6c7b-48bc-89aa-ae70ac1ed6f8]
2021-09-11 22:12:24.081  INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
# sentMsg2()
2021-09-11 22:13:42.910  INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=0e3211ee-a1ba-45e4-90f6-296be79def07]
2021-09-11 22:13:42.912  INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送失敗: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
# sentMsg3()
2021-09-11 22:14:23.600  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 消息主體: (Body:'Hello RabbitMQ ~ ' MessageProperties [headers={spring_returned_message_correlation=0a8db922-ff7c-4b13-86a3-04957a7359bc}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-09-11 22:14:23.602  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回復編碼: 312
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回復內容: NO_ROUTE
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 交換器: 
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 路由鍵: helloRabbitMQ1
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=0a8db922-ff7c-4b13-86a3-04957a7359bc]
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功

2.2.6 小節

  1. Return 退回模式是從ExchangeQueue
  2. Return 給了 Producer
  3. Producer發送的消息即使Routing Key不正確,當Exchange接收失敗後直接觸發Confirm Callback,不會進入到Return Callback,因為還沒到Exchange
  4. Exchange正確接收消息,但是Routing Key設置錯誤, 觸發Return Callback方法。

3. 消息的可靠消費

上文中我們提到了一種消息丟失的情況,即 Consumer接收到了MessageMessage相關業務還沒來得及處理,程式報錯或者宕機了,Broker會認為Consunmer消息正常消費了,就把當前消息從隊列中移除了。這種情況也算是消息丟失。

那能不能消息消費成功後再將消息從queue中移除呢?

答案肯定是可以的。

3.1 ACK確認機制

ACK指Acknowledge,確認。 表示消費端收到消息後的確認方式。

  1. 作用:
  • 確認消息是否被消費者消費,消息通過ACK機制確認是否被正確接收,每個消息都要被確認。
  • 默認情況下,一個消息被消費者正確消費就會從隊列中移除
  1. ACK確認模式
  • AcknowledgeMode.NONE :不確認
    1. 默認所有消息消費成功,會不斷的向消費者推送消息。
    2. 因為RabbitMQ認為所有推送的消息已被成功消費,所以推送出去的消息不會暫存在broker,消息存在丟失的危險。
  • AcknowledgeMode.AUTO:自動確認
    1. 由spring-rabbit依據消息處理邏輯是否拋出異常自動發送ack(無異常)或nack(異常)到broker
    2. 使用自動確認模式時,需要考慮的另一件事是消費者過載,因為broker會暫存沒有收到ack的消息,等消費端ack後才會丟掉;如果收到消費端的nack(消費失敗的標識)或connection斷開沒收到回饋,會將消息放回到原隊列頭部,導致消費者反覆的在消費這條消息。
  • AcknowledgeMode.MANUAL:手動確認
    1. 手動確認則當消費者調用 acknackreject 幾種方法進行確認,手動確認可以在業務失敗後進行一些操作,如果消息未被 ACK 則會發送到下一個消費者。
    2. 手動確認模式可以使用 prefetch,限制通道上未完成的(「正在進行中的」)發送的數量。也就是Consumer一次可以從Broker取幾條消息。
    3. 如果忘記進行ACK確認
      忘記通過basicAck返回確認資訊是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉後,消息會被退回RabbitMQ伺服器,這會使RabbitMQ伺服器記憶體爆滿,而且RabbitMQ也不會主動刪除這些被退回的消息。只要程式還在運行,沒確認的消息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 消息消費並沒有超時機制,也就是說,程式不重啟,消息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的消息。當程式關閉時(實際只要 消費者 關閉就行),消息會恢復為 Ready 狀態。

3.2 配置application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbit 用戶名密碼
    username: admin
    password: admin123
    listener:
      simple:
        # manual 手動確認
        acknowledge-mode: manual

3.3 Consumer

package com.ldx.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 消費者
 *
 * @author ludangxin
 * @date 2021/9/12
 */
@Slf4j
@Component
public class RabbitMQListener {

    @RabbitListener(queues = "helloRabbitMQ")
    public void helloRabbitMq(Message message, Channel channel) throws IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        log.info(messageProperties.toString());
        try {
            log.info(message.toString());
            log.info(new String(message.getBody()));
            int a = 1/0;
            channel.basicAck(messageProperties.getDeliveryTag(), false);
        } catch (Exception e) {
            // 當前的消息是否重新投遞的消息,也就是該消息是重新回到隊列里的消息
            if (messageProperties.getRedelivered()) {
                log.info("消息已重複處理失敗,拒絕再次接收...");
                // 拒絕消息
                channel.basicReject(messageProperties.getDeliveryTag(), false);
            } else {
                log.info("消息即將再次返回隊列處理...");
                channel.basicNack(messageProperties.getDeliveryTag(), false, true);
            }
        }
    }
}

消費消息有三種回執方法,接下來先看下每個方法參數的含義。

3.3.1 basicAck

/**
 * Acknowledge one or several received
 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
 * containing the received message being acknowledged.
 * @see com.rabbitmq.client.AMQP.Basic.Ack
 * @param deliveryTag the tag from the received 
 * @param multiple true to acknowledge all messages up to and
 * including the supplied delivery tag; false to acknowledge just
 * the supplied delivery tag.
 * @throws java.io.IOException if an error is encountered
 */
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:消息投遞的標籤號,每次消費消息或者消息重新投遞後,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行acknackreject等操作。

multiple:是否批量確認,值為 true 則會一次性 ack所有小於當前消息 deliveryTag 的消息。

舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

3.3.2 basicNack

/**
 * Reject one or several received messages.
 *
 * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Nack
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param multiple true to reject all messages up to and including
 * the supplied delivery tag; false to reject just the supplied
 * delivery tag.
 * @param requeue true if the rejected message(s) should be requeued rather
 * than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        throws IOException;

deliveryTag:表示消息投遞序號。

multiple:是否批量確認。

requeue:值為 true 消息將重新入隊列。

3.3.3 basicReject

basicNack :表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

/**
 * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
 * containing the received message being rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Reject
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

3.4 啟動測試

@Test
public void sentMsg() throws IOException {
    String uuid = UUID.randomUUID().toString();
    CorrelationData correlationId = new CorrelationData(uuid);
    rabbitTemplate.convertAndSend("","helloRabbitMQ","Hello RabbitMQ111 ~ ", correlationId);
    // 為了使進程阻塞
    System.in.read();
}

在這裡我們執行sentMsg()方法,輸出日誌如下:

從日誌資訊中我們可以看出,消息已成功被消費,並且當第一次消費失敗後消息被重新放回了隊列,並進行了再此消費,當再次失敗後則放棄該條消息。

2021-09-12 00:47:03.451  INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=eb06a986-0e51-464a-8b8c-d2a8271c0008]
2021-09-12 00:47:03.452  INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
2021-09-12 00:47:04.142  INFO 66160 --- [ntContainer#3-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@75181b50: tags=[[amq.ctag-C1o5ZRm1g0fxX-Q53CCZcw]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://[email protected]:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:04.157  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:04.157  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:04.158  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~ 
2021-09-12 00:47:04.158  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : 消息即將再次返回隊列處理...
2021-09-12 00:47:04.162 ERROR 66160 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2021-09-12 00:47:05.163  INFO 66160 --- [ntContainer#3-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@37695b29: tags=[[amq.ctag-GMJHJuVr22w1so4vhSp-dQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,8), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://[email protected]:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~ 
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : 消息已重複處理失敗,拒絕再次接收...

3.5 小節

消費方的ACK機制可以有效的解決消息從BrokerConsumer丟失的問題。但也要注意一點:消息的無限消費。

3.6 消息無限消費

如果消費端程式碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯後確認消息, int a = 1 / 0 發生異常後將消息重新投入隊列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消費者 2 號收到:{}", msg);
            int a = 1 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

但是有個問題是,業務程式碼一旦出現 bug 99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環,CPU被瞬間打滿了,而且rabbitmq management 只有一條未被確認的消息。

經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反覆進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行,那該怎麼處理呢?

第一種方法:是根據異常類型來選擇是否重新放入隊列。

第二種方法: 先將消息進行應答,此時消息隊列會刪除該條消息,然後通過channel.basicPublish()重新發布這個消息,異常消息就放在了消息隊列尾部,,進而不會影響已經進入隊列的消息處理。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

但這種方法並沒有解決根本問題,錯誤消息還是會時不時報錯,後面優化設置了消息重試次數,達到了重試上限以後,手動確認,隊列刪除此消息,並將消息持久化入MySQL並推送報警,進行人工處理和定時任務做補償。

4. 總結

4.1 持久化

  1. Exchange 要持久化 通過durable屬性控制,true:持久化, 預設:true。
  2. queue 要持久化 通過durable屬性控制,true:持久化, 預設:true。
  3. message 要持久化

在springboot環境下,message模式也是持久化。

4.2 生產方確認Confirm

4.3 消費方確認Ack

4.4 Broker 高可用