RabbitMQ核心知識總結!

本文已經收錄到github倉庫,此倉庫用於分享Java相關知識總結,包括Java基礎、MySQL、Spring Boot、MyBatis、Redis、RabbitMQ、計算機網絡、數據結構與算法等等,歡迎大家提pr和star!

github地址://github.com/Tyson0314/Java-learning

如果github訪問不了,可以訪問gitee倉庫。

gitee地址://gitee.com/tysondai/Java-learning

文章目錄:

簡介

RabbitMQ是一個由erlang開發的消息隊列。消息隊列用於應用間的異步協作。

基本概念

Message:由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key、priority、delivery-mode(是否持久性存儲)等。

Publisher:消息的生產者。

Exchange:接收消息並將消息路由到一個或多個Queue。default exchange 是默認的直連交換機,名字為空字符串,每個新建隊列都會自動綁定到默認交換機上,綁定的路由鍵名稱與隊列名稱相同。

Binding:通過Binding將Exchange和Queue關聯,這樣Exchange就知道將消息路由到哪個Queue中。

Queue:存儲消息,隊列的特性是先進先出。一個消息可分發到一個或多個隊列。

Virtual host:每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange和queue。

Broker:消息隊列服務器實體。

什麼時候使用MQ

對於一些不需要立即生效的操作,可以拆分出來,異步執行,使用消息隊列實現。

以常見的訂單系統為例,用戶點擊下單按鈕之後的業務邏輯可能包括:扣減庫存、生成相應單據、發短訊通知。這種場景下就可以用 MQ 。將短訊通知放到 MQ 異步執行,在下單的主流程(比如扣減庫存、生成相應單據)完成之後發送一條消息到 MQ, 讓主流程快速完結,而由另外的線程消費MQ的消息。

優缺點

缺點:使用erlang實現,不利於二次開發和維護;性能較kafka差,持久化消息和ACK確認的情況下生產和消費消息單機吞吐量大約在1-2萬左右,kafka單機吞吐量在十萬級別。

優點:有管理界面,方便使用;可靠性高;功能豐富,支持消息持久化、消息確認機制、多種消息分發機制。

Exchange 類型

Exchange分發消息時根據類型的不同分發策略不同,目前共四種類型:direct、fanout、topic、headers 。headers 模式根據消息的headers進行路由,此外 headers 交換器和 direct 交換器完全一致,但性能差很多。

Exchange規則。

類型名稱 類型描述
fanout 把所有發送到該Exchange的消息路由到所有與它綁定的Queue中
direct Routing Key==Binding Key
topic 模糊匹配
headers Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的header屬性進行匹配。

direct

direct交換機會將消息路由到binding key 和 routing key完全匹配的隊列中。它是完全匹配、單播的模式。

fanout

所有發到 fanout 類型交換機的消息都會路由到所有與該交換機綁定的隊列上去。fanout 類型轉發消息是最快的。

topic

topic交換機使用routing key和binding key進行模糊匹配,匹配成功則將消息發送到相應的隊列。routing key和binding key都是句點號「. 」分隔的字符串,binding key中可以存在兩種特殊字符「*」與「#」,用於做模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞。

headers

headers交換機是根據發送的消息內容中的headers屬性進行路由的。在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。

消息丟失

消息丟失場景:生產者生產消息到RabbitMQ Server消息丟失、RabbitMQ Server存儲的消息丟失和RabbitMQ Server到消費者消息丟失。

消息丟失從三個方面來解決:生產者確認機制、消費者手動確認消息和持久化。

生產者確認機制

生產者發送消息到隊列,無法確保發送的消息成功的到達server。

解決方法:

  1. 事務機制。在一條消息發送之後會使發送端阻塞,等待RabbitMQ的回應,之後才能繼續發送下一條消息。性能差。
  2. 開啟生產者確認機制,只要消息成功發送到交換機之後,RabbitMQ就會發送一個ack給生產者(即使消息沒有Queue接收,也會發送ack)。如果消息沒有成功發送到交換機,就會發送一條nack消息,提示發送失敗。

在 Springboot 是通過 publisher-confirms 參數來設置 confirm 模式:

spring:
    rabbitmq:   
        #開啟 confirm 確認機制
        publisher-confirms: true

在生產端提供一個回調方法,當服務端確認了一條或者多條消息後,生產者會回調這個方法,根據具體的結果對消息進行後續處理,比如重新發送、記錄日誌等。

// 消息是否成功發送到Exchange
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
            log.info("correlationData: " + correlationData);
            log.info("ack: " + ack);
            if(!ack) {
                log.info("異常處理....");
            }
    };

rabbitTemplate.setConfirmCallback(confirmCallback);

路由不可達消息

生產者確認機制只確保消息正確到達交換機,對於從交換機路由到Queue失敗的消息,會被丟棄掉,導致消息丟失。

對於不可路由的消息,有兩種處理方式:Return消息機制和備份交換機。

Return消息機制

Return消息機制提供了回調函數 ReturnCallback,當消息從交換機路由到Queue失敗才會回調這個方法。需要將mandatory 設置為 true ,才能監聽到路由不可達的消息。

spring:
    rabbitmq:
        #觸發ReturnCallback必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發ReturnCallback
        template.mandatory: true

通過 ReturnCallback 監聽路由不可達消息。

    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
rabbitTemplate.setReturnCallback(returnCallback);

當消息從交換機路由到Queue失敗時,會返回 return exchange: , routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE

備份交換機

備份交換機alternate-exchange 是一個普通的exchange,當你發送消息到對應的exchange時,沒有匹配到queue,就會自動轉移到備份交換機對應的queue,這樣消息就不會丟失。

消費者手動消息確認

有可能消費者收到消息還沒來得及處理MQ服務就宕機了,導致消息丟失。因為消息者默認採用自動ack,一旦消費者收到消息後會通知MQ Server這條消息已經處理好了,MQ 就會移除這條消息。

解決方法:消費者設置為手動確認消息。消費者處理完邏輯之後再給broker回復ack,表示消息已經成功消費,可以從broker中刪除。當消息者消費失敗的時候,給broker回復nack,根據配置決定重新入隊還是從broker移除,或者進入死信隊列。只要沒收到消費者的 acknowledgment,broker 就會一直保存着這條消息,但不會 requeue,也不會分配給其他 消費者。

消費者設置手動ack:

#設置消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息處理完,手動確認:

    @RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //手工ack;第二個參數是multiple,設置為true,表示deliveryTag序列號之前(包括自身)的消息都已經收到,設為false則表示收到一條消息
        channel.basicAck(deliveryTag, true);
        System.out.println("mail listener receive: " + new String(message.getBody()));
    }

當消息消費失敗時,消費端給broker回復nack,如果consumer設置了requeue為false,則nack後broker會刪除消息或者進入死信隊列,否則消息會重新入隊。

持久化

如果RabbitMQ服務異常導致重啟,將會導致消息丟失。RabbitMQ提供了持久化的機制,將內存中的消息持久化到硬盤上,即使重啟RabbitMQ,消息也不會丟失。

消息持久化需要滿足以下條件:

  1. 消息設置持久化。發佈消息前,設置投遞模式delivery mode為2,表示消息需要持久化。
  2. Queue設置持久化。
  3. 交換機設置持久化。

當發佈一條消息到交換機上時,Rabbit會先把消息寫入持久化日誌,然後才向生產者發送響應。一旦從隊列中消費了一條消息的話並且做了確認,RabbitMQ會在持久化日誌中移除這條消息。在消費消息前,如果RabbitMQ重啟的話,服務器會自動重建交換機和隊列,加載持久化日誌中的消息到相應的隊列或者交換機上,保證消息不會丟失。

鏡像隊列

當MQ發生故障時,會導致服務不可用。引入RabbitMQ的鏡像隊列機制,將queue鏡像到集群中其他的節點之上。如果集群中的一個節點失效了,能自動地切換到鏡像中的另一個節點以保證服務的可用性。

通常每一個鏡像隊列都包含一個master和多個slave,分別對應於不同的節點。發送到鏡像隊列的所有消息總是被直接發送到master和所有的slave之上。除了publish外所有動作都只會向master發送,然後由master將命令執行的結果廣播給slave,從鏡像隊列中的消費操作實際上是在master上執行的。

重複消費

消息重複的原因有兩個:1.生產時消息重複,2.消費時消息重複。

生產者發送消息給MQ,在MQ確認的時候出現了網絡波動,生產者沒有收到確認,這時候生產者就會重新發送這條消息,導致MQ會接收到重複消息。

消費者消費成功後,給MQ確認的時候出現了網絡波動,MQ沒有接收到確認,為了保證消息不丟失,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。由於重複消息是由於網絡原因造成的,無法避免。

解決方法:發送消息時讓每個消息攜帶一個全局的唯一ID,在消費消息時先判斷消息是否已經被消費過,保證消息消費邏輯的冪等性。具體消費過程為:

  1. 消費者獲取到消息後先根據id去查詢redis/db是否存在該消息
  2. 如果不存在,則正常消費,消費完畢後寫入redis/db
  3. 如果存在,則證明消息被消費過,直接丟棄

消費端限流

當 RabbitMQ 服務器積壓大量消息時,隊列里的消息會大量湧入消費端,可能導致消費端服務器奔潰。這種情況下需要對消費端限流。

Spring RabbitMQ 提供參數 prefetch 可以設置單個請求處理的消息個數。如果消費者同時處理的消息到達最大值的時候,則該消費者會阻塞,不會消費新的消息,直到有消息 ack 才會消費新的消息。

開啟消費端限流:

#在單個請求中處理的消息個數,unack的最大數量
spring.rabbitmq.listener.simple.prefetch=2

原生 RabbitMQ 還提供 prefetchSize 和 global 兩個參數。Spring RabbitMQ沒有這兩個參數。

//單條消息大小限制,0代表不限制
//global:限制限流功能是channel級別的還是consumer級別。當設置為false,consumer級別,限流功能生效,設置為true沒有了限流功能,因為channel級別尚未實現。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

死信隊列

消費失敗的消息存放的隊列。

消息消費失敗的原因:

  • 消息被拒絕並且消息沒有重新入隊(requeue=false)
  • 消息超時未消費
  • 達到最大隊列長度

設置死信隊列的 exchange 和 queue,然後進行綁定:

	@Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue(RabbitMqConfig.DLX_QUEUE, true);
    }

    @Bean
    public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE);
    }

在普通隊列加上兩個參數,綁定普通隊列到死信隊列。當消息消費失敗時,消息會被路由到死信隊列。

    @Bean
    public Queue sendSmsQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 綁定該隊列到私信交換機
        arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);
        return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments);
    }

生產者完整代碼:

@Component
@Slf4j
public class MQProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    RandomUtil randomUtil;

    @Autowired
    UserService userService;

    final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
            log.info("correlationData: " + correlationData);
            log.info("ack: " + ack);
            if(!ack) {
                log.info("異常處理....");
            }
    };


    final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
            log.info("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);

    public void sendMail(String mail) {
        //貌似線程不安全 範圍100000 - 999999
        Integer random = randomUtil.nextInt(100000, 999999);
        Map<String, String> map = new HashMap<>(2);
        String code = random.toString();
        map.put("mail", mail);
        map.put("code", code);

        MessageProperties mp = new MessageProperties();
        //在生產環境中這裡不用Message,而是使用 fastJson 等工具將對象轉換為 json 格式發送
        Message msg = new Message("tyson".getBytes(), mp);
        msg.getMessageProperties().setExpiration("3000");
        //如果消費端要設置為手工 ACK ,那麼生產端發送消息的時候一定發送 correlationData ,並且全局唯一,用以唯一標識消息。
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());

        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);

        //存入redis
        userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT);
    }
}

消費者完整代碼:

@Slf4j
@Component
public class DeadListener {

    @RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //手工ack
        channel.basicAck(deliveryTag,false);
        System.out.println("receive--1: " + new String(message.getBody()));
    }
}

當普通隊列中有死信時,RabbitMQ 就會自動的將這個消息重新發佈到設置的死信交換機去,然後被路由到死信隊列。可以監聽死信隊列中的消息做相應的處理。

其他

pull模式

pull模式主要是通過channel.basicGet方法來獲取消息,示例代碼如下:

GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

消息過期時間

在生產端發送消息的時候可以給消息設置過期時間,單位為毫秒(ms)

Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");

也可以在創建隊列的時候指定隊列的ttl,從消息入隊列開始計算,超過該時間的消息將會被移除。

參考鏈接

RabbitMQ基礎

Springboot整合RabbitMQ

RabbitMQ之消息持久化

RabbitMQ發送郵件代碼

線上rabbitmq問題

最後給大家分享一個github倉庫,上面放了200多本經典的計算機書籍,包括C語言、C++、Java、Python、前端、數據庫、操作系統、計算機網絡、數據結構和算法、機器學習、編程人生等,可以star一下,下次找書直接在上面搜索,倉庫持續更新中~

github地址://github.com/Tyson0314/java-books

如果github訪問不了,可以訪問gitee倉庫。

gitee地址://gitee.com/tysondai/java-books

Tags: