【RabbitMQ】如何進行消息可靠投遞【上篇】
- 2019 年 10 月 3 日
- 筆記
說明
前幾天,突然發生線上報警,釘釘連發了好幾條消息,一看是RabbitMQ相關的消息,心頭一緊,難道翻車了?
[橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日誌異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]觸發。 應用xxx 可能原因如下 服務名為: 異常為:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620 產生原因如下: 1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it. ||Consumer received fatal=false exception on startup: ... 應用xxx 可能原因如下 服務名為: 異常為:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160 產生原因如下: 1.Stopping container from aborted consumer||Stopping container from aborted consumer:
定睛一看,看樣子像是消費者莫名其妙斷開了連接,正逢公司搬家之際,難道是機房又雙叒叕。。。。斷電了?於是趕緊聯繫了運維,諮詢RabbitMQ是否發生了調整。幾分鐘後,得到了運維的回復,由於一些不可描述的原因,RabbitMQ進行了重啟,emmmm,雖然重啟只持續了10分鐘,但是導致該集群下所有消費者都掛了,需要將項目重啟後才能正常進行消費。
項目重啟後,一切似乎又正常運轉起來,但好景不長,沒過多久,工單就找上了門來,經過排查,發現是生產者在RabbitMQ重啟期間消息投遞失敗,導致消息丟失,需要手動處理和恢復。
於是,我開始思考,如何才能進行RabbitMQ的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ集群不可用的時候,無法投遞的消息該如何處理呢?
可靠投遞
先來說明一個概念,什麼是可靠投遞呢?在RabbitMQ中,一個消息從生產者發送到RabbitMQ伺服器,需要經歷這麼幾個步驟:
- 生產者準備好需要投遞的消息。
- 生產者與RabbitMQ伺服器建立連接。
- 生產者發送消息。
- RabbitMQ伺服器接收到消息,並將其路由到指定隊列。
- RabbitMQ伺服器發起回調,告知生產者消息發送成功。
所謂可靠投遞,就是確保消息能夠百分百從生產者發送到伺服器。
為了避免爭議,補充說明一下,如果沒有設置Mandatory參數,是不需要先路由消息才發起回調的,伺服器收到消息後就會進行回調確認。
2、3、5步都是通過TCP連接進行交互,有網路調用的地方就會有事故,網路波動隨時都有可能發生,不管是內部機房停電,還是外部光纜被切,網路事故無法預測,雖然這些都是小概率事件,但對於訂單等敏感數據處理來說,這些情況下導致消息丟失都是不可接受的。
RabbitMQ中的消息可靠投遞
默認情況下,發送消息的操作是不會返回任何資訊給生產者的,也就是說,默認情況下生產者是不知道消息有沒有正確地到達伺服器。
那麼如何解決這個問題呢?
對此,RabbitMQ中有一些相關的解決方案:
- 使用事務機制來讓生產者感知消息被成功投遞到伺服器。
- 通過生產者確認機制實現。
在RabbitMQ中,所有確保消息可靠投遞的機制都會對性能產生一定影響,如使用不當,可能會對吞吐量造成重大影響,只有通過執行性能基準測試,才能在確定性能與可靠投遞之間的平衡。
在使用可靠投遞前,需要先思考以下問題:
- 消息發布時,保證消息進入隊列的重要性有多高?
- 如果消息無法進行路由,是否應該將該消息返回給發布者?
- 如果消息無法被路由,是否應該將其發送到其他地方稍後再重新進行路由?
- 如果RabbitMQ伺服器崩潰了,是否可以接受消息丟失?
- RabbitMQ在處理新消息時是否應該確認它已經為發布者執行了所有請求的路由和持久化?
- 消息發布者是否可以批量投遞消息?
- 在可靠投遞上是否有可以接受的平衡性?是否可以接受一部分的不可靠性來提升性能?
只考慮平衡性不考慮性能是不行的,至於這個平衡的度具體如何把握,就要具體情況具體分析了,比如像訂單數據這樣敏感的資訊,對可靠性的要求自然要比一般的業務消息對可靠性的要求高的多,因為訂單數據是跟錢直接相關的,可能會導致直接的經濟損失。
所以不僅應該知道有哪些保證消息可靠性的解決方案,還應該知道每種方案對性能的影響程度,以此來進行方案的選擇。
RabbitMQ的事務機制
RabbitMQ是支援AMQP事務機制的,在生產者確認機制之前,事務是確保消息被成功投遞的唯一方法。
在SpringBoot項目中,使用RabbitMQ事務其實很簡單,只需要聲明一個事務管理的Bean,並將RabbitTemplate的事務設置為true即可。
配置文件如下:
spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual
先來配置一下交換機和隊列,以及事務管理器。
@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue"; // 聲明業務Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 聲明業務隊列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build(); } // 聲明業務隊列綁定關係 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } /** * 配置啟用rabbitmq事務 * @param connectionFactory * @return */ @Bean public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } }
然後創建一個消費者,來監聽消息,用以判斷消息是否成功發送。
@Slf4j @Component public class BusinessMsgConsumer { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到業務消息:{}", msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } }
然後是消息生產者:
@Slf4j @Component public class BusinessMsgProducer{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setChannelTransacted(true); } @Transactional public void sendMsg(String msg) { rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg); log.info("msg:{}", msg); if (msg != null && msg.contains("exception")) throw new RuntimeException("surprise!"); log.info("消息已發送 {}" ,msg); } }
這裡有兩個注意的地方:
- 在初始化方法里,通過使用
rabbitTemplate.setChannelTransacted(true);
來開啟事務。 - 在發送消息的方法上加上
@Transactional
註解,這樣在該方法中發生異常時,消息將不會發送。
在controller中加一個介面來生產消息:
@RestController public class BusinessController { @Autowired private BusinessMsgProducer producer; @RequestMapping("send") public void sendMsg(String msg){ producer.sendMsg(msg); } }
來驗證一下:
msg:1 消息已發送 1 收到業務消息:1 msg:2 消息已發送 2 收到業務消息:2 msg:3 消息已發送 3 收到業務消息:3 msg:exception Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause java.lang.RuntimeException: surprise! at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30) ...
當 msg
的值為 exception
時, 在調用rabbitTemplate.convertAndSend
方法之後,程式拋出了異常,消息並沒有發送出去,而是被當前事務回滾了。
當然,你可以將事務管理器注釋掉,或者將初始化方法的開啟事務注釋掉,這樣事務就不會生效,即使在調用了發送消息方法之後,程式發生了異常,消息也會被正常發送和消費。
RabbitMQ中的事務使用起來雖然簡單,但是對性能的影響是不可忽視的,因為每次事務的提交都是阻塞式的等待伺服器處理返回結果,而默認模式下,客戶端是不需要等待的,直接發送就完事了,除此之外,事務消息需要比普通消息多4次與伺服器的交互,這就意味著會佔用更多的處理時間,所以如果對消息處理速度有較高要求時,盡量不要採用事務機制。
RabbitMQ的生產者確認機制
RabbitMQ中的生產者確認功能是AMQP規範的增強功能,當生產者發布給所有隊列的已路由消息被消費者應用程式直接消費時,或者消息被放入隊列並根據需要進行持久化時,一個Basic.Ack請求會被發送到生產者,如果消息無法路由,代理伺服器將發送一個Basic.Nack RPC請求用於表示失敗。然後由生產者決定該如何處理該消息。
也就是說,通過生產者確認機制,生產者可以在消息被伺服器成功接收時得到回饋,並有機會處理未被成功接收的消息。
在Springboot中開啟RabbitMQ的生產者確認模式也很簡單,只多了一行配置:
spring: rabbitmq: host: localhost password: guest username: guest listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual publisher-confirms: true
publisher-confirms: true
即表示開啟生產者確認模式。
然後將消息生產者的代表進行部分修改:
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息確認成功, id:{}", id); } else { log.error("消息未成功投遞, id:{}, cause:{}", id, s); } } }
讓生產者繼承自RabbitTemplate.ConfirmCallback
類,然後實現其confirm
方法,即可用其接收伺服器回調。
需要注意的是,在發送消息時,程式碼也進行了調整:
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
這裡我們為消息設置了消息ID,以便在回調時通過該ID來判斷是對哪個消息的回調,因為在回調函數中,我們是無法直接獲取到消息內容的,所以需要將消息先暫存起來,根據消息的重要程度,可以考慮使用本地快取,或者存入Redis中,或者Mysql中,然後在回調時更新其狀態或者從快取中移除,最後使用定時任務對一段時間內未發送的消息進行重新投遞。
以下是我盜來的圖,原諒我偷懶不想畫了[手動狗頭]:
另外,還需要注意的是,如果將消息發布到不存在的交換機上,那麼發布用的信道將會被RabbitMQ關閉。
此外,生產者確認機制跟事務是不能一起工作的,是事務的輕量級替代方案。因為事務和發布者確認模式都是需要先跟伺服器協商,對信道啟用的一種模式,不能對同一個信道同時使用兩種模式。
在生產者確認模式中,消息的確認可以是非同步和批量的,所以相比使用事務,性能會更好。
使用事務機制和生產者確認機制都能確保消息被正確的發送至RabbitMQ,這裡的「正確發送至RabbitMQ」說的是消息成功被交換機接收,但如果找不到能接收該消息的隊列,這條消息也會丟失。至於如何處理那些無法被投遞到隊列的消息,將會在下篇進行說明。
結題
所以當公司機房「斷電」時,如何處理那些需要發送的消息呢?相信看完上文之後,你的心中已經有了答案。
一般來說,這種「斷電」不會持續較長時間,一般幾分鐘到半小時之間,很快能夠恢復,所以如果是重要消息,可以保存到資料庫中,如果是非重要消息,可以使用redis進行保存,當然,還要根據消息的數量級來進行判斷。
如果消息量比較大,可以考慮將消息發送到另一個集群的死信隊列中,事實上,所在公司就有兩個RabbitMQ集群,所以當一個集群不可用時,可以往另一個集群發消息,emmm,如果兩個機房都停電了的話,當我沒說。