RabbitMQ入門到進階(Spring整合RabbitMQ&SpringBoot整合RabbitMQ)
1.MQ簡介
MQ 全稱為 Message Queue,是在消息的傳輸過程中保存消息的容器。多用於分散式系統 之間進行通訊。
2.為什麼要用 MQ
1.流量消峰
沒使用MQ
使用了MQ
2.應用解耦
3.非同步處理
沒使用MQ
使用了MQ
3.常見的MQ對比
先學習RabbitMQ,後面可以再學學RocketMQ和Kafka
4.RabbitMQ的安裝(linux:centos7環境,我使用的是docker容器進行安裝的,也可以使用其他方式 >>>> 非docker方式安裝RabbitMQ)
一、下載鏡像
docker search RabbitMQ
進入docker hub鏡像倉庫地址://hub.docker.com/
搜索rabbitMq,進入官方的鏡像,可以看到以下幾種類型的鏡像;我們選擇帶有「mangement」的版本(包含web管理頁面);
拉取鏡像
docker pull rabbitmq:management
二、安裝和web介面啟動
鏡像創建和啟動容器
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
說明:
- -d 後台運行容器;
- –name 指定容器名;
- -p 指定服務運行的埠(5672:應用訪問埠;15672:控制台Web埠號);
- –hostname 主機名(RabbitMQ的一個重要注意事項是它根據所謂的 「節點名稱」 存儲數據,默認為主機名);
查看所有正在運行容器
docker ps -a
刪除指定容器
docker rm ID/NAME
刪除所有閑置容器
docker container prune
重啟docker
systemctl restart docker
重啟啟動RabbitMQ
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
開啟防火牆15672埠
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
停止RabbitMQ容器
- 命令: docker stop rabbitmq
啟動RabbitMQ容器
- 命令:docker start rabbitmq
重啟RabbitMQ容器
- 命令:docker restart rabbitmq
三、測試
//linuxip地址:15672,這裡的用戶名和密碼默認都是guest
四、進入rabbitmq容器
docker exec -it rabbitmq /bin/bash
五、添加新的用戶
創建帳號
rabbitmqctl add_user 【用戶名】 【密碼】
設置用戶角色
rabbitmqctl set_user_tags admin administrator
設置用戶許可權
rabbitmqctl set_permissions -p "/" qbb ".*"".*"".*"
查看當前用戶角色、許可權
rabbitmqctl list_users
安裝好RabbitMQ後如果需要熟悉裡面的操作,大家可以參考官方網站
5.RabbitMQ提供了7種工作模式
6.RabbitMQ入門之簡單模式(Java操作RabbitMQ)
1.創建一個普通的maven項目
2.在pom.xml中導入相關依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qbb</groupId>
<artifactId>java-mq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
</dependencies>
</project>
3.編寫生產者發送消息
package com.qbb.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 16:25
* @Description:生產者
*/
public class SimpleProducer {
public static void main(String[] args) {
try {
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.137.72");
factory.setPort(5672);
factory.setUsername("qbb");
factory.setPassword("qbb");
factory.setVirtualHost("/");
// 獲取連接對象
Connection connection = factory.newConnection();
// 獲取channel
Channel channel = connection.createChannel();
// 我們將消息發送到隊列中,前提是我們要有一個隊列,所以先聲明一個隊列
/**
* String queue : 隊列名稱
* boolean durable : 隊列是否持久化
* boolean exclusive : 是否獨佔本次連接,默認true
* boolean autoDelete : 是否自動刪除,最後一個消費者斷開連接以後,該隊列是否自動刪除
* Map<String, Object> arguments : 隊列其它參數
*/
channel.queueDeclare("simple-queue", false, false, false, null);
// 發送消息
/**
* String exchange : 交換機名稱,發送到哪個交換機
* String routingKey : 路由key是哪個
* BasicProperties props : 其他參數資訊
* byte[] body : 要發送的消息
*/
String message = "hello QiuQiu RabbitMQ";
channel.basicPublish("", "simple-queue", null, message.getBytes());
System.out.println("消息發送完畢");
// 釋放資源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.編寫消費者接收消息
package com.qbb.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 18:11
* @Description:消費者
*/
public class SimpleConsumer {
public static void main(String[] args) {
try {
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.137.72");
factory.setPort(5672);
factory.setUsername("qbb");
factory.setPassword("qbb");
factory.setVirtualHost("/");
// 獲取連接對象
Connection connection = factory.newConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
/**
* String queue,
* boolean autoAck,
* Consumer callback
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
}
};
//監聽隊列,第二個參數false,手動進行ACK
channel.basicConsume("simple-queue", true, consumer);
// 注意消費者端不要釋放資源,需要一直監控著隊列中的消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意:我們可以看到控制台報了一個錯,應該是少了個slf4j的依賴,我們導入就好了
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
7.消息確認機制
我們查詢圖形化介面發現消息一經消費,就被刪除了.
那麼RabbitMQ怎麼知道消息已經被我們消費了呢?
如果消費者領取消息後,還沒執行操作就掛掉了呢?
或者拋出了異常?消息消費失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了!
因此,RabbitMQ 有一個 ACK 機制。
當消費者獲取消息後,會向 RabbitMQ 發送回執 ACK, 告知消息已經被接收。
不過這種回執 ACK 分兩種情況:
- 自動 ACK:消息一旦被接收,消費者自動發送 ACK
- 手動 ACK:消息接收後,不會發送 ACK,需要手動調用
- 如果消息不太重要,丟失也沒有影響,那麼自動 ACK 會比較方便
- 如果消息非常重要,不容丟失。那麼最好在消費完成後手動 ACK,否則接收消息後 就自動 ACK,RabbitMQ 就會把消息從隊列中刪除。如果此時消費者宕機,那麼消 息就丟失了。
手動在consumer中製造一個異常,發現消息依舊被消費了
測試一下手動ACK
// 修改consumer端的程式碼
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
int a = 1 / 0;
System.out.println(msg);
//手動進行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//監聽隊列,第二個參數false,手動進行ACK
channel.basicConsume("simple-queue", false, consumer);
可以看出即使出現了異常消息依舊不會被消費丟失
去掉異常重新啟動consumer發現消息又被消費了
8.RabbitMQ入門之工作隊列模式(Java操作RabbitMQ)
與入門程式的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息
應用場景:對於任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
在前面的工程基礎上創建兩個包,繼續編寫程式碼
我們把獲取connection對象抽取一個utils工具類
1.編寫生產者發送消息
package com.qbb.workqueue;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 19:09
* @Description:
*/
public class WorkQueueProducer {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work-queue", false, false, false, null);
// 發送消息
for (int i = 0; i < 20; i++) {
String message = "hello QiuQiu work-queue:"+i;
channel.basicPublish("", "work-queue", null, message.getBytes());
}
// 釋放資源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.編寫消費者接收消息
**消費者1**
package com.qbb.workqueue;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 19:21
* @Description:
*/
public class WorkQueueConsumer1 {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work-queue", false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費者1消費消息
try {
// 睡50ms秒模擬,此服務性能差一點
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(body);
System.out.println("消費者1消費消息 = " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("work-queue", false, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
**消費者2**
package com.qbb.workqueue;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 19:21
* @Description:
*/
public class WorkQueueConsumer2 {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work-queue", false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費者2消費消息
String msg = new String(body);
System.out.println("消費者2消費消息 = " + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("work-queue", false, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
可以發現,兩個消費者各自消費了 25 條消息,而且各不相同,這就實現了任務的分發。
但是我現在想讓性能差一點的伺服器少處理點消息,實現能者多勞怎麼辦呢? 好辦
在比較慢的消費者創建隊列後我們可以使用 basicQos 方法和 prefetchCount = n ,告訴RabbitMQ每次給我發送一個消息等我處理完這個消息再給我發一個,一次一個的發消息
... WorkQueueConsumer1.java ...
// 設置每次拉取一條消息消費
channel.basicQos(1);
這樣就解決了伺服器性能差異問題
8.RabbitMQ入門之發布訂閱模式|Publish/Subscribe(Java操作RabbitMQ)
一次同時向多個消費者發送消息,一條消息可以被多個消費者消費
在訂閱模型中,多了一個 exchange 角色,而且過程略有變化:
- P:生產者,也就是要發送消息的程式,但是不再發送到隊列中,而是發給 X(交換機)
- C:消費者,消息的接受者,會一直等待消息到來。
- Queue:消息隊列,接收消息、快取消息。
- Exchange:交換機,圖中的 X。
一方面,接收生產者發送的消息。另一方面,知道如 何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何 操作,取決於 Exchange 的類型。
Exchange 有常見以下 3 種類型:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列
- Direct:定向,把消息交給符合指定 routing key 的隊列
- Topic:通配符,把消息交給符合 routing pattern(路由模式) 的隊列 Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那麼消息會丟失!
在廣播模式下,消息發送流程是這樣的:
- 可以有多個消費者 -每個消費者有自己的 queue(隊列)
- 每個隊列都要綁定到 Exchange(交換機)
- 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定
- 交換機把消息發送給綁定過的所有隊列
- 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
Fanout 交換機
1.隊列在綁定到交換機的時候不需要指定 routing key
2.發送消息的時候也不需要指定 routing key
3.凡是發送給交換機的消息都會廣播發送到所有與交換機綁定的隊列中。
1.編寫生產者發送消息
package com.qbb.pubsub;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 19:56
* @Description:發布訂閱模式
*/
public class PubSubProducer {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明交換機
/**
* 參數1:交換機名
* 參數2:交換機類型
*/
channel.exchangeDeclare("fanout-exchange","fanout");
String message = "hello QiuQiu pubsub";
channel.basicPublish("fanout-exchange", "pubsub-queue", null, message.getBytes());
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.編寫消費者接收消息
**消費者1**
package com.qbb.pubsub;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:02
* @Description:發布訂閱消費者
*/
public class PubSubConsumer1 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("fanout-queue1", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("fanout-queue1", "fanout-exchange", "pubsub-queue");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者1消費消息Message = " + new String(body));
// 手動ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("fanout-queue1", false, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
**消費者2**
package com.qbb.pubsub;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:02
* @Description:發布訂閱消費者
*/
public class PubSubConsumer2 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("fanout-queue2", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("fanout-queue2", "fanout-exchange", "pubsub-queue");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者2消費消息Message = " + new String(body));
// 手動ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("fanout-queue2", false, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試結果:
發布訂閱模式與工作隊列模式的區別
1、工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
2、發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發 送消息(底層使用默認交換機)。
3、發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作 隊列模式會將隊列綁 定到默認的交換機 。
9.RabbitMQ入門之Routing 路由模式(Java操作RabbitMQ)
有選擇性的接收消息
- 在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到 Direct 類 型的 Exchange。
路由模式特點:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由 key)- 消息的發送方在 向 Exchange 發送消息時,也必須指定消息的
RoutingKey
。- Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的
Routing Key
進行 判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
- P:生產者,向 Exchange 發送消息,發送消息時,會指定一個 routing key。
- X:Exchange(交換機),接收生產者的消息,然後把消息遞交給 與 routing key 完全匹配的隊列
- C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息
- C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的 消息
可以看出routing模式和發布訂閱模式沒多大區別,只是交換機不同而已
1.編寫生產者發送消息(發送增 刪 改消息)
package com.qbb.routing;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:39
* @Description:
*/
public class RoutingProducer {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明交換機
/**
* 參數1:交換機名
* 參數2:交換機類型
*/
channel.exchangeDeclare("routing-exchange", "direct");
String message = "hello QiuQiu 添加商品";
channel.basicPublish("routing-exchange", "insert", null, message.getBytes());
// String message1 = "hello QiuQiu 刪除商品";
// channel.basicPublish("routing-exchange", "delete", null, message1.getBytes());
// String message2 = "hello QiuQiu 修改商品";
// channel.basicPublish("routing-exchange", "update", null, message2.getBytes());
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.編寫消費者接收消息
**消費者1**
package com.qbb.routing;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:36
* @Description:routing模式
*/
public class RoutingComsumer1 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("routing-queue1", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("routing-queue1", "routing-exchange", "insert");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者1消費消息Message = " + new String(body));
}
};
channel.basicConsume("routing-queue1", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
**消費者2**
package com.qbb.routing;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:36
* @Description:
*/
public class RoutingComsumer2 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("routing-queue2", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("routing-queue2", "routing-exchange", "insert");
channel.queueBind("routing-queue2", "routing-exchange", "delete");
channel.queueBind("routing-queue2", "routing-exchange", "update");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者2消費消息Message = " + new String(body));
}
};
channel.basicConsume("routing-queue2", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試結果:
10.RabbitMQ入門之Topics通配符模式(Java操作RabbitMQ)
Topic 類型與 Direct 相比,都是可以根據RoutingKey
把消息路由到不同的隊列。只 不過Topic
類型Exchange
可以讓隊列在綁定Routing key
的時候使用通配符! Routingkey 一般都是有一個或多個單片語成,多個單詞之間以」.」分割
通配符規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好 1 個詞
1.編寫生產者發送消息(發送消息的 routing key 有 3 種: item.insert
、 item.update
、item.delete
)
package com.qbb.topic;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:39
* @Description:
*/
public class TopicProducer {
public static void main(String[] args) {
try {
Connection connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明交換機
/**
* 參數1:交換機名
* 參數2:交換機類型
*/
channel.exchangeDeclare("topic-exchange", "topic");
// String message = "hello QiuQiu 添加商品";
// channel.basicPublish("topic-exchange", "item.insert", null, message.getBytes());
// String message1 = "hello QiuQiu 刪除商品";
// channel.basicPublish("topic-exchange", "item.delete", null, message1.getBytes());
String message2 = "hello QiuQiu 修改商品";
channel.basicPublish("topic-exchange", "item.update.do", null, message2.getBytes());
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.編寫消費者接收消息
**消費者1**
package com.qbb.topic;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:36
* @Description:routing模式
*/
public class TopicConsumer1 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("topic-queue1", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("topic-queue1", "topic-exchange", "#.insert");
channel.queueBind("topic-queue1", "topic-exchange", "#.update.#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者1消費消息Message = " + new String(body));
}
};
channel.basicConsume("topic-queue1", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
**消費者2**
package com.qbb.topic;
import com.qbb.utils.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 20:36
* @Description:
*/
public class TopicConsumer2 {
public static void main(String[] args) {
try {
// 獲取連接
Connection connection = MQUtil.getConnection();
// 獲取channel通道
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare("topic-queue2", false, false, false, null);
// 將隊列綁定到交換機
/**
* 參數1:隊列名稱
* 參數2:交換機名稱
* 參數3:路由key
*/
channel.queueBind("topic-queue2", "topic-exchange", "item.*");
channel.queueBind("topic-queue2", "topic-exchange", "#.delete");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者唯一標識 = " + consumerTag);
System.out.println("交換機名稱 = " + envelope.getExchange());
System.out.println("消息唯一標識 = " + envelope.getDeliveryTag());
System.out.println("路由key = " + envelope.getRoutingKey());
System.out.println("消費者2消費消息Message = " + new String(body));
}
};
channel.basicConsume("topic-queue2", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
測試結果:
Topic 主題模式可以實現 Publish/Subscribe 發布與訂閱模式
和 Routing 路 由模式
的功能;只是 Topic 在配置 routing key 的時候可以使用通配符,顯得更加靈 活。
11.持久化(避免消息丟失)
為了避免消息丟失,我們可以將消息持久化!如何持久化消息呢?
要將消息持久化,前提是:隊列、Exchange 都持久化
1.持久化交換機
/**
* 參數1:交換機名
* 參數2:交換機類型
* 參數3:是否持久化
*/
channel.exchangeDeclare("topic-exchange", "topic",true);
2.持久化隊列
// 聲明隊列
channel.queueDeclare("topic-queue1", true, false, false, null);
3.持久化消息
channel.basicPublish("topic-exchange", "item.update.do", MessageProperties.PERSISTENT_TEXT_PLAIN, message2.getBytes());
12.RabbitMQ 工作模式總結
- 1、簡單模式 HelloWorld 一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)
- 2、工作隊列模式 Work Queue 一個生產者、多個消費者(競爭關係),不需要設置交換機(使用默認的交換機)
- 3、發布訂閱模式 Publish/subscribe 需要設置類型為 fanout 的交換機,並且交換機和隊列進行綁定,當發送消息到交換機後, 交換機會將消息發送到綁定的隊列
- 4、路由模式 Routing 需要設置類型為 direct 的交換機,交換機和隊列進行綁定,並且指定 routing key,當 發送消息到交換機後,交換機會根據 routing key 將消息發送到對應的隊列
- 5、通配符模式 Topic 需要設置類型為 topic 的交換機,交換機和隊列進行綁定,並且指定通配符方式的 routing key,當發送消息到交換機後,交換機會根據 routing key 將消息發送到對應 的隊列 消息的可靠性投遞 RabbitMQ 集群 消息百分百投遞(confirm 和 return、消費者確認 ack 機制)
13.Spring 整合 RabbitMQ(簡單模式)
前面我們使用java程式碼操作了RabbitMQ,其實操作起來感覺還是有點繁瑣,下面使用Spring來整合RabbitMQ,看看能否有不一樣的體驗
先寫producer消息提供方
1.創建一個maven項目
2.導入相關依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qbb</groupId>
<artifactId>spring-mq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.16</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.16</version>
</dependency>
</dependencies>
</project>
3.編寫rabbitmq.properties配置文件
rabbitmq.host=192.168.137.72
rabbitmq.port=5672
rabbitmq.username=qbb
rabbitmq.password=qbb
rabbitmq.virtual-host=/
4.編寫spring-rabbitmq-producer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--配置監聽器-->
<bean id="simpleListener" class="com.qbb.listener.SimpleListener"/>
<!--將監聽器放入rabbit容器-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="simpleListener" queue-names="spring-simple-queue"/>
</rabbit:listener-container>
</beans>
5.在test測試包下創建測試類
package com.qbb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 23:56
* @Description:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimple() {
rabbitTemplate.convertAndSend("spring-simple-queue", "hello QiuQiu Spring-MQ-Simple");
}
}
再寫consumer消息消費方
1.創建一個maven項目
2.導入相關依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qbb</groupId>
<artifactId>spring-mq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.16</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.16</version>
</dependency>
</dependencies>
</project>
3.編寫rabbitmq.properties配置文件
rabbitmq.host=192.168.137.72
rabbitmq.port=5672
rabbitmq.username=qbb
rabbitmq.password=qbb
rabbitmq.virtual-host=/
4.編寫spring-rabbitmq-producer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--RabbitAdmin 用於遠程創建、管理交換機、隊列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--聲明隊列:
id 屬性方便下面引用(當然 id 屬性可以省略,通過 name 屬性引用也行)
name 屬性執行創建隊列的名稱(name 屬性不可省略,否則無法定義隊列名稱),
auto-declare 屬性為 true 表示不存在則自動創建-->
<rabbit:queue id="spring-queue" name="spring-queue" auto-declare="true"></rabbit:queue>
<!--定義 rabbitTemplate 對象操作可以在程式碼中方便發送消息-->
<rabbit:template connection-factory="connectionFactory" id="rabbitTemplate"/>
<!--==================簡單模式==================-->
<rabbit:queue id="spring-simple-queue" name="spring-simple-queue" durable="false" auto-delete="false" auto-declare="true"/>
</beans>
5.創建一個SimpleListener監聽類實現MessageListener監聽消息
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:簡單模式
*/
public class SimpleListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消息 =" + new String(message.getBody()));
}
}
6.在test測試包下創建測試類
package com.qbb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:14
* @Description:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class MQTest {
@Test
public void test01() {
while (true) {
}
}
}
測試結果:
13.Spring 整合 RabbitMQ(工作隊列模式)
1.修改spring-rabbitmq-producer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--RabbitAdmin 用於遠程創建、管理交換機、隊列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--聲明隊列:
id 屬性方便下面引用(當然 id 屬性可以省略,通過 name 屬性引用也行)
name 屬性執行創建隊列的名稱(name 屬性不可省略,否則無法定義隊列名稱),
auto-declare 屬性為 true 表示不存在則自動創建-->
<rabbit:queue id="spring-queue" name="spring-queue" auto-declare="true"></rabbit:queue>
<!--定義 rabbitTemplate 對象操作可以在程式碼中方便發送消息-->
<rabbit:template connection-factory="connectionFactory" id="rabbitTemplate"/>
<!--==================簡單模式==================-->
<rabbit:queue id="spring-simple-queue" name="spring-simple-queue" durable="false" auto-delete="false" auto-declare="true"/>
<!--==================工作隊列模式==================-->
<rabbit:queue id="spring-work-queue" name="spring-work-queue" durable="false" auto-delete="false" auto-declare="true"/>
</beans>
2.修改producer測試類
package com.qbb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 23:56
* @Description:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 簡單模式
*/
@Test
public void testSimple() {
rabbitTemplate.convertAndSend("spring-simple-queue", "hello QiuQiu Spring-MQ-Simple");
}
/**
* 工作隊列模式
*/
@Test
public void testWorkQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("spring-work-queue", "hello QiuQiu Spring-MQ-WorkQueue"+i);
}
}
}
3.修改spring-rabbitmq-consumer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--配置監聽器-->
<!--簡單模式-->
<bean id="simpleListener" class="com.qbb.listener.SimpleListener"/>
<!--工作隊列模式-->
<bean id="workQueueListener1" class="com.qbb.listener.WorkQueueListener1"/>
<bean id="workQueueListener2" class="com.qbb.listener.WorkQueueListener2"/>
<!--將監聽器放入rabbit容器-->
<rabbit:listener-container connection-factory="connectionFactory">
<!--簡單模式-->
<rabbit:listener ref="simpleListener" queue-names="spring-simple-queue"/>
<!--工作隊列模式-->
<rabbit:listener ref="workQueueListener1" queue-names="spring-work-queue"/>
<rabbit:listener ref="workQueueListener2" queue-names="spring-work-queue"/>
</rabbit:listener-container>
</beans>
4.創建兩個監聽類
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:消息隊列模式
*/
public class WorkQueueListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者2唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者2消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者2交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者2路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者2消費的消息 =" + new String(message.getBody()));
}
}
------------
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:消息隊列模式
*/
public class WorkQueueListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者1唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者1消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者1交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者1路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者1消費的消息 =" + new String(message.getBody()));
}
}
執行測試類測試結果:
消費者1唯一標識 =amq.ctag-Jh86rHgn7_CftQS9Klseew
消費者1消息唯一標識 =1
消費者1交換機名稱 =
消費者1路由key =spring-work-queue
消費者2唯一標識 =amq.ctag-CP-q5LpFxWo9RY4yOpgMGQ
消費者2消息唯一標識 =1
消費者2交換機名稱 =
消費者1消費的消息 =hello QiuQiu Spring-MQ-WorkQueue0
消費者2路由key =spring-work-queue
消費者2消費的消息 =hello QiuQiu Spring-MQ-WorkQueue1
消費者2唯一標識 =amq.ctag-CP-q5LpFxWo9RY4yOpgMGQ
消費者2消息唯一標識 =2
消費者2交換機名稱 =
消費者2路由key =spring-work-queue
消費者2消費的消息 =hello QiuQiu Spring-MQ-WorkQueue3
消費者2唯一標識 =amq.ctag-CP-q5LpFxWo9RY4yOpgMGQ
消費者2消息唯一標識 =3
消費者2交換機名稱 =
消費者2路由key =spring-work-queue
消費者2消費的消息 =hello QiuQiu Spring-MQ-WorkQueue5
消費者2唯一標識 =amq.ctag-CP-q5LpFxWo9RY4yOpgMGQ
消費者2消息唯一標識 =4
消費者2交換機名稱 =
消費者2路由key =spring-work-queue
消費者2消費的消息 =hello QiuQiu Spring-MQ-WorkQueue7
消費者2唯一標識 =amq.ctag-CP-q5LpFxWo9RY4yOpgMGQ
消費者2消息唯一標識 =5
消費者2交換機名稱 =
消費者2路由key =spring-work-queue
消費者2消費的消息 =hello QiuQiu Spring-MQ-WorkQueue9
消費者1唯一標識 =amq.ctag-Jh86rHgn7_CftQS9Klseew
消費者1消息唯一標識 =2
消費者1交換機名稱 =
消費者1路由key =spring-work-queue
消費者1消費的消息 =hello QiuQiu Spring-MQ-WorkQueue2
消費者1唯一標識 =amq.ctag-Jh86rHgn7_CftQS9Klseew
消費者1消息唯一標識 =3
消費者1交換機名稱 =
消費者1路由key =spring-work-queue
消費者1消費的消息 =hello QiuQiu Spring-MQ-WorkQueue4
消費者1唯一標識 =amq.ctag-Jh86rHgn7_CftQS9Klseew
消費者1消息唯一標識 =4
消費者1交換機名稱 =
消費者1路由key =spring-work-queue
消費者1消費的消息 =hello QiuQiu Spring-MQ-WorkQueue6
消費者1唯一標識 =amq.ctag-Jh86rHgn7_CftQS9Klseew
消費者1消息唯一標識 =5
消費者1交換機名稱 =
消費者1路由key =spring-work-queue
消費者1消費的消息 =hello QiuQiu Spring-MQ-WorkQueue8
可以看出10條消息平均分配個兩個消費者
14.Spring 整合 RabbitMQ(發布訂閱模式,routing(路由模式),topic模式),這裡我就把三種情況寫一起啦,程式碼和配置文件中都有詳細的注釋.不然太長了閱讀也不方便
spring-rabbitmq-producer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--RabbitAdmin 用於遠程創建、管理交換機、隊列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--聲明隊列:
id 屬性方便下面引用(當然 id 屬性可以省略,通過 name 屬性引用也行)
name 屬性執行創建隊列的名稱(name 屬性不可省略,否則無法定義隊列名稱),
auto-declare 屬性為 true 表示不存在則自動創建-->
<rabbit:queue id="spring-queue" name="spring-queue" auto-declare="true"></rabbit:queue>
<!--定義 rabbitTemplate 對象操作可以在程式碼中方便發送消息-->
<rabbit:template connection-factory="connectionFactory" id="rabbitTemplate"/>
<!--==================簡單模式==================-->
<rabbit:queue id="spring-simple-queue" name="spring-simple-queue" durable="false" auto-delete="false" auto-declare="true"/>
<!--==================工作隊列模式==================-->
<rabbit:queue id="spring-work-queue" name="spring-work-queue" durable="false" auto-delete="false" auto-declare="true"/>
<!--==================發布訂閱模式==================-->
<rabbit:queue id="spring-fanout-queue1" name="spring-fanout-queue1" durable="false" auto-delete="false" auto-declare="true"/>
<rabbit:queue id="spring-fanout-queue2" name="spring-fanout-queue2" durable="false" auto-delete="false" auto-declare="true"/>
<!--創建交換機-->
<rabbit:fanout-exchange name="spring-fanout-exchange">
<!--綁定隊列-->
<rabbit:bindings>
<rabbit:binding queue="spring-fanout-queue1"/>
<rabbit:binding queue="spring-fanout-queue2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--==================routing模式==================-->
<rabbit:queue id="spring-routing-queue1" name="spring-routing-queue1" durable="false" auto-delete="false" auto-declare="true"/>
<rabbit:queue id="spring-routing-queue2" name="spring-routing-queue2" durable="false" auto-delete="false" auto-declare="true"/>
<!--創建交換機-->
<rabbit:direct-exchange name="spring-routing-exchange">
<!--綁定隊列-->
<rabbit:bindings>
<rabbit:binding queue="spring-routing-queue1" key="error"/>
<rabbit:binding queue="spring-routing-queue2" key="error"/>
<rabbit:binding queue="spring-routing-queue2" key="info"/>
<rabbit:binding queue="spring-routing-queue2" key="warning"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--==================topic模式==================-->
<rabbit:queue id="spring-topic-queue1" name="spring-topic-queue1" durable="false" auto-delete="false" auto-declare="true"/>
<rabbit:queue id="spring-topic-queue2" name="spring-topic-queue2" durable="false" auto-delete="false" auto-declare="true"/>
<!--創建交換機-->
<rabbit:topic-exchange name="spring-topic-exchange">
<!--綁定隊列-->
<rabbit:bindings>
<rabbit:binding pattern="*.orange.*" queue="spring-topic-queue1"></rabbit:binding>
<rabbit:binding pattern="*.*.rabbit" queue="spring-topic-queue2"></rabbit:binding>
<rabbit:binding pattern="lazy.#" queue="spring-topic-queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
producer生產者的MQTest.java
package com.qbb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-28 23:56
* @Description:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 簡單模式
*/
@Test
public void testSimple() {
rabbitTemplate.convertAndSend("spring-simple-queue", "hello QiuQiu Spring-MQ-Simple");
}
/**
* 工作隊列模式
*/
@Test
public void testWorkQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("spring-work-queue", "hello QiuQiu Spring-MQ-WorkQueue" + i);
}
}
/**
* 發布訂閱模式
*/
@Test
public void testFanout() {
rabbitTemplate.convertSendAndReceive("spring-fanout-exchange", "", "hello QiuQiu Spring-MQ-PubSub");
}
/**
* routing模式
*/
@Test
public void testRouting() {
rabbitTemplate.convertSendAndReceive("spring-routing-exchange", "error", "hello QiuQiu Spring-MQ-Routing-AAA");
rabbitTemplate.convertSendAndReceive("spring-routing-exchange", "info", "hello QiuQiu Spring-MQ-Routing-BBB");
}
/**
* topic模式
*/
@Test
public void testTopic() {
rabbitTemplate.convertSendAndReceive("spring-topic-exchange", "lazy.orange.qiu", "hello QiuQiu Spring-MQ-Topic-AAA");
rabbitTemplate.convertSendAndReceive("spring-topic-exchange", "qiu.ll.rabbit", "hello QiuQiu Spring-MQ-Topic-BBB");
}
}
spring-rabbitmq-consumer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="//www.springframework.org/schema/rabbit"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans.xsd //www.springframework.org/schema/rabbit //www.springframework.org/schema/rabbit/spring-rabbit.xsd //www.springframework.org/schema/context //www.springframework.org/schema/context/spring-context.xsd">
<!--載入rabbitmq.properties-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--配置監聽器-->
<!--簡單模式-->
<bean id="simpleListener" class="com.qbb.listener.SimpleListener"/>
<!--工作隊列模式-->
<bean id="workQueueListener1" class="com.qbb.listener.WorkQueueListener1"/>
<bean id="workQueueListener2" class="com.qbb.listener.WorkQueueListener2"/>
<!--發布訂閱模式-->
<bean id="fanoutListener1" class="com.qbb.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.qbb.listener.FanoutListener2"/>
<!--routing模式-->
<bean id="routingListener1" class="com.qbb.listener.RoutingListener1"/>
<bean id="routingListener2" class="com.qbb.listener.RoutingListener2"/>
<!--topic模式-->
<bean id="topicListener1" class="com.qbb.listener.TopicListener1"/>
<bean id="topicListener2" class="com.qbb.listener.TopicListener2"/>
<!--將監聽器放入rabbit容器-->
<rabbit:listener-container connection-factory="connectionFactory">
<!--簡單模式-->
<rabbit:listener ref="simpleListener" queue-names="spring-simple-queue"/>
<!--工作隊列模式-->
<rabbit:listener ref="workQueueListener1" queue-names="spring-work-queue"/>
<rabbit:listener ref="workQueueListener2" queue-names="spring-work-queue"/>
<!--發布訂閱模式-->
<rabbit:listener ref="fanoutListener1" queue-names="spring-fanout-queue1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring-fanout-queue2"/>
<!--routing模式-->
<rabbit:listener ref="routingListener1" queue-names="spring-routing-queue1"/>
<rabbit:listener ref="routingListener2" queue-names="spring-routing-queue2"/>
<!--topic模式-->
<rabbit:listener ref="topicListener1" queue-names="spring-topic-queue1"/>
<rabbit:listener ref="topicListener2" queue-names="spring-topic-queue2"/>
</rabbit:listener-container>
</beans>
FanoutListener1監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:發布訂閱模式
*/
public class FanoutListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者1唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者1消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者1交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者1路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者1消費的消息 =" + new String(message.getBody()));
}
}
FanoutListener2監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:發布訂閱模式
*/
public class FanoutListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者2唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者2消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者2交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者2路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者2消費的消息 =" + new String(message.getBody()));
}
}
RoutingListener1監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:routing模式
*/
public class RoutingListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者1唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者1消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者1交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者1路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者1消費的消息 =" + new String(message.getBody()));
}
}
RoutingListener2監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:routing模式
*/
public class RoutingListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者2唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者2消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者2交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者2路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者2消費的消息 =" + new String(message.getBody()));
}
}
TopicListener1監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:topic模式
*/
public class TopicListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者1唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者1消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者1交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者1路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者1消費的消息 =" + new String(message.getBody()));
}
}
TopicListener2監聽器
package com.qbb.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:09
* @Description:topic模式
*/
public class TopicListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("消費者2唯一標識 =" + message.getMessageProperties().getConsumerTag());
System.out.println("消費者2消息唯一標識 =" + message.getMessageProperties().getDeliveryTag());
System.out.println("消費者2交換機名稱 =" + message.getMessageProperties().getReceivedExchange());
System.out.println("消費者2路由key =" + message.getMessageProperties().getReceivedRoutingKey());
System.out.println("消費者2消費的消息 =" + new String(message.getBody()));
}
}
consumer消息消費者的MQTest.java
package com.qbb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 0:14
* @Description:
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class MQTest {
@Test
public void test01() {
while (true) {
}
}
}
發布訂閱模式測試據結果:
routing路由模式測試結果:
topic模式測試結果:
RabbitMQ 高級特性
15.消息的可靠性投遞
在使用 RabbitMQ 的時候,我們當然希望杜絕任何消息丟失或者投遞失敗情況。 RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式
- confirm 確認模式
- return 退回模式
rabbitmq 整個消息投遞的路徑為:
producer—>rabbitmq broker—>exchange—>queue—>consumer
l.消息從 producer 到 exchange 則會返回一個 confirmCallback 。
2.消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback 。
confirm 確認模式
修改spring-rabbitmq-producer.xml
<!--配置連接工廠-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
添加如下兩行設置,開啟confirm和return模式
publisher-returns="true"
confirm-type="CORRELATED"/>
修改測試類MQTest.java
/**
* topic模式
*/
@Test
public void testTopic() {
// 發送消息之前設置ConfirmCallBack回調方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* CorrelationData correlationData
* boolean ack : 當消費者成功把消息發送給交換機 ack=true 發送失敗 ack=false
* String cause : 消息發送失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息發送成功:cause="+cause);
}else {
// 發送失敗我們可以做其他的補救措施,例如發送給其他的交換機
System.out.println("消息發送失敗:cause=" + cause);
}
}
});
rabbitTemplate.convertSendAndReceive("spring-topic-exchange", "lazy.orange.qiu", "hello QiuQiu Spring-MQ-Topic-AAA");
// rabbitTemplate.convertSendAndReceive("spring-topic-exchange", "qiu.ll.rabbit", "hello QiuQiu Spring-MQ-Topic-BBB");
}
上面看到的是發送成功的情況,我們把交換機名字故意寫錯,看看會有什麼效果
rabbitTemplate.convertSendAndReceive("spring-topic-exchange-111", "lazy.orange.qiu", "hello QiuQiu Spring-MQ-Topic-AAA");
return 退回模式
開啟return 退回模式支援,上面我們已經開啟了
發送消息之前設置ReturnCallBack回調方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 出錯了可以指定發送給其他的queue
System.out.println("returnedMessage.getExchange() = " + returnedMessage.getExchange());
System.out.println("returnedMessage.getMessage() = " + returnedMessage.getMessage());
System.out.println("returnedMessage.getReplyCode() = " + returnedMessage.getReplyCode());
System.out.println("returnedMessage.getReplyText() = " + returnedMessage.getReplyText());
System.out.println("returnedMessage.getRoutingKey() = " + returnedMessage.getRoutingKey());
}
});
設置交換機把消息發送給隊列失敗時,強制把消息回退給消息發送者(默認為false即丟失消息)
rabbitTemplate.setMandatory(true);
前面兩種模式我們是確保了producer->exchange和exchange->queue的消息可靠性,但是我們消息從queue->consumer我們怎麼辦證消息一定投遞成功呢?下面我們就解決一下這個問題
其實也簡單,我們只需要關閉自動ACK,然後再處理完業務邏輯後手動ACK即可
- 修改spring-rabbitmq-consumer.xml
...
<bean id="manualAckListener" class="com.qbb.listener.ManualAckListener"/>
...
<rabbit:listener ref="manualAckListener" queue-names="spring-topic-queue1"/>
- 實現ChannelAwareMessageListener監聽器
package com.qbb.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.io.IOException;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-29 23:27
* @Description:
*/
public class ManualAckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("消費者消費的消息為:"+new String(message.getBody()));
// ....業務邏輯... 此處有可能出現異常從而導致消息無法正常手動確認
// 手動確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
/**
* 參數1: 消息唯一標識
* 參數2: 是否重新入隊列
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
/**
* 參數1: 消息唯一標識
* 參數2: 不需要多個消費與隊列確認,只要有一個消費者消費了就證明消息被消費了
* 參數3: 是否重新入隊列,注意如果設置為true則會出現反覆死循環般的消費消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
測試結果:消息即可有正常消費,出現錯誤了也可以進行響應的補救措施,保證了消息從queue->consumer的可靠性
消息可靠性總結
1.持久化 exchange和queue持久化設置: durable=”true”,Spring整合RabbitMQ消息本身就是持久化的
2.生產方確認 ConfirmCallBack 和 returnCallBack
3.消費方確認 手動Ack
4.Broker 高可用,搭建集群
RabbitMQ 應用性問題
- 消息百分百投遞
假如在發送的過程中出現了網路抖動或者其他的不可逆因素,如何保證消息不丟失呢?
從上圖我們可以將要消費的消息存入一個MSGDB的資料庫,給它設置一個狀態status=0代表未消費,當出現消費成功則修改狀態為status=1,如果出現了網路故障status=0我們編寫一個定時任務,指定時間把status=0的消息查詢出來再次執行即可
上面的定時任務和存入將消息資料庫確實可以解決一些問題,但是同時也帶來了消息重複消費的問題,也就是消息冪等性問題,如何解決消息冪等性問題呢?
- 業務ID
- 樂觀鎖
16.消費端限流
修改配置文件
prefetch="1"
package com.qbb.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-30 0:40
* @Description:
*/
public class LimitListener1 implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消費者1消息為:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
package com.qbb.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-30 0:40
* @Description:
*/
public class LimitListener2 implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消費者2消息為:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
測試結果
17.TTL消息過期時間
控制台方式操作:添加相應的隊列設置過期時間,發送消息測試
程式碼方式操作之指定所有消息過期時間
<!--==================TTL-QUEUE==================-->
<rabbit:queue id="ttl-queue1" name="ttl-queue2" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
程式碼方式操作之指定某個消息過期時間
@Test
public void testTTL2() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend("ttl-queue2", (Object) "qiuqiu", messagePostProcessor);
}
注意:RabbitMQ只會檢查隊列頭部的那個資訊是否過期,過期及剔除,隊列後面的消息即使過期了也不會剔除
18.死信隊列
死信,顧名思義就是無法被消費的消息,字面意思可以這 樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由於特定的原因導致 queue 中的某些消息無法被 消費,這樣的消息如果沒有後續的處理,就變成了死信,有死信,自然就有了死信隊列;
消息成為死信的三種情況:
1.隊列消息數量到達限制;比如給隊列最大只能存儲10條消息,當第11條消息進來的時候存 不下了,第1條消息就被稱為死信
2.消費者拒接消費消息,basicNack/basicReject,並且不把消息重新放入原目標隊列, requeue=false;
3.原隊列存在消息過期設置,消息到達超時時間未被消費;
<!--==================正常QUEUE EXCHANGE==================-->
<rabbit:queue id="normal-queue" name="normal-queue">
<rabbit:queue-arguments>
<!--綁定死信交換機-->
<entry key="x-dead-letter-exchange" value="dead-exchange"/>
<!--綁定routing-key-->
<entry key="x-dead-letter-routing-key" value="b.c"/>
<!--設置消息容量-->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
<!--統一的過期時間-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="normal-exchange">
<rabbit:bindings>
<rabbit:binding pattern="a.#" queue="normal-queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--==================死信QUEUE EXCHANGE==================-->
<rabbit:queue id="dead-queue" name="dead-queue"/>
<rabbit:topic-exchange name="dead-exchange">
<rabbit:bindings>
<rabbit:binding pattern="b.#" queue="dead-queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
@Test
public void testDeadQueue() {
for (int i = 0; i < 12; i++) {
rabbitTemplate.convertAndSend("normal-exchange", "a.qiu","qiuqiu" + i);
}
}
19.延遲隊列
程式碼配置方式和上面的一樣,就是把正常隊列設置了一個消息過期時間
20.SpringBoot整合RabbitMQ
生產者:
到入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
</parent>
<groupId>com.qbb</groupId>
<artifactId>springboot-mq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
編寫配置類
package com.qbb.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-30 1:56
* @Description:
*/
@SpringBootConfiguration
public class MQProducerConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.交換機
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.Queue 隊列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3. 隊列和交互機綁定關係 Binding
/* 1. 知道哪個隊列 2. 知道哪個交換機 3. routing key */
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
測試一下
package com.qbb.mq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author QiuQiu&LL (個人部落格://www.cnblogs.com/qbbit)
* @version 1.0
* @date 2022-03-30 1:58
* @Description:
*/
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test01() {
rabbitTemplate.convertSendAndReceive("boot_topic_exchange", "boot.qiu", "等我完成目標就來找你...");
}
}
消費者:
配置監聽器類BootMessageListener
@Component
public class BootMessageListener {
@RabbitListener(queues = "boot_queue")
public void consumeMessage(Message message) {
System.out.println("消息為:" + new String(message.getBody()));
}
}
測試結果