消息中間件——RabbitMQ(六)理解Exchange交換機核心概念!
- 2019 年 10 月 3 日
- 筆記
前言
來了解RabbitMQ一個重要的概念:Exchange交換機
1. Exchange概念
- Exchange:接收消息,並根據路由鍵轉發消息所綁定的隊列。
藍色框:客戶端發送消息至交換機,通過路由鍵路由至指定的隊列。
黃色框:交換機和隊列通過路由鍵有一個綁定的關係。
綠色框:消費端通過監聽隊列來接收消息。
2. 交換機屬性
Name
:交換機名稱
Type
:交換機類型——direct、topic、fanout、headers、sharding(此篇不講)
Durability
:是否需要持久化,true為持久化
Auto Delete
:當最後一個綁定到Exchange上的隊列刪除後,自動刪除該Exchange
Internal
:當前Exchange是否用於RabbitMQ內部使用,默認為false
Arguments
:擴展參數,用於擴展AMQP協議自訂製化使用
3. Direct Exchange(直連)
- 所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。
重點:routing key與隊列queues 的key保持一致,即可以路由到對應的queue中。
3.1 程式碼演示
生產端:
/** * * @ClassName: Producer4DirectExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:15:52 * */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2創建Channel Channel channel = connection.createChannel(); //3 聲明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //4 發送 String msg = "Coder編程 Hello World RabbitMQ 4 Direct Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消費端:
/** * * @ClassName: Consumer4DirectExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:18:52 * */ public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { //創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //聲明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示聲明了一個交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示聲明了一個隊列 channel.queueDeclare(queueName, false, false, false, null); //建立一個綁定關係: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:
注意需要routingKey保持一致。可以自己嘗試修改routingkey,是否能收到消息。
4. Topic Exchange
- 所有發送到Topic Exchange的消息被轉發到所有管線RouteKey中指定Topic的Queue上
-
Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic
注意:可以使用通配符進行模糊匹配
符號 "#" 匹配一個或多個詞
符號 "" 匹配不多不少一個詞
例如:"log.#" 能夠匹配到 "log.info.oa"
"log." 只會匹配到 "log.error"
在一堆消息中,每個不同的隊列只關心自己需要的消息。
4.1 程式碼演示
生產端:
/** * * @ClassName: Producer4TopicExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午22:32:41 * */ public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2創建Channel Channel channel = connection.createChannel(); //3聲明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //4發送 String msg = "Coder編程 Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
消費端:
/** * * @ClassName: Consumer4TopicExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午22:37:12 * */ public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { //創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 聲明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; //String routingKey = "user.*"; String routingKey = "user.*"; // 1 聲明交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 聲明隊列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交換機和隊列的綁定關係: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:
注意一個問題:需要進行解綁
5. Fanout Exchange
- 不處理路由鍵,只需要簡單的將隊里綁定到交換機上
- 發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上
- Fanout交換機轉發消息是最快的
5.1 程式碼演示
生產端:
/** * * @ClassName: Producer4FanoutExchange * @Description: 生產者 * @author Coder編程 * @date2019年7月19日 下午23:01:16 * */ public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 創建Channel Channel channel = connection.createChannel(); //3 聲明 String exchangeName = "test_fanout_exchange"; //4 發送 for(int i = 0; i < 10; i ++) { String msg = "Coder 編程 Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消費端:
/** * * @ClassName: Consumer4FanoutExchange * @Description: 消費者 * @author Coder編程 * @date2019年7月19日 下午23:21:18 * */ public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { //創建ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 聲明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不設置路由鍵 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
測試結果:
6. 其他
6.1 Bingding —— 綁定
- Exchange和Exchange、Queue之間的連接關係
- Bingding可以包含RoutingKey或者參數
6.2 Queue——消息隊列
- 消息隊列,實際存儲消息數據
- Durability:是否持久化,Durable:是 ,Transient:否
- Auto delete:如選yes,代表當最後一個監聽被移除之後,該Queue會自動被刪除。
6.3 Message——消息
- 伺服器與應用程式之間傳送的數據
- 本質上就是一段數據,由Properties和Payload(Body)組成
- 常用屬性:delivery mode、headers(自定義屬性)
6.4 其他屬性
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
6.5 Virtual Host虛擬主機
- 虛擬地址,用於進行邏輯隔離,最上層的消息路由
- 一個Virtual Host裡面可以有若干個Exchange和Queue
- 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue
7. 總結
RabbitMQ的概念、安裝與使用、管控台操作、結合RabbitMQ的特性、Exchange、Queue、Binding
、RoutingKey、Message進行核銷API的講解,通過本章的學習,希望大家對RabbitMQ有一個初步的認識。
文末
歡迎關注個人微信公眾號:Coder編程
獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq群:315211365,歡迎大家進群交流一起學習。謝謝了!也可以介紹給身邊有需要的朋友。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!
消息中間件——RabbitMQ(四)命令行與管控台的基本操作!
消息中間件——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ!