基於RabbltMQ延遲插件實現延遲隊列程式碼示例
- 2022 年 6 月 5 日
- 筆記
上一篇文章寫了docker安裝RabbitMQ及延遲插件的安裝如果沒有安裝插件的可以去上一個文章去安裝一下,這篇的話是基於RabbitMQ延遲插件實現延遲隊列的示例
那麼廢話不多說 直接上程式碼!!
首先創建延遲隊列配置類 DelayedQueueConfig
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
//交換機
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//隊列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//routingkey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//聲明交換機
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
//設置延遲交換機為直接類型
arguments.put("x-delayed-type", "direct");
/**
* 參數說明
* 1.交換機名字
* 2.交換機類型
* 3.是否持久化
* 4.是否自動刪除
* 5.其他參數
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
//聲明隊列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//綁定
@Bean
public Binding delayedQueueBindingdelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
接著編寫生產者程式碼
//基於插件發送延遲消息
@GetMapping("/senddelayedMsg/{message}/{delayedTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime) {
log.info("當前時間:{},發送一條時長{}毫秒給延遲隊列delayed.queue:{}", new Date().toString(), delayedTime, message);
/**
* convertAndSend 方法參數說明
* 1.交換機名稱
* 2.routingkey
* 3.發送的消息
* 4.發送完消息後的回調
*/
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelay(delayedTime);
return msg;
});
}
最後編寫消費者程式碼
import com.zfd.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayedqueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message) {
String msg = new String(message.getBody());
log.info("當前時間:{},收到延遲隊列的消息:{}", new Date().toString(), msg);
}
}
接著運行程式 發送請求
到這裡我們基於插件實現RabbitMQ延遲隊列就寫完了!!