基於rabbitmq延遲插件實現分散式延遲任務

承接上文基於redis,redisson的延遲隊列實踐,今天介紹下基於rabbitmq延遲插件rabbitmq_delayed_message_exchange實現延遲任務。

一、延遲任務的使用場景

1、下單成功,30分鐘未支付。支付超時,自動取消訂單

2、訂單簽收,簽收後7天未進行評價。訂單超時未評價,系統默認好評

3、下單成功,商家5分鐘未接單,訂單取消

4、配送超時,推送簡訊提醒

5、三天會員試用期,三天到期後準時準點通知用戶,試用產品到期了

……

對於延時比較長的場景、實時性不高的場景,我們可以採用任務調度的方式定時輪詢處理。如:xxl-job。

 

今天我們講解延遲隊列的實現方式,而延遲隊列有很多種實現方式,普遍會採用如下等方式,如:

  • 1.如基於RabbitMQ的隊列ttl+死信路由策略:通過設置一個隊列的超時未消費時間,配合死信路由策略,到達時間未消費後,回會將此消息路由到指定隊列
  • 2.基於RabbitMQ延遲隊列插件(rabbitmq-delayed-message-exchange):發送消息時通過在請求頭添加延時參數(headers.put(“x-delay”, 5000))即可達到延遲隊列的效果。(順便說一句阿里雲的收費版rabbitMQ當前可支援一天以內的延遲消息),局限性:目前該插件的當前設計並不真正適合包含大量延遲消息(例如數十萬或數百萬)的場景,詳情參見 #/issues/72 另外該插件的一個可變性來源是依賴於 Erlang 計時器,在系統中使用了一定數量的長時間計時器之後,它們開始爭用調度程式資源。
  1. 3.使用redis的zset有序性,輪詢zset中的每個元素,到點後將內容遷移至待消費的隊列,(redisson已有實現)
  • 4.使用redis的key的過期通知策略,設置一個key的過期時間為延遲時間,過期後通知客戶端(此方式依賴redis過期檢查機制key多後延遲會比較嚴重;Redis的pubsub不會被持久化,伺服器宕機就會被丟棄)。

二、組件安裝

安裝rabbitMQ需要依賴erlang語言環境,所以需要我們下載erlang的環境安裝程式。網上有很多安裝教程,這裡不再貼圖累述,需要注意的是:該延遲插件支援的版本匹配。

插件Git官方地址://github.com/rabbitmq/rabbitmq-delayed-message-exchange

 

 

當你成功安裝好插件後運行起rabbitmq管理後台,在新建exchange里就可以看到type類型中多出了這個選項

三、RabbitMQ延遲隊列插件的延遲隊列實現

1、基本原理

 

 

  通過 x-delayed-message 聲明的交換機,它的消息在發布之後不會立即進入隊列,先將消息保存至 Mnesia(一個分散式資料庫管理系統,適合於電信和其它需要持續運行和具備軟實時特性的 Erlang 應用。目前資料介紹的不是很多)

  這個插件將會嘗試確認消息是否過期,首先要確保消息的延遲範圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設置的範圍為 (2^32)-1 毫秒),如果消息過期通過 x-delayed-type 類型標記的交換機投遞至目標隊列,整個消息的投遞過程也就完成了。

2、核心組件開發走起

引入maven依賴

 <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

application.yml簡單配置

  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /

 

RabbitMqConfig配置文件

package com.example.code.bot_monomer.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: shf description: date: 2022/1/5 15:00
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 普通
     */
    public static final String EXCHANGE_NAME = "test_exchange";
    public static final String QUEUE_NAME = "test001_queue";
    public static final String NEW_QUEUE_NAME = "test002_queue";
    /**
     * 延遲
     */
    public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
    public static final String DELAY_QUEUE_NAME = "delay001_queue";
    public static final String DELAY_QUEUE_ROUT_KEY = "key001_delay";
    //由於阿里rabbitmq增加隊列要額外收費,現改為各業務延遲任務共同使用一個queue:delay001_queue
    //public static final String NEW_DELAY_QUEUE_NAME = "delay002_queue";

    
    @Bean
    public CustomExchange delayMessageExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //自定義交換機
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayMessageQueue() {
        return new Queue(DELAY_QUEUE_NAME, true, false, false);
    }

    @Bean
    public Binding bindingDelayExchangeAndQueue(Queue delayMessageQueue, Exchange delayMessageExchange) {
        return new Binding(DELAY_QUEUE_NAME, Binding.DestinationType.QUEUE, DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUT_KEY, null);
        //return BindingBuilder.bind(delayMessageQueue).to(delayMessageExchange).with("key001_delay").noargs();
    }
    
    /**
     * 交換機
     */
    @Bean
    public Exchange orderExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        //return new TopicExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 隊列
     */
    @Bean
    public Queue orderQueue() {
        //return QueueBuilder.durable(QUEUE_NAME).build();
        return new Queue(QUEUE_NAME, true, false, false, null);
    }

    /**
     * 隊列
     */
    @Bean
    public Queue orderQueue1() {
        //return QueueBuilder.durable(NEW_QUEUE_NAME).build();
        return new Queue(NEW_QUEUE_NAME, true, false, false, null);
    }

    /**
     * 交換機和隊列綁定關係
     */
    @Bean
    public Binding orderBinding(Queue orderQueue, Exchange orderExchange) {
        //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs();
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null);
    }

    /**
     * 交換機和隊列綁定關係
     */
    @Bean
    public Binding orderBinding1(Queue orderQueue1, Exchange orderExchange) {
        //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs();
        return new Binding(NEW_QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null);
    }

}

MqDelayQueueEnum枚舉類

package com.example.code.bot_monomer.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * @author: shf description: 延遲隊列業務枚舉類
 * date: 2021/8/27 14:03
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum MqDelayQueueEnum {
    /**
     * 業務0001
     */
    YW0001("yw0001", "測試0001", "yw0001"),
    /**
     * 業務0002
     */
    YW0002("yw0002", "測試0002", "yw0002");

    /**
     * 延遲隊列業務區分唯一Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延遲隊列具體業務實現的 Bean 可通過 Spring 的上下文獲取
     */
    private String beanId;

    public static String getBeanIdByCode(String code) {
        for (MqDelayQueueEnum queueEnum : MqDelayQueueEnum.values()) {
            if (queueEnum.code.equals(code)) {
                return queueEnum.beanId;
            }
        }
        return null;
    }
}

 

模板介面處理類:MqDelayQueueHandle

package com.example.code.bot_monomer.service.mqDelayQueue;

/**
 * @author: shf description: RabbitMQ延遲隊列方案處理介面
 * date: 2022/1/10 10:46
 */
public interface MqDelayQueueHandle<T> {

    void execute(T t);
}

 

具體業務實現處理類

@Slf4j
@Component("yw0001")
public class MqTaskHandle01 implements MqDelayQueueHandle<String> {

    @Override
    public void execute(String s) {
        log.info("MqTaskHandle01.param=[{}]",s);
        //TODO
    }
}

 

注意:@Component(“yw0001“) 要和業務枚舉類MqDelayQueueEnum中對應的beanId保持一致。

統一消息體封裝類

/**
 * @author: shf description: date: 2022/1/10 10:51
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MqDelayMsg<T> {

    /**
     * 業務區分唯一key
     */
    @NonNull
    String businessCode;

    /**
     * 消息內容
     */
    @NonNull
    T content;
}

 

統一消費分發處理Consumer

package com.example.code.bot_monomer.service.mqConsumer;

import com.alibaba.fastjson.JSONObject;
import com.example.code.bot_monomer.config.common.MqDelayMsg;
import com.example.code.bot_monomer.enums.MqDelayQueueEnum;
import com.example.code.bot_monomer.service.mqDelayQueue.MqDelayQueueHandle;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;


/**
 * @author: shf description: date: 2022/1/5 15:12
 */
@Slf4j
@Component
//@RabbitListener(queues = "test001_queue")
@RabbitListener(queues = "delay001_queue")
public class TestConsumer {

    @Autowired
    ApplicationContext context;

    /**
     * RabbitHandler 會自動匹配 消息類型(消息自動確認)
     *
     * @param msgStr
     * @param message
     */
    @RabbitHandler
    public void taskHandle(String msgStr, Message message) {
        try {
            MqDelayMsg msg = JSONObject.parseObject(msgStr, MqDelayMsg.class);
            log.info("TestConsumer.taskHandle:businessCode=[{}],deliveryTag=[{}]", msg.getBusinessCode(), message.getMessageProperties().getDeliveryTag());
            String beanId = MqDelayQueueEnum.getBeanIdByCode(msg.getBusinessCode());
            if (StringUtils.isNotBlank(beanId)) {
                MqDelayQueueHandle<Object> handle = (MqDelayQueueHandle<Object>) context.getBean(beanId);
                handle.execute(msg.getContent());
            } else {
                log.warn("TestConsumer.taskHandle:MQ延遲任務不存在的beanId,businessCode=[{}]", msg.getBusinessCode());
            }
        } catch (Exception e) {
            log.error("TestConsumer.taskHandle:MQ延遲任務Handle異常:", e);
        }
    }
}

 

最後簡單封裝個工具類

package com.example.code.bot_monomer.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.code.bot_monomer.config.RabbitMQConfig;
import com.example.code.bot_monomer.config.common.MqDelayMsg;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Objects;

import lombok.extern.slf4j.Slf4j;

/**
 * @author: shf description: MQ分散式延遲隊列工具類 date: 2022/1/10 15:20
 */
@Slf4j
@Component
public class MqDelayQueueUtil {

    @Autowired
    private RabbitTemplate template;

    @Value("${mqdelaytask.limit.days:2}")
    private Integer mqDelayLimitDays;

    /**
     * 添加延遲任務
     *
     * @param bindId 業務綁定ID,用於關聯具體消息
     * @param businessCode 業務區分唯一標識
     * @param content      消息內容
     * @param delayTime    設置的延遲時間 單位毫秒
     * @return 成功true;失敗false
     */
    public boolean addDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) {
        log.info("MqDelayQueueUtil.addDelayQueueTask:bindId={},businessCode={},delayTime={},content={}", bindId, businessCode, delayTime, JSON.toJSONString(content));
        if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) {
            return false;
        }
        try {
            //TODO 延時時間大於2天的先加入資料庫表記錄,後由定時任務每天拉取2次將低於2天的延遲記錄放入MQ中等待到期執行
            if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) {
                //TODO
            } else {
                this.template.convertAndSend(
                    RabbitMQConfig.DELAY_EXCHANGE_NAME,
                    RabbitMQConfig.DELAY_QUEUE_ROUT_KEY,
                    JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()),
                    message -> {
                        //注意這裡時間可使用long類型,毫秒單位,設置header
                        message.getMessageProperties().setHeader("x-delay", delayTime);
                        return message;
                    }
                );
            }
        } catch (Exception e) {
            log.error("MqDelayQueueUtil.addDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e);
            return false;
        }
        return true;
    }

    /**
     * 撤銷延遲消息
     * @param bindId 業務綁定ID,用於關聯具體消息
     * @param businessCode 業務區分唯一標識
     * @return 成功true;失敗false
     */
    public boolean cancelDelayQueueTask(@NonNull String bindId, @NonNull String businessCode) {
        if (StringUtils.isAnyBlank(bindId,businessCode)) {
            return false;
        }
        try {
            //TODO 查詢DB,如果消息還存在即可刪除
        } catch (Exception e) {
            log.error("MqDelayQueueUtil.cancelDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e);
            return false;
        }
        return true;
    }

    /**
     * 修改延遲消息
     * @param bindId 業務綁定ID,用於關聯具體消息
     * @param businessCode 業務區分唯一標識
     * @param content      消息內容
     * @param delayTime    設置的延遲時間 單位毫秒
     * @return 成功true;失敗false
     */
    public boolean updateDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) {
        if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) {
            return false;
        }
        try {
            //TODO 查詢DB,消息不存在返回false,存在判斷延遲時長入庫或入mq
            //TODO 延時時間大於2天的先加入資料庫表記錄,後由定時任務每天拉取2次將低於2天的延遲記錄放入MQ中等待到期執行
            if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >= mqDelayLimitDays) {
                //TODO
            } else {
                this.template.convertAndSend(
                    RabbitMQConfig.DELAY_EXCHANGE_NAME,
                    RabbitMQConfig.DELAY_QUEUE_ROUT_KEY,
                    JSONObject.toJSONString(MqDelayMsg.<Object>builder().businessCode(businessCode).content(content).build()),
                    message -> {
                        //注意這裡時間可使用long類型,毫秒單位,設置header
                        message.getMessageProperties().setHeader("x-delay", delayTime);
                        return message;
                    }
                );
            }
        } catch (Exception e) {
            log.error("MqDelayQueueUtil.updateDelayQueueTask:bindId={}businessCode={}異常:", bindId, businessCode, e);
            return false;
        }
        return true;
    }

}

 

附上測試類:

/**
 * description: 延遲隊列測試
 *
 * @author: shf date: 2021/8/27 14:18
 */
@RestController
@RequestMapping("/mq")
@Slf4j
public class MqQueueController {

    @Autowired
    private MqDelayQueueUtil mqDelayUtil;

    @PostMapping("/addQueue")
    public String addQueue() {
        mqDelayUtil.addDelayQueueTask("00001",MqDelayQueueEnum.YW0001.getCode(),"delay0001測試",3000L);
        return "SUCCESS";
    }

}

 

貼下DB記錄表的欄位設置

 

 

配合xxl-job定時任務即可。

  由於投遞後的消息無法修改,設置延遲消息需謹慎!並需要與業務方配合,如:延遲時間在2天以內(該時間天數可調整,你也可以設置閾值單位為小時,看業務需求)的消息不支援修改與撤銷。2天之外的延遲消息支援撤銷與修改,需要注意的是,需要綁定關聯具體操作業務唯一標識ID以對應關聯操作撤銷或修改。(PS:延遲時間設置在2天以外的會先保存到DB記錄表由定時任務每天拉取到時2天內的投放到延遲對列)。

  再穩妥點,為了防止進入DB記錄的消息有操作時間誤差導致的不一致問題,可在消費統一Consumer消費分發前,查詢DB記錄表,該消息是否已被撤銷刪除(增加個刪除標記欄位記錄),並且當前時間大於等於DB表中記錄的到期執行時間才能分發出去執行,否則棄用。


 

此外,利用rabbitmq的死信隊列機制也可以實現延遲任務,有時間再附上實現案例。