RabbitMQ消息隊列基礎概念、原理學習
- 2019 年 11 月 5 日
- 筆記
使用RabbitMQ實現解耦合的設計,對添加程式碼是開發的,對修改程式碼是關閉的。
1、什麼是生產者Provider?
答:消息生產者,就是投遞消息的程式。
2、什麼是消費者Consumer?
答:消息消費者,就是接受消息的程式。
3、沒有使用消息隊列時消息傳遞方式。
4、使用消息隊列後消息傳遞方式。
5、什麼是隊列?
答:隊列就像存放了商品的倉庫或者商店,是生產商品的工廠和購買商品的用戶之間的中轉站。
6、隊列里存儲了什麼?
答:在RabbitMQ 中,資訊流從你的應用程式出發,來到 Rabbitmq 的隊列,所有資訊可以只存儲在一個隊列中。隊列可以存儲很多資訊,因為它基本上是一個無限制的緩衝區,前提是你的機器有足夠的存儲空間。
7、隊列和應用程式的關係?
答:多個生產者可以將消息發送到同一個隊列中,多個消息者也可以只從同一個隊列接收數據。
8、RabbitMQ的生產者消費者模型、使用SpringBoot搭建。由於創建的SpringBoot項目,使用Maven。依賴包如下所示:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.1.1.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.bie</groupId> 14 <artifactId>rabbitmq</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>rabbitmq</name> 17 <description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.springframework.boot</groupId> 34 <artifactId>spring-boot-starter-test</artifactId> 35 <scope>test</scope> 36 </dependency> 37 <dependency> 38 <groupId>org.springframework.boot</groupId> 39 <artifactId>spring-boot-starter-amqp</artifactId> 40 </dependency> 41 </dependencies> 42 43 <build> 44 <plugins> 45 <plugin> 46 <groupId>org.springframework.boot</groupId> 47 <artifactId>spring-boot-maven-plugin</artifactId> 48 </plugin> 49 </plugins> 50 </build> 51 52 </project>
配置你的SpringBoot的配置文件application.properties。
1 # 給當前項目起名稱. 2 spring.application.name=rabbitmq 3 4 # 配置rabbitmq的參數. 5 # rabbitmq伺服器的ip地址. 6 spring.rabbitmq.host=192.168.110.133 7 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號. 8 spring.rabbitmq.port=5672 9 # rabbitmq的帳號. 10 spring.rabbitmq.username=guest 11 # rabbitmq的密碼. 12 spring.rabbitmq.password=guest 13 14 # 隊列的名稱 15 rabbitmq.queue=queue001
然後,創建一個隊列,創建的隊列,在項目啟動的時候載入,這樣使用隊列的時候就可以直接使用。
1 package com.example.bie.config; 2 3 import org.springframework.amqp.core.Queue; 4 import org.springframework.beans.factory.annotation.Value; 5 import org.springframework.context.annotation.Bean; 6 import org.springframework.context.annotation.Configuration; 7 8 /** 9 * 10 * @author biehl 11 * 12 * @Configuration項目啟動載入本類 13 * 14 */ 15 @Configuration 16 public class RabbitMqQueueConfig { 17 18 @Value("${rabbitmq.queue}") 19 private String queueName; 20 21 /** 22 * 創建一個隊列 23 * 24 * @return 25 */ 26 @Bean 27 public Queue createQueue() { 28 return new Queue(this.queueName); 29 } 30 31 }
然後,創建一個Rabbitmq的生產者,生產消息,這裡使用傳入參數的方法生產消息。可以測試單條消息和多條消息。
1 package com.example.bie.rabbitmq.producer; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * 10 * @author biehl 11 * 12 * RabbitmqProducer消息發送者 13 * 14 * @Component加入到容器中. 15 * 16 */ 17 @Component 18 public class RabbitmqProducer { 19 20 @Autowired 21 private AmqpTemplate rabbitmqAmqpTemplate; 22 23 @Value("${rabbitmq.queue}") 24 private String queueName; 25 26 /** 27 * 發送消息的方法 28 */ 29 public void producer(String msg) { 30 // 向消息隊列中發送消息 31 // 參數1,隊列的名稱 32 // 參數2,發送的消息 33 this.rabbitmqAmqpTemplate.convertAndSend(this.queueName, msg); 34 } 35 36 }
創建消費者消費消息,由於使用的是消費事件監聽器,這裡使用註解方式,一旦發現隊列裡面的消息發生變化,就觸發該事件,從而觸發方法進行消息的消費。
1 package com.example.bie.rabbitmq.consumer; 2 3 import org.springframework.amqp.rabbit.annotation.RabbitListener; 4 import org.springframework.beans.factory.annotation.Value; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * 9 * @author biehl 10 * 11 * RabbitmqConsumer消息消費者 12 * 13 * 消費者是根據消息隊列的監聽器,進行消息的接收和消費。 14 * 15 * 消息隊列發生變化,消息事件就會產生,觸發方法進行消息的接收。 16 * 17 */ 18 @Component 19 public class RabbitmqConsumer { 20 21 @Value("${rabbitmq.queue}") 22 private String queueName; 23 24 /** 25 * 消費者消費消息,接受消息的方法,採用消息隊列監聽機制. 26 * 27 * @RabbitListener 28 * 29 * 意思是當隊列發生變化,消息事件產生了或者生產者發送消息了。 30 * 31 * 馬上就會觸發這個方法,進行消息的消費。 32 */ 33 @RabbitListener(queues = "queue001") 34 public void consumer(String msg) { 35 // 列印消息 36 System.out.println("消費者===>消費<===消息message: " + msg); 37 } 38 39 }
創建完畢隊列,生產者,消費者,配置好配置文件,可以使用單元測試,或者使用Controller的方式進行測試。由於使用單元測試,SpringBoot項目停止,就無法進行消費,不方便測試,這裡使用web容器進行測試,創建一個Controller,頁面訪問進行觸發方法的執行。當生產者生產消息多的時候可以在瀏覽器進行觀察生產、消費的效果。
1 package com.example.bie.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.example.bie.rabbitmq.producer.RabbitmqProducer; 9 10 /** 11 * 12 * @author biehl 13 * 14 */ 15 @Controller 16 public class RabbitmqController { 17 18 @Autowired 19 private RabbitmqProducer rabbitmqProducer; 20 21 @RequestMapping(value = "/sendMessage") 22 @ResponseBody 23 public void rabbitmqSendMessage() { 24 String msg = "消息產===>生者<===消息message: "; 25 for (int i = 0; i < 100000; i++) { 26 rabbitmqProducer.producer(msg + i); 27 } 28 } 29 30 }
最後就可以啟動你的SpringBoot項目了。
1 package com.example; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class RabbitmqApplication { 8 9 public static void main(String[] args) { 10 SpringApplication.run(RabbitmqApplication.class, args); 11 } 12 13 }
運行效果如下所示:
9、RabbitMQ原理理解,概念的理解。
1)、RabbitMQ的消息Message。
消息。消息是不具名的,它由消息頭、消息體組成。消息體是不透明的,而消息頭則由一系列可選屬性組成,這些屬性包括:routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出消息可能持久性存儲)等。
2)、RabbitMQ的生產者Provider或者Publisher。
消息的生產者。也是一個向交換器發布消息的客戶端應用程式。
3)、RabbitMQ的消費者Consumer。
消息的消費者。表示一個從消息隊列中取得消息的客戶端應用程式。
4)、RabbitMQ的交換器或者稱為交換機Exchange。
交換器。用來接收生產者發送的消息並將這些消息路由給伺服器中的隊列。三種常用的交換器類型,包含,direct(發布與訂閱 完全匹配,默認使用)、fanout(廣播)、topic(主題,規則匹配)。
5)、RabbitMQ的綁定Binding。
綁定。用於消息隊列Queue和交換器Exchange之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
6)、RabbitMQ的消息隊列Queue。
消息隊列。用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者鏈接到這個隊列將其取走。
7)、RabbitMQ的路由鍵Routing-key。
路由鍵。RabbitMQ 決定消息該投遞到哪個隊列的規則。隊列通過路由鍵綁定到交換器。消息發送到 MQ 伺服器時,消息將擁有一個路由鍵,即便是空的,RabbitMQ 也會將其和綁定使用的路由鍵進行匹配。如果相匹配,消息將會投遞到該隊列。如果不匹配,消息將會進入黑洞。
8)、RabbitMQ的鏈接Connection。
鏈接。指Rabbitmq 伺服器和服務建立的 TCP 鏈接。
9)、RabbitMQ的信道Channel。
信道。
a、Channel 中文叫做信道,是 TCP 裡面的虛擬鏈接。例如:電纜相當於 TCP,信道是一個獨立光纖束,一條 TCP 連接上創建多條信道是沒有問題的。
b、TCP 一旦打開,就會創建 AMQP 信道。c、無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的。
10)、RabbitMQ的虛擬主機Virtual Host。
虛擬主機。表示一批交換器,消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和許可權機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定, RabbitMQ 默認的 vhost 是/。
11)、RabbitMQ的消息隊列伺服器實體Borker。
表示消息隊列伺服器實體。
12)、RabbitMQ交換器和隊列的關係。
交換器是通過路由鍵和隊列綁定在一起的,如果消息擁有的路由鍵跟隊列和交換器的路由鍵匹配,那麼消息就會被路由到該綁定的隊列中。 也就是說,消息到隊列的過程中,消息首先會經過交換器,接下來交換器在通過路由鍵匹配分發消息到具體的隊列中。 路由鍵可以理解為匹配的規則。
13)、RabbitMQ 為什麼需要信道?為什麼不是 TCP 直接通訊?
TCP 的創建和銷毀開銷特別大。創建需要 3 次握手,銷毀需要 4 次分手。 如果不用信道,那應用程式就會以 TCP 鏈接 Rabbit,高峰時每秒成千上萬條鏈接會造成資源巨大的浪費,而且作業系統每秒處理 TCP 鏈接數也是有限制的,必定造成性能瓶頸。 信道的原理是一條執行緒一條通道,多條執行緒多條通道同用一條 TCP 鏈接。一條 TCP 鏈接可以容納無限的信道,即使每秒成千上萬的請求也不會成為性能的瓶頸。
10、RabbitMQ的架構設計,如下圖所示。