RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配)

  • 2019 年 11 月 5 日
  • 笔记

1、交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型,a、direct(发布与订阅 完全匹配)。b、fanout(广播)。c、topic(主题,规则匹配)。

2、direct(发布与订阅 完全匹配)的使用。

由于使用的是SpringBoot项目结合Maven项目构建的。项目工程如下所示:

3、生产者模块和消费者模块分开的,但是pom.xml是一致的,如下所示:

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <project xmlns="http://maven.apache.org/POM/4.0.0"   3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   5     https://maven.apache.org/xsd/maven-4.0.0.xsd">   6     <modelVersion>4.0.0</modelVersion>   7     <parent>   8         <groupId>org.springframework.boot</groupId>   9         <artifactId>spring-boot-starter-parent</artifactId>  10         <version>2.1.1.RELEASE</version>  11         <relativePath /> <!-- lookup parent from repository -->  12     </parent>  13     <groupId>com.bie</groupId>  14     <artifactId>rabbitmq-direct-provider</artifactId>  15     <version>0.0.1-SNAPSHOT</version>  16     <name>rabbitmq-direct-provider</name>  17     <description>Demo project for Spring Boot</description>  18  19     <properties>  20         <java.version>1.8</java.version>  21     </properties>  22  23     <dependencies>  24         <dependency>  25             <groupId>org.springframework.boot</groupId>  26             <artifactId>spring-boot-starter</artifactId>  27         </dependency>  28         <dependency>  29             <groupId>org.springframework.boot</groupId>  30             <artifactId>spring-boot-starter-web</artifactId>  31         </dependency>  32         <dependency>  33             <groupId>org.springframework.boot</groupId>  34             <artifactId>spring-boot-starter-test</artifactId>  35             <scope>test</scope>  36         </dependency>  37         <dependency>  38             <groupId>org.springframework.boot</groupId>  39             <artifactId>spring-boot-starter-amqp</artifactId>  40         </dependency>  41     </dependencies>  42  43     <build>  44         <plugins>  45             <plugin>  46                 <groupId>org.springframework.boot</groupId>  47                 <artifactId>spring-boot-maven-plugin</artifactId>  48             </plugin>  49         </plugins>  50     </build>  51  52 </project>

配置生产者的配置文件application.properties。配置如下所示:

 1 # 给当前项目起名称.   2 spring.application.name=rabbitmq-direct-provider   3   4 # 配置端口号   5 server.port=8081   6   7   8 # 配置rabbitmq的参数.   9 # rabbitmq服务器的ip地址.  10 spring.rabbitmq.host=192.168.110.133  11 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.  12 spring.rabbitmq.port=5672  13 # rabbitmq的账号.  14 spring.rabbitmq.username=guest  15 # rabbitmq的密码.  16 spring.rabbitmq.password=guest  17  18 # 设置交换器的名称,方便修改.  19 # 生产者和消费者的交换器的名称是一致的,这样生产者生产的消息发送到交换器,消费者可以从这个交换器中消费.  20 rabbitmq.config.exchange=log.exchange.direct  21  22 # 生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器,交换器根据路由键将绑定到队列上.  23 # 交换器根据不同的路由键将消息发送到不同队列上.  24 # info的路由键.  25 rabbitmq.config.queue.info.routing.key=log.info.routing.key  26  27 # error的路由键.  28 rabbitmq.config.queue.error.routing.key=log.error.routing.key

配置完毕,配置文件开始写生产者生产消息代码。

本模块练习的是发布订阅模式即Direct,分为两个生产者LogInfo、LogError,生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器(即交换器根据路由键将绑定到队列上),交换器根据不同的路由键将消息发送到不同队列上。

本项目指定了info的路由键、error的路由键,然后生产者生产的消息发送到指定的交换器。交换器通过路由到绑定的队列中去,最后消费者进行监听队列发生变化,触发指定的方法进行消息的消费。

 1 package com.example.bie.provider;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.  13  *  14  *         这里使用的交换器类型使用的是direct发布订阅模式,  15  *         根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。  16  *         不同的消息具有相同的路由键,就会进入相同的队列当中去。  17  *  18  *  19  */  20 @Component  21 public class RabbitMqLogInfoProduce {  22  23     @Autowired  24     private AmqpTemplate rabbitmqAmqpTemplate;  25  26     // 交换器的名称Exchange  27     @Value(value = "${rabbitmq.config.exchange}")  28     private String exchange;  29  30     // 路由键routingkey  31     @Value(value = "${rabbitmq.config.queue.info.routing.key}")  32     private String routingKey;  33  34     /**  35      * 发送消息的方法  36      *  37      * @param msg  38      */  39     public void producer(String msg) {  40         // 向消息队列发送消息  41         // 参数1,交换器的名称  42         // 参数2,路由键  43         // 参数3,消息  44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);  45     }  46  47 }
 1 package com.example.bie.provider;   2   3 import org.springframework.amqp.core.AmqpTemplate;   4 import org.springframework.beans.factory.annotation.Autowired;   5 import org.springframework.beans.factory.annotation.Value;   6 import org.springframework.stereotype.Component;   7   8 /**   9  *  10  * @author biehl  11  *  12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.  13  *  14  *         这里使用的交换器类型使用的是direct发布订阅模式,  15  *         根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。  16  *         不同的消息具有相同的路由键,就会进入相同的队列当中去。  17  *  18  *  19  */  20 @Component  21 public class RabbitMqLogErrorProduce {  22  23     @Autowired  24     private AmqpTemplate rabbitmqAmqpTemplate;  25  26     // 交换器的名称Exchange  27     @Value(value = "${rabbitmq.config.exchange}")  28     private String exchange;  29  30     // 路由键routingkey  31     @Value(value = "${rabbitmq.config.queue.error.routing.key}")  32     private String routingKey;  33  34     /**  35      * 发送消息的方法  36      *  37      * @param msg  38      */  39     public void producer(String msg) {  40         // 向消息队列发送消息  41         // 参数1,交换器的名称  42         // 参数2,路由键  43         // 参数3,消息  44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);  45     }  46 }

这里使用web工程,浏览器访问调用,方便测试。

 1 package com.example.bie.controller;   2   3 import org.springframework.beans.factory.annotation.Autowired;   4 import org.springframework.stereotype.Controller;   5 import org.springframework.web.bind.annotation.RequestMapping;   6 import org.springframework.web.bind.annotation.ResponseBody;   7   8 import com.example.bie.provider.RabbitMqLogErrorProduce;   9 import com.example.bie.provider.RabbitMqLogInfoProduce;  10  11 /**  12  *  13  * @author biehl  14  *  15  */  16 @Controller  17 public class RabbitmqController {  18  19     @Autowired  20     private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;  21  22     @Autowired  23     private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;  24  25     @RequestMapping(value = "/logInfo")  26     @ResponseBody  27     public String rabbitmqSendLogInfoMessage() {  28         String msg = "生产者===>生者的LogInfo消息message: ";  29         for (int i = 0; i < 100000; i++) {  30             rabbitMqLogInfoProduce.producer(msg + i);  31         }  32         return "生产===>  LogInfo消息message  ===> success!!!";  33     }  34  35     @RequestMapping(value = "/logError")  36     @ResponseBody  37     public String rabbitmqSendLogErrorMessage() {  38         String msg = "生产者===>生者的LogError消息message: ";  39         for (int i = 0; i < 100000; i++) {  40             rabbitMqLogErrorProduce.producer(msg + i);  41         }  42         return "生产===>  LogError消息message  ===> success!!!";  43     }  44  45 }

生产者的启动类,如下所示:

 1 package com.example;   2   3 import org.springframework.boot.SpringApplication;   4 import org.springframework.boot.autoconfigure.SpringBootApplication;   5   6 @SpringBootApplication   7 public class RabbitmqProducerApplication {   8   9     public static void main(String[] args) {  10         SpringApplication.run(RabbitmqProducerApplication.class, args);  11     }  12  13 }

4、生产者搞完以后,开始搞消费者。由于pom.xml配置文件一致,这里省略消费者的pom.xml配置文件。

 1 # 给当前项目起名称.   2 spring.application.name=rabbitmq-direct-consumer   3   4 # 配置端口号   5 server.port=8080   6   7 # 配置rabbitmq的参数.   8 # rabbitmq服务器的ip地址.   9 spring.rabbitmq.host=192.168.110.133  10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.  11 spring.rabbitmq.port=5672  12 # rabbitmq的账号.  13 spring.rabbitmq.username=guest  14 # rabbitmq的密码.  15 spring.rabbitmq.password=guest  16  17 # 设置交换器的名称,方便修改.  18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.  19 rabbitmq.config.exchange=log.exchange.direct  20  21 # info级别的队列名称.  22 rabbitmq.config.queue.info=log.info.queue  23 # info的路由键.  24 rabbitmq.config.queue.info.routing.key=log.info.routing.key  25  26 # error级别的队列名称.  27 rabbitmq.config.queue.error=log.error.queue  28 # error的路由键.  29 rabbitmq.config.queue.error.routing.key=log.error.routing.key

消费者消费消息的编写,如下所示:

 1 package com.example.bie.consumer;   2   3 import org.springframework.amqp.core.ExchangeTypes;   4 import org.springframework.amqp.rabbit.annotation.Exchange;   5 import org.springframework.amqp.rabbit.annotation.Queue;   6 import org.springframework.amqp.rabbit.annotation.QueueBinding;   7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;   8 import org.springframework.amqp.rabbit.annotation.RabbitListener;   9 import org.springframework.stereotype.Component;  10  11 /**  12  *  13  * @author biehl  14  *  15  *         消息接收者  16  *  17  *         1、@RabbitListener bindings:绑定队列  18  *  19  *         2、@QueueBinding  20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器  21  *  22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列  23  *  24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型  25  *  26  *  27  */  28 @Component  29 @RabbitListener(bindings = @QueueBinding(  30  31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),  32  33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),  34  35         key = "${rabbitmq.config.queue.error.routing.key}"))  36 public class LogErrorConsumer {  37  38     /**  39      * 接收消息的方法,采用消息队列监听机制.  40      *  41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面  42      *  43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.  44      *  45      * @param msg  46      */  47     @RabbitHandler  48     public void consumer(String msg) {  49         // 打印消息  50         System.out.println("ERROR消费者===>消费<===消息message: " + msg);  51     }  52  53 }
 1 package com.example.bie.consumer;   2   3 import org.springframework.amqp.core.ExchangeTypes;   4 import org.springframework.amqp.rabbit.annotation.Exchange;   5 import org.springframework.amqp.rabbit.annotation.Queue;   6 import org.springframework.amqp.rabbit.annotation.QueueBinding;   7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;   8 import org.springframework.amqp.rabbit.annotation.RabbitListener;   9 import org.springframework.stereotype.Component;  10  11 /**  12  *  13  * @author biehl  14  *  15  *         消息接收者  16  *  17  *         1、@RabbitListener bindings:绑定队列  18  *  19  *         2、@QueueBinding  20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器  21  *  22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列  23  *  24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型  25  *  26  *  27  */  28 @Component  29 @RabbitListener(bindings = @QueueBinding(  30  31         value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),  32  33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),  34  35         key = "${rabbitmq.config.queue.info.routing.key}"))  36 public class LogInfoConsumer {  37  38     /**  39      * 接收消息的方法,采用消息队列监听机制.  40      *  41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面  42      *  43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.  44      *  45      * @param msg  46      */  47     @RabbitHandler  48     public void consumer(String msg) {  49         // 打印消息  50         System.out.println("INFO消费者===>消费: " + msg);  51     }  52  53 }

消费者启动主类如下所示:

 1 package com.example;   2   3 import org.springframework.boot.SpringApplication;   4 import org.springframework.boot.autoconfigure.SpringBootApplication;   5   6 @SpringBootApplication   7 public class RabbitmqConsumerApplication {   8   9     public static void main(String[] args) {  10         SpringApplication.run(RabbitmqConsumerApplication.class, args);  11     }  12  13 }

5、发布订阅模式,生产者生产消息,消费者消费消息,运行效果如下所示:

首先启动你消费者消费消息的启动类,再启动你的生产者生产消息的启动类。