SpringBoot整合RabbitMQ實戰附加死信交換機
前言
使用springboot,實現以下功能,有兩個隊列1、2,往裡面發送消息,如果處理失敗發生異常,可以重試3次,重試3次均失敗,那麼就將消息發送到死信隊列進行統一處理,例如記錄資料庫、報警等
完整demo項目程式碼//gitee.com/daenmax/rabbit-mq-demo
環境
Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.雙擊C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat啟動MQ服務
2.然後訪問//localhost:15672/,默認帳號密碼均為guest,
3.手動添加一個虛擬主機為admin_host,手動創建一個用戶帳號密碼均為admin
pom.xml
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: admin_host
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true #開啟失敗重試
max-attempts: 3 #最大重試次數
initial-interval: 1000 #重試間隔時間 毫秒
配置文件
RabbitConfig
package com.example.rabitmqdemo.mydemo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
* Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
* Queue:消息的載體,每個消息都會被投到一個或多個隊列。
* Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
* Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
* vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的許可權分離。
* Producer:消息生產者,就是投遞消息的程式.
* Consumer:消息消費者,就是接受消息的程式.
* Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
*/
@Slf4j
@Component
public class RabbitConfig {
//業務交換機
public static final String EXCHANGE_PHCP = "phcp";
//業務隊列1
public static final String QUEUE_COMPANY = "company";
//業務隊列1的key
public static final String ROUTINGKEY_COMPANY = "companyKey";
//業務隊列2
public static final String QUEUE_PROJECT = "project";
//業務隊列2的key
public static final String ROUTINGKEY_PROJECT = "projectKey";
//死信交換機
public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
//死信隊列1
public static final String QUEUE_COMPANY_DEAD = "company_dead";
//死信隊列2
public static final String QUEUE_PROJECT_DEAD = "project_dead";
//死信隊列1的key
public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
//死信隊列2的key
public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";
// /**
// * 解決重複確認報錯問題,如果沒有報錯的話,就不用啟用這個
// *
// * @param connectionFactory
// * @return
// */
// @Bean
// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return factory;
// }
/**
* 聲明業務交換機
* 1. 設置交換機類型
* 2. 將隊列綁定到交換機
* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
* HeadersExchange :通過添加屬性key-value匹配
* DirectExchange:按照routingkey分發到指定隊列
* TopicExchange:多關鍵字匹配
*/
@Bean("exchangePhcp")
public DirectExchange exchangePhcp() {
return new DirectExchange(EXCHANGE_PHCP);
}
/**
* 聲明死信交換機
* 1. 設置交換機類型
* 2. 將隊列綁定到交換機
* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
* HeadersExchange :通過添加屬性key-value匹配
* DirectExchange:按照routingkey分發到指定隊列
* TopicExchange:多關鍵字匹配
*/
@Bean("exchangePhcpDead")
public DirectExchange exchangePhcpDead() {
return new DirectExchange(EXCHANGE_PHCP_DEAD);
}
/**
* 聲明業務隊列1
*
* @return
*/
@Bean("queueCompany")
public Queue queueCompany() {
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
//綁定該隊列到死信交換機的隊列1
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
}
/**
* 聲明業務隊列2
*
* @return
*/
@Bean("queueProject")
public Queue queueProject() {
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
//綁定該隊列到死信交換機的隊列2
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();
}
/**
* 聲明死信隊列1
*
* @return
*/
@Bean("queueCompanyDead")
public Queue queueCompanyDead() {
return new Queue(QUEUE_COMPANY_DEAD);
}
/**
* 聲明死信隊列2
*
* @return
*/
@Bean("queueProjectDead")
public Queue queueProjectDead() {
return new Queue(QUEUE_PROJECT_DEAD);
}
/**
* 綁定業務隊列1和業務交換機
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);
}
/**
* 綁定業務隊列2和業務交換機
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);
}
/**
* 綁定死信隊列1和死信交換機
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);
}
/**
* 綁定死信隊列2和死信交換機
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}
}
生產者
RabbltProducer
package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 初始化消息確認函數
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
/**
* 發送消息伺服器確認函數
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息發送成功" + correlationData);
} else {
System.out.println("消息發送失敗:" + cause);
}
}
/**
* 消息發送失敗,消息回調函數
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
String str = new String(returnedMessage.getMessage().getBody());
System.out.println("消息發送失敗:" + str);
}
/**
* 處理消息發送到隊列1
* @param str
*/
public void sendCompany(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
}
/**
* 處理消息發送到隊列2
* @param str
*/
public void sendProject(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
}
}
業務消費者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 監聽業務交換機
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 監聽業務隊列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company")
public void company(Message message, Channel channel) throws IOException {
try{
System.out.println("次數" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("處理消息"+s);
//下面兩行是嘗試手動拋出異常,用來測試重試次數和發送到死信交換機
//String str = null;
//str.split("1");
//處理成功,確認應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("處理消息時發生異常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("異常重試次數已到達設置次數,將發送到死信交換機");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即將返回隊列處理重試");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 監聽業務隊列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project")
public void project(Message message, Channel channel) throws IOException {
try{
System.out.println("次數" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("處理消息"+s);
//下面兩行是嘗試手動拋出異常,用來測試重試次數和發送到死信交換機
//String str = null;
//str.split("1");
//處理成功,確認應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("處理消息時發生異常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("異常重試次數已到達設置次數,將發送到死信交換機");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即將返回隊列處理重試");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
死信消費者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 監聽死信交換機
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumerDead {
/**
* 處理死信隊列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company_dead")
public void company_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("處理死信"+s);
//在此處記錄到資料庫、報警之類的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收異常:"+e.getMessage());
}
}
/**
* 處理死信隊列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project_dead")
public void project_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("處理死信"+s);
//在此處記錄到資料庫、報警之類的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收異常:"+e.getMessage());
}
}
}
測試
MqController
package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
@Resource
private RabbltProducer rabbltProducer;
@RequestMapping("/handleCompany")
public void handleCompany(@RequestBody String jsonStr){
rabbltProducer.sendCompany(jsonStr);
}
}