SpringBoot RabbitMQ 註解版 基本概念與基本案例
- 2022 年 8 月 24 日
- 筆記
- 【中間件系列】RabbitMQ
前言
人間清醒
Windows安裝RabbitMQ
環境工具下載
rabbitMQ是Erlang語言開發的所以先下載Erlang;
RabbitMQ官網地址: //www.rabbitmq.com/
Erlang下載: //www.erlang.org/downloads
如果不 想使用最新版本可以使用我已經下載保存在雲盤的環境: //www.aliyundrive.com/s/Yts3ufpyk1W(otp_win64_23.0,rabbitmq-server-3.8.8)
Erlang環境安裝
直接運行: otp_win64_23.0.exe 程序一直next即可,如需改變安裝位置自行選擇,安裝完成後對系統環境變量新增ERLANG_HOME地址為:
C:\Program Files\erl-23.0

雙擊系統變量path,點擊「新建」,將%ERLANG_HOME%\bin加入到path中。
win+R鍵,輸入cmd,再輸入erl,看到erlang版本號就說明erlang安裝成功了。
RabbitMQ安裝
直接運行: rabbitmq-server-3.8.8 程序一直next即可,如需改變安裝位置自行選擇.
RabbitMQ Web管理端安裝
進入安裝後的RabbitMQ的sbin目錄中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
Cmd命令執行: rabbitmq-plugins enable rabbitmq_managementr
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
Plugin configuration unchanged
常用命令:
# 啟動RabbitMQ
rabbitmq-service start
# 停止RabbitMQ
rabbitmq-service stop
# 啟用RabbitMQ Web可視化界面插件
rabbitmq-plugins enable rabbitmq_management
# 停用RabbitMQ Web可視化界面插件
rabbitmq-plugins disable rabbitmq_management
# 查看RabbitMQ狀態
rabbitmqctl status
訪問管理端頁面,默認賬號密碼為: guest
可視化界面: //127.0.0.1:15672/#/
RabbitMQ新增超級管理員
進入安裝後的RabbitMQ的sbin目錄中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
# 創建用戶root用戶 密碼為123456
rabbitmqctl add_user root 123456
# 為該用戶分配所有權限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 設置該用戶為管理員角色
rabbitmqctl set_user_tags root administrator
RabbitMQ特點
RabbitMQ是一款使用Erlang語言開發的,實現AMQP(高級消息隊列協議)的開源消息中間件。首先要知道一些RabbitMQ的特點:
- 可靠性:支持持久化,傳輸確認,發佈確認等保證了MQ的可靠性。
- 靈活的分發消息策略:在消息進入MQ前由Exchange(交換機)進行路由消息。
- 分發消息策略:簡單模式、工作隊列模式、發佈訂閱模式、路由模式、通配符模式。
- 支持集群:多台RabbitMQ服務器可以組成一個集群,形成一個邏輯Broker。
- 多種協議:RabbitMQ支持多種消息隊列協議,比如 STOMP、MQTT 等等。
- 支持多種語言客戶端:RabbitMQ幾乎支持所有常用編程語言,包括 Java、.NET、Ruby 等等。
- 可視化管理界面:RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker。
- 插件機制:RabbitMQ提供了許多插件,可以通過插件進行擴展,也可以編寫自己的插件。
RabbitMQ 3種常用交換機
- Direct Exchange 直連型交換機:根據消息攜帶的路由鍵將消息投遞給對應隊列。
- Fanout Exchange 扇型交換機:這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息後,會直接轉發到綁定到它上面的所有隊列。
- Topic Exchange 主題交換機:這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的
RabbitMQ 5種常用模式
- Simple Work Queue 簡單工作隊列:該模式是很少用到的一個場景,一般都會通過Exchange進行消息分配到隊列從而為以後擴展預留一個入口。
- Publish/Subscribe 發佈訂閱模式:該模式性能最好,拿到消息直接放入隊列。
- Routing 路由模式:該模式通過routing key 進行全字匹配,匹配上將相關消息放入相關隊列。
- Topics 主題模式:該模式通過routng key進行模糊匹配,匹配上將相關信息放入相關隊列。
- Header 模式:通過message header頭部信息進行比對,可以根據定義全匹配、部分匹配等規則。
RabbitMQ名詞解釋
- Producer/Publisher:生產者,投遞消息的一方。
- Consumer:消費者,接收消息的一方。
- Message消息:實際的數據,如demo中的order訂單消息載體。
- Queue隊列:是RabbitMQ的內部對象,用於存儲消息,最終將消息傳輸到消費者。
- Exchange交換機:在RabbitMQ中,生產者發送消息到交換機,由交換機將消息路由到一個或者多個隊列中
- RoutingKey路由鍵:生產者將消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則。
- Binding綁定:RabbitMQ中通過綁定將交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵(BindingKey),這樣RabbitMQ就知道如何正確地將消息路由到隊列。
MQ適用場景
異步處理場景
如當一個站點新增用戶時需要走以下流程:驗證賬號信息->用戶入庫->發送註冊成功歡迎郵箱給用戶;
從該流程中分析用戶註冊成功後首先期望的是能夠成功登錄上站點,而對於能否收到註冊成功的郵件對於用戶而言並不重要,
而郵件發送對於如遇到網絡問題可能導致發送郵件緩慢從來導致整個用戶註冊流程響應很慢;
對於通知郵件發送對於功能而言並不重要的時候,這個時候就可以將該業務放在MQ中異步執行從而可以從一定程度上提升整個流程的性能。
應用解耦
如當一個站點新增用戶時需要走以下流程:驗證賬號信息->用戶入庫->發送註冊成功歡迎郵箱給用戶;
通常通過系統劃分會劃分為:用戶模塊,消息模塊;
以Spring Cloud的為例按照原始做法會在用戶入庫成功後會通過Feign調用消息模塊的發送郵件接口,但是如果消息模塊全集群宕機就會導致Feign請求失敗從而導致業務不可用;
使用MQ就不會造成上述的問題,因為我們在用戶註冊完成後想消息模塊對應的郵件發送業務隊列去發送消息即可,隊列會監督消息模塊完成,如果完不成隊列會一直監督,直到完成為止
流量削峰
秒殺和搶購等場景經常使用 MQ 進行流量削峰。活動開始時流量暴增,用戶的請求寫入 MQ,超過 MQ 最大長度丟棄請求,業務系統接收 MQ 中的消息進行處理,達到流量削峰、保證系統可用性的目的。
影響:MQ是排隊執行的所以對性能有一定的影響,並且請求過多後會導致請求被丟棄問題
消息通訊
點對點或者訂閱發佈模式,通過消息進行通訊。如微信的消息發送與接收、聊天室等。
SpringBoot中使用RabbitMQ
工程創建&準備
說明該工程按照包區分同時擔任生產者與消費者
POM導入依賴:
<dependencies>
<!-- RabbitMQ依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 導入Web服務方便測試 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 代碼簡化工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
創建SpringBoot啟動類:
@SpringBootApplication
public class SimpleRabbitMQCaseApplication {
public static void main(String[] args) {
SpringApplication.run(SimpleRabbitMQCaseApplication.class,args);
}
}
創建applicatin.yaml:
server:
port: 8021
spring:
application:
name: rabbitmq-simple-case
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
virtual-host: / # 虛擬host 可以不設置,使用server默認host
listener:
simple:
concurrency: 10 # 消費端的監聽個數(即@RabbitListener開啟幾個線程去處理數據。)
max-concurrency: 10 # 消費端的監聽最大個數
prefetch: 5
acknowledge-mode: auto # MANUAL:手動處理 AUTO:自動處理
default-requeue-rejected: true # 消費不成功的消息拒絕入隊
retry:
enabled: true # 開啟消息重試
max-attempts: 5 # 重試次數
max-interval: 10000 # 重試最大間隔時間
initial-interval: 2000 # 重試初始間隔時間
簡單隊列生產消費
生產者:
/**
* 簡單隊列消息生產
* @author wuwentao
*/
@RestController
@RequestMapping("/simple/queue")
@AllArgsConstructor
public class SimpleQueueProducer {
private RabbitTemplate rabbitTemplate;
// 發送到的隊列名稱
public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue";
/**
* 發送簡單消息
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, message);
return "OK";
}
}
消費者:
/**
* 簡單隊列消息消費者
* @author wuwentao
*/
@Component
@Slf4j
public class SimpleQueueConsumer {
/**
* 監聽一個簡單的隊列,隊列不存在時候會創建
* @param content 消息
*/
@RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE))
public void consumerSimpleMessage(String content, Message message, Channel channel) throws IOException {
// 通過Message對象解析消息
String messageStr = new String(message.getBody());
log.info("通過參數形式接收的消息:{}" ,content);
//log.info("通過Message:{}" ,messageStr); // 可通過Meessage對象解析消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動確認消息消費成功
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 手動確認消息消費失敗
}
}
測試生成消息訪問接口地址:
//localhost:8021/simple/queue/sendMessage?message=這是一條簡單的消息序號1
//localhost:8021/simple/queue/sendMessage?message=這是一條簡單的消息序號2
//localhost:8021/simple/queue/sendMessage?message=這是一條簡單的消息序號3
控制台打印消費信息:
2022-08-22 09:45:26.846 INFO 14400 --- [ntContainer#0-1] c.g.b.s.consumer.SimpleQueueConsumer : 通過參數形式接收的消息:這是一條簡單的消息序號1
2022-08-22 09:45:29.064 INFO 14400 --- [tContainer#0-10] c.g.b.s.consumer.SimpleQueueConsumer : 通過參數形式接收的消息:這是一條簡單的消息序號2
2022-08-22 09:45:31.441 INFO 14400 --- [ntContainer#0-4] c.g.b.s.consumer.SimpleQueueConsumer : 通過參數形式接收的消息:這是一條簡單的消息序號3
注意事項:在YAML中開啟的配置acknowledge-mode為auto也是默認的所以消息不需要手動確認默認沒有異常則消費成功,如果需要定製ACK方式可以將acknowledge-mode修改為MANUAL則要在消費完成後自行ACK或NACK否則將導致消息重複消費
Fanout Exchange 扇形交換機 廣播模式
fanout模式也叫廣播模式,每一條消息可以被綁定在同一個交換機上的所有隊列的消費者消費
生產者:
@RestController
@RequestMapping("/exchange/fanout")
@AllArgsConstructor
public class ExchangeFanoutProducer {
private RabbitTemplate rabbitTemplate;
// 扇形交換機定義
public static final String EXCHANGE_FANOUT = "exchange.fanout";
// 綁定扇形交換機的隊列1
public static final String EXCHANGE_FANOUT_QUEUE_1 = "exchange.fanout.queue1";
// 綁定扇形交換機的隊列2
public static final String EXCHANGE_FANOUT_QUEUE_2 = "exchange.fanout.queue2";
/**
* 發送扇形消息消息能夠被所有綁定該交換機的隊列給消費
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// routingkey 在fanout模式不使用,會在direct和topic模式使用,所以這裡給空
rabbitTemplate.convertAndSend(EXCHANGE_FANOUT,"", message);
return "OK";
}
}
消費者:
這裡定義兩個消費者同時綁定同一個扇形交換機,這裡主要聲明交換機Type為ExchangeTypes.FANOUT
/**
* 扇形交換機隊列消費者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeFanoutConsumer {
/**
* 創建交換機並且綁定隊列(隊列1)
*
* @param content 內容
* @param channel 通道
* @param message 消息
* @throws IOException ioexception
* @throws TimeoutException 超時異常
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_1, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue1(String content, Channel channel, Message message) {
log.info("EXCHANGE_FANOUT_QUEUE_1隊列接收到消息:{}",content);
}
/**
* 創建交換機並且綁定隊列(隊列2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_2, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue2(String content, Channel channel, Message message) {
log.info("EXCHANGE_FANOUT_QUEUE_2隊列接收到消息:{}",content);
}
}
測試生成消息訪問接口地址:
//localhost:8021/exchange/fanout/sendMessage?message=這是一條扇形交換機中的消息序號1
//localhost:8021/exchange/fanout/sendMessage?message=這是一條扇形交換機中的消息序號2
//localhost:8021/exchange/fanout/sendMessage?message=這是一條扇形交換機中的消息序號3
控制台打印消費信息:
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#1-2] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2隊列接收到消息:這是一條扇形交換機中的消息序號1
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#0-7] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1隊列接收到消息:這是一條扇形交換機中的消息序號1
2022-08-22 10:10:49.151 INFO 12016 --- [tContainer#0-10] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1隊列接收到消息:這是一條扇形交換機中的消息序號2
2022-08-22 10:10:49.151 INFO 12016 --- [ntContainer#1-4] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2隊列接收到消息:這是一條扇形交換機中的消息序號2
2022-08-22 10:10:54.254 INFO 12016 --- [ntContainer#0-6] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1隊列接收到消息:這是一條扇形交換機中的消息序號3
2022-08-22 10:10:54.255 INFO 12016 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2隊列接收到消息:這是一條扇形交換機中的消息序號3
Direct Exchange 直連型交換機,
直連交換機與扇形交換機的區別在於,隊列都是綁定同一個交換機,但是在隊列上會添加routingkey標識,消費者會根據對應的不同routingkey消費對應的消息。
生產者:
@RestController
@RequestMapping("/exchange/direct")
@AllArgsConstructor
public class ExchangeDirectProducer {
private RabbitTemplate rabbitTemplate;
// 直連交換機定義
public static final String EXCHANGE_DIRECT = "exchange.direct";
// 直連交換機隊列定義1
public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue1";
// 直連交換機隊列定義2
public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue2";
// 直連交換機路由KEY定義1
public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.key1";
// 直連交換機路由KEY定義2
public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.key2";
/**
* 發送消息到直連交換機並且指定對應routingkey
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){
if(routingkey == 1){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_1, message);
} else if (routingkey == 2){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_2, message);
}else{
return "非法參數!";
}
return "OK";
}
}
消費者:
這裡定義多個消費者同時綁定同一個直連交換機,這裡主要聲明交換機Type為ExchangeTypes.DIRECT,不聲明則默認為DIRECT。
/**
* 直連交換機隊列消費者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeDirectConsumer {
/**
* 創建交換機並且綁定隊列1(綁定routingkey1)
*
* @param content 內容
* @param channel 通道
* @param message 消息
* @throws IOException ioexception
* @throws TimeoutException 超時異常
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_1, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_1
))
@RabbitHandler
public void exchangeDirectRoutingKey1(String content, Channel channel, Message message) {
log.info("隊列1 KEY1接收到消息:{}",content);
}
/**
* 創建交換機並且綁定隊列2(綁定routingkey2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_2, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_2
))
@RabbitHandler
public void exchangeDirectRoutingKey2(String content, Channel channel, Message message) {
log.info("隊列2 KEY2接收到消息:{}",content);
}
}
測試生成消息訪問接口地址:
//localhost:8021/exchange/direct/sendMessage?routingkey=1&message=這是一條發給路由key為1的消息
//localhost:8021/exchange/direct/sendMessage?routingkey=2&message=這是一條發給路由key為2的消息
控制台打印消費信息:
2022-08-22 10:37:22.173 INFO 4380 --- [ntContainer#0-1] c.g.b.s.consumer.ExchangeDirectConsumer : 隊列1 KEY1接收到消息:這是一條發給路由key為1的消息
2022-08-22 10:37:26.882 INFO 4380 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeDirectConsumer : 隊列2 KEY2接收到消息:這是一條發給路由key為2的消息
Topic Exchange 主題交換機
這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的;規則如下:
Topic交換機接收的消息RoutingKey必須是多個單詞,以 . 分割
Topic交換機與隊列綁定時的routingKey可以指定通配符
#:代表0個或多個詞
*:代表1個詞
生產者:
@RestController
@RequestMapping("/exchange/topic")
@AllArgsConstructor
public class ExchangeTopicProducer {
private RabbitTemplate rabbitTemplate;
// 主題交換機定義
public static final String EXCHANGE_TOPIC = "exchange.topic";
// 主題交換機隊列定義1
public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue1";
// 主題交換機隊列定義1
public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue2";
// 主題交換機隊列路由Key定義1
public static final String EXCHANGE_TOPIC_ROUTING1_KEY_1 = "#.routingkey.#";
// 主題交換機隊列路由Key定義2
public static final String EXCHANGE_TOPIC_ROUTING2_KEY_2 = "routingkey.*";
// 案例KEY1 可以被EXCHANGE_TOPIC_ROUTING1_KEY_1匹配不能被EXCHANGE_TOPIC_ROUTING2_KEY_2匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
// 案例KEY2 同時可以被EXCHANGE_TOPIC_ROUTING1_KEY_1和EXCHANGE_TOPIC_ROUTING2_KEY_2匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
/**
* 發送消息到主題交換機並且指定對應可通配routingkey
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){
if(routingkey == 1){
// 同時匹配 topic.routingkey.case1 和 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_1, message);
} else if (routingkey == 2){
// 只能匹配 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_2, message);
}else{
return "非法參數!";
}
return "OK";
}
}
消費者:
這裡定義多個消費者同時綁定同一個直主題交換機,這裡主要聲明交換機Type為ExchangeTypes.TOPIC,當routingkey發送的消息能夠被消費者給匹配僅能夠接收到消息。
@Component
@Slf4j
public class ExchangeTopicConsumer {
/**
* 創建交換機並且綁定隊列1(綁定routingkey1)
*
* @param content 內容
* @param channel 通道
* @param message 消息
* @throws IOException ioexception
* @throws TimeoutException 超時異常
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_1, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING1_KEY_1
))
@RabbitHandler
public void exchangeTopicRoutingKey1(String content, Channel channel, Message message) {
log.info("#號統配符號隊列1接收到消息:{}",content);
}
/**
* 創建交換機並且綁定隊列2(綁定routingkey2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_2, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING2_KEY_2
))
@RabbitHandler
public void exchangeTopicRoutingKey2(String content, Channel channel, Message message) {
log.info("*號統配符號隊列2接收到消息:{}",content);
}
}
測試生成消息訪問接口地址:
//localhost:8021/exchange/topic/sendMessage?routingkey=1&message=前後多重匹配
//localhost:8021/exchange/topic/sendMessage?routingkey=2&message=後一個詞匹配
控制台打印消費信息:
2022-08-22 15:10:50.444 INFO 1376 --- [ntContainer#4-8] c.g.b.s.consumer.ExchangeTopicConsumer : #號統配符號隊列1接收到消息:前後多重匹配
2022-08-22 15:10:55.118 INFO 1376 --- [ntContainer#5-8] c.g.b.s.consumer.ExchangeTopicConsumer : *號統配符號隊列2接收到消息:後一個詞匹配
2022-08-22 15:10:55.119 INFO 1376 --- [ntContainer#4-9] c.g.b.s.consumer.ExchangeTopicConsumer : #號統配符號隊列1接收到消息:後一個詞匹配
手動ACK與消息確認機制
新增SpringBoot配置文件YAML,這裡主要將自動ACK修改為手工ACK並且開啟消息確認模式與消息回退:
spring:
rabbitmq:
listener:
acknowledge-mode: manual # MANUAL:手動處理 AUTO:自動處理
# NONE值是禁用發佈確認模式,是默認值
# CORRELATED值是發佈消息成功到交換器後會觸發回調方法,如1示例
# SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發佈消息成功後使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;
publisher-confirm-type: simple #消息確認機制
publisher-returns: true # 消息回退確認機制
定義消息回調確認實現類:
/**
* 消費者確認收到消息後,手動ack回執回調處理
* @author wuwentao
*/
@Slf4j
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("===================================================");
log.info("消息確認機制回調函數參數信息如下:");
log.info("ACK狀態:{}",ack);
log.info("投遞失敗原因:{}",cause);
log.info("===================================================");
}
}
消費者:
/**
* RabbitMQ Message 回調地址消費者測試
* @author wuwentao
*/
@Component
@Slf4j
public class MessagesCallbackConsumer {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MessagesCallbackProducer.MESSAGE_CALLBACK_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = MessagesCallbackProducer.MESSAGE_CALLBACK_QUEUE, durable = "true"),
key = MessagesCallbackProducer.MESSAGE_CALLBACK_ROUTINGKEY
))
@RabbitHandler
public void consumer(String content, Channel channel, Message message) throws IOException {
if("成功".equals(content)){
log.info("消息處理成功:{}",content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動確認消息消費成功
}else{
if(message.getMessageProperties().getRedelivered()){
log.info("消息已被處理過了請勿重複處理消息:{}",content);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕該消息,消息會被丟棄,不會重回隊列
}else{
log.info("消息處理失敗等待重新處理:{}",content);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
生產者:
/**
* 消息回調機制測試
* @author wuwentao
*/
@RestController
@RequestMapping("/message/callback")
@AllArgsConstructor
public class MessagesCallbackProducer {
private RabbitTemplate rabbitTemplate;
private MessageConfirmCallback messageConfirmCallback;
// 發送到的隊列名稱
public static final String MESSAGE_CALLBACK_QUEUE = "amqp.message.callback.queue";
public static final String MESSAGE_CALLBACK_EXCHANGE = "amqp.message.callback.exchange";
public static final String MESSAGE_CALLBACK_ROUTINGKEY = "amqp.message.callback.routingkey";
/**
* 測試消息確認機制
* @param message 消息內容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// 設置失敗和確認回調函數
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(messageConfirmCallback);
//構建回調id為uuid
String callBackId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(callBackId);
if("失敗的消息".equals(message)){
// 寫一個不存的交換機器 和不存在的路由KEY
rabbitTemplate.convertAndSend("sdfdsafadsf","123dsfdasf",message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}else{
rabbitTemplate.convertAndSend(MESSAGE_CALLBACK_EXCHANGE,MESSAGE_CALLBACK_ROUTINGKEY,message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}
return "OK";
}
}
測試生成消息訪問接口地址:
# 發送找不到交換機的消息
//localhost:8021/message/callback/sendMessage?message=失敗的消息
# 發送手動ACK成功的消息
//localhost:8021/message/callback/sendMessage?message=成功
# 發送手動ACK失敗的消息
//localhost:8021/message/callback/sendMessage?message=失敗
控制台打印消費信息:
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 消息確認機制回調函數參數信息如下:
2022-08-24 09:11:50.123 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK狀態:false
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投遞失敗原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'sdfdsafadsf' in vhost '/', class-id=60, method-id=40)
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.704 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 消息確認機制回調函數參數信息如下:
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK狀態:true
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投遞失敗原因:null
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.735 INFO 11440 --- [ntContainer#6-1] c.g.b.s.c.MessagesCallbackConsumer : 消息處理成功:成功
2022-08-24 09:12:16.680 INFO 11440 --- [ntContainer#6-4] c.g.b.s.c.MessagesCallbackConsumer : 消息處理失敗等待重新處理:失敗
2022-08-24 09:12:16.688 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 消息確認機制回調函數參數信息如下:
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK狀態:true
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投遞失敗原因:null
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.693 INFO 11440 --- [ntContainer#6-7] c.g.b.s.c.MessagesCallbackConsumer : 消息已被處理過了請勿重複處理消息:失敗
案例源代碼
//gitee.com/SimpleWu/blogs-examples/tree/master/rabbitmq-simple-case


