RabbitMQ實現商城訂單超時處理

  • 2019 年 11 月 1 日
  • 筆記

背景

  1. 為什麼需要使用延遲隊列?適用於什麼場景? 場景一:訂單下單之後30分鐘後,如果用戶沒有付錢,則系統自動取消訂單。 這樣類似的需求是我們經常會遇見的問題。最常用的方法是定期輪訓資料庫,設置狀態。在數據量小的時候並沒有什麼大的問題,但是數據量一大輪訓資料庫的方式就會變得特別耗資源。當面對千萬級、上億級數據量時,本身寫入的IO就比較高,導致長時間查詢或者根本就查不出來。通過使用延遲隊列來解決這種問題
  2. 使用RabbitMQ來實現延遲任務必須先了解RabbitMQ的兩個概念:消息的Time To Live(TTL)和Dead Letter Exchanges(DLX),利用兩者的組合來實現延遲隊列 簡述一下:A.消息的TTL就是消息的存活時間,B.DLX是死信路由 實現原理:先發送一個消息到隊列中,設置存活時間,超時後會轉發到死信路由中,客戶端消費死信路由中的消息,消息中包裝好需要轉發的隊列名,再根據此隊列名發送消息,這樣間接中轉的方式實現了延遲隊列。

實現

新建SpringBoot項目,添加 amqp 引用

<dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-amqp</artifactId>  </dependency>  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>fastjson</artifactId>      <version>1.2.47</version>  </dependency>  

在配置文件application.properties中配置好mq的連接地址

#rabbitmq  spring.rabbitmq.host=127.0.0.1  spring.rabbitmq.port=5672  spring.rabbitmq.username=root  spring.rabbitmq.password=root  spring.rabbitmq.virtual-host=ykh_vhosts  

創建配置類,使用配置文件中的連接

/**   * 讀取application.properties中的連接配置   */  @Configuration  public class RabbitMQConfiguration {        private static Logger logger = Logger.getLogger("RabbitMQConfiguration");        @Value("${spring.rabbitmq.host}")      public String host;        @Value("${spring.rabbitmq.port}")      public int port;        @Value("${spring.rabbitmq.username}")      public String username;        @Value("${spring.rabbitmq.password}")      public String password;        @Value("${spring.rabbitmq.virtual-host}")      public String virtualHost;        @Bean      public ConnectionFactory connectionFactory() {          CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);          connectionFactory.setUsername(username);          connectionFactory.setPassword(password);          connectionFactory.setVirtualHost(virtualHost);          connectionFactory.setPublisherConfirms(true);          logger.info("Create ConnectionFactory bean ..");          return connectionFactory;      }        @Bean      @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)      public RabbitTemplate rabbitTemplate() {          RabbitTemplate template = new RabbitTemplate(connectionFactory());          return template;      }  }  

創建一個常量類,定義隊列名稱

/**   * Rabbit消息隊列相關常量   */  public final class MQConstant {        private MQConstant(){}        //exchange name      public static final String DEFAULT_EXCHANGE = "ZyChange";        //TTL QUEUE      public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "zy.dead.letter.queue";        //DLX repeat QUEUE 死信轉發隊列      public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "zy.repeat.trade.queue";        //Hello 測試消息隊列名稱      public static final String HELLO_QUEUE_NAME = "HELLO";    }  

創建一個隊列配置類,作用是信道配置,隊列配置,隊列綁定

/**   * 隊列配置,所有配置@Bean的隊列名稱,由系統啟動時創建隊列,並綁定到Exchane上   */  @Configuration  public class QueueConfiguration {        //信道配置      @Bean      public DirectExchange defaultExchange() {          return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);      }        /*********************    業務隊列定義與綁定 hello 測試    *****************/      @Bean      public Queue queue() {          Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);          return queue;      }        @Bean      public Binding binding() {          //隊列綁定到exchange上,再綁定好路由鍵          return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);      }      /*********************    業務隊列定義與綁定 hello 測試    *****************/        //下面是延遲隊列的配置      //轉發隊列      @Bean      public Queue repeatTradeQueue() {          Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);          return queue;      }      //綁定轉發隊列      @Bean      public Binding  drepeatTradeBinding() {          return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);      }        //死信隊列  -- 消息在死信隊列上堆積,消息超時時,會把消息轉發到轉發隊列,轉發隊列根據消息內容再把轉發到指定的隊列上      @Bean      public Queue deadLetterQueue() {          Map<String, Object> arguments = new HashMap<>();          arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);          arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);          Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);          return queue;      }      //綁定死信隊列      @Bean      public Binding  deadLetterBinding() {          return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);      }  }  

創建消息生成介面和實現

public interface IMessageService {        /**       * 發送消息到隊列       * @param queueName 隊列名稱       * @param message 消息內容       */      public void send(String queueName,String message);          /**       * 延遲發送消息到隊列       * @param queueName 隊列名稱       * @param message 消息內容       * @param times 延遲時間 單位毫秒       */      public void send(String queueName,String message,long times);  }    /**   * 消息隊列服務介面實現   */  @Service("messageService")  public class MessageService implements IMessageService {        @Autowired      private RabbitTemplate rabbitTemplate;        /**       * 發送消息到隊列       * @param queueName 隊列名稱       * @param message 消息內容       */      @Override      public void send(String queueName, String message) {          rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, message);      }        /**       * 延遲發送消息到隊列       * @param queueName 隊列名稱       * @param message 消息內容       * @param times 延遲時間 單位毫秒       */      @Override      public void send(String queueName, String message, long times) {          //消息發送到死信隊列上,當消息超時時,會發生到轉發隊列上,轉發隊列根據下面封裝的queueName,把消息轉發的指定隊列上          //發送前,把消息進行封裝,轉發時應轉發到指定 queueName 隊列上          DLXMessage dlxMessage = new DLXMessage(MQConstant.DEFAULT_EXCHANGE,queueName,message,times);          MessagePostProcessor processor = new MessagePostProcessor(){              @Override              public Message postProcessMessage(Message message) throws AmqpException {                  message.getMessageProperties().setExpiration(times + "");                  return message;              }          };          rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSON.toJSONString(dlxMessage), processor);      }  }  

死信消息載體

DLXMessage是一個消息封裝對象,很關鍵,發送延遲隊列時,先把消息存在此對象中,在加上目的地隊列名稱,然後再發到死信隊列中,當消息超時時,轉發到轉發隊列,添加對轉發隊列的監聽,消費轉發隊列,獲取需要延遲發送的資訊,該資訊就是DLXMessage對象,這樣就拿到了目的地隊列名稱,然後再發送一次消息,就完成了延遲隊列的發送。

/**   * rabbit 死信消息載體   */  public class DLXMessage implements Serializable {        private static final long serialVersionUID = 9956432152000L;      private String exchange;      private String queueName;      private String content;      private long times;        public DLXMessage() {          super();      }        public DLXMessage(String queueName, String content, long times) {          super();          this.queueName = queueName;          this.content = content;          this.times = times;      }        public DLXMessage(String exchange, String queueName, String content, long times) {          super();          this.exchange = exchange;          this.queueName = queueName;          this.content = content;          this.times = times;      }          public static long getSerialVersionUID() {          return serialVersionUID;      }        public String getExchange() {          return exchange;      }        public void setExchange(String exchange) {          this.exchange = exchange;      }        public String getQueueName() {          return queueName;      }        public void setQueueName(String queueName) {          this.queueName = queueName;      }        public String getContent() {          return content;      }        public void setContent(String content) {          this.content = content;      }        public long getTimes() {          return times;      }        public void setTimes(long times) {          this.times = times;      }  }  

添加消息消費者監聽,當有消息時進行消費

//監聽hello隊列,有消息時進行消費  @Component  @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)  public class ReceiverMessage {        @RabbitHandler      public void process(String content) {          System.out.println("接受時間:"+ System.currentTimeMillis());          System.out.println("接受消息:" + content);      }  }    //監聽轉發隊列,有消息時,把消息轉發到目標隊列  @Component  @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)  public class ReceiverDelayMessage {        @Autowired      private IMessageService messageService;        @RabbitHandler      public void process(String content) {          //此時,才把消息發送到指定隊列,而實現延遲功能          DLXMessage message = JSON.parseObject(content, DLXMessage.class);          messageService.send(message.getQueueName(), message.getContent());      }    }  

測試,啟動項目,會執行發送消息程式碼

/**   * 啟動啟動時執行   */  @Component  public class SysInitLoad implements ApplicationRunner {        @Autowired      private IMessageService messageService;        @Override      public void run(ApplicationArguments args) throws Exception {          System.out.println("發送時間:"+ System.currentTimeMillis());          String message = "測試延遲消息";          messageService.send(MQConstant.HELLO_QUEUE_NAME,message,6000);            message = "測試普通消息";          messageService.send(MQConstant.HELLO_QUEUE_NAME,message);      }  }  

普通消息馬上就接收到了,延遲消息6s後收到。