ActiveMQ學習總結——實戰操作(上)02

  • 2019 年 10 月 11 日
  • 筆記

相信大家通過上一篇博文已經對ActiveMQ有了一個大致的概念了,

那麼本篇博文將帶領大家一步一步去實戰操作我們的ActiveMQ

本篇主要內容:

  1.ActiveMQ術語及API介紹

  2.ActiveMQ 文本消息處理

  3.ActiveMQ 對象消息處理


既然我們要學習如何實戰操作了,那麼久不可不知它的一些術語和API的用於

即使覺得枯燥,我們也要大致看一眼,過後相信你還會再來看的!因為很有用

 

 

一 ActiveMQ 的術語(這麼碉堡的東西怎麼會沒有寫讓人覺得更碉堡的詞彙呢?)

1 Destination

目的地,JMS Provider(消息中間件)負責維護,用於對 Message 進行管理的對象。
MessageProducer 需要指定 Destination 才能發送消息,MessageReceiver 需要指定 Destination
才能接收消息。

 

2 Producer

消息生成者,負責發送 Message 到目的地。

 

3 Consumer | Receiver

消息消費者,負責從目的地中消費【處理|監聽|訂閱】Message。

 

4 Message

消息,消息封裝一次通信的內容。

 


 

 

二 ActiveMQ API簡介

下述API都是接口類型,由定義在javax.jms包中

都是JMS(Java Message Service)標準接口定義

 

1.ConnectionFactory

  鏈接工廠,用於創建鏈接的工廠類型

2.Connection

  鏈接,用於建立訪問ActiveMQ連接的類型,由鏈接工廠ConnectionFactory創建

3.Session

  會話,一次持久有效有狀態的訪問,由鏈接創建

4.Destination & Queue

  目的地。用於描述本次訪問ActiveMQ的消息訪問目的地,即ActiveMQ服務中的具體隊列,由會話創建

  interface Queue extends Destination

5.MessageProducer

  消息生成者、在一次有效會話中,用於發送消息給ActiveMQ服務的工具,由會話創建

6.MessageConsumer

  消息消費者【消息訂閱者|消息處理着】,在一次有效會話中,用於從ActiveMQ服務中獲取消息的工具,由會話創建

7.Message

  消息。通過消息生成者向ActiveMQ服務發送消息時使用的數據載體對象或消費者從ActiveMQ服務中獲取消息時使用的數據載體對象。是所有消息【文本消息|對象消息等】具體類型的頂級接口,可以通過會話創建或通過會話從ActiveMQ服務中獲取

 

 

 


 

三 ActiveMQ處理文本消息

:*本案例將以producer(消息生產者)和consumer(消息發送者)連個模塊為例

 

準備工作:

  確保你的ActiveMQ已經開啟,並且所依賴的端口已經關閉(建議直接關閉防火牆 service iptables stop)

  Maven環境

 

1 消息生產者

1.1創建模塊

 

 

1,2在POM文件添加ActiveMQ的依賴(根據個人的ActiveMQ版本自行查找)

 <dependencies>          <dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>      </dependencies>

 

1.3 編寫消息的生產者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTextProducer {        public void sendTextActiveMQ(String msg) {          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的發送者          MessageProducer producer = null;            //定義消息          Message message = null;              try {              /**               * userName:訪問ActiveMQ服務的用戶名,默認為admin。               * password:訪問ActiveMQ服務的密碼,默認為admin               * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改               *               * borkerURL:訪問ActiveMQ服務的路徑地址。               *  路徑結構為:協議名://主機地址:端口號               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介紹               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動連接              connection.start();                /**               * 參數一:transacted 是否使用事務 可選值為:true|false               *      ture:使用事務 設置第二個參數變量這為 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事務,則設置我們的參數即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制               * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制               這裡設置 這兩個參數的含義為:               不使用事務,並由Session自動確認提交               * 這裡對此不作過多講解,初學者跟着敲,其後在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列              //名稱不固定              destination = session.createQueue("hello-mq");                //創建消息的生產者 需要指定目的地(也就是把Destination傳入參數)              producer = session.createProducer(destination);                //創建消息對象 並傳入我們需要放入隊列的消息              message = session.createTextMessage(msg);                //發送消息              producer.send(message);            } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息發送者資源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }          }  }

 

 

2. 消息消費者

2.1創建模塊

 

 

 

2,2在POM文件添加ActiveMQ的依賴(根據個人的ActiveMQ版本自行查找)

 <dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

2.3編寫消息的消費者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTextConsumer {        public void receiveTextActiveMQ() {          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的消費者(接收者)          MessageConsumer consumer = null;            //定義消息          Message message = null;              try {              /**               * userName:訪問ActiveMQ服務的用戶名,默認為admin。               * password:訪問ActiveMQ服務的密碼,默認為admin               * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改               *               * borkerURL:訪問ActiveMQ服務的路徑地址。               *  路徑結構為:協議名://主機地址:端口號               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介紹               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動連接              connection.start();                /**               * 參數一:transacted 是否使用事務 可選值為:true|false               *      ture:使用事務 設置第二個參數變量這為 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事務,則設置我們的參數即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制               * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制               這裡設置 這兩個參數的含義為:               不使用事務,並由Session自動確認提交               * 這裡對此不作過多講解,初學者跟着敲,其後在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列              //名稱不固定              destination = session.createQueue("hello-mq");                //創建消息的消費者 需要指定目的地(也就是把Destination傳入參數)              consumer = session.createConsumer(destination);                //創建消息對象 由消費者接收消息              message = consumer.receive();                //處理消息              String msg = ((TextMessage) message).getText();              System.out.println("ActiveMQ say:" + msg);          } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息接收者資源              if (consumer != null) {                  try {                      consumer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }        }  }

 

 

3 測試成果

定義測試類,來測試我們的成果!

 

3.1 定義producer測試類

package cn.arebirth.mq;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQTextProducer producer = new ActiveMQTextProducer();          producer.sendTextActiveMQ("Hello,ActiveMQ!");      }  }

當我們執行完這段代碼後,我們打開我們的ActiveMQ的控制面板(首先你要啟動你的ActiveMQ,上一篇有介紹到!)

 

 

接下來,當我們看到此圖片內容即說明producer成果運行!(hello-mq我們自定義的隊列名稱!)

 

 

 


 

3.2 定義 Consumer測試類

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQTextConsumer consumer = new ActiveMQTextConsumer();          consumer.receiveTextActiveMQ();      }  }

 

運行結果出現如下內容即成果!

 

 

 

 

 

當我們仔細觀察ActiveMQ控制台的時候會發現,

  producer每產生一條消息對應的Number Of Pending Message和Message Enqueued就+1

  consumer每次啟動消費消息的時候,Number Of pending Message就會-1  Message Dequeued +1 

為什麼呢?哈哈  相信大家已經大概猜出來了或者看英文的意思  

介紹下:

  • Name:消息的名字
  • Number Of Pending Message:為消費的消息數量
  • Number Of Consumers:消費者的數量
  • Message Enqueued:隊列共有過多少消息
  • Message Dequeued:消費者消費了多少條消息

 

鼓搗完文本消息,那麼有的人會說,只能鼓搗文本??錯了,接下來我們搞實用的對象!(不是你和我,是我們和電腦 -. -)


 

 

三 ActiveMQ處理對象消息

有了上面處理文本消息的基礎,我們將很容易掌握處理對象消息的能力!這裏面的步驟,我們將略省一部分,和上面及其相似

 

1 對象創建(處理對象消息,那麼一定是要有個數據載體的對象啦,我們自己創建一個即可     生產者和消費者均需要此對象!)

package cn.arebirth.pojo;    import java.io.Serializable;    /**   * 一定要序列化!!   */  public class User implements Serializable {      private String username;      private String pwd;      private String content;        @Override      public String toString() {          return "User{" +                  "username='" + username + ''' +                  ", pwd='" + pwd + ''' +                  ", content='" + content + ''' +                  '}';      }        public User() {      }        public User(String username, String pwd, String content) {          this.username = username;          this.pwd = pwd;          this.content = content;      }        public String getUsername() {          return username;      }        public void setUsername(String username) {          this.username = username;      }        public String getPwd() {          return pwd;      }        public void setPwd(String pwd) {          this.pwd = pwd;      }        public String getContent() {          return content;      }        public void setContent(String content) {          this.content = content;      }  }

 

2 消息生產者

2.1 還是需要pom的jar包依賴的的,如果還在原來的環境上,則不需要添加啦!

<dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

2.2 編寫消息的生產者

package cn.arebirth.mq;    import cn.arebirth.pojo.User;  import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQObjectProducer {        public void sendObjectActiveMQ(User user){          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的發送者          MessageProducer producer = null;            //定義消息          Message message = null;              try {              /**               * userName:訪問ActiveMQ服務的用戶名,默認為admin。               * password:訪問ActiveMQ服務的密碼,默認為admin               * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改               *               * borkerURL:訪問ActiveMQ服務的路徑地址。               *  路徑結構為:協議名://主機地址:端口號               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介紹               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動連接              connection.start();                /**               * 參數一:transacted 是否使用事務 可選值為:true|false               *      ture:使用事務 設置第二個參數變量這為 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事務,則設置我們的參數即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制               * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制               這裡設置 這兩個參數的含義為:               不使用事務,並由Session自動確認提交               * 這裡對此不作過多講解,初學者跟着敲,其後在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列              //名稱不固定              destination = session.createQueue("hello-mq-obj");                //創建消息的生產者 需要指定目的地(也就是把Destination傳入參數)              producer = session.createProducer(destination);                //創建消息對象 並傳入我們需要放入隊列的消息              message = session.createObjectMessage(user);                //發送消息              producer.send(message);            } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息發送者資源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }      }  }

 

 

3 消息消費者

3.1 還是需要pom的jar包依賴的的,如果還在原來的環境上,則不需要添加啦!

<dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

3.2  編寫消息的消費者

package cn.arebirth.mq;    import cn.arebirth.pojo.User;  import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQObjectConsumer {      public void receiveObjectActiveMQ() {          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的消費者(接收者)          MessageConsumer consumer = null;            //定義消息          Message message = null;              try {              /**               * userName:訪問ActiveMQ服務的用戶名,默認為admin。               * password:訪問ActiveMQ服務的密碼,默認為admin               * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改               *               * borkerURL:訪問ActiveMQ服務的路徑地址。               *  路徑結構為:協議名://主機地址:端口號               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介紹               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動連接              connection.start();                /**               * 參數一:transacted 是否使用事務 可選值為:true|false               *      ture:使用事務 設置第二個參數變量這為 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事務,則設置我們的參數即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制               * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制               這裡設置 這兩個參數的含義為:               不使用事務,並由Session自動確認提交               * 這裡對此不作過多講解,初學者跟着敲,其後在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列              //名稱不固定              destination = session.createQueue("hello-mq-obj");                //創建消息的消費者 需要指定目的地(也就是把Destination傳入參數)              consumer = session.createConsumer(destination);                //創建消息對象 由消費者接收消息              message = consumer.receive();                //處理消息              ObjectMessage objectMessage = (ObjectMessage) message;              User user = (User) objectMessage.getObject();              System.out.println("ActiveMQ say:" + user);          } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息接收者資源              if (consumer != null) {                  try {                      consumer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }        }  }

 

 

 

4 創建測試類

4.1 Produce生產者測試代碼

package cn.arebirth.mq;    import cn.arebirth.pojo.User;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQObjectProducer producer = new ActiveMQObjectProducer();          producer.sendObjectActiveMQ(new User("Arebirth","123","Hello,ActiveMQ!"));      }  }

 

當我們執行完這段代碼後,我們打開我們的ActiveMQ的控制面板(首先你要啟動你的ActiveMQ,上一篇有介紹到!)

 

 會看到 隊列中增加了我們的消息!

 

4.2 Consumer消費者測試代碼

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQObjectConsumer consumer = new ActiveMQObjectConsumer();          consumer.receiveObjectActiveMQ();      }  }

當我們執行後,出現下面的內容即可!成功啦!

 

 在看下,控制面板,

 

 

是不是已經和上一個列子一樣啦!

 

 

ps

  本篇內容就到這裡了,希望各位勤加敲代碼,畢竟代碼不是看出來的~