ActiveMQ學習總結——原生實戰操作(下)03

  • 2019 年 10 月 15 日
  • 筆記

本篇將繼續延續上一篇的內容,作為知識補充篇,為接下來我們學習spring整合ActiveMQ打好基礎

本篇主要學習內容:

  1.ActiveMQ 隊列服務監聽

  2.ActiveMQ Topic模型


 

回顧下上一篇ActiveMQ學習總結我們學習到了:

  1.ActiveMQ術語及API介紹

  2.ActiveMQ 文本消息處理

  3.ActiveMQ 對象消息處理

相信大現在對ActiveMQ的一些簡單操作已經很輕鬆掌握了

上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.html


 

 

一 ActiveMQ實現隊列服務監聽

在我們上一篇的練習中,所有的消費者都是接收一次消息即斷開連接,這樣是不是很不方便。

試想一下,如果我們的provider在consumer接收完第一條消息後又繼續發送了一條消息,那麼consumer已經斷開連接了,是不是就不能連接不間斷的實時獲取消息?

解決方案:

  很容易,用我們的隊列服務監聽即可

 

*:根據上一章的學習,大家對環境搭建使用配置,肯定都已經相當清楚了,這裡就不過多闡述,直接進行代碼實戰

 

1 消息生產者

相比之下,我么你的生產者照之前是沒有任何變化的,主要的變化還是在cosumer身上

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQQueueListenerProducer {      public static void sendTextActiveMq(String txt) {          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的發送者          MessageProducer producer = null;            //定義消息          Message message = null;            try {              //創建鏈接工廠              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接誒對象              connection = connectionFactory.createConnection();                //啟動鏈接              connection.start();                //創建會話              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地              destination = session.createQueue("queue-listener");                //創建消息生產者              producer = session.createProducer(destination);                //創建消息對象              message = session.createTextMessage(txt);                //發送消息              producer.send(message);          } catch (Exception ex) {              ex.printStackTrace();          } finally {              //回收資源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }      }  }

 

2 消息消費者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQQueueListenerConsumer {      public static void receiveTextActiveMq() {          // 定義鏈接工廠          ConnectionFactory connectionFactory = null;          // 定義鏈接對象          Connection connection = null;          // 定義會話          Session session = null;          // 目的地          Destination destination = null;          // 定義消息的發送者          MessageConsumer consumer = null;          // 定義消息          Message message = null;            try {              //創建鏈接工廠              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動鏈接              connection.start();                //創建會話              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地              destination = session.createQueue("queue-listener");                //創建消息消費者              consumer = session.createConsumer(destination);                //隊列服務監聽              consumer.setMessageListener(new MessageListener() {                  //ActiveMQ回調方法。通過該方法將消息傳遞到consumer                  @Override                  public void onMessage(Message message) {                      //處理消息                      String msg = null;                      try {                          msg = ((TextMessage) message).getText();                      } catch (JMSException e) {                          e.printStackTrace();                      }                      System.out.println("Producer say:" + msg);                  }              });          } catch (Exception ex) {              ex.printStackTrace();          }      }  }

 

3 測試

3.1 provider測試

package cn.arebirth.mq;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!");      }  }

觀察我們的控制台可以發現已經成功發佈到隊列

 

 

 

3.2 consumer測試

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQQueueListenerConsumer.receiveTextActiveMq();      }  }

我們運行後可以發現,它接收到了消息,但是它的進程並沒有關閉,

 

 

我們用provider繼續發佈一條消息,看看consumer能不能接收到

 

 

可以看到,consumer持續在後台監聽我們發佈的消息,

 

 

 

 

 

 

通過上面代碼,不難發現,provider沒有任何改動,只是consumer修改了一部分

通過調用匿名內部類的方法來實現持續監聽

 consumer.setMessageListener(new MessageListener() {      @Override                  public void onMessage(Message message) {            }    }

注意:因為涉及到隊列持續監聽,所以我們不能在finally處給資源回收,否則還在監聽狀態,資源都回收沒了,也就無從監聽啦。

 


 

 

二 Topic模型

在本系列文章第一篇也有介紹過一些Topic模型的概念,那麼這裡我們將以原理+實戰的方式來帶領大家掌握

 

1 Publish/Subscribe處理模式(Topic)

消息生產者(發佈)消息到topic中,同時有多個消息消費者(訂閱)消費該消息。

和點對點方式不同,發佈到Topic的消息會被所有的訂閱者消費,而點對點的只能是指定的消費者去消費

當生產者發佈消息,不管是否有消費者,都不會保存消息,也就是說它是發完就啥也不管了那種,

所以要注意:一定要有消費者,然後在有生產者,否則生產者不發完消息什麼也不管了,你消費者在生產者之後才有,那麼你是接收不到消息的。

 

接下來我們就以實戰的方式鼓搗下。

 

2 創建生產者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTopicProducer {      public static void sendTextActiveMQ(String txt){          //定義鏈接工廠          ConnectionFactory connectionFactory = null;            //定義鏈接對象          Connection connection = null;            //定義會話          Session session = null;            //目的地          Destination destination = null;            //定義消息的發送者          MessageProducer producer = null;            //定義消息          Message message = null;            try {              //創建鏈接工廠              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接誒對象              connection = connectionFactory.createConnection();                //啟動鏈接              connection.start();                //創建會話              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地              destination = session.createTopic("topic-test");                //創建消息生產者              producer = session.createProducer(destination);                //創建消息對象              message = session.createTextMessage(txt);                //發送消息              producer.send(message);          } catch (Exception ex) {              ex.printStackTrace();          } finally {              //回收資源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }      }  }

我們可以發現,在創建目的地destination的時候代碼有了變動

destination = session.createTopic("topic-test");

變成了createTopic,對這就是topic模式了。

 

3 創建消費者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTopicConsumer implements Runnable {          public static void receiveTextActiveMQ(String threadName) {          // 定義鏈接工廠          ConnectionFactory connectionFactory = null;          // 定義鏈接對象          Connection connection = null;          // 定義會話          Session session = null;          // 目的地          Destination destination = null;          // 定義消息的發送者          MessageConsumer consumer = null;          // 定義消息          Message message = null;            try {              //創建鏈接工廠              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //創建鏈接對象              connection = connectionFactory.createConnection();                //啟動鏈接              connection.start();                //創建會話              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //創建目的地              destination = session.createTopic("topic-test");                //創建消息的消費者              consumer = session.createConsumer(destination);                //服務監聽              consumer.setMessageListener(new MessageListener() {                  //ActiveMQ回調方法。通過該方法將消息傳遞到consumer                  @Override                  public void onMessage(Message message) {                      //處理消息                      String msg = null;                      try {                          msg = ((TextMessage) message).getText();                      } catch (JMSException e) {                          e.printStackTrace();                      }                      System.out.println(threadName + "--Producer say:" + msg);                  }              });          } catch (Exception ex) {              ex.printStackTrace();          }      }        @Override      public void run() {          receiveTextActiveMQ(Thread.currentThread().getName());      }  }

 

我們可以發現,在創建目的地destination的時候代碼有了變動

destination = session.createTopic("topic-test");

還有實現了Runnable這個是為了一會測試的時候,多線程啟動,看效果,是否多個都會接受到,(如果看着糊塗的話,你也可以去掉線程的部分,單獨複製多個對象,並啟動,效果也是一樣的)

 

4 測試(要先啟動消費者,否則消費者是接收不到消息的!當然,你自己可以試一下

4.1 測試消費者

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer();          Thread t1 = new Thread(a1,"a1");            ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer();          Thread t2 = new Thread(a2,"a2");            ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer();          Thread t3 = new Thread(a3,"a3");            t1.start();          t2.start();          t3.start();      }  }

 

可以看到,我們的消費者已經啟動了,三個線程。並以監聽服務的方式啟動

 

 

4.2 測試生產者

package cn.arebirth.mq;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQTopicProducer.sendTextActiveMQ("hello,topic");      }  }

 

可以看到,在topics下面,我們發佈的內容已經有記錄了

 

 

 

然後我們在看下,我們的consumer

 

 

 

可以發現,三個consumer都已經接收到了

 

ps:

  如果你對ActiveMQ原理性的東西感到困惑,可以看下我們前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html