RabbitMQ实现商城订单超时处理

  • 2019 年 11 月 1 日
  • 筆記

背景

  1. 为什么需要使用延迟队列?适用于什么场景? 场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。 这样类似的需求是我们经常会遇见的问题。最常用的方法是定期轮训数据库,设置状态。在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来。通过使用延迟队列来解决这种问题
  2. 使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的Time To Live(TTL)和Dead Letter Exchanges(DLX),利用两者的组合来实现延迟队列 简述一下:A.消息的TTL就是消息的存活时间,B.DLX是死信路由 实现原理:先发送一个消息到队列中,设置存活时间,超时后会转发到死信路由中,客户端消费死信路由中的消息,消息中包装好需要转发的队列名,再根据此队列名发送消息,这样间接中转的方式实现了延迟队列。

实现

新建SpringBoot项目,添加 amqp 引用

<dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-amqp</artifactId>  </dependency>  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>fastjson</artifactId>      <version>1.2.47</version>  </dependency>  

在配置文件application.properties中配置好mq的连接地址

#rabbitmq  spring.rabbitmq.host=127.0.0.1  spring.rabbitmq.port=5672  spring.rabbitmq.username=root  spring.rabbitmq.password=root  spring.rabbitmq.virtual-host=ykh_vhosts  

创建配置类,使用配置文件中的连接

/**   * 读取application.properties中的连接配置   */  @Configuration  public class RabbitMQConfiguration {        private static Logger logger = Logger.getLogger("RabbitMQConfiguration");        @Value("${spring.rabbitmq.host}")      public String host;        @Value("${spring.rabbitmq.port}")      public int port;        @Value("${spring.rabbitmq.username}")      public String username;        @Value("${spring.rabbitmq.password}")      public String password;        @Value("${spring.rabbitmq.virtual-host}")      public String virtualHost;        @Bean      public ConnectionFactory connectionFactory() {          CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);          connectionFactory.setUsername(username);          connectionFactory.setPassword(password);          connectionFactory.setVirtualHost(virtualHost);          connectionFactory.setPublisherConfirms(true);          logger.info("Create ConnectionFactory bean ..");          return connectionFactory;      }        @Bean      @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)      public RabbitTemplate rabbitTemplate() {          RabbitTemplate template = new RabbitTemplate(connectionFactory());          return template;      }  }  

创建一个常量类,定义队列名称

/**   * Rabbit消息队列相关常量   */  public final class MQConstant {        private MQConstant(){}        //exchange name      public static final String DEFAULT_EXCHANGE = "ZyChange";        //TTL QUEUE      public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "zy.dead.letter.queue";        //DLX repeat QUEUE 死信转发队列      public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "zy.repeat.trade.queue";        //Hello 测试消息队列名称      public static final String HELLO_QUEUE_NAME = "HELLO";    }  

创建一个队列配置类,作用是信道配置,队列配置,队列绑定

/**   * 队列配置,所有配置@Bean的队列名称,由系统启动时创建队列,并绑定到Exchane上   */  @Configuration  public class QueueConfiguration {        //信道配置      @Bean      public DirectExchange defaultExchange() {          return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false);      }        /*********************    业务队列定义与绑定 hello 测试    *****************/      @Bean      public Queue queue() {          Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true);          return queue;      }        @Bean      public Binding binding() {          //队列绑定到exchange上,再绑定好路由键          return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME);      }      /*********************    业务队列定义与绑定 hello 测试    *****************/        //下面是延迟队列的配置      //转发队列      @Bean      public Queue repeatTradeQueue() {          Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);          return queue;      }      //绑定转发队列      @Bean      public Binding  drepeatTradeBinding() {          return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);      }        //死信队列  -- 消息在死信队列上堆积,消息超时时,会把消息转发到转发队列,转发队列根据消息内容再把转发到指定的队列上      @Bean      public Queue deadLetterQueue() {          Map<String, Object> arguments = new HashMap<>();          arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE);          arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);          Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);          return queue;      }      //绑定死信队列      @Bean      public Binding  deadLetterBinding() {          return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);      }  }  

创建消息生成接口和实现

public interface IMessageService {        /**       * 发送消息到队列       * @param queueName 队列名称       * @param message 消息内容       */      public void send(String queueName,String message);          /**       * 延迟发送消息到队列       * @param queueName 队列名称       * @param message 消息内容       * @param times 延迟时间 单位毫秒       */      public void send(String queueName,String message,long times);  }    /**   * 消息队列服务接口实现   */  @Service("messageService")  public class MessageService implements IMessageService {        @Autowired      private RabbitTemplate rabbitTemplate;        /**       * 发送消息到队列       * @param queueName 队列名称       * @param message 消息内容       */      @Override      public void send(String queueName, String message) {          rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, message);      }        /**       * 延迟发送消息到队列       * @param queueName 队列名称       * @param message 消息内容       * @param times 延迟时间 单位毫秒       */      @Override      public void send(String queueName, String message, long times) {          //消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上          //发送前,把消息进行封装,转发时应转发到指定 queueName 队列上          DLXMessage dlxMessage = new DLXMessage(MQConstant.DEFAULT_EXCHANGE,queueName,message,times);          MessagePostProcessor processor = new MessagePostProcessor(){              @Override              public Message postProcessMessage(Message message) throws AmqpException {                  message.getMessageProperties().setExpiration(times + "");                  return message;              }          };          rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSON.toJSONString(dlxMessage), processor);      }  }  

死信消息载体

DLXMessage是一个消息封装对象,很关键,发送延迟队列时,先把消息存在此对象中,在加上目的地队列名称,然后再发到死信队列中,当消息超时时,转发到转发队列,添加对转发队列的监听,消费转发队列,获取需要延迟发送的信息,该信息就是DLXMessage对象,这样就拿到了目的地队列名称,然后再发送一次消息,就完成了延迟队列的发送。

/**   * rabbit 死信消息载体   */  public class DLXMessage implements Serializable {        private static final long serialVersionUID = 9956432152000L;      private String exchange;      private String queueName;      private String content;      private long times;        public DLXMessage() {          super();      }        public DLXMessage(String queueName, String content, long times) {          super();          this.queueName = queueName;          this.content = content;          this.times = times;      }        public DLXMessage(String exchange, String queueName, String content, long times) {          super();          this.exchange = exchange;          this.queueName = queueName;          this.content = content;          this.times = times;      }          public static long getSerialVersionUID() {          return serialVersionUID;      }        public String getExchange() {          return exchange;      }        public void setExchange(String exchange) {          this.exchange = exchange;      }        public String getQueueName() {          return queueName;      }        public void setQueueName(String queueName) {          this.queueName = queueName;      }        public String getContent() {          return content;      }        public void setContent(String content) {          this.content = content;      }        public long getTimes() {          return times;      }        public void setTimes(long times) {          this.times = times;      }  }  

添加消息消费者监听,当有消息时进行消费

//监听hello队列,有消息时进行消费  @Component  @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME)  public class ReceiverMessage {        @RabbitHandler      public void process(String content) {          System.out.println("接受时间:"+ System.currentTimeMillis());          System.out.println("接受消息:" + content);      }  }    //监听转发队列,有消息时,把消息转发到目标队列  @Component  @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)  public class ReceiverDelayMessage {        @Autowired      private IMessageService messageService;        @RabbitHandler      public void process(String content) {          //此时,才把消息发送到指定队列,而实现延迟功能          DLXMessage message = JSON.parseObject(content, DLXMessage.class);          messageService.send(message.getQueueName(), message.getContent());      }    }  

测试,启动项目,会执行发送消息代码

/**   * 启动启动时执行   */  @Component  public class SysInitLoad implements ApplicationRunner {        @Autowired      private IMessageService messageService;        @Override      public void run(ApplicationArguments args) throws Exception {          System.out.println("发送时间:"+ System.currentTimeMillis());          String message = "测试延迟消息";          messageService.send(MQConstant.HELLO_QUEUE_NAME,message,6000);            message = "测试普通消息";          messageService.send(MQConstant.HELLO_QUEUE_NAME,message);      }  }  

普通消息马上就接收到了,延迟消息6s后收到。