RabbitMQ學習
RabbitMQ
什麼是MQ?
Message Queue:消息隊列(消息中間件),典型的生產者—消費者模型。生產者不斷地往消息隊列中生產消息,消費者不斷從隊列中獲取消息。消息的生產和消費是非同步的,分別只關心消息的發送和接收,沒有業務邏輯的入侵,可以輕鬆實現系統間解耦。
對比其他MQ
-
RocketMQ —— 阿里開源的消息中間件,基於Java語言開發,具有高吞吐、高可用的特性,適合大規模分散式系統應用的特點。
-
Kafka —— 分散式發布-訂閱消息系統,基於Pull的模式處理消息消費,追求
高吞吐量、低延遲
,最初開發用於日誌收集和傳輸,適合產生大量數據的互聯網服務的數據收集業務。 -
RabbitMQ —— 基於Erlang語言開發的開源消息隊列系統,基於AMQP協議實現,追求
高可靠性
。
功能 | 消息隊列 RocketMQ | 消息隊列 Kafka | RabbitMQ |
---|---|---|---|
安全防護 | 支援 | 支援 | 支援 |
主子帳號支援 | 支援 | 支援 | 不支援 |
可靠性 | – 同步刷盤 – 同步雙寫 – 超3份數據副本 – 99.99999999% | – 同步刷盤 – 同步雙寫 – 超3份數據副本 – 99.99999999% | 同步刷盤 |
可用性 | – 非常好,99.95% – Always Writable | – 非常好,99.95% – Always Writable | 好 |
橫向擴展能力 | – 支援平滑擴展 – 支援百萬級 QPS | – 支援平滑擴展 – 支援百萬級 QPS | – 集群擴容依賴前端 – LVS 負載均衡調度 |
Low Latency | 支援 | 支援 | 不支援 |
消費模型 | Push / Pull | Push / Pull | Push / Pull |
定時消息 | 支援(可精確到秒級) | 暫不支援 | 支援 |
事務消息 | 支援 | 不支援 | 不支援 |
順序消息 | 支援 | 暫不支援 | 不支援 |
全鏈路消息軌跡 | 支援 | 暫不支援 | 不支援 |
消息堆積能力 | 百億級別 不影響性能 | 百億級別 不影響性能 | 影響性能 |
消息堆積查詢 | 支援 | 支援 | 不支援 |
消息回溯 | 支援 | 支援 | 不支援 |
消息重試 | 支援 | 暫不支援 | 支援 |
死信隊列 | 支援 | 不支援 | 支援 |
性能(常規) | 非常好 百萬級 QPS | 非常好 百萬級 QPS | 一般 萬級 QPS |
性能(萬級 Topic 場景) | 非常好 百萬級 QPS | 非常好 百萬級 QPS | 低 |
性能(海量消息堆積場景) | 非常好 百萬級 QPS | 非常好 百萬級 QPS | 低 |
AMQP協議
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞非同步消息
的網路協議
。AMQP協議模型如下圖所示:
- 消息發布者(publisher)發布消息,經由交換機(Exchange)
- 交換機根據不同的路由規則將收到的消息分發給與該交換機綁定(Binding)的隊列(Queue)
- AMQP代理將消息投遞給訂閱(Subscribe)了該隊列的消費者(Consumer),或者消費者按照需求自行獲取
深入理解
(1)發布者、交換機、隊列、消費者都可以有多個
。同時因為 AMQP 是一個網路協議
,所以這個過程中的發布者、消費者、消息代理可以分別存在於不同的設備上。
(2)發布者發布消息時可以給消息指定各種消息屬性(Message Meta-data)。有些屬性有可能會被消息代理(Brokers)使用,然而其他的屬性則是完全不透明的,它們只能被接收消息的應用所使用。
(3)從安全形度考慮
,網路是不可靠的,又或是消費者在處理消息的過程中意外掛掉,這樣沒有處理成功的消息就會丟失。基於此原因,AMQP 模組包含了一個消息確認
(Message Acknowledgements)機制:當一個消息從隊列中投遞給消費者後,不會立即從隊列中刪除,直到它收到來自消費者的確認回執
(ACK)後,才將其完全從隊列中刪除。
(4)在某些情況下,例如當一個消息無法被成功路由時(無法從交換機分發到隊列),消息或許會被返回給發布者並被丟棄。或者,如果消息代理執行了延期操作,消息會被放入一個所謂的死信隊列
中。此時,消息發布者可以選擇某些參數來處理這些特殊情況。
RabbitMQ安裝
通過docker部署
docker run -dit --name myRabbitMQ -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root -p 15672:15672 -p 5672:5672 rabbitmq:management
熟悉RabbitMQ管理介面
三種常見的交換機類型
交換機類型 | Direct | Topic | Fanout |
---|---|---|---|
路由方式 | 根據綁定的路由鍵進行精準匹配 | 將路由鍵和綁定的模式進行通配符模糊匹配 | 群發 |
特點 | 一對一 | 一對多 | 一對多 |
RabiitMQ常用工作模式
RabbitmqUtils.java —— 連接管理工具類
package com.youzikeji.rabbitmq.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ連接開啟、關閉工具類
*/
public class RabbitmqUtils {
private static ConnectionFactory cf;
// 類一載入就執行,且只執行一次,避免每次獲取連接都去創建連接工廠
static {
cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
cf.setPort(5672);
cf.setUsername("root");
cf.setPassword("root");
cf.setVirtualHost("/test01");
}
/**
* 定義提供RabbitMQ連接的工具方法
* @return RabbitMQ連接
*/
public static Connection getConnection() {
try {
return cf.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return null;
}
/**
* 關閉通道和連接
* @param channel 通道
* @param connection 連接
*/
public static void closeChannelAndConnection(Channel channel, Connection connection) {
try {
if (channel != null && channel.isOpen()){
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
簡單模式
一個生產者,一個消費者,不涉及交換機
Producer.java
package com.youzikeji.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException {
// 1. 創建connection
Connection conn = RabbitmqUtils.getConnection();
// 2. 通過connection獲取channel
assert conn != null;
Channel channel = conn.createChannel();
//3. 聲明消息隊列
String queueName = "test01-simple-durable";
/**
* @param1: 消息隊列名
* @param2: 是否將隊列持久化
* @param3: 隊列是否獨佔,能否綁定其他channel或連接
* @param4: 是否自動刪除隊列,為true則表示消費完自動刪除隊列
* @param5: 其他屬性
*/
channel.queueDeclare(queueName, true, false, true, null);
// 4. 準備消息內容
String msg = "hello, rabbitmq!!!";
/**
* @param1: 交換機
* @param2: 路由鍵,也即綁定的消息隊列名
* @param3: 其他參數,例如將隊列中的消息進行持久化(MessageProperties.PERSISTENT_TEXT_PLAIN)
* @param4: 消息實體
*/
// 5. 發送消息給隊列
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息發送成功");
// 6. 依次關閉通道和連接
RabbitmqUtils.closeChannelAndConnection(channel, conn);
}
}
Consumer.java
package com.youzikeji.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException {
// 1. 創建connection
Connection conn = RabbitmqUtils.getConnection();
// 2. 通過connection獲取channel
assert conn != null;
Channel channel = conn.createChannel();
// 3. 綁定消息隊列
channel.queueDeclare("test01-simple-durable", true, false, true, null);
// 3. 從消息隊列中拿消息
channel.basicConsume("test01-simple-durable", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息是" + new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("消息接收失敗");
}
});
// 阻塞監聽消息隊列
System.out.println("開始接收消息");
System.in.read();
// 4. 依次關閉通道和連接
RabbitmqUtils.closeChannelAndConnection(channel, conn);
}
}
工作隊列模式
多個消費者消費一個生產者,一條消息只能被消費一次,也不涉及交換機
Producer.java
package com.youzikeji.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* work queue工作模式下,消息隊列中的消息默認平均分配給訂閱的消費者
*/
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
String queueName = "work-durable";
channel.queueDeclare(queueName, true, false, false, null);
// 發布100條消息
for (int i = 0; i < 100; i++) {
String msg = "hello, work queue msg(" + i + ")";
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
}
RabbitmqUtils.closeChannelAndConnection(channel, connection);
}
}
Consumer1.java
package com.youzikeji.rabbitmq.work;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
String queueName = "work-durable";
channel.queueDeclare(queueName, true, false, false, null);
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息是" + new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("消息接收失敗");
}
});
System.out.println("開始接收消息");
System.in.read();
RabbitmqUtils.closeChannelAndConnection(channel, connection);
}
}
其他消費者類同。
平均分配結果
—— 消費者1消費了一半消息,消費者2消費了另一半
發布訂閱模式(fanout)
又稱廣播模式,可以有多個消費者,每個消費者都有自己訂閱的臨時消息隊列。其消息發送流程如下:
- 消息發布者發布消息到fanout交換機
- 交換機把消息發送給綁定在交換機上的所有臨時隊列
- 臨時消息隊列的訂閱者都能獲取到消息
Publisher.java
package com.youzikeji.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Publisher {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 為channel聲明交換機(名稱 + 類型)
channel.exchangeDeclare("logs", "fanout");
// 發送消息
channel.basicPublish("logs", "", null, "fanout type msg".getBytes(StandardCharsets.UTF_8));
// 關閉通道及連接
RabbitmqUtils.closeChannelAndConnection(channel, connection);
}
}
Subscriber1.java
package com.youzikeji.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
public class Subscriber1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
String tempQueue = channel.queueDeclare().getQueue();
// 將臨時隊列綁定到fanout交換機(路由key為空)
channel.queueBind(tempQueue, EXCHANGE_NAME, "");
// 接收消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("訂閱者1: " + new String(body));
}
});
}
}
其他消息訂閱者類同。
路由模式(direct)
直連模式,基於路由Key的工作模式。在發布-訂閱模式中,發布的消息會被所有訂閱了綁定在交換機上的臨時消息隊列的訂閱者接收到。但是在某些場景下,希望不同的消息被不同的隊列消費,這時就需要路由模式。
- 隊列和交換機不再任意綁定,而是需要指定一個
RoutingKey
- 消息發送方在向交換機
發送消息時,必須指定消息的RoutingKey
- 交換機不再把消息交給每個與其綁定的隊列,而是根據消息的RoutingKey進行
定向的消息發送
Producer.java
package com.youzikeji.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Producer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct", "direct");
// 指定發送消息的RoutingKey
String routingKey = "error";
channel.basicPublish("logs_direct", routingKey, null, ("基於" + routingKey + "發送的消息").getBytes(StandardCharsets.UTF_8));
RabbitmqUtils.closeChannelAndConnection(channel, connection);
}
}
Consumer1.java
—— 只綁定了一個RoutingKey(info)
package com.youzikeji.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
public class Consumer1 {
private static final String EXCHANGE_NAME = "logs_direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 聲明交換機名稱和類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 獲取臨時消息隊列
String tempQueue = channel.queueDeclare().getQueue();
// 綁定交換機和消息隊列的同時指定routingKey
channel.queueBind(tempQueue, EXCHANGE_NAME, "info");
// 接收消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1: " + new String(body));
}
});
}
}
Consumer2.java
—— 綁定了多個RoutingKey(info、warning、error)
package com.youzikeji.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
public class Consumer2 {
private static final String EXCHANGE_NAME = "logs_direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 聲明交換機名稱和類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 獲取臨時消息隊列
String tempQueue = channel.queueDeclare().getQueue();
// 綁定交換機和消息隊列的同時指定routingKey,可以同時指定多個
channel.queueBind(tempQueue, EXCHANGE_NAME, "error");
channel.queueBind(tempQueue, EXCHANGE_NAME, "info");
channel.queueBind(tempQueue, EXCHANGE_NAME, "warning");
// 接收消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2: " + new String(body));
}
});
}
}
上述生產者生產的資訊帶有RoutingKey(error),只能由消費者2接受並消費。
主題模式
動態路由(direct),更加靈活地匹配RoutingKey(通過通配符的形式)。
通配符
*
—— 恰好匹配一個詞#
—— 匹配一個或多個詞- 例如
lazy.#
可以匹配lazy.irs
或者lazy.irs.corporate
,而lazy.*
之能匹配lazy.irs
Producer.java
package com.youzikeji.rabbitmq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Producer {
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 指定RoutingKey
String routingKey = "user.save.test";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, ("基於" + routingKey +"發送的消息").getBytes(StandardCharsets.UTF_8));
RabbitmqUtils.closeChannelAndConnection(channel, connection);
}
}
Consumer1.java
—— 使用”*”進行路由匹配
package com.youzikeji.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
public class Consumer1 {
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 聲明交換機名稱和類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 獲取臨時消息隊列
String tempQueue = channel.queueDeclare().getQueue();
// 將交換機和臨時消息隊列進行綁定,使用通配符"*"進行路由匹配
channel.queueBind(tempQueue, EXCHANGE_NAME, "user.*");
// 接收消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1: " + new String(body));
}
});
}
}
Consumer2.java
package com.youzikeji.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.youzikeji.rabbitmq.utils.RabbitmqUtils;
import java.io.IOException;
public class Consumer2 {
private static final String EXCHANGE_NAME = "topics";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 聲明交換機名稱和類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 獲取臨時消息隊列
String tempQueue = channel.queueDeclare().getQueue();
// 將交換機和臨時消息隊列進行綁定,使用通配符"#"進行路由匹配
channel.queueBind(tempQueue, EXCHANGE_NAME, "user.#");
// 接收消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2: " + new String(body));
}
});
}
}
上述帶RoutingKey(user.save.test)的消息,只能被第二個消費者接收並消費。
SpringBoot整合RabbitMQ
導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
Springboot配置
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: localhost
port: 5672
username: root
password: root
virtual-host: /test01
五種常用工作模式測試
TestRabbitMQ.java
模板的使用
- 注入RabbitTemplate
- 重載rabbitTemplate.convertAndSend(),簡單模式和工作隊列模式只有
隊列名參數和消息體參數
,後三種工作模式的三個參數分別為交換機名稱、路由鍵、消息體
。
package com.youzikeji;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimple() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work queue");
}
}
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("springboot_logs", "", "Fanout message");
}
@Test
public void testDirect() {
rabbitTemplate.convertAndSend("springboot_logs_direct", "error", "direct message");
}
@Test
public void testTopic() {
rabbitTemplate.convertAndSend("springboot_logs_topic", "user.save", "topic message");
}
}
註解的使用
-
@RabbitListener —— 修飾目標包括類、方法、註解,多個監聽對應多個消費者
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(RabbitListeners.class) public @interface RabbitListener {
-
@QueueBinding(value = @Queue, exchange=@Exchang(…)) —— 指示綁定在一起的交換機和消息隊列
-
@Queue(value = “xxx”, durable = “true”) —— 指示要監聽的持久化隊列,不帶參數說明是臨時隊列
-
@Exchange(value=”xxx”, type=”xxx”) —— 指示綁定的交換機的名稱和類型
-
@RabbitHandler —— 指示消息的接收處理方法
SimpleConsumer.java
—— 簡單模式
package com.youzikeji.simple;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true")) // 消費者監聽hello隊列
public class SimpleConsumer {
@RabbitHandler
public void receiveMsg(String message) {
System.out.println("message = " + message);
}
}
WorkConsumer.java
—— 工作隊列模式
package com.youzikeji.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
FanoutConsumer.java
—— 發布訂閱模式(廣播模式)
package com.youzikeji.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 創建臨時隊列
exchange = @Exchange(value = "springboot_logs", type = "fanout")
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 創建臨時隊列
exchange = @Exchange(value = "springboot_logs", type = "fanout")
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
DirectConsumer.java
—— Direct路由模式
package com.youzikeji.direct;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "springboot_logs_direct", type = "direct"),
key = {"info", "warning", "error"}
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "springboot_logs_direct", type = "direct"),
key = {"info", "warning"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
TopicConsumer.java
—— Topic模式
package com.youzikeji.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
key = {"user.save", "user.*"}
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
key = {"*.user.*"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "springboot_logs_topic", type = "topic"),
key = {"user.#"}
)
})
public void receive3(String message) {
System.out.println("message3 = " + message);
}
}
MQ使用場景
非同步處理
例如網站註冊,沒必要等待註冊成功的通知郵件發送完畢或者註冊成功的通知簡訊發送完畢才顯示註冊成功,郵件和簡訊的發送可以非同步處理。
應用解耦
將訂單系統和庫存系統解耦,訂單系統只用關注下訂單,庫存系統只用從消息隊列中讀取訂單資訊,再做庫存的扣除。
流量削峰
秒殺系統中,流量過大會導致系統宕機,將大量的用戶請求先寫入消息隊列,消息隊列達到最大長度是拋棄其餘的用戶請求或跳轉到錯誤頁面,秒殺業務根據消息隊列中的請求資訊,再做後續處理。