RabbitMQ實現延時消息的兩種方法

RabbitMQ實現延時消息的兩種方法

1、死信隊列

1.1消息什麼時候變為死信(dead-letter)

  1. 消息被否定接收,消費者使用basic.reject 或者 basic.nack並且requeue 重回隊列屬性設為false。
  2. 消息在隊列里得時間超過了該消息設置的過期時間(TTL)。
  3. 消息隊列到達了它的最大長度,之後再收到的消息。

1.2死信隊列的原理

當一個消息再隊列里變為死信時,它會被重新publish到另一個exchange交換機上,這個exchange就為DLX。因此我們只需要在聲明正常的業務隊列時添加一個可選的”x-dead-letter-exchange”參數,值為死信交換機,死信就會被rabbitmq重新publish到配置的這個交換機上,我們接着監聽這個交換機就可以了。

1.3 代碼實現

  1. 引入amqp依賴
  2. 聲明交換機,隊列
package com.lank.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

/**
 * @author lank
 * @since 2020/12/14 10:44
 */
@Configuration
public class RabbitmqConfig {

    //死信交換機,隊列,路由相關配置
    public static final String DLK_EXCHANGE = "dlk.exchange";
    public static final String DLK_ROUTEKEY = "dlk.routeKey";
    public static final String DLK_QUEUE = "dlk.queue";

    //業務交換機,隊列,路由相關配置
    public static final String DEMO_EXCHANGE = "demo.exchange";
    public static final String DEMO_QUEUE = "demo.queue";
    public static final String DEMO_ROUTEKEY = "demo.routeKey";

    //延時插件DelayedMessagePlugin的交換機,隊列,路由相關配置
    public static final String DMP_EXCHANGE = "dmp.exchange";
    public static final String DMP_ROUTEKEY = "dmp.routeKey";
    public static final String DMP_QUEUE = "dmp.queue";

    @Bean
    public DirectExchange demoExchange(){
        return new DirectExchange(DEMO_EXCHANGE,true,false);
    }

    @Bean
    public Queue demoQueue(){
        //只需要在聲明業務隊列時添加x-dead-letter-exchange,值為死信交換機
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",DLK_EXCHANGE);
        //該參數x-dead-letter-routing-key可以修改該死信的路由key,不設置則使用原消息的路由key
        map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
        return new Queue(DEMO_QUEUE,true,false,false,map);
    }

    @Bean
    public Binding demoBind(){
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
    }

    @Bean
    public DirectExchange dlkExchange(){
        return new DirectExchange(DLK_EXCHANGE,true,false);
    }

    @Bean
    public Queue dlkQueue(){
        return new Queue(DLK_QUEUE,true,false,false);
    }

    @Bean
    public Binding dlkBind(){
        return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
    }


    //延遲插件使用
    //1、聲明一個類型為x-delayed-message的交換機
    //2、參數添加一個x-delayed-type值為交換機的類型用於路由key的映射
    @Bean
    public CustomExchange dmpExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    @Bean
    public Queue dmpQueue(){
        return new Queue(DMP_QUEUE,true,false,false);
    }

    @Bean
    public Binding dmpBind(){
        return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
    }
    

}
  1. 聲明一個類用於發送帶過期時間的消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author lank
 * @since 2020/12/14 10:33
 */
@Component
@Slf4j
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //使用死信隊列發送消息方法封裝
    public void send(String message,Integer time){
        String ttl = String.valueOf(time*1000);
        //exchange和routingKey都為業務的就可以,只需要設置消息的過期時間
        rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //設置消息的過期時間,是以毫秒為單位的
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        log.info("使用死信隊列消息:{}發送成功,過期時間:{}秒。",message,time);
    }

    //使用延遲插件發送消息方法封裝
    public void send2(String message,Integer time){
        rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //使用延遲插件只需要在消息的header中添加x-delay屬性,值為過期時間,單位毫秒
                message.getMessageProperties().setHeader("x-delay",time*1000);
                return message;
            }
        });
        log.info("使用延遲插件發送消息:{}發送成功,過期時間:{}秒。",message,time);
    }
}
  1. 編寫一個類用於消費消息
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author lank
 * @since 2020/12/15 15:57
 */

@Component
@Slf4j
public class MessageReceiver {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
    public void onMessage(Message message){
        log.info("使用死信隊列,收到消息:{}",new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
    public void onMessage2(Message message){
        log.info("使用延遲插件,收到消息:{}",new String(message.getBody()));
    }
}
  1. 編寫Controller調用發送消息方法測試結果
package com.lank.demo.controller;
import com.lank.demo.rabbitmq.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author lank
 * @since 2020/12/14 17:05
 */
@RestController
public class MessageController {

    @Autowired
    public MessageSender messageSender;

    //死信隊列controller
    @GetMapping("/send")
    public String send(@RequestParam String msg,Integer time){
        messageSender.send(msg,time);
        return "ok";
    }

    //延遲插件controller
    @GetMapping("/send2")
    public String sendByPlugin(@RequestParam String msg,Integer time){
        messageSender.send2(msg,time);
        return "ok";
    }

}
  1. 配置文件application.properties
server.port=4399
#virtual-host使用默認的/就好,如果需要/demo需自己在控制台添加
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  1. 啟動項目,打開rabbitmq控制台,可以看到交換機和隊列已經創建好。
    image
    rabbitmq2
  2. 在瀏覽器中請求//localhost:4399/send?msg=hello&time=5,從控制台的輸出來看,剛好5s後接收到消息。
2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:hello發送成功,過期時間:5秒。
2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:hello

1.4死信隊列的一個小坑

當我往死信隊列中發送兩條不同過期時間的消息時,如果先發送的消息A的過期時間大於後發送的消息B的過期時間時,由於消息的順序消費,消息B過期後並不會立即重新publish到死信交換機,而是會等到消息A過期後一起被消費。

依次發送兩個請求//localhost:4399/send?msg=消息A&time=30和//localhost:4399/send?msg=消息B&time=10,消息A先發送,過期時間30S,消息B後發送,過期時間10S,我們想要的結果應該是10S收到消息B,30S後收到消息A,但結果並不是,控制台輸出如下:

2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:消息A發送成功,過期時間:30秒。
2020-12-16 22:54:54.278  INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用死信隊列消息:消息B發送成功,過期時間:10秒。
2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:消息A
2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver           : 使用死信隊列,收到消息:消息B

消息A30S後被成功消費,緊接着消息B被消費。因此當我們使用死信隊列時應該注意是否消息的過期時間都是一樣的,比如訂單超過10分鐘未支付修改其狀態。如果當一個隊列各個消息的過期時間不一致時,使用死信隊列就可能達不到延時的作用。這時候我們可以使用延時插件來實現這需求。

2 、延時插件

RabbitMQ Delayed Message Plugin是一個rabbitmq的插件,所以使用前需要安裝它,可以參考的GitHub地址://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2.1如何實現

  1. 安裝好插件後只需要聲明一個類型type為”x-delayed-message”的exchange,並且在其可選參數下配置一個key為”x-delayed-typ”,值為交換機類型(topic/direct/fanout)的屬性。
  2. 聲明一個隊列綁定到該交換機
  3. 在發送消息的時候消息的header里添加一個key為”x-delay”,值為過期時間的屬性,單位毫秒。
  4. 代碼就在上面,配置類為DMP開頭的,發送消息的方法為send2()。
  5. 啟動後在rabbitmq控制台可以看到一個類型為x-delayed-message的交換機。
    rabbitmq3
    rabbitmq4
  6. 繼續在瀏覽器中發送兩個請求//localhost:4399/send2?msg=消息A&time=30和//localhost:4399/send2?msg=消息B&time=10,控制台輸出如下,不會出現死信隊列出現的問題:
2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延遲插件發送消息:消息A發送成功,過期時間:30秒。
2020-12-16 23:31:27.673  INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : 使用延遲插件發送消息:消息B發送成功,過期時間:10秒。
2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延遲插件,收到消息:消息B
2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver           : 使用延遲插件,收到消息:消息A

死信交換機官網介紹://www.rabbitmq.com/dlx.html
延時插件GitHub://github.com/rabbitmq/rabbitmq-delayed-message-exchange