RabbitMQ實現訂單超時案例
- 2022 年 8 月 26 日
- 筆記
- 【Spring系列】SpringBoot, 【中間件系列】RabbitMQ
前言
人間清醒
業務場景
用戶在購買商品的時候通常會預購然後沒付款,沒付款的訂單通常會被設置一個自動超時時間如30分鐘後超時,所以我們要在訂單到30分鐘後自動將超時的訂單取消。
JUC(DelayQueue)方案
DelayQueue簡介
- DelayQueue是java並發包下的延時阻塞隊列,常用於實現定時任務。
- DelayQueue是一個支持延時獲取元素的無界阻塞隊列。裏面的元素全部都是「可延期」的元素,列頭的元素是最先「到期」的元素,
如果隊列裏面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從隊列中取元素。 - DelayQueue主要用於兩個方面:- 緩存:清掉緩存中超時的緩存數據- 任務超時處理
- DelayQueue實現了BlockingQueue,所以它是一個阻塞隊列。
- DelayQueue還組合了一個叫做Delayed的接口,DelayQueue中存儲的所有元素必須實現Delayed接口。
JUC DelayQueue實現訂單超時案例代碼 案例代碼
定義訂單超時對象:
/**
* juc 定義延遲對象信息
* @author wuwentao
*/
@Data
public class Order implements Delayed {
public Order(String orderId,long second){
this.orderId = orderId;
second = second * 1000;
this.timeout = System.currentTimeMillis() + second;
}
private String orderId; // 訂單號
private long timeout; // 具體的超時時間
/**
* 延遲任務會自動調用該方法如果是負數則說明對象到了時間
*/
@Override
public long getDelay(TimeUnit unit) {
return this.timeout - System.currentTimeMillis();
}
/**
* 定義排序規則
*/
@Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
定義DelayQueue端作為消費者:
/**
* 延遲隊列消費者
*/
@Slf4j
@Component
public class JavaDelayQueueConsumer {
// 訂單超時對象存儲的定時隊列
private final DelayQueue<OrderTimeoutDelayed> delayQueue = new DelayQueue<>();
// 為true的時候啟動線程,全局只啟動一次
private final AtomicBoolean start = new AtomicBoolean(false);
// 任務處理線程
private Thread thread;
/**
* 將需要自動過期的訂單放到隊列
* @param orderTimeoutDelayed
*/
public void monitor(OrderTimeoutDelayed orderTimeoutDelayed){
delayQueue.add(orderTimeoutDelayed);
}
/**
* 啟動過期訂單處理
*/
@PostConstruct
public void start(){
if(!start.getAndSet(true)){
this.thread = new Thread(()->{
while (true) {
try {
// 獲取已超時的訂單
OrderTimeoutDelayed take = delayQueue.take();
if(take != null){
log.info("JUC延遲隊列訂單號:[{}]已超時當前時間為:[{}]",take.getOrderId(), DateUtil.getCuurentDateStr());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
this.thread.start();
log.info("已啟動JUC延遲隊列消費!");
}
}
}
定義入口請求用於向隊列中添加需要自動超時的訂單信息:
/**
* JUC實現延遲隊列
* @author wuwentao
*/
@Slf4j
@RestController
@RequestMapping("java/dealy/queue")
@AllArgsConstructor
public class JavaDealyQueueProducer {
private JavaDelayQueueConsumer javaDelayQueueConsumer;
private final int defaultTimoutSecond = 10; // 過期秒數
/**
* 發送消息到JUC延遲隊列中
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// 監聽訂單10秒後過期
javaDelayQueueConsumer.monitor(new OrderTimeoutDelayed(message,defaultTimoutSecond));
log.info("當前時間為:[{}] 訂單號為:[{}] 超時秒數:[{}]", DateUtil.getCuurentDateStr(),message,defaultTimoutSecond);
return "OK";
}
}
測試生成消息訪問接口地址(每一秒訪問次一個生成5個需要過期的訂單):
//localhost:8022/java/dealy/queue/sendMessage?message=100000001
//localhost:8022/java/dealy/queue/sendMessage?message=100000002
//localhost:8022/java/dealy/queue/sendMessage?message=100000003
//localhost:8022/java/dealy/queue/sendMessage?message=100000004
//localhost:8022/java/dealy/queue/sendMessage?message=100000005
控制台打印消費信息:
2022-08-24 15:55:14.435 INFO 18876 --- [nio-8022-exec-1] c.g.b.d.c.JavaDealyQueueProducer : 當前時間為:[2022-08-24 15:55:14] 訂單號為:[100000001] 超時秒數:[10]
2022-08-24 15:55:16.184 INFO 18876 --- [nio-8022-exec-2] c.g.b.d.c.JavaDealyQueueProducer : 當前時間為:[2022-08-24 15:55:16] 訂單號為:[100000002] 超時秒數:[10]
2022-08-24 15:55:17.626 INFO 18876 --- [nio-8022-exec-3] c.g.b.d.c.JavaDealyQueueProducer : 當前時間為:[2022-08-24 15:55:17] 訂單號為:[100000003] 超時秒數:[10]
2022-08-24 15:55:19.165 INFO 18876 --- [nio-8022-exec-4] c.g.b.d.c.JavaDealyQueueProducer : 當前時間為:[2022-08-24 15:55:19] 訂單號為:[100000004] 超時秒數:[10]
2022-08-24 15:55:20.811 INFO 18876 --- [nio-8022-exec-5] c.g.b.d.c.JavaDealyQueueProducer : 當前時間為:[2022-08-24 15:55:20] 訂單號為:[100000005] 超時秒數:[10]
2022-08-24 15:55:24.434 INFO 18876 --- [ Thread-8] c.g.b.d.java.JavaDelayQueueConsumer : JUC延遲隊列訂單號:[100000001]已超時當前時間為:[2022-08-24 15:55:24]
2022-08-24 15:55:26.184 INFO 18876 --- [ Thread-8] c.g.b.d.java.JavaDelayQueueConsumer : JUC延遲隊列訂單號:[100000002]已超時當前時間為:[2022-08-24 15:55:26]
2022-08-24 15:55:27.625 INFO 18876 --- [ Thread-8] c.g.b.d.java.JavaDelayQueueConsumer : JUC延遲隊列訂單號:[100000003]已超時當前時間為:[2022-08-24 15:55:27]
2022-08-24 15:55:29.164 INFO 18876 --- [ Thread-8] c.g.b.d.java.JavaDelayQueueConsumer : JUC延遲隊列訂單號:[100000004]已超時當前時間為:[2022-08-24 15:55:29]
2022-08-24 15:55:30.810 INFO 18876 --- [ Thread-8] c.g.b.d.java.JavaDelayQueueConsumer : JUC延遲隊列訂單號:[100000005]已超時當前時間為:[2022-08-24 15:55:30]
Redis Key過期事件方案
簡介
這裡主要使用Redis Key過期事件來實現訂單超時案例
Rabbit Key過期時間實現訂單超時案例代碼
Redis使用的時候將redis配置文件中的該屬性從””修改為”Ex”
notify-keyspace-events "Ex"
定義Redis序列化與Key過期監聽容器:
/**
* Redis SpringBoot 配置
* @author wuwentao
*/
@Configuration
@AllArgsConstructor
public class RedisConfiguration {
private RedisConnectionFactory redisConnectionFactory;
/**
* 模板方法序列化防止亂碼
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
/**
* 配置Redis消息監聽容器
* @return {@link:org.springframework.data.redis.listener.RedisMessageListenerContainer}
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
}
定義Key過期監聽處理:
/**
* Redis過期Key監聽
*/
@Slf4j
@Component
public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {
public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String key = message.toString();
if(key.startsWith(RedisKeyExpiredController.ORDER_TIMEOUT_PREFIX)){
String orderId = key.replace(RedisKeyExpiredController.ORDER_TIMEOUT_PREFIX,"");
log.info("過期時間:[{}] Redis-key:[{}] 訂單號:[{}]", DateUtil.getCuurentDateStr(),key,orderId);
}
}
}
定義入口請求用於向Redis中保存需要過期的訂單Key:
@Slf4j
@RestController
@RequestMapping("redis/expired/key")
@AllArgsConstructor
public class RedisKeyExpiredController {
private RedisTemplate redisTemplate;
// Redis需要需要處理的Key前綴
public static final String ORDER_TIMEOUT_PREFIX = "ORDERTIMEOUT-";
// 過期秒數
private final int defaultTimoutSecond = 10;
/**
* Redis保存訂單號與過期事件
* @param orderId 訂單編號
*/
@GetMapping("/send")
public String send(@RequestParam(value = "orderId") String orderId){
ValueOperations valueOperations = redisTemplate.opsForValue();
String key = ORDER_TIMEOUT_PREFIX + orderId;
String value = orderId;
long timeout = defaultTimoutSecond;
TimeUnit seconds = TimeUnit.SECONDS;
valueOperations.set(key,value,timeout, seconds);
log.info("當前時間:[{}] 訂單編號:[{}] Redis-Key:[{}] 超時秒數:[{}] ", DateUtil.getCuurentDateStr(),value,key,timeout);
return "OK";
}
}
測試生成消息訪問接口地址(每一秒訪問次一個生成5個需要過期的訂單):
//localhost:8022/redis/expired/key/send?orderId=100000001
//localhost:8022/redis/expired/key/send?orderId=100000002
//localhost:8022/redis/expired/key/send?orderId=100000003
//localhost:8022/redis/expired/key/send?orderId=100000004
//localhost:8022/redis/expired/key/send?orderId=100000005
控制台打印消費信息:
2022-08-24 16:26:49.626 INFO 20028 --- [nio-8022-exec-1] c.g.b.d.c.RedisKeyExpiredController : 當前時間:[2022-08-24 16:26:49] 訂單編號:[100000001] Redis-Key:[ORDERTIMEOUT-100000001] 超時秒數:[10]
2022-08-24 16:26:53.124 INFO 20028 --- [nio-8022-exec-4] c.g.b.d.c.RedisKeyExpiredController : 當前時間:[2022-08-24 16:26:53] 訂單編號:[100000002] Redis-Key:[ORDERTIMEOUT-100000002] 超時秒數:[10]
2022-08-24 16:26:55.468 INFO 20028 --- [nio-8022-exec-5] c.g.b.d.c.RedisKeyExpiredController : 當前時間:[2022-08-24 16:26:55] 訂單編號:[100000003] Redis-Key:[ORDERTIMEOUT-100000003] 超時秒數:[10]
2022-08-24 16:26:57.717 INFO 20028 --- [nio-8022-exec-6] c.g.b.d.c.RedisKeyExpiredController : 當前時間:[2022-08-24 16:26:57] 訂單編號:[100000004] Redis-Key:[ORDERTIMEOUT-100000004] 超時秒數:[10]
2022-08-24 16:26:59.703 INFO 20028 --- [nio-8022-exec-7] c.g.b.d.c.RedisKeyExpiredController : 當前時間:[2022-08-24 16:26:59] 訂單編號:[100000005] Redis-Key:[ORDERTIMEOUT-100000005] 超時秒數:[10]
2022-08-24 16:26:59.885 INFO 20028 --- [enerContainer-2] c.g.b.d.redis.RedisKeyExpiredListener : 過期時間:[2022-08-24 16:26:59] Redis-key:[ORDERTIMEOUT-100000001] 訂單號:[100000001]
2022-08-24 16:27:03.210 INFO 20028 --- [enerContainer-3] c.g.b.d.redis.RedisKeyExpiredListener : 過期時間:[2022-08-24 16:27:03] Redis-key:[ORDERTIMEOUT-100000002] 訂單號:[100000002]
2022-08-24 16:27:05.537 INFO 20028 --- [enerContainer-4] c.g.b.d.redis.RedisKeyExpiredListener : 過期時間:[2022-08-24 16:27:05] Redis-key:[ORDERTIMEOUT-100000003] 訂單號:[100000003]
2022-08-24 16:27:07.771 INFO 20028 --- [enerContainer-5] c.g.b.d.redis.RedisKeyExpiredListener : 過期時間:[2022-08-24 16:27:07] Redis-key:[ORDERTIMEOUT-100000004] 訂單號:[100000004]
2022-08-24 16:27:09.780 INFO 20028 --- [enerContainer-6] c.g.b.d.redis.RedisKeyExpiredListener : 過期時間:[2022-08-24 16:27:09] Redis-key:[ORDERTIMEOUT-100000005] 訂單號:[100000005]
JUC與Redis的不足
- JUC是存內存操作一旦系統宕機數據將全部丟失。
- JUC因為是純內存操作所以不支持集群。
- Redis Key過期前程序突然宕機將造成數據丟失。
- 應用程序在集群環境是多個程序都能夠監聽到這個過期的Key,如果處理不好可能導致重複消費。
- Redis緩存所有的Key都會被監聽需要自己處理Key去匹配不夠靈活
為什麼使用RabbitMQ來實現?
單獨看這點就能夠解決前者的不足: 可靠性:支持持久化,傳輸確認,發佈確認等保證了MQ的可靠性。
更多特性請參考: //www.cnblogs.com/SimpleWu/p/16618662.html
RabbitMQ死信隊列方案
死信隊列實現訂單超時案例代碼
DLX(dead-letter-exchange),死信隊列也是一般的隊列,當消息變成死信時,消息會投遞到死信隊列中,經過死信隊列進行消費的一種形式,對應的交換機叫死信交換機DLX。
死信隊列配置信息定義:
public interface DeadLetterQueueConfig {
// 訂單超時處理隊列
public static final String ORDER_TIMEOUT_QUEUE = "order.timeout.queue";
// 訂單超時處理隊列交換機
public static final String ORDER_TIMEOUT_EXCHANGE = "order.timeout.exchange";
// 訂單超時處理RoutingKey
public static final String ORDER_TIMEOUT_ROUTING_KEY = "order.timeout.routing.key";
// 死信隊列
public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
// 死信隊列交換機
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
// 死信隊列RoutingKey
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
// 死信隊列超時時間(10秒)
public static final String X_MESSAGE_TTL ="10000";
}
消費者定義,死信隊列消費者定義與訂單超時消費隊列定義:
@Slf4j
@Component
public class DeadLetterConsumerAnnotatedEdition {
/**
* 該隊列的消息為死信始終消費不會成功等到到了超時時間則會將消息投遞到x-dead-letter-exchange交換機中由綁定的隊列來進行處理
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = DeadLetterQueueConfig.DEAD_LETTER_QUEUE, arguments =
{@Argument(name = "x-dead-letter-exchange", value = DeadLetterQueueConfig.ORDER_TIMEOUT_EXCHANGE),
@Argument(name = "x-dead-letter-routing-key", value = DeadLetterQueueConfig.ORDER_TIMEOUT_ROUTING_KEY),
@Argument(name = "x-message-ttl", value = DeadLetterQueueConfig.X_MESSAGE_TTL, type = "java.lang.Long")
// ,@Argument(name = "x-max-length",value = "5",type = "java.lang.Integer")隊列最大長度
}),//可以指定多種屬性
exchange = @Exchange(value = DeadLetterQueueConfig.DEAD_LETTER_EXCHANGE),
key = {DeadLetterQueueConfig.DEAD_LETTER_ROUTING_KEY}
)
})
@RabbitHandler
public void deadLetterConsumer(Message message, Channel channel) throws Exception {
/*
* deliveryTag:該消息的index
* multiple: ture確認本條消息以及之前沒有確認的消息(批量),false僅確認本條消息
* requeue: true該條消息重新返回MQ queue,MQ broker將會重新發送該條消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
/**
* 處理死信隊列超時的訂單
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = DeadLetterQueueConfig.ORDER_TIMEOUT_EXCHANGE, durable = "true", type = "direct"),
value = @Queue(value = DeadLetterQueueConfig.ORDER_TIMEOUT_QUEUE, durable = "true"),
key = DeadLetterQueueConfig.ORDER_TIMEOUT_ROUTING_KEY
))
public void canleOrder(String context, Message message, Channel channel) throws IOException {
log.info("當前時間:{} 訂單取消訂單號:{}", DateUtil.getCuurentDateStr(),context);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//僅確認本條消息
}
}
定義入口請求用於向死信隊列投遞消息:
@Slf4j
@RestController
@RequestMapping("rabbit/deadletter/queue")
@AllArgsConstructor
public class RabbitDeadLetterQueueProducer {
private RabbitTemplate rabbitTemplate;
/**
* 投遞訂單到死信隊列中
* @param orderId 訂單ID
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "orderId") String orderId){
rabbitTemplate.convertAndSend(DeadLetterQueueConfig.DEAD_LETTER_EXCHANGE,DeadLetterQueueConfig.DEAD_LETTER_ROUTING_KEY,orderId);
log.info("當前時間:{} 訂單號:{}", DateUtil.getCuurentDateStr(),orderId);
return "OK";
}
}
測試生成消息訪問接口地址(每一秒訪問次一個生成5個需要過期的訂單):
//localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000001
//localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000002
//localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000003
//localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000004
//localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000005
控制台打印消費信息:
2022-08-25 16:41:25.150 INFO 19956 --- [nio-8022-exec-9] c.g.b.d.c.RabbitDeadLetterQueueProducer : 當前時間:2022-08-25 16:41:25 訂單號:100000001
2022-08-25 16:41:26.824 INFO 19956 --- [io-8022-exec-10] c.g.b.d.c.RabbitDeadLetterQueueProducer : 當前時間:2022-08-25 16:41:26 訂單號:100000002
2022-08-25 16:41:28.689 INFO 19956 --- [nio-8022-exec-1] c.g.b.d.c.RabbitDeadLetterQueueProducer : 當前時間:2022-08-25 16:41:28 訂單號:100000003
2022-08-25 16:41:30.453 INFO 19956 --- [nio-8022-exec-2] c.g.b.d.c.RabbitDeadLetterQueueProducer : 當前時間:2022-08-25 16:41:30 訂單號:100000004
2022-08-25 16:41:33.256 INFO 19956 --- [nio-8022-exec-3] c.g.b.d.c.RabbitDeadLetterQueueProducer : 當前時間:2022-08-25 16:41:33 訂單號:100000005
2022-08-25 16:41:35.153 INFO 19956 --- [ntContainer#1-5] r.t.c.DeadLetterConsumerAnnotatedEdition : 當前時間:2022-08-25 16:41:35 訂單取消訂單號:100000001
2022-08-25 16:41:36.825 INFO 19956 --- [ntContainer#1-4] r.t.c.DeadLetterConsumerAnnotatedEdition : 當前時間:2022-08-25 16:41:36 訂單取消訂單號:100000002
2022-08-25 16:41:38.699 INFO 19956 --- [ntContainer#1-6] r.t.c.DeadLetterConsumerAnnotatedEdition : 當前時間:2022-08-25 16:41:38 訂單取消訂單號:100000003
2022-08-25 16:41:40.458 INFO 19956 --- [ntContainer#1-7] r.t.c.DeadLetterConsumerAnnotatedEdition : 當前時間:2022-08-25 16:41:40 訂單取消訂單號:100000004
2022-08-25 16:41:43.267 INFO 19956 --- [ntContainer#1-3] r.t.c.DeadLetterConsumerAnnotatedEdition : 當前時間:2022-08-25 16:41:43 訂單取消訂單號:100000005
延遲消息插件方案
延遲消息插件安裝
我這裡安裝的RabbitMQ版本為3.8.8,這裡我在發行版本中下載版本為: rabbitmq-delayed-message-exchange v3.8.x
github地址: //github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
同版本地址: //github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
將下載好的插件(rabbitmq_delayed_message_exchange-3.8.0.ez)複製到plugins目錄下(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\plugins);
進入到sbin目錄(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)打開cmd窗口執行命令開啟插件:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
繼續輸入命令重啟RabbitMQ:rabbitmq-service restart
延遲消息實現訂單超時案例代碼
延遲消息配置信息定義:
public class DelayedQueueConfig {
// 延遲隊列
public static final String DELAYED_QUEUE = "delayed.queue";
// 延遲隊列交換機
public static final String DELAYED_EXCHANGE = "delayed.exchange";
// 延遲隊列路由KEY
public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
// 訂單過期時間
public static final String ORDER_OUTIME ="10000";
}
消費者定義,延遲消息訂單超時消費隊列定義:
@Slf4j
@Component
public class DelayedConsumerAnnotatedEdition {
/**
* 延遲隊列交換機類型必須為:x-delayed-message
* x-delayed-type 必須設置否則將會報錯
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = DelayedQueueConfig.DELAYED_QUEUE),
exchange = @Exchange(value = DelayedQueueConfig.DELAYED_EXCHANGE,type = "x-delayed-message",
arguments = {@Argument(name = "x-delayed-type", value = ExchangeTypes.DIRECT)}),
key = {DelayedQueueConfig.DELAYED_ROUTING_KEY}
)
})
@RabbitHandler
public void delayedConsumer(String context, Message message, Channel channel) throws Exception {
log.info("當前時間:{} 訂單取消訂單號:{}", DateUtil.getCuurentDateStr(),context);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//僅確認本條消息
}
}
定義入口請求用於向延遲消息隊列投遞消息:
@Slf4j
@RestController
@RequestMapping("rabbit/dealyed/queue")
@AllArgsConstructor
public class RabbitDelayedQueueProducer {
private RabbitTemplate rabbitTemplate;
/**
* 投遞訂單到延遲消息隊列中
* @param orderId 訂單ID
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "orderId") String orderId){
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTING_KEY, orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設置消息超時時間
message.getMessageProperties().setHeader("x-delay", DelayedQueueConfig.ORDER_OUTIME);
return message;
}
});
log.info("當前時間:{} 訂單號:{}", DateUtil.getCuurentDateStr(),orderId);
return "OK";
}
}
測試生成消息訪問接口地址(每一秒訪問次一個生成5個需要過期的訂單):
//localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000001
//localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000002
//localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000003
//localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000004
//localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000005
控制台打印消費信息:
2022-08-25 17:24:36.324 INFO 17212 --- [nio-8022-exec-1] c.g.b.d.c.RabbitDelayedQueueProducer : 當前時間:2022-08-25 17:24:36 訂單號:100000001
2022-08-25 17:24:38.011 INFO 17212 --- [nio-8022-exec-2] c.g.b.d.c.RabbitDelayedQueueProducer : 當前時間:2022-08-25 17:24:38 訂單號:100000002
2022-08-25 17:24:39.606 INFO 17212 --- [nio-8022-exec-3] c.g.b.d.c.RabbitDelayedQueueProducer : 當前時間:2022-08-25 17:24:39 訂單號:100000003
2022-08-25 17:24:41.109 INFO 17212 --- [nio-8022-exec-4] c.g.b.d.c.RabbitDelayedQueueProducer : 當前時間:2022-08-25 17:24:41 訂單號:100000004
2022-08-25 17:24:42.547 INFO 17212 --- [nio-8022-exec-5] c.g.b.d.c.RabbitDelayedQueueProducer : 當前時間:2022-08-25 17:24:42 訂單號:100000005
2022-08-25 17:24:46.395 INFO 17212 --- [ntContainer#2-1] .d.r.d.c.DelayedConsumerAnnotatedEdition : 當前時間:2022-08-25 17:24:46 訂單取消訂單號:100000001
2022-08-25 17:24:48.037 INFO 17212 --- [tContainer#2-10] .d.r.d.c.DelayedConsumerAnnotatedEdition : 當前時間:2022-08-25 17:24:48 訂單取消訂單號:100000002
2022-08-25 17:24:49.626 INFO 17212 --- [ntContainer#2-9] .d.r.d.c.DelayedConsumerAnnotatedEdition : 當前時間:2022-08-25 17:24:49 訂單取消訂單號:100000003
2022-08-25 17:24:51.127 INFO 17212 --- [ntContainer#2-7] .d.r.d.c.DelayedConsumerAnnotatedEdition : 當前時間:2022-08-25 17:24:51 訂單取消訂單號:100000004
2022-08-25 17:24:52.549 INFO 17212 --- [ntContainer#2-8] .d.r.d.c.DelayedConsumerAnnotatedEdition : 當前時間:2022-08-25 17:24:52 訂單取消訂單號:100000005
案例源代碼
//gitee.com/SimpleWu/blogs-examples/tree/master/rabbitmq-delay-queue-case