昨晚12點,女朋友突然問我:你會RabbitMQ嗎?我竟然愣住了。
01為什麼要用消息隊列?
1.1 同步調用和非同步調用
在說起消息隊列之前,必須要先說一下同步調用和非同步調用。
同步調用:A服務去調用B服務,需要一直等著B服務,直到B服務執行完畢並把執行結果返回給A之後,A才能繼續往下執行。
舉個例子:過年回到家,老媽對你說:「你也不小了,該談女朋友了,隔壁王阿姨給你……。」「媽!我談的有!”
老媽嘴角微微上揚:「那她現在有空嗎?讓媽給你把把關。」
你被逼之下跟女朋友開影片說:「那個我媽在我旁邊,她想跟你說說話。」
你女朋友一下子慌了,立馬拿起眉筆、口紅、遮瑕對你說:「你先別掛,等我2分鐘,我稍微化一下妝。」
你就一直等著她,等她化好妝之後你把手機給了你老媽。所以同步調用的核心就是:等待。
非同步調用:A服務去調用B服務,不用一直等待B服務的執行結果,也就是說在B服務執行的同時A服務可以接著執行下面的程式。
舉個例子:上午10點鐘,辦公室里,正在上班的你給你女朋友發微信說:「親愛的,等你不忙了給我發一張你的照片吧,我想你了。」然後你接著工作了。
等到下午2點你女朋友給你發了一張她的美顏照,你點開看了看,迷的顛三倒四。所以非同步調用的核心就是:只用通知對方一下,不用等待,通知完我這邊該幹嘛幹嘛!
上面所說的非同步調用就是用消息隊列去實現。
1.2 為什麼要用消息隊列?
場景一:用戶註冊
現在很多網站都需要給註冊的用戶發送註冊簡訊或者激活郵箱,如果使用同步調用的話用戶只有註冊成功後才能給用戶發送簡訊和郵箱鏈接,這樣花費的時間就會很長。
有了消息隊列之後我們只需要將用戶註冊的資訊寫入到消息隊列裡面,接來下該幹嘛幹嘛。
發送郵箱和發送簡訊的服務隨時從消息隊列裡面取出該用戶的資訊,然後再去發送簡訊和郵箱鏈接。這樣花費的時間就會大大減少。
場景二:修改商品
在微服務項目中,有時候數據量太多的話就需要分庫分表,例如下圖中商品表分別存儲在A資料庫和B資料庫中。
有一天我們去調用修改商品的服務去修改A資料庫中的商品資訊,由於我們還需要調用搜索商品的服務查詢商品資訊,所以修改完A庫中的商品資訊後必須保證B庫中的商品資訊和A庫一樣。
如果採用同步調用的方式,在修改完A庫的商品資訊之後需要等待B庫的商品資訊修改完,這樣耗時過長。
有了消息隊列之後我們修改完A庫的商品資訊之後只需要將要修改的商品資訊寫入消息隊列中,接下來該幹什麼幹什麼。
搜索商品的服務從消息隊列中讀取要修改的商品資訊,然後同步B庫中的商品資訊,這樣就大大地縮短響應時間。
02 RabbitMQ介紹
2.1 什麼是MQ
MQ(Message Quene) : 江湖人稱消息隊列,小名又叫消息中間件。消息隊列基於生產者和消費者模型,生產者不斷向消息隊列中發送消息,消費者不斷從隊列中獲取消息。
因為消息的生產和消費都是非同步的,而且沒有業務邏輯的侵入,所以可以輕鬆的實現系統間解耦。
2.2 MQ有哪些
當今市面上有很多消息中間件,ActiveMQ、RabbitMQ、Kafka以及阿里巴巴自研的消息中間件RocketMQ等。
2.3 不同MQ特點
-
RabbitMQ 穩定可靠,支援多協議,有消息確認,基於erlang語言。
-
Kafka高吞吐,高性能,快速持久化,無消息確認,無消息遺漏,可能會有有重複消息,依賴於zookeeper,成本高。
-
ActiveMQ不夠靈活輕巧,對隊列較多情況支援不好。
-
RocketMQ性能好,高吞吐,高可用性,支援大規模分散式,協議支援單一。
2.4 RabbitMQ
基於AMQP協議,erlang語言開發,是部署最廣泛的開源消息中間件,是最受歡迎的開源消息中間件之一。
AMQP:即Advanced Message Queuing Protocol, 一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。
RabbitMQ主要特性:
-
保證可靠性:使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認
-
可伸縮性:支援消息集群,多台RabbitMQ伺服器可以組成一個集群
-
高可用性:RabbitMQ集群中的某個節點出現問題時隊列任然可用
-
支援多種協議
-
支援多語言客戶端
-
提供良好的管理介面
-
提供跟蹤機制:如果消息出現異常,可以通過跟蹤機制分析異常原因
-
提供插件機制:可通過插件進行多方面擴展
03 RabbitMQ安裝及配置
3.1 docker安裝RabbitMQ
3.1.1 獲取RabbitMQ鏡像
指定版本,該版本包含了RabbitMQ的後台圖形化頁面
docker pull rabbitmq:management
3.1.2 運行RabbitMQ鏡像
方式一:默認guest 用戶,密碼也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:設置用戶名和密碼
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
3.2 本地安裝RabbitMQ
3.2.1 因為RabbitMQ是用erlang語言開發的,所以安裝之前先刪除erlang包
yum remove erlang*
3.2.2 將RabbitMQ安裝包上傳到linux伺服器上
erlang-23.2.1-1.el7.x86_64.rpm
rabbitmq-server-3.8.9-1.el7.noarch.rpm
3.2.3 安裝Erlang依賴包
rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm
3.2.4 安裝RabbitMQ安裝包(需要聯網)
yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
注意:安裝完成後配置文件在:/usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example目錄中,需要 將配置文件複製到/etc/rabbitmq/目錄中,並修改名稱為rabbitmq.config
3.2.5 複製配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
3.2.6 查看配置文件
ls /etc/rabbitmq/rabbitmq.config
3.2.7 修改配置文件
vim /etc/rabbitmq/rabbitmq.config
將上圖中框著的部分修改為下圖:
3.2.8 啟動rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
3.2.9 查看服務狀態
systemctl status rabbitmq-server
rabbitmq常用命令
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
3.2.10 如果是買的伺服器,記得安全組開放15672和5672埠
3.2.11 訪問RabbitMQ的後台圖形化管理介面
- 瀏覽器地址欄輸入://ip:15672
- 登錄管理介面
username:guest
password:guest
3.3 Admin用戶和虛擬主機管理
3.3.1 添加用戶
上面的Tags選項,其實是指定用戶的角色。超級管理員(administrator):可登陸管理控制台,可查看所有的資訊,並且可以對用戶,策略(policy)進行操作。
3.3.2 創建虛擬主機
虛擬主機:為了讓各個用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。
其實就是一個獨立的訪問路徑,不同用戶使用不同路徑,各自有自己的隊列、交換機,互相不會影響。
3.3.3 綁定虛擬主機和用戶
創建好虛擬主機,我們還要給用戶添加訪問許可權。點擊添加好的虛擬主機,進入虛擬機設置介面。
04 RabbitMQ的4種消息模式
4.1 簡單模式
說白了就是一個生產者發送消息,一個消費者接受消息,一對一的關係。
在上圖的模型中,有以下概念:
producer:生產者,消息發送者
consumer:消費者:消息的接受者
queue:消息隊列,圖中紅色部分。類似一個倉庫,可以快取消息;生產者向其中投遞消息,消費者從其中取出消息。
4.2 工作模式
說白了就是一個生產者發送消息,多個消費者接受消息。只要其中的一個消費者搶先接收到了消息,其他的就接收不到了。一對多的關係。
4.3 廣播模式
這裡引入了交換機(Exchange)的概念,交換機綁定所有的隊列。也就是說消息生產者會先把消息發送給交換機,然後交換機把消息發送到與它綁定的所有隊列裡面,消費者從它所綁定的隊列裡面獲取消息。
在廣播模式下,消息發送流程是這樣的:
-
可以有多個消費者
-
每個消費者有自己的queue(隊列)
-
每個隊列都要綁定到Exchange(交換機)
-
生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定
-
交換機把消息發送給綁定過的所有隊列
-
隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
4.4 路由模式
4.4.1 Routing之訂閱模型-Direct(直連)
舉個例子:消息生產者發送消息時給了交換機一個紅桃A,消息生產者對交換機說:」這條消息只能給有紅桃A的隊列「。交換機發現隊列一手裡是黑桃K,隊列二手裡是紅桃A,所以它將這條消息給了隊列二。
在路由-直連模式中,一條消息,會被所有訂閱的隊列都消費。但是在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
-
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
-
消息的發送方在向Exchange發送消息時,也必須指定消息的 RoutingKey。
-
Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息
4.4.2 Routing 之訂閱模型-Topic
舉個例子:消息生產者發送消息時給了交換機一個暗號:hello.mq,消息生產者對交換機說:」這條消息只能給暗號以hello開頭的隊列「。交換機發現它與隊列一的暗號是hello.java,與隊列二的暗號是news.today,所以它將這條消息給了隊列一。
Topic類型的交換機與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!這種模型Routingkey 一般都是由一個或多個單片語成,多個單詞之間以」.」分割,例如:b.hello
05 Maven 應用整合 RabbitMQ
5.1 創建 SpringBoot 項目,引入依賴
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
</dependencies>
5.2 創建 RabbitMQ 的連接參數工具類
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//ip地址
factory.setHost("##.##.##.##");
//埠
factory.setPort(5672);
//虛擬主機
factory.setVirtualHost("myhost");
//賬戶
factory.setUsername("root");
//密碼
factory.setPassword("########");
Connection connection = factory.newConnection();
return connection;
}
}
5.3 第一種:簡單模式
消息生產者
public class Producer {
public static void main(String[] args) throws Exception {
// 獲取RabbitMQ的連接
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 創建隊列,如果存在就不創建,不存在就創建
// 參數1 隊列名, 參數2 durable:數據是否持久化 ,參數3 exclusive:是否排外的,記住false就行
// 參數4 autoDelete:是否自動刪除,消費者消費完消息之後是否刪除這個隊列
// 參數5 arguments: 其他參數
channel.queueDeclare("queue", false, false, false, null);
// 寫到隊列中的消息內容
String message = "你好啊,mq!";
// 參數1 交換機,此處沒有
// 參數2 發送到哪個隊列
// 參數3 屬性
// 參數4 內容
channel.basicPublish("", "queue", null, message.getBytes());
//關閉通道和連接
channel.close();
connection.close();
}
}
消息消費者
public class Consumer {
public static void main(String[] args) throws Exception {
//獲取RabbitMq的連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("簡單模式獲取消息:"+new String(body));
}
});
}
}
測試結果:
5.4 第二種:工作模式
消息生產者
public class Producer {
public static void main(String[] args) throws Exception {
// 獲取RabbitMQ的連接
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 創建隊列,如果存在就不創建,不存在就創建
// 參數1 隊列名, 參數2 durable:數據是否持久化 ,參數3 exclusive:是否排外的,記住false就行
// 參數4 autoDelete:是否自動刪除,消費者消費完消息之後是否刪除這個隊列
// 參數5 arguments: 其他參數
channel.queueDeclare("queue", false, false, false, null);
// 寫到隊列中的消息內容
String message = "你好啊,mq";
// 參數1 交換機,此處無
// 參數2 發送到哪個隊列
// 參數3 屬性
// 參數4 內容
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "queue", null, (message+i).getBytes());
}
//關閉通道和連接
channel.close();
connection.close();
}
}
消費者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//創建一個RabbitMq的連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者01:"+new String(body));
}
});
}
}
消費者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//創建一個RabbitMq的連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者02:"+new String(body));
}
});
}
}
測試結果:
消費者01
消費者02
5.5 第三種:廣播模式
消息生產者
public class Producer {
public static void main(String[] args) throws Exception {
// 獲取RabbitMQ的連接
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 創建隊列,如果存在就不創建,不存在就創建
// 參數1 隊列名, 參數2 durable:數據是否持久化 ,參數3 exclusive:是否排外的,記住false就行
// 參數4 autoDelete:是否自動刪除,消費者消費完消息之後是否刪除這個隊列
// 參數5 arguments: 其他參數
channel.queueDeclare("queue01", false, false, false, null);
channel.queueDeclare("queue02", false, false, false, null);
//創建交換機,如果存在就不創建。並指定交換機的類型是FANOUT即廣播模式
channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT);
//綁定交換機與隊列,第一個參數是隊列,第二個參數是交換機,第三個參數是路由key,這裡不指定key
channel.queueBind("queue01", "fanout-exchange", "");
channel.queueBind("queue02", "fanout-exchange", "");
// 消息內容
String message = "這是一條廣播消息";
// 參數1 交換機
// 參數2 發送到哪個隊列,因為指定了交換機,所以這裡隊列名為空
// 參數3 屬性
// 參數4 內容
channel.basicPublish("fanout-exchange", "", null, message.getBytes());
//關閉通道和連接
channel.close();
connection.close();
}
}
消費者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMq連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue01",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者01:"+new String(body));
}
});
}
}
消費者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMq連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue02",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者02:"+new String(body));
}
});
}
}
測試結果
5.6 第四種 路由模式
1)路由模式之Direct(直連)
消息生產者
public class Producer {
public static void main(String[] args) throws Exception {
// 獲取RabbitMQ的連接
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 創建隊列,如果存在就不創建,不存在就創建
// 參數1 隊列名, 參數2 durable:數據是否持久化 ,參數3 exclusive:是否排外的,記住false就行
// 參數4 autoDelete:是否自動刪除,消費者消費完消息之後是否刪除這個隊列
// 參數5 arguments: 其他參數
channel.queueDeclare("queue03", false, false, false, null);
channel.queueDeclare("queue04", false, false, false, null);
//創建交換機,如果存在就不創建。並指定交換機的類型是DIRECT模式
channel.exchangeDeclare("direct-exchange", BuiltinExchangeType.DIRECT);
//綁定交換機與隊列,第一個參數是隊列,第二個參數是交換機,第三個參數是路由key,這裡指定路由key是a
channel.queueBind("queue03", "direct-exchange", "a");
//綁定交換機與隊列,第一個參數是隊列,第二個參數是交換機,第三個參數是路由key,這裡指定路由key是b
channel.queueBind("queue04", "direct-exchange", "b");
//消息
String message = "這是一條key為a的消息";
// 參數1 交換機
// 參數2 路由key
// 參數3 屬性
// 參數4 內容
channel.basicPublish("direct-exchange", "a", null, message.getBytes());
//關閉通道和連接
channel.close();
connection.close();
}
}
消費者03
public class ConsumerThree {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMQ連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue03",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者03:"+new String(body));
}
});
}
}
消費者04
public class ConsumerFour {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMQ連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue04",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者04:"+new String(body));
}
});
}
}
測試結果
只有消費者03收到了消息
2)路由模式之-Topic
消息生產者
public class Producer {
public static void main(String[] args) throws Exception {
// 獲取RabbitMQ的連接
Connection connection = ConnectionUtil.getConnection();
// 從連接中創建通道
Channel channel = connection.createChannel();
// 創建隊列,如果存在就不創建,不存在就創建
// 參數1 隊列名, 參數2 durable:數據是否持久化 ,參數3 exclusive:是否排外的,記住false就行
// 參數4 autoDelete:是否自動刪除,消費者消費完消息之後是否刪除這個隊列
// 參數5 arguments: 其他參數
channel.queueDeclare("queue05", false, false, false, null);
channel.queueDeclare("queue06", false, false, false, null);
//創建交換機,如果存在就不創建。並指定交換機的類型是TOPIC模式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//綁定交換機與隊列,第一個參數是隊列,第二個參數是交換機,第三個參數是路由key,這裡指定路由key是a.*
//*是通配符,意思只要key滿足a開頭,.後面是什麼都可以
channel.queueBind("queue05", "topic-exchange", "a.*");
//綁定交換機與隊列,第一個參數是隊列,第二個參數是交換機,第三個參數是路由key,這裡指定路由key是b.*
//*是通配符,意思只要key滿足b開頭,.後面是什麼都可以
channel.queueBind("queue06", "topic-exchange", "b.*");
// channel.queueDeclare("queue", false, false, false, null);
// 消息內容
String message = "這是一條key為a.hello的消息";
// 參數1 交換機,此處無
// 參數2 路由key
// 參數3 屬性
// 參數4 內容
channel.basicPublish("topic-exchange", "a.hello", null, message.getBytes());
//關閉通道和連接
channel.close();
connection.close();
}
}
消息消費者05
public class ConsumerFive {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMQ連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue05",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者05:"+new String(body));
}
});
}
}
消息消費者06
public class ConsumerSix {
public static void main(String[] args) throws Exception {
//創建一個新的RabbitMQ連接
Connection connection = ConnectionUtil.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//第一個參數:要從哪個隊列獲取消息
channel.basicConsume("queue06",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者06:"+new String(body));
}
});
}
}
測試結果
06 SpringBoot 整合 RabbitMQ
6.1 創建 SpringBoot 項目,引入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
6.2 配置配置文件
spring:
application:
name: mq-springboot
rabbitmq:
host: ##.##.##.##
port: 5672
username: root
password: #####
virtual-host: myhost
6.3 第一種:簡單模式
消息生產者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("quenue","你好mq");
}
消息消費者
@Component
public class SingleCunstomer {
//監聽的隊列
@RabbitListener(queues = "queue")
public void receive(String message){
System.out.println("消息:" + message);
}
}
6.4 第二種:工作模式
消息生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("quenue","你好mq!");
}
}
消息消費者
@Component
public class WorkCunstomer {
@RabbitListener(queues = "queue")
public void customerOne(String message){
System.out.println("消費者一:" + message);
}
@RabbitListener(queues = "queue")
public void customerTwo(String message){
System.out.println("消費者二:" + message);
}
}
6.5 第三種:廣播模式
消息生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//參數1 交換機 參數2 路由key 參數三 消息
rabbitTemplate.convertAndSend("fanout-exchange","","這是一條廣播消息");
}
消息消費者
@Component
public class FanoutCunstomer {
@RabbitListener(queues = "queue01")
public void customerOne(String message){
System.out.println("消費者一:" + message);
}
@RabbitListener(queues = "queue02")
public void customerTwo(String message){
System.out.println("消費者二:" + message);
}
}
6.6 第4種:路由模式
1)Direct(直連)模式
消息生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//參數1 交換機 參數2 路由key 參數三 消息
rabbitTemplate.convertAndSend("direct-exchange","a","這是一條廣播消息");
}
消息消費者
@Component
public class DirectCunstomer {
//監聽的隊列 queue03
@RabbitListener(queues = "queue03")
//監聽的隊列 queue04
public void customerOne(String message){
System.out.println("消費者一:" + message);
}
@RabbitListener(queues = "queue04")
public void customerTwo(String message){
System.out.println("消費者二:" + message);
}
}
2)Topic模式
消息生產者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//參數1 交換機 參數2 路由key 參數三 消息
rabbitTemplate.convertAndSend("topic-exchange","a.hello","這是一條廣播消息");
}
消息消費者
@Component
public class TopicCunstomer {
//監聽的隊列 queue05
@RabbitListener(queues = "queue05")
public void customerOne(String message){
System.out.println("消費者一:" + message);
}
//監聽的隊列 queue06
@RabbitListener(queues = "queue06")
public void customerTwo(String message){
System.out.println("消費者二:" + message);
}
}
6.7 SpringBoot 應用中通過配置完成隊列的創建
@Configuration
public class RabbitMQConfiguration {
//創建隊列
@Bean
public Queue queue1(){
Queue queue9 = new Queue("queue1");
return queue9;
}
@Bean
public Queue queue2(){
Queue queue2 = new Queue("queue2");
//設置隊列屬性
return queue2;
}
//創建廣播模式交換機
@Bean
public FanoutExchange ex1(){
return new FanoutExchange("ex1");
}
//創建路由模式-direct交換機
@Bean
public DirectExchange ex2(){
return new DirectExchange("ex2");
}
//綁定隊列
@Bean
public Binding bindingQueue1(Queue queue1, DirectExchange ex2){
return BindingBuilder.bind(queue1).to(ex2).with("a1");
}
@Bean
public Binding bindingQueue2(Queue queue2, DirectExchange ex2){
return BindingBuilder.bind(queue2).to(ex2).with("a2");
}
}
6.8 使用RabbitMQ發送-接收對象
消息生產者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void sendMsg() {
User user = new User();
user.setId(1).setAge(16).setUsername("張飛");
rabbitTemplate.convertAndSend("queue",user);
}
消息消費者
public class SingleCunstomer {
//監聽的隊列
@RabbitListener(queues = "queue")
public void receive(User user){
System.out.println("對象:" + user);
}
}
07 RabbitMQ 消息確認機制
所謂消息確認機制就是消息生產者有沒有將消息發出去?生產者有沒有將消息發給交換機,交換機有沒有將消息發到隊列裡面?消息消費者是否成功的從隊列裡面獲取到了消息?
就像你在網上買東西,商家有沒有將快遞發到你家小區樓下的快遞驛站?你有沒有成功的從快遞驛站拿到你的快遞?
所以RabbitMQ的消息確認機制包括消息發送端的確認機制和消息消費端的確認機制。
消息發送端:
- confirm機制:消息生產者是否成功的將消息發送到交換機。
- return機制:交換機是否成功的將消息發送到隊列。
消息消費端:消息消費者是否成功的從隊列獲取到了消息。
7.1 SpringBoot配置消息確認
消息發送端消息確認配置
# 消息發送到交換器確認
spring.rabbitmq.publisher-confirm-type=correlated
# 消息發送到隊列確認
spring.rabbitmq.publisher-returns=true
7.2 消息發送到交換機監聽類
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
//消息發送到交換機監聽類
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功發送到交換機! correlationData:{}", correlationData);
} else {
log.info("消息發送到交換機失敗! correlationData:{}", correlationData);
}
}
}
7.3 消息未路由到隊列監聽類
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
//消息未路由到隊列監聽類
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("Fail... message:{},從交換機exchange:{},以路由鍵routingKey:{}," + "未找到匹配隊列,replyCode:{},replyText:{}",
message, exchange, routingKey, replyCode, replyText);
}
}
7.4 重新注入RabbitTemplate,並設置兩個監聽類
@Configuration
public class RabbitMQConfig {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
rabbitTemplate.setReturnCallback(new SendReturnCallback());
return rabbitTemplate;
}
}
7.5 消費端確認
添加配置
# 消費者消息確認--手動 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費者程式碼
@Component
@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
public class Receiver {
@RabbitHandler
public void process(String content, Channel channel, Message message) {
try {
// 業務處理成功後調用,消息會被確認消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 業務處理失敗後調用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
08 RabbitMQ 死信隊列實現消息延遲
8.1 什麼是延遲隊列
延遲隊列存儲的對象肯定是對應的延時消息,所謂」延時消息」是指當消息被發送以後,並不想讓消費者立即拿到消息,而是等待指定時間後,消費者才拿到這個消息進行消費。
8.2 RabbitMQ如何實現延遲隊列?
AMQP協議和RabbitMQ隊列本身沒有直接支援延遲隊列功能,但是可以通過TTL(Time To Live)特性模擬出延遲隊列的功能。
8.3 消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。可以通過設置消息的expiration欄位或者x-message-ttl屬性來設置時間.
8.4 實現延遲隊列
延遲任務通過消息的TTL來實現。我們需要建立2個隊列,一個用於發送消息,一個用於消息過期後的轉發目標隊列。
場景:使用延遲隊列實現訂單支付監控
8.5 程式碼實現
RabbitMQConfig
@Configuration
public class RabbitMQConfig {
//交換機
public static final String EXCHANGE = "delay.exchange";
//死信隊列
public static final String DELAY_QUEUE = "delay.queue";
//死信隊列與交換機綁定的路由key
public static final String DELAY_ROUTING_KEY = "delay.key";
//業務隊列
public static final String TASK_QUEUE_NAME = "task.queue";
//業務隊列與交換機綁定的路由key
public static final String TASK_ROUTING_KEY = "task.key";
// 聲明交換機
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE);
}
// 聲明死信隊列
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
//死信隊列消息過期之後要轉發的交換機
args.put("x-dead-letter-exchange", EXCHANGE);
//消息過期轉發的交換機對應的key
args.put("x-dead-letter-routing-key", TASK_ROUTING_KEY);
return new Queue(DELAY_QUEUE, true, false, false, args);
}
// 聲明死信隊列綁定關係
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(delayQueue()).to(exchange()).with(DELAY_ROUTING_KEY);
}
// 聲明業務隊列
@Bean
public Queue taskQueue() {
return new Queue(TASK_QUEUE_NAME, true);
}
//聲明業務隊列綁定關係
@Bean
public Binding taskBinding() {
return BindingBuilder.bind(taskQueue()).to(exchange()).with(TASK_ROUTING_KEY);
}
}
消息生產者
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
//orderId 是訂單id interval是自定義過期時間 單位:秒
public void orderDelay(String orderId,Long interval) {
MessageProperties messageProperties = new MessageProperties();
//設置消息過期時間
messageProperties.setExpiration(String.valueOf(interval));
Message message = new Message(orderId.getBytes(), messageProperties);
//生產者將消息發給死信隊列,並設置消息過期時間
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, null, message);
}
}
消息消費者
@Component
public class Consumer {
@Autowired
private OrderService orderService;
//監聽業務隊列
@RabbitListener(queues = RabbitMQConfig.TASK_QUEUE_NAME)
public void receiveTask(Message message){
String orderId = new String(message.getBody());
log.info("過期的任務Id:{}", orderId);
Order order = orderService.getById(orderId);
//如果訂單支付狀態仍為未支付
if(order.getPayState()==0){
//設置該訂單狀態為已關閉
order.setPayState(2);
orderService.updateById(order);
}
}
}
09 RabbitMQ 的應用場景
9.1 解耦
場景說明:用戶下單之後,訂單系統要通知庫存系統修改商品數量
9.2 非同步
場景說明:用戶註冊成功之後,需要發送註冊郵件及註冊簡訊提醒
9.3 消息通訊
場景說明:應用系統之間的通訊,例如聊天室
9.4 流量削峰
場景說明:秒殺業務。大量的請求不會主動請求秒殺業務,而是存放在消息隊列。
微信公眾號:eclipse編程。專註於編程技術分享,堅持終身學習。