RabbitMQ六種工作模式有哪些?怎樣用SpringBoot整合RabbitMQ

  • 2021 年 1 月 23 日
  • 筆記

目錄

  • 一、RabbitMQ入門程序
  • 二、Work queues 工作模式
  • 三、Publish / Subscribe 發佈/訂閱模式
  • 四、Routing 路由模式
  • 五、Topics
  • 六、Header
  • 七、RPC
  • 八、Spring Data Elasticsearch

一、RabbitMQ入門程序

<dependencies>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
      </dependency>

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
      </dependency>
</dependencies>

application.yml

server:
  port: 44000
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672
    virtual-host: /

消息發送者

/**
 * Description: rabbitmq入門程序
 *
 * @author zygui
 * @date Created on 2020/5/13 15:34
 */
public class Producer01 {

    // 聲明一個消息隊列名稱
    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) {
        // 通過連接工廠創建新的連接與mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/"); // 默認為 / 即可

        // 建立連接
        Connection connection = null;
        // 建立信道(目的是為了復用連接)
        Channel channel = null;
        try {

            //建立新連接
            connection = connectionFactory.newConnection();
            //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
            channel = connection.createChannel();

            //聲明隊列,如果隊列在mq 中沒有則要創建
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
             * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
             * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
             * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
             */
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);

            // 發送消息
            //參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 參數明細:
             * 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
             * 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
             * 3、props,消息的屬性
             * 4、body,消息內容
             */
            //消息內容
            String message = "hello world 桂朝陽";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("send to mq "+message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            try {
                // 關閉信道
                channel.close();
                // 關閉連接
                connection.close();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消息接收者

/**
 * Description: rabbitmq入門程序
 *
 * @author zygui
 * @date Created on 2020/5/13 15:45
 */
public class Consumer01 {

    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {

        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        //監聽隊列
        //聲明隊列,如果隊列在mq 中沒有則要創建
        //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

入門程序步驟
在這裡插入圖片描述

二、Work queues 工作模式

在這裡插入圖片描述
在這裡插入圖片描述

三、Publish / Subscribe 發佈/訂閱模式

在這裡插入圖片描述
消息生產者

public class Producer02_publish {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 交換機名稱
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args) {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連接
            connection = connectionFactory.newConnection();
            //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
            channel = connection.createChannel();
            //聲明隊列,如果隊列在mq 中沒有則要創建
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
             * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
             * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
             * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //聲明一個交換機
            //參數:String exchange, String type
            /**
             * 參數明細:
             * 1、交換機的名稱
             * 2、交換機的類型
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

            //進行交換機和隊列綁定
            //參數:String queue, String exchange, String routingKey
            /**
             * 參數明細:
             * 1、queue 隊列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
            //發送消息
            //參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 參數明細:
             * 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
             * 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
             * 3、props,消息的屬性
             * 4、body,消息內容
             */
            for(int i=0;i<5;i++){
                //消息內容
                String message = "send inform message to user";
                channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
                System.out.println("send to mq "+message);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連接
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
}

消息接收者1

public class Consumer02_subscribe_email {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}

消息接收者2

public class Consumer02_subscribe_sms {
    //隊列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

在這裡插入圖片描述

四、Routing 路由模式

在這裡插入圖片描述
消息生產者

public class Producer03_routing {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    // 路由鍵名稱
    private static final String ROUTINGKEY_EMAIL="inform_email";
    private static final String ROUTINGKEY_SMS="inform_sms";

    public static void main(String[] args) {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連接
            connection = connectionFactory.newConnection();
            //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
            channel = connection.createChannel();

            //聲明隊列,如果隊列在mq 中沒有則要創建
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
             * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
             * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
             * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);

            //聲明一個交換機
            //參數:String exchange, String type
            /**
             * 參數明細:
             * 1、交換機的名稱
             * 2、交換機的類型
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

            //進行交換機和隊列綁定
            //參數:String queue, String exchange, String routingKey
            /**
             * 參數明細:
             * 1、queue 隊列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
            //channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
            //channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");

            //發送消息
            //參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 參數明細:
             * 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
             * 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
             * 3、props,消息的屬性
             * 4、body,消息內容
             */
           /* for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                System.out.println("send to mq "+message);
            }*/
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
                System.out.println("send to mq "+message);
            }

           // 此時指定的路由鍵是 inform, 所以兩個消費者都可以消費
            /*for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());
                System.out.println("send to mq "+message);
            }*/

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連接
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消息接收者1

public class Consumer03_routing_email {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    // 交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    // 路由鍵名稱
    private static final String ROUTINGKEY_EMAIL="inform_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}
//www.dtmao.cc/news_show_631033.shtml

消息接收者2

public class Consumer03_routing_sms {
    //隊列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    private static final String ROUTINGKEY_SMS="inform_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

在這裡插入圖片描述

五、Topics

在這裡插入圖片描述

public class Producer04_topics {
    // 隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 聲明交換機
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    // 使用通配符的方式來,設置路由鍵
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    public static void main(String[] args) {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連接
            connection = connectionFactory.newConnection();
            //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
            channel = connection.createChannel();
            //聲明隊列,如果隊列在mq 中沒有則要創建
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
             * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
             * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
             * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //聲明一個交換機
            //參數:String exchange, String type
            /**
             * 參數明細:
             * 1、交換機的名稱
             * 2、交換機的類型
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //進行交換機和隊列綁定
            //參數:String queue, String exchange, String routingKey
            /**
             * 參數明細:
             * 1、queue 隊列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
            //發送消息
            //參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 參數明細:
             * 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")
             * 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱
             * 3、props,消息的屬性
             * 4、body,消息內容
             */
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms and email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連接
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消息接收者1

public class Consumer04_topics_email {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}

小寫接收者2

public class Consumer04_topics_sms {
    //隊列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接工廠創建新的連接和mq建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接
        Connection connection = connectionFactory.newConnection();
        //創建會話通道,生產者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後隊列還在
         * 3、exclusive 是否獨佔連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發佈訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法將被調用
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息後要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

六、Header

在這裡插入圖片描述

七、RPC

在這裡插入圖片描述

八、Spring Data Elasticsearch

rabbitmq-producer 消息發送者

@Configuration
public class RabbitMQConfig {
    // 聲明兩個隊列常量
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 聲明交換機常量
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    // 聲明兩個路由鍵常量
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    // 聲明隊列

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    // 綁定交換機和隊列
    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

rabbitmq-consumer 消息接收者

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    // 聲明隊列

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    // 綁定交換機和隊列
    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}
@SpringBootApplication
@EnableRabbit
public class TestRabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestRabbitMQApplication.class, args);
    }
}

監聽消息隊列

@Component
public class ReceiveHandler {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
    public void receiveMsg(String msg) {
        System.out.println("接收到的消息是 = " + msg);
    }
}

在rabbitmq-provider中測試

@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {

    // 使用rabbitTemplate發送消息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendEmail() {
        String message = "send email message to user";
        /**
         * arg1: 交換機名稱
         * arg2: 路由鍵
         * arg3: 消息內容
         */
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);
    }

}