消息中間件——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ!

  • 2019 年 10 月 3 日
  • 筆記

求關注

快速入門生產者與消費者,SpringBoot整合RabbitMQ!

前言

本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什麼意思呢?我們的生產者發送一條消息,投遞到RabbitMQ集群也就是Broker。
我們的消費端進行監聽RabbitMQ,當發現隊列中有消息後,就進行消費。

1. 環境準備

本次整合主要採用SpringBoot框架,需要對SpringBoot的使用有一定了解。

2.大概步驟

我們來看下大概步驟:

  • ConnectionFacorty:獲取連接工廠
  • Connection:一個連接
  • Channel:數據通訊信道,可發送和接收消息
  • Queue:具體的消息存儲隊列
  • Producer & Consumer 生產者和消費者

這個連接工廠需要配置一些相應的資訊,例如: RabbitMQ節點的地址,埠號,VirtualHost等等。
Channel是我們RabbitMQ所有消息進行交互的關鍵。

3. 項目實戰

3.1 連接工廠

  /**   *  * @ClassName: ConnectionUtils  * @Description: 連接工具類  * @author Coder編程  * @date 2019年6月21日 上午22:28:22  *   */  public class ConnectionUtils {      public static Connection getConnection() throws IOException, TimeoutException {          //定義連接工廠          ConnectionFactory factory = new ConnectionFactory();          //設置服務地址          factory.setHost("127.0.0.1");          //埠          factory.setPort(5672);//amqp協議 埠 類似與mysql的3306          //設置帳號資訊,用戶名、密碼、vhost          factory.setVirtualHost("/vhost_cp");          factory.setUsername("user_cp");          factory.setPassword("123456");          // 通過工程獲取連接          Connection connection = factory.newConnection();          return connection;      }  }  

3.2 生產端

  /**   *  * @ClassName: Producer  * @Description: 生產者  * @author Coder編程  * @date 2019年7月30日 上午21:04:43  *   */  public class Producer {          public static void main(String[] args) throws Exception {            System.out.println("Producer start...");            //1 創建ConnectionFactory          Connection connection = ConnectionUtils.getConnection();            //2 通過connection創建一個Channel          Channel channel = connection.createChannel();            //3 通過Channel發送數據          for(int i=0; i < 5; i++){              String msg = "Hello RabbitMQ!";              //1 exchange   2 routingKey              channel.basicPublish("", "test001", null, msg.getBytes());          }            //4 記得要關閉相關的連接          channel.close();          connection.close();      }  }      

3.3 消費端

  /**   *  * @ClassName: Consumer  * @Description: 消費端  * @author Coder編程  * @date 2019年7月30日 上午21:08:12  *   */  public class Consumer {        public static void main(String[] args) throws Exception {            System.out.println("Consumer start...");            //1 創建ConnectionFactory          Connection connection = ConnectionUtils.getConnection();            //2通過connection創建一個Channel          Channel channel = connection.createChannel();            //3聲明(創建)一個隊列          String queueName = "test001";          channel.queueDeclare(queueName, true, false, false, null);            //4創建消費者          QueueingConsumer queueingConsumer = new QueueingConsumer(channel);            //5設置Channel          channel.basicConsume(queueName, true, queueingConsumer);            while(true){              //6 獲取消息              Delivery delivery = queueingConsumer.nextDelivery();              String msg = new String(delivery.getBody());              System.err.println("消費端: " + msg);              //Envelope envelope = delivery.getEnvelope();          }        }  }        

3.4 源碼解析

  channel.queueDeclare(queueName, true, false, false, null);  

底層程式碼

第一個參數:queuename:隊列的名稱
第二個參數:durable 是否持久化。true消息會持久化到本地,保證重啟服務後消息不會丟失
第三個參數:exclusive :表示獨佔方式,設置為true 在某些情景下有必要,例如:順序消費。表示只有一個channel可以去監聽,其他channel都不能夠監聽。目的就是為了保證順序消費。
第四個參數:autoDelete:隊列如果與Exchange未綁定,則自動刪除
第五個參數:arguments:擴展參數

  channel.basicConsume(QUEUE_NAME, true, consumer);  

第二個參數 autoAck:自動簽收消息

3.5 運行程式

(1)啟動消費端
啟動消費端

(2)查看管控台
Overview

可以看到已經有一個連接,一個信道,一個消費者等資訊了。

Connections

Channels

可以看到信道目前的狀態是空閑狀態。

queues

隊列中多了test001隊列。

關於管控台的介紹可以看這篇文章:消息中間件——RabbitMQ(四)命令行與管控台的基本操作!

(3)運行生產端

運行生產端

消費端收到消息
可以看到生產端發送完消息之後停下了,消費端迅速接收到了消息。也可以繼續通過管控台觀察消費的情況。

(4) 問題

注意:

這裡面可能有一個問題:為什麼要先啟動消費端呢?

因為在消費端創建的隊列,我們必須要有隊列,才能夠發送消息。

另一個問題:在生產端程式碼中:

  channel.basicPublish("", "test001", null, msg.getBytes());  

並沒有設置exchange,只設置了隊列名稱,消費端卻依然能夠消費到消息,這是為什麼呢?

答:發消息的一定要指定Exchange,如果不指定Exchange或者Exchange為空的話,它會默認走第一個

默認Exchange
它的路由規則:將相同命名的隊列Queue的消息路由過去,如果路由不過去,將會把消息刪除。

文末

歡迎關注個人微信公眾號:Coder編程
獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq群:315211365,歡迎大家進群交流一起學習。謝謝了!也可以介紹給身邊有需要的朋友。

文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
微信公眾號

參考文章:

https://www.cnblogs.com/myJavaEE/p/6665166.html

《RabbitMQ消息中間件精講》

推薦文章:

消息中間件——RabbitMQ(二)各大主流消息中間件綜合對比介紹!

消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!

消息中間件——RabbitMQ(四)命令行與管控台的基本操作!