RabbitMQ消息隊列基礎概念、原理學習

  • 2019 年 11 月 5 日
  • 筆記

使用RabbitMQ實現解耦合的設計,對添加程式碼是開發的,對修改程式碼是關閉的。

1、什麼是生產者Provider?

  答:消息生產者,就是投遞消息的程式。

2、什麼是消費者Consumer?

  答:消息消費者,就是接受消息的程式。

3、沒有使用消息隊列時消息傳遞方式。

4、使用消息隊列後消息傳遞方式。

5、什麼是隊列?

  答:隊列就像存放了商品的倉庫或者商店,是生產商品的工廠和購買商品的用戶之間的中轉站。

6、隊列里存儲了什麼?

  答:在RabbitMQ 中,資訊流從你的應用程式出發,來到 Rabbitmq 的隊列,所有資訊可以只存儲在一個隊列中。隊列可以存儲很多資訊,因為它基本上是一個無限制的緩衝區,前提是你的機器有足夠的存儲空間。

7、隊列和應用程式的關係?

  答:多個生產者可以將消息發送到同一個隊列中,多個消息者也可以只從同一個隊列接收數據。

8、RabbitMQ的生產者消費者模型、使用SpringBoot搭建。由於創建的SpringBoot項目,使用Maven。依賴包如下所示:

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <project xmlns="http://maven.apache.org/POM/4.0.0"   3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   5     https://maven.apache.org/xsd/maven-4.0.0.xsd">   6     <modelVersion>4.0.0</modelVersion>   7     <parent>   8         <groupId>org.springframework.boot</groupId>   9         <artifactId>spring-boot-starter-parent</artifactId>  10         <version>2.1.1.RELEASE</version>  11         <relativePath /> <!-- lookup parent from repository -->  12     </parent>  13     <groupId>com.bie</groupId>  14     <artifactId>rabbitmq</artifactId>  15     <version>0.0.1-SNAPSHOT</version>  16     <name>rabbitmq</name>  17     <description>Demo project for Spring Boot</description>  18  19     <properties>  20         <java.version>1.8</java.version>  21     </properties>  22  23     <dependencies>  24         <dependency>  25             <groupId>org.springframework.boot</groupId>  26             <artifactId>spring-boot-starter</artifactId>  27         </dependency>  28         <dependency>  29             <groupId>org.springframework.boot</groupId>  30             <artifactId>spring-boot-starter-web</artifactId>  31         </dependency>  32         <dependency>  33             <groupId>org.springframework.boot</groupId>  34             <artifactId>spring-boot-starter-test</artifactId>  35             <scope>test</scope>  36         </dependency>  37         <dependency>  38             <groupId>org.springframework.boot</groupId>  39             <artifactId>spring-boot-starter-amqp</artifactId>  40         </dependency>  41     </dependencies>  42  43     <build>  44         <plugins>  45             <plugin>  46                 <groupId>org.springframework.boot</groupId>  47                 <artifactId>spring-boot-maven-plugin</artifactId>  48             </plugin>  49         </plugins>  50     </build>  51  52 </project>

配置你的SpringBoot的配置文件application.properties。

 1 # 給當前項目起名稱.   2 spring.application.name=rabbitmq   3   4 # 配置rabbitmq的參數.   5 # rabbitmq伺服器的ip地址.   6 spring.rabbitmq.host=192.168.110.133   7 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號.   8 spring.rabbitmq.port=5672   9 # rabbitmq的帳號.  10 spring.rabbitmq.username=guest  11 # rabbitmq的密碼.  12 spring.rabbitmq.password=guest  13  14 # 隊列的名稱  15 rabbitmq.queue=queue001

然後,創建一個隊列,創建的隊列,在項目啟動的時候載入,這樣使用隊列的時候就可以直接使用。

 1 package com.example.bie.config;   2   3 import org.springframework.amqp.core.Queue;   4 import org.springframework.beans.factory.annotation.Value;   5 import org.springframework.context.annotation.Bean;   6 import org.springframework.context.annotation.Configuration;   7   8 /**   9  *  10  * @author biehl  11  *  12  * @Configuration項目啟動載入本類  13  *  14  */  15 @Configuration  16 public class RabbitMqQueueConfig {  17  18     @Value("${rabbitmq.queue}")  19     private String queueName;  20  21     /**  22      * 創建一個隊列  23      *  24      * @return  25      */  26     @Bean  27     public Queue createQueue() {  28         return new Queue(this.queueName);  29     }  30  31 }

然後,創建一個Rabbitmq的生產者,生產消息,這裡使用傳入參數的方法生產消息。可以測試單條消息和多條消息。

 1 package com.example.bie.rabbitmq.producer;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         RabbitmqProducer消息發送者  13  *  14  * @Component加入到容器中.  15  *  16  */  17 @Component  18 public class RabbitmqProducer {  19  20     @Autowired  21     private AmqpTemplate rabbitmqAmqpTemplate;  22  23     @Value("${rabbitmq.queue}")  24     private String queueName;  25  26     /**  27      * 發送消息的方法  28      */  29     public void producer(String msg) {  30         // 向消息隊列中發送消息  31         // 參數1,隊列的名稱  32         // 參數2,發送的消息  33         this.rabbitmqAmqpTemplate.convertAndSend(this.queueName, msg);  34     }  35  36 }

創建消費者消費消息,由於使用的是消費事件監聽器,這裡使用註解方式,一旦發現隊列裡面的消息發生變化,就觸發該事件,從而觸發方法進行消息的消費。

 1 package com.example.bie.rabbitmq.consumer;   2   3 import org.springframework.amqp.rabbit.annotation.RabbitListener;   4 import org.springframework.beans.factory.annotation.Value;   5 import org.springframework.stereotype.Component;   6   7 /**   8  *   9  * @author biehl  10  *  11  *         RabbitmqConsumer消息消費者  12  *  13  *         消費者是根據消息隊列的監聽器,進行消息的接收和消費。  14  *  15  *         消息隊列發生變化,消息事件就會產生,觸發方法進行消息的接收。  16  *  17  */  18 @Component  19 public class RabbitmqConsumer {  20  21     @Value("${rabbitmq.queue}")  22     private String queueName;  23  24     /**  25      * 消費者消費消息,接受消息的方法,採用消息隊列監聽機制.  26      *  27      * @RabbitListener  28      *  29      *                 意思是當隊列發生變化,消息事件產生了或者生產者發送消息了。  30      *  31      *                 馬上就會觸發這個方法,進行消息的消費。  32      */  33     @RabbitListener(queues = "queue001")  34     public void consumer(String msg) {  35         // 列印消息  36         System.out.println("消費者===>消費<===消息message: " + msg);  37     }  38  39 }

創建完畢隊列,生產者,消費者,配置好配置文件,可以使用單元測試,或者使用Controller的方式進行測試。由於使用單元測試,SpringBoot項目停止,就無法進行消費,不方便測試,這裡使用web容器進行測試,創建一個Controller,頁面訪問進行觸發方法的執行。當生產者生產消息多的時候可以在瀏覽器進行觀察生產、消費的效果。

 1 package com.example.bie.controller;   2   3 import org.springframework.beans.factory.annotation.Autowired;   4 import org.springframework.stereotype.Controller;   5 import org.springframework.web.bind.annotation.RequestMapping;   6 import org.springframework.web.bind.annotation.ResponseBody;   7   8 import com.example.bie.rabbitmq.producer.RabbitmqProducer;   9  10 /**  11  *  12  * @author biehl  13  *  14  */  15 @Controller  16 public class RabbitmqController {  17  18     @Autowired  19     private RabbitmqProducer rabbitmqProducer;  20  21     @RequestMapping(value = "/sendMessage")  22     @ResponseBody  23     public void rabbitmqSendMessage() {  24         String msg = "消息產===>生者<===消息message: ";  25         for (int i = 0; i < 100000; i++) {  26             rabbitmqProducer.producer(msg + i);  27         }  28     }  29  30 }

最後就可以啟動你的SpringBoot項目了。

 1 package com.example;   2   3 import org.springframework.boot.SpringApplication;   4 import org.springframework.boot.autoconfigure.SpringBootApplication;   5   6 @SpringBootApplication   7 public class RabbitmqApplication {   8   9     public static void main(String[] args) {  10         SpringApplication.run(RabbitmqApplication.class, args);  11     }  12  13 }

運行效果如下所示:

9、RabbitMQ原理理解,概念的理解。

1)、RabbitMQ的消息Message。

  消息。消息是不具名的,它由消息頭、消息體組成。消息體是不透明的,而消息頭則由一系列可選屬性組成,這些屬性包括:routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出消息可能持久性存儲)等。

2)、RabbitMQ的生產者Provider或者Publisher。

  消息的生產者。也是一個向交換器發布消息的客戶端應用程式。

3)、RabbitMQ的消費者Consumer。

  消息的消費者。表示一個從消息隊列中取得消息的客戶端應用程式。

4)、RabbitMQ的交換器或者稱為交換機Exchange。

  交換器。用來接收生產者發送的消息並將這些消息路由給伺服器中的隊列。三種常用的交換器類型,包含,direct(發布與訂閱 完全匹配,默認使用)、fanout(廣播)、topic(主題,規則匹配)。

5)、RabbitMQ的綁定Binding。

  綁定。用於消息隊列Queue和交換器Exchange之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

6)、RabbitMQ的消息隊列Queue。

  消息隊列。用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者鏈接到這個隊列將其取走。

7)、RabbitMQ的路由鍵Routing-key。

  路由鍵。RabbitMQ 決定消息該投遞到哪個隊列的規則。隊列通過路由鍵綁定到交換器。消息發送到 MQ 伺服器時,消息將擁有一個路由鍵,即便是空的,RabbitMQ 也會將其和綁定使用的路由鍵進行匹配。如果相匹配,消息將會投遞到該隊列。如果不匹配,消息將會進入黑洞。

8)、RabbitMQ的鏈接Connection。

  鏈接。指Rabbitmq 伺服器和服務建立的 TCP 鏈接。

9)、RabbitMQ的信道Channel。

  信道。

a、Channel 中文叫做信道,是 TCP 裡面的虛擬鏈接。例如:電纜相當於 TCP,信道是一個獨立光纖束,一條 TCP 連接上創建多條信道是沒有問題的。

b、TCP 一旦打開,就會創建 AMQP 信道。c、無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的。

10)、RabbitMQ的虛擬主機Virtual Host。

  虛擬主機。表示一批交換器,消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和許可權機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定, RabbitMQ 默認的 vhost 是/。

11)、RabbitMQ的消息隊列伺服器實體Borker。

  表示消息隊列伺服器實體。

12)、RabbitMQ交換器和隊列的關係。

  交換器是通過路由鍵和隊列綁定在一起的,如果消息擁有的路由鍵跟隊列和交換器的路由鍵匹配,那麼消息就會被路由到該綁定的隊列中。   也就是說,消息到隊列的過程中,消息首先會經過交換器,接下來交換器在通過路由鍵匹配分發消息到具體的隊列中。   路由鍵可以理解為匹配的規則。

13)、RabbitMQ 為什麼需要信道?為什麼不是 TCP 直接通訊?

  TCP 的創建和銷毀開銷特別大。創建需要 3 次握手,銷毀需要 4 次分手。   如果不用信道,那應用程式就會以 TCP 鏈接 Rabbit,高峰時每秒成千上萬條鏈接會造成資源巨大的浪費,而且作業系統每秒處理 TCP 鏈接數也是有限制的,必定造成性能瓶頸。   信道的原理是一條執行緒一條通道,多條執行緒多條通道同用一條 TCP 鏈接。一條 TCP 鏈接可以容納無限的信道,即使每秒成千上萬的請求也不會成為性能的瓶頸。

10、RabbitMQ的架構設計,如下圖所示。