RabbitMQ實現商城訂單超時處理
- 2019 年 11 月 1 日
- 筆記
背景
- 為什麼需要使用延遲隊列?適用於什麼場景? 場景一:訂單下單之後30分鐘後,如果用戶沒有付錢,則系統自動取消訂單。 這樣類似的需求是我們經常會遇見的問題。最常用的方法是定期輪訓資料庫,設置狀態。在數據量小的時候並沒有什麼大的問題,但是數據量一大輪訓資料庫的方式就會變得特別耗資源。當面對千萬級、上億級數據量時,本身寫入的IO就比較高,導致長時間查詢或者根本就查不出來。通過使用延遲隊列來解決這種問題
- 使用RabbitMQ來實現延遲任務必須先了解RabbitMQ的兩個概念:消息的Time To Live(TTL)和Dead Letter Exchanges(DLX),利用兩者的組合來實現延遲隊列 簡述一下:A.消息的TTL就是消息的存活時間,B.DLX是死信路由 實現原理:先發送一個消息到隊列中,設置存活時間,超時後會轉發到死信路由中,客戶端消費死信路由中的消息,消息中包裝好需要轉發的隊列名,再根據此隊列名發送消息,這樣間接中轉的方式實現了延遲隊列。
實現
新建SpringBoot項目,添加 amqp 引用
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
在配置文件application.properties中配置好mq的連接地址
#rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root spring.rabbitmq.virtual-host=ykh_vhosts
創建配置類,使用配置文件中的連接
/** * 讀取application.properties中的連接配置 */ @Configuration public class RabbitMQConfiguration { private static Logger logger = Logger.getLogger("RabbitMQConfiguration"); @Value("${spring.rabbitmq.host}") public String host; @Value("${spring.rabbitmq.port}") public int port; @Value("${spring.rabbitmq.username}") public String username; @Value("${spring.rabbitmq.password}") public String password; @Value("${spring.rabbitmq.virtual-host}") public String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); logger.info("Create ConnectionFactory bean .."); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
創建一個常量類,定義隊列名稱
/** * Rabbit消息隊列相關常量 */ public final class MQConstant { private MQConstant(){} //exchange name public static final String DEFAULT_EXCHANGE = "ZyChange"; //TTL QUEUE public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "zy.dead.letter.queue"; //DLX repeat QUEUE 死信轉發隊列 public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "zy.repeat.trade.queue"; //Hello 測試消息隊列名稱 public static final String HELLO_QUEUE_NAME = "HELLO"; }
創建一個隊列配置類,作用是信道配置,隊列配置,隊列綁定
/** * 隊列配置,所有配置@Bean的隊列名稱,由系統啟動時創建隊列,並綁定到Exchane上 */ @Configuration public class QueueConfiguration { //信道配置 @Bean public DirectExchange defaultExchange() { return new DirectExchange(MQConstant.DEFAULT_EXCHANGE, true, false); } /********************* 業務隊列定義與綁定 hello 測試 *****************/ @Bean public Queue queue() { Queue queue = new Queue(MQConstant.HELLO_QUEUE_NAME,true); return queue; } @Bean public Binding binding() { //隊列綁定到exchange上,再綁定好路由鍵 return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE_NAME); } /********************* 業務隊列定義與綁定 hello 測試 *****************/ //下面是延遲隊列的配置 //轉發隊列 @Bean public Queue repeatTradeQueue() { Queue queue = new Queue(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false); return queue; } //綁定轉發隊列 @Bean public Binding drepeatTradeBinding() { return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME); } //死信隊列 -- 消息在死信隊列上堆積,消息超時時,會把消息轉發到轉發隊列,轉發隊列根據消息內容再把轉發到指定的隊列上 @Bean public Queue deadLetterQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", MQConstant.DEFAULT_EXCHANGE); arguments.put("x-dead-letter-routing-key", MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME); Queue queue = new Queue(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments); return queue; } //綁定死信隊列 @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME); } }
創建消息生成介面和實現
public interface IMessageService { /** * 發送消息到隊列 * @param queueName 隊列名稱 * @param message 消息內容 */ public void send(String queueName,String message); /** * 延遲發送消息到隊列 * @param queueName 隊列名稱 * @param message 消息內容 * @param times 延遲時間 單位毫秒 */ public void send(String queueName,String message,long times); } /** * 消息隊列服務介面實現 */ @Service("messageService") public class MessageService implements IMessageService { @Autowired private RabbitTemplate rabbitTemplate; /** * 發送消息到隊列 * @param queueName 隊列名稱 * @param message 消息內容 */ @Override public void send(String queueName, String message) { rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,queueName, message); } /** * 延遲發送消息到隊列 * @param queueName 隊列名稱 * @param message 消息內容 * @param times 延遲時間 單位毫秒 */ @Override public void send(String queueName, String message, long times) { //消息發送到死信隊列上,當消息超時時,會發生到轉發隊列上,轉發隊列根據下面封裝的queueName,把消息轉發的指定隊列上 //發送前,把消息進行封裝,轉發時應轉發到指定 queueName 隊列上 DLXMessage dlxMessage = new DLXMessage(MQConstant.DEFAULT_EXCHANGE,queueName,message,times); MessagePostProcessor processor = new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(times + ""); return message; } }; rabbitTemplate.convertAndSend(MQConstant.DEFAULT_EXCHANGE,MQConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, JSON.toJSONString(dlxMessage), processor); } }
死信消息載體
DLXMessage是一個消息封裝對象,很關鍵,發送延遲隊列時,先把消息存在此對象中,在加上目的地隊列名稱,然後再發到死信隊列中,當消息超時時,轉發到轉發隊列,添加對轉發隊列的監聽,消費轉發隊列,獲取需要延遲發送的資訊,該資訊就是DLXMessage對象,這樣就拿到了目的地隊列名稱,然後再發送一次消息,就完成了延遲隊列的發送。
/** * rabbit 死信消息載體 */ public class DLXMessage implements Serializable { private static final long serialVersionUID = 9956432152000L; private String exchange; private String queueName; private String content; private long times; public DLXMessage() { super(); } public DLXMessage(String queueName, String content, long times) { super(); this.queueName = queueName; this.content = content; this.times = times; } public DLXMessage(String exchange, String queueName, String content, long times) { super(); this.exchange = exchange; this.queueName = queueName; this.content = content; this.times = times; } public static long getSerialVersionUID() { return serialVersionUID; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public long getTimes() { return times; } public void setTimes(long times) { this.times = times; } }
添加消息消費者監聽,當有消息時進行消費
//監聽hello隊列,有消息時進行消費 @Component @RabbitListener(queues = MQConstant.HELLO_QUEUE_NAME) public class ReceiverMessage { @RabbitHandler public void process(String content) { System.out.println("接受時間:"+ System.currentTimeMillis()); System.out.println("接受消息:" + content); } } //監聽轉發隊列,有消息時,把消息轉發到目標隊列 @Component @RabbitListener(queues = MQConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME) public class ReceiverDelayMessage { @Autowired private IMessageService messageService; @RabbitHandler public void process(String content) { //此時,才把消息發送到指定隊列,而實現延遲功能 DLXMessage message = JSON.parseObject(content, DLXMessage.class); messageService.send(message.getQueueName(), message.getContent()); } }
測試,啟動項目,會執行發送消息程式碼
/** * 啟動啟動時執行 */ @Component public class SysInitLoad implements ApplicationRunner { @Autowired private IMessageService messageService; @Override public void run(ApplicationArguments args) throws Exception { System.out.println("發送時間:"+ System.currentTimeMillis()); String message = "測試延遲消息"; messageService.send(MQConstant.HELLO_QUEUE_NAME,message,6000); message = "測試普通消息"; messageService.send(MQConstant.HELLO_QUEUE_NAME,message); } }
普通消息馬上就接收到了,延遲消息6s後收到。