ActiveMQ的安裝與使用。

  • 2019 年 10 月 8 日
  • 筆記

1、什麼是ActiveMQ

 1 ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。   2 主要特點:   3   1). 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP   4   2). 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)。   5   3.) 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性。   6   4.) 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業伺服器上。   7   5). 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。   8   6). 支援通過JDBC和journal提供高速的消息持久化。   9   7). 從設計上保證了高性能的集群,客戶端-伺服器,點對點。  10   8). 支援Ajax。  11   9). 支援與Axis的整合。  12   10). 可以很容易得調用內嵌JMS provider,進行測試。

2、JMS介紹:

1 1)、JMS的全稱是Java Message Service,即Java消息服務。用於在兩個應用程式之間,或分散式系統中發送消息,進行非同步通訊。  2  3 2)、它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息後去完成對應的業務邏輯。

3、ActiveMQ的兩種消息形式。

 1   1)、對於消息的傳遞有兩種類型。   2     a)、一種是點對點的,即一個生產者和一個消費者一一對應。   3     b)、另一種是發布/訂閱模式,即一個生產者產生消息並進行發送後,可以由多個消費者進行接收。   4   5   2)、JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。   6     a)、 StreamMessage -- Java原始值的數據流。   7     b)、 MapMessage--一套名稱-值對。   8     c)、 TextMessage--一個字元串對象。   9     d)、 ObjectMessage--一個序列化的 Java對象。  10     e)、 BytesMessage--一個位元組的數據流。

4、ActiveMQ的安裝。官方網址:http://activemq.apache.org/

由於ActiveMQ是java開發的,所以需要先安裝jdk(注意:安裝jdk,需要jdk1.7以上版本)的哦。這裡使用的是apache-activemq-5.12.0-bin.tar.gz版本的。

開始進行解壓縮操作。

1 [root@localhost package]# ls  2 apache-activemq-5.12.0-bin.tar.gz  apache-activemq-5.12.0-bin.zip  apache-tomcat-7.0.47.tar.gz  IK Analyzer 2012FF_hf1  IK Analyzer 2012FF_hf1.rar  jdk-7u55-linux-i586.tar.gz  solr-4.10.3.tgz.tgz  zookeeper-3.4.6.tar.gz  3 [root@localhost package]# tar -zxvf apache-activemq-5.12.0-bin.tar.gz -C /home/hadoop/soft/

解壓縮完以後進入bin目錄。開始進行啟動操作。

啟動:[root@localhost bin]# ./activemq start

停止:[root@localhost bin]# ./activemq stop

查看狀態:[root@localhost bin]# ./activemq status

 1 [root@localhost soft]# cd apache-activemq-5.12.0/   2 [root@localhost apache-activemq-5.12.0]# ls   3 activemq-all-5.12.0.jar  bin  conf  data  docs  examples  lib  LICENSE  NOTICE  README.txt  webapps  webapps-demo   4 [root@localhost apache-activemq-5.12.0]# ll   5 total 9384   6 -rwxr-xr-x. 1 root root 9524668 Aug 10  2015 activemq-all-5.12.0.jar   7 drwxr-xr-x. 5 root root    4096 Sep 15 00:39 bin   8 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 conf   9 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 data  10 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 docs  11 drwxr-xr-x. 8 root root    4096 Sep 15 00:39 examples  12 drwxr-xr-x. 6 root root    4096 Sep 15 00:39 lib  13 -rw-r--r--. 1 root root   40580 Aug 10  2015 LICENSE  14 -rw-r--r--. 1 root root    3334 Aug 10  2015 NOTICE  15 -rw-r--r--. 1 root root    2610 Aug 10  2015 README.txt  16 drwxr-xr-x. 7 root root    4096 Sep 15 00:39 webapps  17 drwxr-xr-x. 3 root root    4096 Sep 15 00:39 webapps-demo  18 [root@localhost apache-activemq-5.12.0]# cd bin/  19 [root@localhost bin]# ls  20 activemq  activemq-diag  activemq.jar  env  linux-x86-32  linux-x86-64  macosx  wrapper.jar  21 [root@localhost bin]# ./activemq start  22 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'  23 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'  24 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details  25 INFO: pidfile created : '/home/hadoop/soft/apache-activemq-5.12.0//data/activemq.pid' (pid '9318')  26 [root@localhost bin]# ./activemq status  27 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'  28 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'  29 ActiveMQ is running (pid '9318')  30 [root@localhost bin]# 

然後你可以訪問後台管理介面,帳號和密碼默認都是admin的。訪問地址:http://192.168.110.142:8161/admin

Home是當前的歡迎頁,Queues是點到點形式,Topics是發布訂閱模式,Subscribers話題消息的發布與訂閱,Connections客戶端鏈接,Network當前網路的鏈接狀態,Scheduled計劃任務,Send可以測試發送消息。

5、ActiveMQ的使用方法,JMS消息發送模式。

注意: 1)、在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這裡,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被概括為:只有一個消費者將獲得消息。生產者不需要在接收者消費該消息期間處於運行狀態,接收者也同樣不需要在消息發送時處於運行狀態。每一個成功處理的消息都由接收者簽收。 2)、發布者/訂閱者模型支援向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息,在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。

6、JMS應用程式介面。

 1 1)、ConnectionFactory 介面(連接工廠)   2     用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的介面訪問連接,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。   3 2)、Connection 介面(連接)   4     連接代表了應用程式和消息伺服器之間的通訊鏈路。在獲得了連接工廠後,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。   5 3)、Destination 介面(目標)   6     目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然後用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。   7 4)、MessageConsumer 介面(消息消費者)   8     由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收隊列和主題類型的消息。   9 5)、MessageProducer 介面(消息生產者)  10     由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。  11 6)、Message 介面(消息)  12     是在消費者和生產者之間傳送的對象,也就是說從一個應用程式創送到另一個應用程式。一個消息有三個主要部分:  13         消息頭(必須):包含用於識別和為消息尋找路由的操作設置。  14         一組消息屬性(可選):包含額外的屬性,支援其他提供者和用戶的兼容。可以創建訂製的欄位和過濾器(消息選擇器)。  15         一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,位元組消息,流消息和對象消息)。  16         消息介面非常靈活,並提供了許多方式來訂製消息的內容。  17 7)、Session 介面(會話)  18     表示一個單執行緒的上下文,用於發送和接收消息。由於會話是單執行緒的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果用戶選擇了事務支援,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。

7、如何使用java操作activeMQ呢,把ActiveMQ依賴的jar包添加到工程中。

使用maven工程,則添加jar包的依賴:

1 <dependency>  2     <groupId>org.apache.activemq</groupId>  3     <artifactId>activemq-all</artifactId>  4     <version>5.11.2</version>  5 </dependency>

然後你就可以愉快得開發了。是不是很開森呢。

8、ActiveMQ點對點模式(point-to-point)。

ActiveMq的點對點生產者。

 1 package com.taotao.activemq;   2   3 import javax.jms.Connection;   4 import javax.jms.ConnectionFactory;   5 import javax.jms.JMSException;   6 import javax.jms.MessageProducer;   7 import javax.jms.Queue;   8 import javax.jms.Session;   9 import javax.jms.TextMessage;  10  11 import org.apache.activemq.ActiveMQConnectionFactory;  12 import org.apache.activemq.command.ActiveMQTextMessage;  13 import org.junit.Test;  14  15 /**  16  *  17  * @ClassName: ActiveMqMain.java  18  * @author: biehl  19  * @since: 2019年9月15日 下午4:44:57  20  * @Copyright: ©2019 biehl 版權所有  21  * @version: 0.0.1  22  * @Description:  23  */  24 public class ActiveMqMain {  25  26     // activeMq得點對點生產者  27     @Test  28     public void queueProducer() throws JMSException {  29         // 1、創建一個連接工廠對象ConnectionFactory對象。需要指定mq服務得ip以及埠號61616。  30         String brokerURL = "tcp://192.168.110.142:61616";  31         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);  32         // 2、使用ConnectionFactory創建一個連接Connection對象。  33         Connection connection = connectionFactory.createConnection();  34         // 3、開啟連接。調用Connection對象得start方法。  35         connection.start();  36         // 4、使用Connection對象創建一個Session對象。  37         // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略  38         // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。  39         boolean transacted = false;// 不開啟事務  40         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1  41         Session session = connection.createSession(transacted, acknowledgeMode);  42         // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic。現在應該使用queue。  43         String queueName = "queue1";// 當前消息隊列得名稱  44         Queue queue = session.createQueue(queueName);  45         // 6、使用Session對象創建一個Producer對象。  46         // interface Queue extends Destination。destination是一個介面。  47         MessageProducer producer = session.createProducer(queue);  48         // 7、創建一個TextMessage對象。  49         // 創建TextMessage方式一  50         // TextMessage textMessage = new ActiveMQTextMessage();  51         // textMessage.setText("hello activeMq......");  52         // 方式二  53         TextMessage textMessage = session.createTextMessage("hello activeMq......");  54         // 8、發送消息。  55         producer.send(textMessage);  56         // 9、關閉資源。  57         producer.close();// 關閉producer  58         session.close();// 關閉session  59         connection.close();// 關閉connection  60     }  61  62 }

ActiveMQ的點對點消息生產成功以後,可以在ActiveMQ提供的web介面可以看到一些資訊。

activeMq的點對點消費者。

  1 package com.taotao.activemq;    2    3 import java.io.IOException;    4    5 import javax.jms.Connection;    6 import javax.jms.ConnectionFactory;    7 import javax.jms.JMSException;    8 import javax.jms.Message;    9 import javax.jms.MessageConsumer;   10 import javax.jms.MessageListener;   11 import javax.jms.MessageProducer;   12 import javax.jms.Queue;   13 import javax.jms.Session;   14 import javax.jms.TextMessage;   15   16 import org.apache.activemq.ActiveMQConnectionFactory;   17 import org.apache.activemq.command.ActiveMQTextMessage;   18 import org.junit.Test;   19   20 /**   21  *   22  * @ClassName: ActiveMqMain.java   23  * @author: biehl   24  * @since: 2019年9月15日 下午4:44:57   25  * @Copyright: ©2019 biehl 版權所有   26  * @version: 0.0.1   27  * @Description:   28  */   29 public class ActiveMqMain {   30   31     // activeMq的點對點生產者   32     @Test   33     public void queueProducer() throws JMSException {   34         // 1、創建一個連接工廠對象ConnectionFactory對象。需要指定mq服務得ip以及埠號61616。   35         String brokerURL = "tcp://192.168.110.142:61616";   36         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);   37         // 2、使用ConnectionFactory創建一個連接Connection對象。   38         Connection connection = connectionFactory.createConnection();   39         // 3、開啟連接。調用Connection對象得start方法。   40         connection.start();   41         // 4、使用Connection對象創建一個Session對象。   42         // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略   43         // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。   44         boolean transacted = false;// 不開啟事務   45         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1   46         Session session = connection.createSession(transacted, acknowledgeMode);   47         // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic。現在應該使用queue。   48         String queueName = "queue1";// 當前消息隊列得名稱   49         Queue queue = session.createQueue(queueName);   50         // 6、使用Session對象創建一個Producer對象。   51         // interface Queue extends Destination。destination是一個介面。   52         MessageProducer producer = session.createProducer(queue);   53         // 7、創建一個TextMessage對象。   54         // 創建TextMessage方式一   55         // TextMessage textMessage = new ActiveMQTextMessage();   56         // textMessage.setText("hello activeMq......");   57         // 方式二   58         TextMessage textMessage = session.createTextMessage("hello activeMq......");   59         // 8、發送消息。   60         producer.send(textMessage);   61         // 9、關閉資源。   62         producer.close();// 關閉producer   63         session.close();// 關閉session   64         connection.close();// 關閉connection   65     }   66   67     // activeMq的點對點消費者   68     @Test   69     public void queueConsumer() throws JMSException {   70         // 1、創建一個連接工廠ConnectionFactory 對象   71         String brokerURL = "tcp://192.168.110.142:61616";   72         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);   73         // 2、使用連接工廠對象創建一個連接   74         Connection connection = connectionFactory.createConnection();   75         // 3、開啟連接   76         connection.start();   77         // 4、使用連接對象創建一個Session對象   78         boolean transacted = false;// 關閉事務   79         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 自動響應   80         Session session = connection.createSession(transacted, acknowledgeMode);   81         // 5、使用Session創建一個Destination,Destination應該和消息的發送端一致的。   82         String queueName = "queue1";   83         Queue queue = session.createQueue(queueName);   84         // 6、使用Session創建一個Consumer對象。   85         MessageConsumer consumer = session.createConsumer(queue);   86         // 7、向Consumer對象中設置一個MessageListener對象,用來接受消息。   87         // 匿名內部類,new 介面,後面加上{},相當於實現了這個介面的實現類。然後創建這個實現類的對象listener。   88         MessageListener listener = new MessageListener() {   89   90             @Override   91             public void onMessage(Message message) {   92                 // 接受事件的。當消息到達就可以在這裡接受到消息了的。   93                 // 8、取出消息的內容。   94                 if (message instanceof TextMessage) {   95                     TextMessage textMessage = (TextMessage) message;   96                     // 9、列印消息內容。   97                     try {   98                         String text = textMessage.getText();   99                         System.out.println(text);  100                     } catch (JMSException e) {  101                         e.printStackTrace();  102                     }  103                 }  104             }  105         };  106         consumer.setMessageListener(listener);  107  108         // 關閉資源以前,系統等待,等待接受消息。  109         /*while (true) {  110             try {  111                 Thread.sleep(100);  112             } catch (InterruptedException e) {  113                 e.printStackTrace();  114             }  115         }*/  116  117         // 等待鍵盤輸入。才回接著向下執行的。  118         try {  119             System.in.read();  120         } catch (IOException e) {  121             e.printStackTrace();  122         }  123  124  125         // 10、關閉資源。  126         consumer.close();// 關閉consumer  127         session.close();// 關閉session  128         connection.close();// 關閉connection  129     }  130  131 }

執行了activeMq的點對點消費者。可以在介面看到變化。可以看到有一個消費者,然後生產了7條消息,7條消息進隊和7條消息出隊。

9、ActiveMQ發布訂閱模式(publish/subscribe)。

消費者有兩種消費方法(這裡使用非同步消費):   a、同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。

  b、非同步消費。客戶可以為消費者註冊一個消息監聽器,以定義在消息到達時所採取的動作。

    實現MessageListener介面,在MessageListener()方法中實現消息的處理邏輯。

  1 package com.taotao.activemq;    2    3 import java.io.IOException;    4    5 import javax.jms.Connection;    6 import javax.jms.ConnectionFactory;    7 import javax.jms.JMSException;    8 import javax.jms.Message;    9 import javax.jms.MessageConsumer;   10 import javax.jms.MessageListener;   11 import javax.jms.MessageProducer;   12 import javax.jms.Session;   13 import javax.jms.TextMessage;   14 import javax.jms.Topic;   15   16 import org.apache.activemq.ActiveMQConnectionFactory;   17 import org.junit.Test;   18   19 /**   20  * Active的發布訂閱模式   21  *   22  * @ClassName: ActiveMqTopics.java   23  * @author: biehl   24  * @since: 2019年9月19日 上午10:51:14   25  * @Copyright: ©2019 biehl 版權所有   26  * @version: 0.0.1   27  * @Description:   28  */   29 public class ActiveMqTopics {   30   31     // 發布訂閱模式,生產者。topic生產者生產消息默認不持久化客戶端的。   32     @Test   33     public void topicProducer() {   34         try {   35             // 1、創建一個連接工廠對象。需要指定mq服務的ip地址以及埠號61616   36             String brikerURL = "tcp://192.168.110.142:61616";   37             // 創建ConnectionFactory介面對象,實現類ActiveMQConnectionFactory   38             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brikerURL);   39   40             // 2、創建Connection連接   41             Connection connection = connectionFactory.createConnection();   42   43             // 3、開啟連接,調用Connection的start方法。   44             connection.start();   45   46             // 4、創建Session,使用Connection對象創建一個session   47             // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略   48             boolean transacted = false;   49             // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。   50             int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;   51             Session session = connection.createSession(transacted, acknowledgeMode);   52   53             // 5、創建Destination,應該使用topic,區別於點對點的queue   54             String topicName = "topic01";   55             Topic topic = session.createTopic(topicName);   56   57             // 6、創建一個Producer對象   58             // interface Topic extends Destination.   59             // Destination是一個介面,Topic介面繼承Destination這個介面。   60             MessageProducer producer = session.createProducer(topic);   61   62             // 7、創建一個TextMessage對象   63             String message = null;   64             TextMessage textMessage = null;   65             for (int i = 0; i < 100; i++) {   66                 message = i + " ActiveMQ topics......";   67                 textMessage = session.createTextMessage(message);   68   69                 // 8、發送消息   70                 producer.send(textMessage);   71             }   72   73             // 9、關閉資源   74             producer.close();// 關閉producer   75             session.close();// 關閉session   76             connection.close();// 關閉connection   77         } catch (JMSException e) {   78             e.printStackTrace();   79         }   80     }   81   82     // 發布訂閱模式,消費者必須一直等待生產者生產的消息,因為發布訂閱模式不持久化。   83     @Test   84     public void topicConsumer() {   85         try {   86             // 1、創建一個連接工廠對象   87             String brokerURL = "tcp://192.168.110.142:61616";   88             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);   89   90             // 2、使用連接工廠對象創建一個連接   91             Connection connection = connectionFactory.createConnection();   92   93             // 3、開啟連接   94             connection.start();   95   96             // 4、使用連接對象創建一個Session對象   97             // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略   98             boolean transacted = false;   99             // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。  100             int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;  101             Session session = connection.createSession(transacted, acknowledgeMode);  102  103             // 5、使用session創建destination,注意,destination應該和消息的發送端一致的。  104             String topicName = "topic01";  105             Topic topic = session.createTopic(topicName);  106  107             // 6、使用session創建一個consumer對象  108             MessageConsumer consumer = session.createConsumer(topic);  109  110             // 7、向Consumer對象中設置一個MessageListener對象,用來接受消息。  111             // 匿名內部類,new 介面,後面加上{},相當於實現了這個介面的實現類。然後創建這個實現類的對象listener。  112             MessageListener listener = new MessageListener() {  113                 // 接受事件的。當消息到達就可以在這裡接受到消息了的。  114                 // 8、取出消息的內容。  115                 @Override  116                 public void onMessage(Message message) {  117                     if (message instanceof TextMessage) {  118                         TextMessage textMessage = (TextMessage) message;  119                         // 9、列印消息內容。  120                         try {  121                             String text = textMessage.getText();  122                             System.out.println(text);  123                         } catch (JMSException e) {  124                             e.printStackTrace();  125                         }  126                     }  127                 }  128             };  129             consumer.setMessageListener(listener);  130  131             // 啟動三次,模擬是三個消費者  132             System.out.println("消費者1.......");  133             // System.out.println("消費者2.......");  134             // System.out.println("消費者3.......");  135  136             // 等待鍵盤輸入。才回接著向下執行的。  137             try {  138                 System.in.read();  139             } catch (IOException e) {  140                 e.printStackTrace();  141             }  142  143             // 9、關閉資源  144             consumer.close();// 關閉producer  145             session.close();// 關閉session  146             connection.close();// 關閉connection  147         } catch (JMSException e) {  148             e.printStackTrace();  149         }  150  151     }  152  153 }

執行了activeMq的發布訂閱模式。可以在介面看到變化。可以看到有三個消費者,然後生產了201條消息,201條消息進隊和603條消息出隊。

10、ActiveMQ與Spring整合如下所示:

在pom.xml配置文件中引入自己的依賴的jar包。

1 <dependency>  2     <groupId>org.springframework</groupId>  3     <artifactId>spring-jms</artifactId>  4 </dependency>  5 <dependency>  6     <groupId>org.springframework</groupId>  7     <artifactId>spring-context-support</artifactId>  8 </dependency>

在配置文件applicationContext-activemq.xml裡面配置ConnectionFactory。如下所示:

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <beans xmlns="http://www.springframework.org/schema/beans"   3     xmlns:context="http://www.springframework.org/schema/context"   4     xmlns:p="http://www.springframework.org/schema/p"   5     xmlns:aop="http://www.springframework.org/schema/aop"   6     xmlns:tx="http://www.springframework.org/schema/tx"   7     xmlns:jms="http://www.springframework.org/schema/jms"   8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   9     xsi:schemaLocation="http://www.springframework.org/schema/beans  10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  11     http://www.springframework.org/schema/context  12     http://www.springframework.org/schema/context/spring-context-4.0.xsd  13     http://www.springframework.org/schema/aop  14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  15     http://www.springframework.org/schema/tx  16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd  17     http://www.springframework.org/schema/jms  18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  19     http://www.springframework.org/schema/util  20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">  21  22  23     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->  24     <bean id="targetConnectionFactory"  25         class="org.apache.activemq.ActiveMQConnectionFactory">  26         <property name="brokerURL"  27             value="tcp://192.168.110.142:61616" />  28     </bean>  29  30     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  31     <bean id="connectionFactory"  32         class="org.springframework.jms.connection.SingleConnectionFactory">  33         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  34         <property name="targetConnectionFactory"  35             ref="targetConnectionFactory" />  36     </bean>  37 </beans>

開始配置生產者的spring配置。

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <beans xmlns="http://www.springframework.org/schema/beans"   3     xmlns:context="http://www.springframework.org/schema/context"   4     xmlns:p="http://www.springframework.org/schema/p"   5     xmlns:aop="http://www.springframework.org/schema/aop"   6     xmlns:tx="http://www.springframework.org/schema/tx"   7     xmlns:jms="http://www.springframework.org/schema/jms"   8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   9     xsi:schemaLocation="http://www.springframework.org/schema/beans  10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  11     http://www.springframework.org/schema/context  12     http://www.springframework.org/schema/context/spring-context-4.0.xsd  13     http://www.springframework.org/schema/aop  14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  15     http://www.springframework.org/schema/tx  16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd  17     http://www.springframework.org/schema/jms  18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  19     http://www.springframework.org/schema/util  20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">  21  22  23     <!-- 1、真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->  24     <bean id="targetConnectionFactory"  25         class="org.apache.activemq.ActiveMQConnectionFactory">  26         <property name="brokerURL"  27             value="tcp://192.168.110.142:61616" />  28     </bean>  29  30     <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  31     <bean id="connectionFactory"  32         class="org.springframework.jms.connection.SingleConnectionFactory">  33         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  34         <!-- 給屬性targetConnectionFactory傳值 -->  35         <property name="targetConnectionFactory"  36             ref="targetConnectionFactory" />  37     </bean>  38  39     <!-- 3、開始配置生產者配置 -->  40     <!-- 配置生產者 -->  41     <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->  42     <bean id="jmsTemplate"  43         class="org.springframework.jms.core.JmsTemplate">  44         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  45         <!-- 給屬性connectionFactory傳值 -->  46         <property name="connectionFactory" ref="connectionFactory" />  47     </bean>  48  49     <!-- 4、配置消息的Destination對象 -->  50     <!-- 點對點模式 -->  51     <!-- 這個是隊列目的地,點對點的。 -->  52     <bean id="queueDestination"  53         class="org.apache.activemq.command.ActiveMQQueue">  54         <constructor-arg>  55             <!-- 給ActiveMQQueue構造參數傳遞一個值為queue -->  56             <value>queue</value>  57         </constructor-arg>  58     </bean>  59  60     <!-- 發布訂閱模式 -->  61     <!-- 這個是主題目的地,一對多的。 -->  62     <bean id="topicDestination"  63         class="org.apache.activemq.command.ActiveMQTopic">  64         <!-- 給ActiveMQTopic構造參數傳遞一個值為topic -->  65         <constructor-arg value="topic" />  66     </bean>  67  68 </beans>

生產者測試程式碼如下所示:

可以根據之前的消費者測試一下,消息的消費。

 1 package com.taotao.activemq;   2   3 import javax.jms.Destination;   4 import javax.jms.JMSException;   5 import javax.jms.Message;   6 import javax.jms.Session;   7 import javax.jms.TextMessage;   8   9 import org.junit.Test;  10 import org.springframework.context.ApplicationContext;  11 import org.springframework.context.support.ClassPathXmlApplicationContext;  12 import org.springframework.jms.core.JmsTemplate;  13 import org.springframework.jms.core.MessageCreator;  14  15 /**  16  *  17  * @ClassName: SpringActiveMQ.java  18  * @author: biehl  19  * @since: 2019年9月19日 下午7:01:43  20  * @Copyright: ©2019 biehl 版權所有  21  * @version: 0.0.1  22  * @Description:  23  */  24 public class SpringActiveMQ {  25  26     // 使用spring與activemq整合,是喲個jmsTemplate發送消息  27     @Test  28     public void jmsTemplateProducer() {  29         // 1、初始化spring容器  30         ApplicationContext applicationContext = new ClassPathXmlApplicationContext(  31                 "classpath:/spring/applicationContext-activemq.xml");  32         // 2、從容器中獲得jmsTemplate對象。根據類型獲取到bean的對象  33         JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);  34         // 3、從容器中獲得Destination對象。根據名稱獲取到bean的對象  35         Destination destination = (Destination) applicationContext.getBean("queueDestination");  36  37         // 4、發送消息  38         jmsTemplate.send(destination, new MessageCreator() {  39  40             @Override  41             public Message createMessage(Session session) throws JMSException {  42                 // 定義一個消息  43                 String message = "hello activeMq......";  44                 // 發送消息  45                 TextMessage textMessage = session.createTextMessage(message);  46                 return textMessage;  47             }  48         });  49     }  50  51 }

效果如下所示:

開始配置消費者的spring配置。

  1)、注意:那麼消費者是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收資訊,並把接收到的資訊分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。   2)、對於消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪裡去監聽,也就是說它還需要知道去監聽哪個JMS伺服器,這是通過在配置MessageConnectionFactory的時候往裡面注入一個ConnectionFactory來實現的。   3)、所以在配置一個MessageListenerContainer的時候有三個屬性必須指定:     a、一個是表示從哪裡監聽的ConnectionFactory     b、一個是表示監聽什麼的Destination;     c、一個是接收到消息以後進行消息處理的MessageListener。   4)、常用的MessageListenerContainer實現類是DefaultMessageListenerContainer。

 1 <?xml version="1.0" encoding="UTF-8"?>   2 <beans xmlns="http://www.springframework.org/schema/beans"   3     xmlns:context="http://www.springframework.org/schema/context"   4     xmlns:p="http://www.springframework.org/schema/p"   5     xmlns:aop="http://www.springframework.org/schema/aop"   6     xmlns:tx="http://www.springframework.org/schema/tx"   7     xmlns:jms="http://www.springframework.org/schema/jms"   8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   9     xsi:schemaLocation="http://www.springframework.org/schema/beans  10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  11     http://www.springframework.org/schema/context  12     http://www.springframework.org/schema/context/spring-context-4.0.xsd  13     http://www.springframework.org/schema/aop  14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  15     http://www.springframework.org/schema/tx  16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd  17     http://www.springframework.org/schema/jms  18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  19     http://www.springframework.org/schema/util  20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">  21  22  23     <!-- 1、真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->  24     <bean id="targetConnectionFactory"  25         class="org.apache.activemq.ActiveMQConnectionFactory">  26         <property name="brokerURL"  27             value="tcp://192.168.110.142:61616" />  28     </bean>  29  30     <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  31     <bean id="connectionFactory"  32         class="org.springframework.jms.connection.SingleConnectionFactory">  33         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  34         <!-- 給屬性targetConnectionFactory傳值 -->  35         <property name="targetConnectionFactory"  36             ref="targetConnectionFactory" />  37     </bean>  38  39     <!-- 3、配置消息的Destination對象。接受消息的目的地。 -->  40     <!-- 點對點模式 -->  41     <!-- 這個是隊列目的地,點對點的。 -->  42     <bean id="queueDestination"  43         class="org.apache.activemq.command.ActiveMQQueue">  44         <constructor-arg>  45             <!-- 給ActiveMQQueue構造參數傳遞一個值為queue -->  46             <value>queue</value>  47         </constructor-arg>  48     </bean>  49  50     <!-- 發布訂閱模式 -->  51     <!-- 這個是主題目的地,一對多的。 -->  52     <bean id="topicDestination"  53         class="org.apache.activemq.command.ActiveMQTopic">  54         <!-- 給ActiveMQTopic構造參數傳遞一個值為topic -->  55         <constructor-arg value="topic" />  56     </bean>  57  58     <!-- 4、配置消息接收者 -->  59     <!-- 配置一個監聽器 -->  60     <bean id="activeMqMessageListener"  61         class="com.taotao.search.listener.ActiveMqMessageListener" />  62  63     <!-- 配置監聽容器 -->  64     <bean  65         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  66         <!-- 屬性設置 -->  67         <!-- 一個是表示從哪裡監聽的ConnectionFactory -->  68         <property name="connectionFactory" ref="connectionFactory" />  69         <!-- 一個是表示監聽什麼的Destination -->  70         <property name="destination" ref="queueDestination" />  71         <!-- 一個是接收到消息以後進行消息處理的MessageListener -->  72         <property name="messageListener" ref="activeMqMessageListener" />  73     </bean>  74  75  76 </beans>

然後可以寫消息監聽器,用來監聽生產者生產的消息,以便實現自己的業務邏輯。

 1 package com.taotao.search.listener;   2   3 import java.text.SimpleDateFormat;   4 import java.util.Date;   5   6 import javax.jms.JMSException;   7 import javax.jms.Message;   8 import javax.jms.MessageListener;   9 import javax.jms.TextMessage;  10  11 /**  12  * 接受ActiveMQ發送的消息.  13  *  14  * @ClassName: ActiveMqMessageListener.java  15  * @author: biehl  16  * @since: 2019年9月19日 下午7:55:24  17  * @Copyright: ©2019 biehl 版權所有  18  * @version: 0.0.1  19  * @Description:  20  */  21 public class ActiveMqMessageListener implements MessageListener {  22  23     @Override  24     public void onMessage(Message message) {  25         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  26         System.out.println("監聽生產者生產的消息,消費者進行消息消費.......");  27         // 消息到了onMessage就接受到了消息  28         if (message instanceof TextMessage) {  29             TextMessage textMessage = (TextMessage) message;  30             try {  31                 String text = textMessage.getText();  32                 System.out.println(sdf.format(new Date()) + " : " + text);  33             } catch (JMSException e) {  34                 e.printStackTrace();  35             }  36         }  37     }  38  39 }

由於這裡只是簡單的測試,如果是正式項目的話,直接載入這個配置文件,然後就可以進行消息的監聽消費,我這裡只是載入一下這個配置文件即可。

 1 package com.taotao.search.service;   2   3 import java.io.IOException;   4   5 import org.springframework.context.ApplicationContext;   6 import org.springframework.context.support.ClassPathXmlApplicationContext;   7   8 /**   9  *  10  * @ClassName: ActiveMqConsumer.java  11  * @author: biehl  12  * @since: 2019年9月19日 下午8:10:55  13  * @Copyright: ©2019 biehl 版權所有  14  * @version: 0.0.1  15  * @Description:  16  */  17 public class ActiveMqConsumer {  18  19     // 啟動spring容器。就可以實現監聽生產者發送消息,消費者消費小的目的地。  20     public static void main(String[] args) {  21         // 初始化spring容器  22         ApplicationContext applicationContext = new ClassPathXmlApplicationContext(  23                 "classpath:/spring/applicationContext-activemq.xml");  24         System.out.println("spring容器載入完畢,開始監聽生產者生產的消息.......");  25         try {  26             System.in.read();  27         } catch (IOException e) {  28             e.printStackTrace();  29         }  30     }  31 }

實現效果如下所示:

控制台列印如下所示,只要你生產消息,這裡就可以進行消息的消費。

待續……