RabbitMQ的交換器Exchange之Topic交換器(主題,規則匹配)

  • 2019 年 11 月 5 日
  • 筆記

1、Topic交換器(主題,規則匹配),Topic交換器也稱為主題交換器,特點是根據規則進行匹配,可以根據模糊進行匹配(即根據路由key進行模糊匹配),決定將那個資訊放入到指定的隊列裡面去。

項目的結構如下所示:

2、由於使用的是SpringBoot項目結合Maven項目構建的,pom.xml的配置文件,如下所示,生產者和消費者的配置文件一致,這裡只貼一份了。

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <project xmlns="http://maven.apache.org/POM/4.0.0"   3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   5     https://maven.apache.org/xsd/maven-4.0.0.xsd">   6     <modelVersion>4.0.0</modelVersion>   7     <parent>   8         <groupId>org.springframework.boot</groupId>   9         <artifactId>spring-boot-starter-parent</artifactId>  10         <version>2.1.1.RELEASE</version>  11         <relativePath /> <!-- lookup parent from repository -->  12     </parent>  13     <groupId>com.bie</groupId>  14     <artifactId>rabbitmq-topic-provider</artifactId>  15     <version>0.0.1-SNAPSHOT</version>  16     <name>rabbitmq-topic-provider</name>  17     <description>Demo project for Spring Boot</description>  18  19     <properties>  20         <java.version>1.8</java.version>  21     </properties>  22  23     <dependencies>  24         <dependency>  25             <groupId>org.springframework.boot</groupId>  26             <artifactId>spring-boot-starter</artifactId>  27         </dependency>  28         <dependency>  29             <groupId>org.springframework.boot</groupId>  30             <artifactId>spring-boot-starter-web</artifactId>  31         </dependency>  32         <dependency>  33             <groupId>org.springframework.boot</groupId>  34             <artifactId>spring-boot-starter-test</artifactId>  35             <scope>test</scope>  36         </dependency>  37         <dependency>  38             <groupId>org.springframework.boot</groupId>  39             <artifactId>spring-boot-starter-amqp</artifactId>  40         </dependency>  41     </dependencies>  42  43     <build>  44         <plugins>  45             <plugin>  46                 <groupId>org.springframework.boot</groupId>  47                 <artifactId>spring-boot-maven-plugin</artifactId>  48             </plugin>  49         </plugins>  50     </build>  51  52 </project>

3、配置好pom.xml配置文件,就可以進行開發了,這裡先約束一下配置文件,體現一下SpringBoot的魔力,約定大於配置。

 1 # 給當前項目起名稱.   2 spring.application.name=rabbitmq-topic-provider   3   4 # 配置埠號   5 server.port=8081   6   7 # 配置rabbitmq的參數.   8 # rabbitmq伺服器的ip地址.   9 spring.rabbitmq.host=192.168.110.133  10 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號.  11 spring.rabbitmq.port=5672  12 # rabbitmq的帳號.  13 spring.rabbitmq.username=guest  14 # rabbitmq的密碼.  15 spring.rabbitmq.password=guest  16  17 # 設置交換器的名稱,方便修改.  18 # 生產者和消費者的交換器的名稱是一致的,這樣生產者生產的消息發送到交換器,消費者可以從這個交換器中消費.  19 rabbitmq.config.exchange=log.exchange.topic

模擬三個服務,用戶服務、商品服務,訂單服務,產生的各種日誌資訊,包含info、debug、trace、warn、error日誌資訊。不同的日誌級別資訊指定好路由鍵,將發送的消息綁定到交換器上面,發送消息。

 1 package com.example.bie.provider;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         生產者,生產消息同樣需要知道向那個交換器Exchange發送消息的.  13  *  14  *         這裡使用的交換器類型使用的是topic主題模式,根據規則匹配。  15  *  16  */  17 @Component  18 public class RabbitMqUserLogProduce {  19  20     @Autowired  21     private AmqpTemplate rabbitmqAmqpTemplate;  22  23     // 交換器的名稱Exchange  24     @Value(value = "${rabbitmq.config.exchange}")  25     private String exchange;  26  27     // 路由鍵routingkey  28     private String routingKeyInfo = "user.log.info";  29     private String routingKeyDebug = "user.log.debug";  30     private String routingKeyTrace = "user.log.trace";  31     private String routingKeyWarn = "user.log.warn";  32     private String routingKeyError = "user.log.error";  33  34     /**  35      * 發送消息的方法  36      *  37      * @param msg  38      */  39     public void producer(String msg) {  40         // 向消息隊列發送消息  41         // 參數1,交換器的名稱  42         // 參數2,路由鍵  43         // 參數3,消息  44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "user.log.info......" + msg);  45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "user.log.debug......" + msg);  46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "user.log.trace......" + msg);  47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "user.log.warn......" + msg);  48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "user.log.error......" + msg);  49     }  50  51 }
 1 package com.example.bie.provider;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         生產者,生產消息同樣需要知道向那個交換器Exchange發送消息的.  13  *  14  *         這裡使用的交換器類型使用的是topic主題模式,根據規則匹配。  15  *  16  */  17 @Component  18 public class RabbitMqProductLogProduce {  19  20     @Autowired  21     private AmqpTemplate rabbitmqAmqpTemplate;  22  23     // 交換器的名稱Exchange  24     @Value(value = "${rabbitmq.config.exchange}")  25     private String exchange;  26  27     // 路由鍵routingkey  28     private String routingKeyInfo = "product.log.info";  29     private String routingKeyDebug = "product.log.debug";  30     private String routingKeyTrace = "product.log.trace";  31     private String routingKeyWarn = "product.log.warn";  32     private String routingKeyError = "product.log.error";  33  34     /**  35      * 發送消息的方法  36      *  37      * @param msg  38      */  39     public void producer(String msg) {  40         // 向消息隊列發送消息  41         // 參數1,交換器的名稱  42         // 參數2,路由鍵  43         // 參數3,消息  44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "product.log.info......" + msg);  45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "product.log.debug......" + msg);  46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "product.log.trace......" + msg);  47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "product.log.warn......" + msg);  48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "product.log.error......" + msg);  49     }  50  51 }
 1 package com.example.bie.provider;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         生產者,生產消息同樣需要知道向那個交換器Exchange發送消息的.  13  *  14  *         這裡使用的交換器類型使用的是topic主題模式,根據規則匹配。  15  *  16  */  17 @Component  18 public class RabbitMqOrderLogProduce {  19  20     @Autowired  21     private AmqpTemplate rabbitmqAmqpTemplate;  22  23     // 交換器的名稱Exchange  24     @Value(value = "${rabbitmq.config.exchange}")  25     private String exchange;  26  27     // 路由鍵routingkey  28     private String routingKeyInfo = "order.log.info";  29     private String routingKeyDebug = "order.log.debug";  30     private String routingKeyTrace = "order.log.trace";  31     private String routingKeyWarn = "order.log.warn";  32     private String routingKeyError = "order.log.error";  33  34     /**  35      * 發送消息的方法  36      *  37      * @param msg  38      */  39     public void producer(String msg) {  40         // 向消息隊列發送消息  41         // 參數1,交換器的名稱  42         // 參數2,路由鍵  43         // 參數3,消息  44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "order.log.info......" + msg);  45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "order.log.debug......" + msg);  46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "order.log.trace......" + msg);  47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "order.log.warn......" + msg);  48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "order.log.error......" + msg);  49     }  50  51 }

這裡使用web工程,瀏覽器訪問調用,方便測試。你也可以使用單元測試的方法。

 1 package com.example.bie.controller;   2   3 import org.springframework.beans.factory.annotation.Autowired;   4 import org.springframework.stereotype.Controller;   5 import org.springframework.web.bind.annotation.RequestMapping;   6 import org.springframework.web.bind.annotation.ResponseBody;   7   8 import com.example.bie.provider.RabbitMqOrderLogProduce;   9 import com.example.bie.provider.RabbitMqProductLogProduce;  10 import com.example.bie.provider.RabbitMqUserLogProduce;  11  12 /**  13  *  14  * @author biehl  15  *  16  */  17 @Controller  18 public class RabbitmqController {  19  20     @Autowired  21     private RabbitMqUserLogProduce rabbitMqUserLogProduce;  22  23     @Autowired  24     private RabbitMqProductLogProduce rabbitMqProductLogProduce;  25  26     @Autowired  27     private RabbitMqOrderLogProduce rabbitMqOrderLogProduce;  28  29     @RequestMapping(value = "/userLogInfo")  30     @ResponseBody  31     public String rabbitmqSendUserLogInfoMessage() {  32         String msg = "生產者===>生者的UserLogInfo消息message: ";  33         for (int i = 0; i < 50000; i++) {  34             rabbitMqUserLogProduce.producer(msg + i);  35         }  36         return "生產===>  UserLogInfo消息message  ===> success!!!";  37     }  38  39     @RequestMapping(value = "/productLogInfo")  40     @ResponseBody  41     public String rabbitmqSendProductLogErrorMessage() {  42         String msg = "生產者===>生者的ProductLogInfo消息message: ";  43         for (int i = 0; i < 50000; i++) {  44             rabbitMqProductLogProduce.producer(msg + i);  45         }  46         return "生產===>  ProductLogInfo消息message  ===> success!!!";  47     }  48  49     @RequestMapping(value = "/orderLogInfo")  50     @ResponseBody  51     public String rabbitmqSendOrderLogInfoMessage() {  52         String msg = "生產者===>生者的OrderLogInfo消息message: ";  53         for (int i = 0; i < 50000; i++) {  54             rabbitMqOrderLogProduce.producer(msg + i);  55         }  56         return "生產===>  OrderLogInfo消息message  ===> success!!!";  57     }  58  59 }

生產者的啟動類如下所示:

 1 package com.example;   2   3 import org.springframework.boot.SpringApplication;   4 import org.springframework.boot.autoconfigure.SpringBootApplication;   5   6 @SpringBootApplication   7 public class RabbitmqProducerApplication {   8   9     public static void main(String[] args) {  10         SpringApplication.run(RabbitmqProducerApplication.class, args);  11     }  12  13 }

4、生產者開發完畢就可以進行消費者的開發,也是先約束一下配置文件application.properties。

 1 # 給當前項目起名稱.   2 spring.application.name=rabbitmq-topic-consumer   3   4 # 配置埠號   5 server.port=8080   6   7 # 配置rabbitmq的參數.   8 # rabbitmq伺服器的ip地址.   9 spring.rabbitmq.host=192.168.110.133  10 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號.  11 spring.rabbitmq.port=5672  12 # rabbitmq的帳號.  13 spring.rabbitmq.username=guest  14 # rabbitmq的密碼.  15 spring.rabbitmq.password=guest  16  17 # 設置交換器的名稱,方便修改.  18 # 路由鍵是將交換器和隊列進行綁定的,隊列通過路由鍵綁定到交換器.  19 rabbitmq.config.exchange=log.exchange.topic  20  21 # info級別的隊列名稱.  22 rabbitmq.config.queue.info=log.info.queue  23  24 # error級別的隊列名稱.  25 rabbitmq.config.queue.error=log.error.queue  26  27 # 全日誌log級別的隊列名稱.  28 rabbitmq.config.queue.log=log.all.queue

約束好配置文件就可以進行消費者的開發了,這裡是將用戶服務、商品服務、訂單服務產生的info、debug、trace、warn、error不同級別的日誌資訊,使用rabbitmq的主題topic模式進行規則配置,即,消費者可以專一消費info級別的消息,error級別的消息,或者全部級別的日誌消息。

 1 package com.example.bie.consumer;   2   3 import org.springframework.amqp.core.ExchangeTypes;   4 import org.springframework.amqp.rabbit.annotation.Exchange;   5 import org.springframework.amqp.rabbit.annotation.Queue;   6 import org.springframework.amqp.rabbit.annotation.QueueBinding;   7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;   8 import org.springframework.amqp.rabbit.annotation.RabbitListener;   9 import org.springframework.stereotype.Component;  10  11 /**  12  *  13  * @author biehl  14  *  15  *         消息接收者  16  *  17  *         1、@RabbitListener bindings:綁定隊列  18  *  19  *         2、@QueueBinding  20  *         value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器  21  *  22  *         3、@Queue value:配置隊列名稱、autoDelete:是否是一個可刪除的臨時隊列  23  *  24  *         4、@Exchange value:為交換器起個名稱、type:指定具體的交換器類型  25  *  26  *  27  */  28 @Component  29 @RabbitListener(bindings = @QueueBinding(  30  31         value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),  32  33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),  34  35         key = "*.log.info"))  36 public class LogInfoConsumer {  37  38     /**  39      * 接收消息的方法,採用消息隊列監聽機制.  40      *  41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面  42      *  43      * @RabbitHandler是指定這個方法可以進行消息的接收並且消費.  44      *  45      * @param msg  46      */  47     @RabbitHandler  48     public void consumer(String msg) {  49         // 列印消息  50         System.out.println("All消費者===>消費: " + msg);  51     }  52  53 }
 1 package com.example.bie.consumer;   2   3 import org.springframework.amqp.core.ExchangeTypes;   4 import org.springframework.amqp.rabbit.annotation.Exchange;   5 import org.springframework.amqp.rabbit.annotation.Queue;   6 import org.springframework.amqp.rabbit.annotation.QueueBinding;   7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;   8 import org.springframework.amqp.rabbit.annotation.RabbitListener;   9 import org.springframework.stereotype.Component;  10  11 /**  12  *  13  * @author biehl  14  *  15  *         消息接收者  16  *  17  *         1、@RabbitListener bindings:綁定隊列  18  *  19  *         2、@QueueBinding  20  *         value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器  21  *  22  *         3、@Queue value:配置隊列名稱、autoDelete:是否是一個可刪除的臨時隊列  23  *  24  *         4、@Exchange value:為交換器起個名稱、type:指定具體的交換器類型  25  *  26  *  27  */  28 @Component  29 @RabbitListener(bindings = @QueueBinding(  30  31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),  32  33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),  34  35         key = "*.log.error"))  36 public class LogErrorConsumer {  37  38     /**  39      * 接收消息的方法,採用消息隊列監聽機制.  40      *  41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面  42      *  43      * @RabbitHandler是指定這個方法可以進行消息的接收並且消費.  44      *  45      * @param msg  46      */  47     @RabbitHandler  48     public void consumer(String msg) {  49         // 列印消息  50         System.out.println("Error消費者===>消費: " + msg);  51     }  52  53 }
 1 package com.example.bie.consumer;   2   3 import org.springframework.amqp.core.ExchangeTypes;   4 import org.springframework.amqp.rabbit.annotation.Exchange;   5 import org.springframework.amqp.rabbit.annotation.Queue;   6 import org.springframework.amqp.rabbit.annotation.QueueBinding;   7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;   8 import org.springframework.amqp.rabbit.annotation.RabbitListener;   9 import org.springframework.stereotype.Component;  10  11 /**  12  *  13  * @author biehl  14  *  15  *         消息接收者  16  *  17  *         1、@RabbitListener bindings:綁定隊列  18  *  19  *         2、@QueueBinding  20  *         value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器  21  *  22  *         3、@Queue value:配置隊列名稱、autoDelete:是否是一個可刪除的臨時隊列  23  *  24  *         4、@Exchange value:為交換器起個名稱、type:指定具體的交換器類型  25  *  26  *  27  */  28 @Component  29 @RabbitListener(bindings = @QueueBinding(  30  31         value = @Queue(value = "${rabbitmq.config.queue.log}", autoDelete = "true"),  32  33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),  34  35         key = "*.log.*"))  36 public class LogAllConsumer {  37  38     /**  39      * 接收消息的方法,採用消息隊列監聽機制.  40      *  41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面  42      *  43      * @RabbitHandler是指定這個方法可以進行消息的接收並且消費.  44      *  45      * @param msg  46      */  47     @RabbitHandler  48     public void consumer(String msg) {  49         // 列印消息  50         System.out.println("Info消費者===>消費: " + msg);  51     }  52  53 }

消費者的啟動類,如下所示:

 1 package com.example;   2   3 import org.springframework.boot.SpringApplication;   4 import org.springframework.boot.autoconfigure.SpringBootApplication;   5   6 @SpringBootApplication   7 public class RabbitmqConsumerApplication {   8   9     public static void main(String[] args) {  10         SpringApplication.run(RabbitmqConsumerApplication.class, args);  11     }  12  13 }

5、運行效果如下所示: