RabbitMQ-如何保證消息在99.99%的情況下不丟失
- 2021 年 9 月 12 日
- 筆記
- Middleware, RabbitMQ, springboot
1. 簡介
MQ雖然幫我們解決了很多問題,但是也帶來了很多問題,其中最麻煩的就是,如何保證消息的可靠性傳輸。
我們在聊如何保證消息的可靠性傳輸之前,先考慮下哪些情況下會出現消息丟失的情況。
首先,上圖中完整的展示了消息從生產到被消費的完整鏈路,我們通過圖列舉下各種情況。
Producer
在把Message
發送到Broker
的過程中,因為網路不可靠的原因,可能會出現Message
還未發送到Broker
就丟失,或者Message
發送到了Broker
,但是由於某種原因,消息未保存到Broker。Broker
接收到Message
數據存儲在記憶體,Consumer
還沒消費,Broker
宕機了。Consumer
接收到了Message
,Message
相關業務還沒來得及處理,程式報錯或者宕機了,Broker
會認為Consunmer
消息正常消費了,就把當前消息從隊列中移除了。這種情況也算是消息丟失。
從上述的問題中我們可以總結出想要消息被正常消費,就得保證:
- 消息成功被
Broker
接收到。 - 消息可以被
Broker
持久化。 - 消息成功被
Consumer
接收並且當消費失敗時,消息可以重回隊列。 - 要有相應的補償機制。(當任何一個環節出錯時,可以進行消息 補償)。
2. 消息的可靠投遞
我們在使用MQ的時候,為了避免消息丟失或者投遞失敗。RabbitMQ為我們提供了兩種方式來控制消息的投遞可靠性。
- confirm 確認模式
- return 退回模式
如圖所示:
消息從 producer 到 exchange 則會返回一個confirmCallback 。
消息從 exchange 到 queue 投遞失敗則會返回一個 ReturnsCallback 資訊,其內容為ReturnedMessage實例資訊。
我們將利用這兩個 callback 控制消息的可靠性投遞。
2.1 confirm
2.1.1 引入所需依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.1.2 application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默認的虛擬主機
virtual-host: /
# rabbit 用戶名密碼
username: admin
password: admin123
# 開啟消息發送確認功能
publisher-confirm-type: correlated
# 高版本已棄用
# publisher-confirms: true
2.1.3 ConfirmCallBack
package com.ldx.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 生產者消息確認回調方法
*
* @author ludangxin
* @date 2021/9/11
*/
@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
*
* @param correlationData 相關配置資訊
* @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("MsgSendConfirmCallBack , 回調id: {}", correlationData);
if(ack) {
log.info("消息發送成功");
}else {
log.info("消息發送失敗: {}", cause);
}
}
}
2.1.3 RabbitConfig
package com.ldx.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig implements InitializingBean {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 設置一個簡單的隊列
*/
@Bean(name = "durableQueue")
public Queue queue() {
/*
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨佔本次連接
* 參數4:是否在不使用的時候自動刪除隊列
* 參數5:隊列其它參數
*/
return new Queue("helloRabbitMQ", true, false, false, null);
}
/**
* bean 初始化後執行
*/
@Override
public void afterPropertiesSet() {
// 設置消息確認回調類
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
}
}
2.1.4 測試方法
這裡兩個測試方法,sentMsg()
使用默認的Exchange
,而sentMsg2()
設置一個不存在的Exchange
測試失敗情況。
package com.ldx.rabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sentMsg(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}
@Test
public void sentMsg2(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
// 設置一個不存在的exchange 測試失敗情況
rabbitTemplate.convertAndSend("abc", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}
}
2.1.5 啟動測試
sendMsg()
方法日誌如下:
2021-09-11 21:30:38.336 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=8e9fc4b8-aa32-4e1b-a165-8a83457636ed]
2021-09-11 21:30:38.339 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
sendMsg2()
方法日誌如下:
2021-09-11 21:32:27.377 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=399c8d85-f010-433f-946c-419d9b9396c2]
2021-09-11 21:32:27.379 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送失敗: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
2.1.6 小結
Confirm 確認模式
是從Producer
到Exchange
。Producer
發送的消息正常或失敗時都會進入Confirm Callback
方法。Producer
發送消息的Exchange
不存在時,Confirm Callback
中的Ack
為false且Cause
為發送失敗原因。
2.2 return
2.2.1 application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默認的虛擬主機
virtual-host: /
# rabbit 用戶名密碼
username: admin
password: admin123
# 開啟消息發送確認功能
publisher-confirm-type: correlated
# 高版本已棄用
# publisher-confirms: true
# 開啟失敗退回功能
publisher-returns: true
2.2.2 ReturnCallback
這裡注意下,網上很多提到的ReturnCallback
(少了個s)介面已經棄用,注釋中也提到了,棄用是為了更好的使用ReturnedMessage
類,因為對象的方式可以更好的支援lambda
表達式。
package com.ldx.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 發生異常時的消息返回提醒
*
* @author ludangxin
* @date 2021/9/11
*/
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {
/**
* Returned message callback.
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息主體: {}", returned.getMessage());
log.info("回復編碼: {}", returned.getReplyCode());
log.info("回復內容: {}", returned.getReplyText());
log.info("交換器: {}", returned.getExchange());
log.info("路由鍵: {}", returned.getRoutingKey());
}
}
2.2.3 RabbitConfig
將RabbitReturnCallback
設置到RabbitTemplate
中。
/**
* bean 初始化後執行
*/
@Override
public void afterPropertiesSet() {
// 設置消息確認回調類
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
// 設置消息回退回調類
rabbitTemplate.setReturnsCallback(new RabbitReturnCallback());
}
2.2.4 測試方法
@Test
public void sentMsg3(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
// 設置一個不存在的routingkey 測試失敗情況
rabbitTemplate.convertAndSend("", "helloRabbitMQ1", "Hello RabbitMQ ~ ", correlationId);
}
2.2.5 啟動測試
# sentMsg()
2021-09-11 22:12:24.079 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=fb471c69-6c7b-48bc-89aa-ae70ac1ed6f8]
2021-09-11 22:12:24.081 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
# sentMsg2()
2021-09-11 22:13:42.910 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=0e3211ee-a1ba-45e4-90f6-296be79def07]
2021-09-11 22:13:42.912 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送失敗: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
# sentMsg3()
2021-09-11 22:14:23.600 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 消息主體: (Body:'Hello RabbitMQ ~ ' MessageProperties [headers={spring_returned_message_correlation=0a8db922-ff7c-4b13-86a3-04957a7359bc}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-09-11 22:14:23.602 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回復編碼: 312
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回復內容: NO_ROUTE
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 交換器:
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 路由鍵: helloRabbitMQ1
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=0a8db922-ff7c-4b13-86a3-04957a7359bc]
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
2.2.6 小節
Return 退回模式
是從Exchange
到Queue
。Return
給了Producer
。Producer
發送的消息即使Routing Key
不正確,當Exchange
接收失敗後直接觸發Confirm Callback
,不會進入到Return Callback
,因為還沒到Exchange
。- 當
Exchange
正確接收消息,但是Routing Key
設置錯誤, 觸發Return Callback
方法。
3. 消息的可靠消費
上文中我們提到了一種消息丟失的情況,即 Consumer
接收到了Message
,Message
相關業務還沒來得及處理,程式報錯或者宕機了,Broker
會認為Consunmer
消息正常消費了,就把當前消息從隊列中移除了。這種情況也算是消息丟失。
那能不能消息消費成功後再將消息從queue中移除呢?
答案肯定是可以的。
3.1 ACK確認機制
ACK指Acknowledge,確認。 表示消費端收到消息後的確認方式。
- 作用:
- 確認消息是否被消費者消費,消息通過ACK機制確認是否被正確接收,每個消息都要被確認。
- 默認情況下,一個消息被消費者正確消費就會從隊列中移除
- ACK確認模式
- AcknowledgeMode.NONE :不確認
- 默認所有消息消費成功,會不斷的向消費者推送消息。
- 因為RabbitMQ認為所有推送的消息已被成功消費,所以推送出去的消息不會暫存在
broker
,消息存在丟失的危險。
- AcknowledgeMode.AUTO:自動確認
- 由spring-rabbit依據消息處理邏輯是否拋出異常自動發送ack(無異常)或nack(異常)到
broker
。 - 使用自動確認模式時,需要考慮的另一件事是消費者過載,因為
broker
會暫存沒有收到ack
的消息,等消費端ack
後才會丟掉;如果收到消費端的nack
(消費失敗的標識)或connection
斷開沒收到回饋,會將消息放回到原隊列頭部,導致消費者反覆的在消費這條消息。
- 由spring-rabbit依據消息處理邏輯是否拋出異常自動發送ack(無異常)或nack(異常)到
- AcknowledgeMode.MANUAL:手動確認
- 手動確認則當消費者調用
ack
、nack
、reject
幾種方法進行確認,手動確認可以在業務失敗後進行一些操作,如果消息未被 ACK 則會發送到下一個消費者。 - 手動確認模式可以使用 prefetch,限制通道上未完成的(「正在進行中的」)發送的數量。也就是
Consumer
一次可以從Broker
取幾條消息。 - 如果忘記進行ACK確認
忘記通過basicAck返回確認資訊是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉後,消息會被退回RabbitMQ伺服器,這會使RabbitMQ伺服器記憶體爆滿,而且RabbitMQ也不會主動刪除這些被退回的消息。只要程式還在運行,沒確認的消息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 消息消費並沒有超時機制,也就是說,程式不重啟,消息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的消息。當程式關閉時(實際只要 消費者 關閉就行),消息會恢復為 Ready 狀態。
- 手動確認則當消費者調用
3.2 配置application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默認的虛擬主機
virtual-host: /
# rabbit 用戶名密碼
username: admin
password: admin123
listener:
simple:
# manual 手動確認
acknowledge-mode: manual
3.3 Consumer
package com.ldx.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消費者
*
* @author ludangxin
* @date 2021/9/12
*/
@Slf4j
@Component
public class RabbitMQListener {
@RabbitListener(queues = "helloRabbitMQ")
public void helloRabbitMq(Message message, Channel channel) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
log.info(messageProperties.toString());
try {
log.info(message.toString());
log.info(new String(message.getBody()));
int a = 1/0;
channel.basicAck(messageProperties.getDeliveryTag(), false);
} catch (Exception e) {
// 當前的消息是否重新投遞的消息,也就是該消息是重新回到隊列里的消息
if (messageProperties.getRedelivered()) {
log.info("消息已重複處理失敗,拒絕再次接收...");
// 拒絕消息
channel.basicReject(messageProperties.getDeliveryTag(), false);
} else {
log.info("消息即將再次返回隊列處理...");
channel.basicNack(messageProperties.getDeliveryTag(), false, true);
}
}
}
}
消費消息有三種回執方法,接下來先看下每個方法參數的含義。
3.3.1 basicAck
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag
:消息投遞的標籤號,每次消費消息或者消息重新投遞後,deliveryTag
都會增加。手動消息確認模式下,我們可以對指定deliveryTag
的消息進行ack
、nack
、reject
等操作。
multiple
:是否批量確認,值為 true
則會一次性 ack
所有小於當前消息 deliveryTag
的消息。
舉個栗子: 假設我先發送三條消息deliveryTag
分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag
為8,multiple
設置為 true,會將5、6、7、8的消息全部進行確認。
3.3.2 basicNack
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
deliveryTag
:表示消息投遞序號。
multiple
:是否批量確認。
requeue
:值為 true
消息將重新入隊列。
3.3.3 basicReject
basicNack
:表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。
/**
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag
:表示消息投遞序號。
requeue
:值為 true
消息將重新入隊列。
3.4 啟動測試
@Test
public void sentMsg() throws IOException {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("","helloRabbitMQ","Hello RabbitMQ111 ~ ", correlationId);
// 為了使進程阻塞
System.in.read();
}
在這裡我們執行sentMsg()
方法,輸出日誌如下:
從日誌資訊中我們可以看出,消息已成功被消費,並且當第一次消費失敗後消息被重新放回了隊列,並進行了再此消費,當再次失敗後則放棄該條消息。
2021-09-12 00:47:03.451 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調id: CorrelationData [id=eb06a986-0e51-464a-8b8c-d2a8271c0008]
2021-09-12 00:47:03.452 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息發送成功
2021-09-12 00:47:04.142 INFO 66160 --- [ntContainer#3-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@75181b50: tags=[[amq.ctag-C1o5ZRm1g0fxX-Q53CCZcw]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://[email protected]:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : 消息即將再次返回隊列處理...
2021-09-12 00:47:04.162 ERROR 66160 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2021-09-12 00:47:05.163 INFO 66160 --- [ntContainer#3-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@37695b29: tags=[[amq.ctag-GMJHJuVr22w1so4vhSp-dQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,8), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://[email protected]:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : 消息已重複處理失敗,拒絕再次接收...
3.5 小節
消費方的ACK機制可以有效的解決消息從Broker
到Consumer
丟失的問題。但也要注意一點:消息的無限消費。
3.6 消息無限消費
如果消費端程式碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯後確認消息, int a = 1 / 0
發生異常後將消息重新投入隊列。
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消費者 2 號收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
但是有個問題是,業務程式碼一旦出現 bug
99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環,CPU被瞬間打滿了,而且rabbitmq management
只有一條未被確認的消息。
經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。
消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反覆進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行,那該怎麼處理呢?
第一種方法:是根據異常類型來選擇是否重新放入隊列。
第二種方法: 先將消息進行應答,此時消息隊列會刪除該條消息,然後通過channel.basicPublish()重新發布這個消息,異常消息就放在了消息隊列尾部,,進而不會影響已經進入隊列的消息處理。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
但這種方法並沒有解決根本問題,錯誤消息還是會時不時報錯,後面優化設置了消息重試次數,達到了重試上限以後,手動確認,隊列刪除此消息,並將消息持久化入MySQL
並推送報警,進行人工處理和定時任務做補償。
4. 總結
4.1 持久化
- Exchange 要持久化 通過
durable
屬性控制,true:持久化, 預設:true。 - queue 要持久化 通過
durable
屬性控制,true:持久化, 預設:true。 - message 要持久化
在springboot環境下,message模式也是持久化。