RabbitMQ六種工作模式的對比與實踐

  • 2019 年 10 月 3 日
  • 筆記

最近學習RabbitMQ的使用方式,記錄下來,方便以後使用,也方便和大家共享,相互交流。

RabbitMQ的六種工作模式:

1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC

一、Work queues

多個消費端消費同一個隊列中的消息,隊列採用輪詢的方式將消息是平均發送給消費者;

 

 特點:

1、一條消息只會被一個消費端接收;

2、隊列採用輪詢的方式將消息是平均發送給消費者的;

3、消費者在處理完某條消息後,才會收到下一條消息

生產端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、制定消息

6、發送消息,使用默認交換機

消費端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、重寫消息消費方法

6、執行消息方法

新建兩個maven工程,生產消息的生產端,消費消息的消費端;

pom.xml文件中依賴坐標如下:

<dependencies>      <dependency>              <groupId>org.springframework.boot</groupId>              <artifactId>spring-boot-starter-logging</artifactId>              <version>2.1.0.RELEASE</version>          </dependency>          <dependency>              <groupId>com.rabbitmq</groupId>              <artifactId>amqp-client</artifactId>              <version>5.7.0</version>          </dependency>  </dependencies>

 生產端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.BuiltinExchangeType;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;      import java.io.IOException;  import java.util.concurrent.TimeoutException;  /*  1、聲明隊列  2、創建連接  3、創建通道  4、通道聲明隊列  5、制定消息  6、發送消息,使用默認交換機  */  public class Producer02 {      //聲明隊列      private static final String QUEUE ="queue";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");//mq服務ip地址              connectionFactory.setPort(5672);//mq client連接端口              connectionFactory.setUsername("guest");//mq登錄用戶名              connectionFactory.setPassword("guest");//mq登錄密碼              connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器              //創建與RabbitMQ服務的TCP連接              connection = connectionFactory.newConnection();              //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務              channel = connection.createChannel();                //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列                for(int i = 0;i<10;i++){                  String message = new String("mq 發送消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));                  System.out.println("mq消息發送成功!");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  channel.close();              } catch (IOException e) {                  e.printStackTrace();              } catch (TimeoutException e) {                  e.printStackTrace();              }              try {                  connection.close();              } catch (IOException e) {                  e.printStackTrace();              }          }      }  }

消費端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;  /*  1、聲明隊列  2、創建連接  3、創建通道  4、通道聲明隊列  5、重寫消息消費方法  6、執行消息方法  */  public class Consumer02 {      private static final String QUEUE ="queue";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE,true,consumer);            } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

生產端啟動後,控制台打印信息如下:

 RabbitMQ中的已有消息:

 queue中的消息正是生產端發送的消息:

 二、Publish/subscribe 模式

這種模式又稱為發佈訂閱模式,相對於Work queues模式,該模式多了一個交換機,生產端先把消息發送到交換機,再由交換機把消息發送到綁定的隊列中,每個綁定的隊列都能收到由生產端發送的消息。

發佈訂閱模式:

1、每個消費者監聽自己的隊列;

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短訊、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、制定消息

8、發送消息

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、重寫消息消費方法

8、執行消息方法

Publish/subscribe 模式綁定兩個消費端,因此需要有兩個消費端,一個郵件消費端,一個短訊消費端;

生產端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.BuiltinExchangeType;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;      import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Producer01 {      //聲明兩個隊列和一個交換機      //Publish/subscribe發佈訂閱模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");//mq服務ip地址              connectionFactory.setPort(5672);//mq client連接端口              connectionFactory.setUsername("guest");//mq登錄用戶名              connectionFactory.setPassword("guest");//mq登錄密碼              connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器              //創建與RabbitMQ服務的TCP連接              connection = connectionFactory.newConnection();              //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Publish/subscribe發佈訂閱模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Publish/subscribe發佈訂閱模式              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");              channel.queueBind(QUEUE_SMS,EXCHANGE,"");              for(int i = 0;i<10;i++){                  String message = new String("mq 發送消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  //Publish/subscribe發佈訂閱模式                  channel.basicPublish(EXCHANGE,"",null,message.getBytes());                  System.out.println("mq消息發送成功!");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  channel.close();              } catch (IOException e) {                  e.printStackTrace();              } catch (TimeoutException e) {                  e.printStackTrace();              }              try {                  connection.close();              } catch (IOException e) {                  e.printStackTrace();              }          }      }  }

郵件消費端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer01 {      //Publish/subscribe發佈訂閱模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Publish/subscribe發佈訂閱模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Publish/subscribe發佈訂閱模式              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {              /**                * 消費者接收消息調用此方法                * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                  (收到消息失敗後是否需要重新發送)                * @param properties                * @param body                * @throws IOException                * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                */              @Override              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                  //交換機                  String exchange = envelope.getExchange();                  //路由key                  String routingKey = envelope.getRoutingKey();                  envelope.getDeliveryTag();                  String msg = new String(body,"utf-8");                  System.out.println("mq收到的消息是:"+msg );              }              };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_EMAIL,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

短訊消費端的代碼如下:

package xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer01 {      //Publish/subscribe發佈訂閱模式      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Publish/subscribe發佈訂閱模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Publish/subscribe發佈訂閱模式              channel.queueBind(QUEUE_SMS,EXCHANGE,"");              DefaultConsumer consumer = new DefaultConsumer(channel) {              /**                * 消費者接收消息調用此方法                * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                  (收到消息失敗後是否需要重新發送)                * @param properties                * @param body                * @throws IOException                * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                */              @Override              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                  //交換機                  String exchange = envelope.getExchange();                  //路由key                  String routingKey = envelope.getRoutingKey();                  envelope.getDeliveryTag();                  String msg = new String(body,"utf-8");                  System.out.println("mq收到的消息是:"+msg );              }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_SMS,true,consumer);            } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

三、Routing 路由模式

Routing 模式又稱路由模式,該種模式除了要綁定交換機外,發消息的時候還要制定routing key,即路由key,隊列通過通道綁定交換機的時候,需要指定自己的routing key,這樣,生產端發送消息的時候也會指定routing key,通過routing key就可以把相應的消息發送到綁定相應routing key的隊列中去。

路由模式:

1、每個消費者監聽自己的隊列,並且設置routingkey;
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列;

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短訊、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定該隊列的routingkey

7、制定消息

8、發送消息並指定routingkey

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定routingkey

7、重寫消息消費方法

8、執行消息方法

按照假設的應用場景,同樣,Routing 路由模式也是一個生產端,兩個消費端,所不同的是,聲明交換機的類型不同,隊列綁定交換機的時候需要指定Routing key,發送消息的時候也需要指定Routing key,這樣根據Routing key就能把相應的消息發送到相應的隊列中去。

生產端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.BuiltinExchangeType;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;      import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Producer03 {      //聲明兩個隊列和一個交換機      //Routing 路由模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");//mq服務ip地址              connectionFactory.setPort(5672);//mq client連接端口              connectionFactory.setUsername("guest");//mq登錄用戶名              connectionFactory.setPassword("guest");//mq登錄密碼              connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器              //創建與RabbitMQ服務的TCP連接              connection = connectionFactory.newConnection();              //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務              channel = connection.createChannel();              //通道綁定交換機              /**               * 參數明細               * 1、交換機名稱               * 2、交換機類型,fanout、topic、direct、headers               */              //Routing 路由模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Routing 路由模式              channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);              channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);              //給email隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送email消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  //Routing 路由模式                  channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());                  System.out.println("mq消息發送成功!");              }              //給sms隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送sms消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  //Routing 路由模式                  channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());                  System.out.println("mq消息發送成功!");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  channel.close();              } catch (IOException e) {                  e.printStackTrace();              } catch (TimeoutException e) {                  e.printStackTrace();              }              try {                  connection.close();              } catch (IOException e) {                  e.printStackTrace();              }          }      }  }

郵件消費端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer03 {      //Routing 路由模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Routing 路由模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Routing 路由模式              channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_EMAIL,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

短訊消費端的代碼如下:

package xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer03 {      //Routing 路由模式      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Routing 路由模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              //Routing 路由模式              channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }              };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_SMS,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

四、Topics 模式

Topics 模式和Routing 路由模式最大的區別就是,Topics 模式發送消息和消費消息的時候是通過通配符去進行匹配的。

路由模式:

1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey

2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短訊、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定該隊列的routingkey(通配符)

7、制定消息

8、發送消息並指定routingkey(通配符)

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機並指定routingkey(通配符)

7、重寫消息消費方法

8、執行消息方法

按照假設的應用場景,Topics 模式也是一個生產端,兩個消費端,生產端隊列綁定交換機的時候,需要指定的routingkey是通配符,發送消息的時候綁定的routingkey也是通配符,消費端隊列綁定交換機的時候routingkey也是通配符,這樣就能根據通配符匹配到消息了。

生產端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.BuiltinExchangeType;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;      import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Producer04 {      //聲明兩個隊列和一個交換機      //Topics 模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");//mq服務ip地址              connectionFactory.setPort(5672);//mq client連接端口              connectionFactory.setUsername("guest");//mq登錄用戶名              connectionFactory.setPassword("guest");//mq登錄密碼              connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器              //創建與RabbitMQ服務的TCP連接              connection = connectionFactory.newConnection();              //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Topics 模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");              channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");              //給email隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送email消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());                  System.out.println("mq email 消息發送成功!");              }              //給sms隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送sms消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());                  System.out.println("mq sms 消息發送成功!");              }              //給email和sms隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送email sms消息。。。");                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());                  System.out.println("mq email sms 消息發送成功!");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  channel.close();              } catch (IOException e) {                  e.printStackTrace();              } catch (TimeoutException e) {                  e.printStackTrace();              }              try {                  connection.close();              } catch (IOException e) {                  e.printStackTrace();              }          }      }  }

郵件消費端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer04 {      private static final String QUEUE_EMAIL ="queueEmail";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_EMAIL,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

短訊消費端的代碼如下:

package xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.concurrent.TimeoutException;    public class Consumer04 {      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               */              channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_SMS,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

由於生產端同時發送了email的消息(10條),sms的消息(10條),email和sms同時收到的消息(10條),所以每個消費端都應收到各自的10條消息,加上同時都能收到的10條消息,每個消費端應該收到20條消息;

生產端控制台打印:

 郵件消費端控制台打印:

 短訊消費端的控制台打印:

 生產端執行後,RabbitMQ上的消息隊列情況:

 兩個消費端執行完後,RabbitMQ上的消息隊列情況:

 五、Header 模式

header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。

案例:

根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效。

根據假設使用場景,需要一個生產端,兩個消費端,不同的是,生產端聲明交換機時,交換機的類型不同,是headers類型,生產端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,發送消息時也是使用header中的 key/value(鍵值對)匹配隊列。

消費端同樣是聲明交換機時,交換機的類型不同,是headers類型,消費端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,消費消息時也是使用header中的 key/value(鍵值對)匹配隊列。

生產端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;      import java.io.IOException;  import java.util.Hashtable;  import java.util.Map;  import java.util.concurrent.TimeoutException;    public class Producer05 {      //聲明兩個隊列和一個交換機      //Header 模式      private static final String QUEUE_EMAIL ="queueEmail";      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");//mq服務ip地址              connectionFactory.setPort(5672);//mq client連接端口              connectionFactory.setUsername("guest");//mq登錄用戶名              connectionFactory.setPassword("guest");//mq登錄密碼              connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當於一個獨立的mq服務器              //創建與RabbitMQ服務的TCP連接              connection = connectionFactory.newConnection();              //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              //Header 模式              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短訊隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               * 4、               * String queue, String exchange, String routingKey, Map<String, Object> arguments               */              Map<String,Object> headers_email = new Hashtable<String,Object>();              headers_email.put("inform_type","email");              Map<String,Object> headers_sms = new Hashtable<String, Object>();              headers_sms.put("inform_type","sms");              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);              channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);              //給email隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送email消息。。。");                  Map<String,Object> headers = new Hashtable<String,Object>();                  headers.put("inform_type","email");//匹配email通知消費者綁定的header                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();                  properties.headers(headers);                  //Email通知                  channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());                  System.out.println("mq email 消息發送成功!");              }              //給sms隊列發消息              for(int i = 0;i<10;i++){                  String message = new String("mq 發送sms消息。。。");                  Map<String,Object> headers = new Hashtable<String,Object>();                  headers.put("inform_type","sms");//匹配sms通知消費者綁定的header                  /**                    * 消息發佈方法                    * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange                    * param2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列                    * param3:消息包含的屬性                    * param4:消息體                    * 這裡沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定                    * 默認的交換機,routingKey等於隊列名稱                   */                  //String exchange, String routingKey, BasicProperties props, byte[] body                  AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();                  properties.headers(headers);                  //sms通知                  channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());                  System.out.println("mq sms 消息發送成功!");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  channel.close();              } catch (IOException e) {                  e.printStackTrace();              } catch (TimeoutException e) {                  e.printStackTrace();              }              try {                  connection.close();              } catch (IOException e) {                  e.printStackTrace();              }          }      }  }

郵件消費端的代碼如下:

package com.xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.Hashtable;  import java.util.Map;  import java.util.concurrent.TimeoutException;    public class Consumer05 {      private static final String QUEUE_EMAIL ="queueEmail";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                            * 參數明細                            * 1、交換機名稱                            * 2、交換機類型,fanout、topic、direct、headers                            */              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               * 4、               * String queue, String exchange, String routingKey, Map<String, Object> arguments               */              Map<String,Object> headers_email = new Hashtable<String,Object>();              headers_email.put("inform_email","email");              channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_EMAIL,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

短訊消費端的代碼如下:

package xyfer;    import com.rabbitmq.client.*;    import java.io.IOException;  import java.util.Hashtable;  import java.util.Map;  import java.util.concurrent.TimeoutException;    public class Consumer05 {      private static final String QUEUE_SMS ="queueSms";      private static final String EXCHANGE = "messageChange";      public static void main(String[] args) {          Connection connection = null;          Channel channel = null;          try {              ConnectionFactory connectionFactory = new ConnectionFactory();              connectionFactory.setHost("127.0.0.1");              connectionFactory.setPort(5672);              connection = connectionFactory.newConnection();              channel = connection.createChannel();              //通道綁定交換機              /**                * 參數明細                * 1、交換機名稱                * 2、交換機類型,fanout、topic、direct、headers                */              channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);              //通道綁定隊列              /**               * 聲明隊列,如果Rabbit中沒有此隊列將自動創建               * param1:隊列名稱               * param2:是否持久化               * param3:隊列是否獨佔此連接               * param4:隊列不再使用時是否自動刪除此隊列               * param5:隊列參數               * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments               *               */              channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊列              //交換機和隊列綁定              /**               * 參數明細               * 1、隊列名稱               * 2、交換機名稱               * 3、路由key               * 4、               * String queue, String exchange, String routingKey, Map<String, Object> arguments               */              Map<String,Object> headers_email = new Hashtable<String,Object>();              headers_email.put("inform_email","sms");              channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email);              //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body              DefaultConsumer consumer = new DefaultConsumer(channel) {                  /**                    * 消費者接收消息調用此方法                    * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定                    * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌                      (收到消息失敗後是否需要重新發送)                    * @param properties                    * @param body                    * @throws IOException                   * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body                   */                  @Override                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                      //交換機                      String exchange = envelope.getExchange();                      //路由key                      String routingKey = envelope.getRoutingKey();                      envelope.getDeliveryTag();                      String msg = new String(body,"utf-8");                      System.out.println("mq收到的消息是:"+msg );                  }                };              System.out.println("消費者啟動成功!");              channel.basicConsume(QUEUE_SMS,true,consumer);          } catch (IOException e) {              e.printStackTrace();          } catch (TimeoutException e) {              e.printStackTrace();          }      }  }

生產端啟動後RabbitMQ上面的消息隊列情況:

六、RPC 模式

 RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的異步調用,基於Direct交換機實現,流程如下:

1、客戶端即是生產者也是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

2、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,得到方法返回的結果。

3、服務端將RPC方法 的結果發送到RPC響應隊列。

4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

 

至此,RabbitMQ的六種工作模式已經介紹完畢,手動代碼實現,實際體驗六種工作模式的不同。