ActiveMQ — 入門篇

  • 2019 年 10 月 6 日
  • 筆記

一、什麼是MQ?

MQ,中文名字叫做消息中間件。既然是中間件,那麼就說明它左邊有東西,右邊也有東西。那麼左邊是什麼?右邊又是什麼呢?MQ在中間能幹嘛呢?看看下面的例子。

1、生活中的case: 老師講完了練習,然後對同學們說有問題的現在就過來問。然後張三李四王五趙六都有問題要問。那麼他們就按順序排隊。張三需要5分鐘,然後是李四8分鐘,再然後才是王五10分鐘,最後是趙六。這就相當於dubbo的RPC遠程調用。也就是說,張三問的時候老師這個系統只能響應張三,後面的人都得等着。這樣就會導致學生和老師耦合度高,而且效率低,如果問問題的學生多,越後面的人等待的時間也越長,老師還會累死。怎麼優化呢?

2、優化方案: 老師會叫同學們把需要問的問題按照約定的格式在紙上寫好,然後交給班長。等老師解答完當前學生的問題,就從班長那裡拿出一份問題。這樣一來,同學們也不用乾等着,交了問題後該幹嘛就幹嘛去,老師也可以選擇適當的時間再解答,不會被累死。

這個案例中的班長就是一個中間件,它不處理真正的邏輯,只是一個中間人。學生不直接問老師,而是通過班長,使得學生和老師解耦了;其次,學生上午交的問題,可能下午才得到老師的解答,整個過程是異步的;即便有一大群學生來問問題,這些請求也會堆積在班長那裡,可以幫老師抵流量衝擊,而不會影響到老師。綜上: MQ的作用:

  • 異步
  • 解耦
  • 削峰

二、activeMQ的安裝

  • 首先從官網下載activeMQ (linux版本);
  • 然後解壓就行了(activeMQ是java編寫的,所以需要安裝JDK)。

進入到bin目錄,然後執行如下命令:

  • 啟動:./activemq start
  • 指定xml配置文件啟動:./activemq start xbean:file:/文件路徑
  • 關閉:./activemq stop
  • 重啟:./activemq restart

activeMQ的後台啟動端口是 61616,要想查看是否啟動成功,有如下幾種方式:

  • ps -ef | grep activemq| grep -v grep
  • netstat -anp | grep 61616
  • lsof -i:61616

activemq還有一個圖形界面,端口是 8161。首先保證你的 Linux 虛擬機和 windows 的 ip 處於同一個網段,然後確保沒有被防火牆給屏蔽,在Linux 和 windows 上互 ping 一下。能 ping 通後,就在 瀏覽器訪問 192.168.x.xx:8161, 默認的用戶名和密碼都是 admin。訪問後可以看到如下界面:

activemq的圖形界面

三、activeMQ怎麼玩?

上面舉了生活中的例子來說明MQ的作用,說白了就是我們先把問題發到MQ中,然後從MQ中取出消息。那麼具體是發送到MQ中的什麼位置呢?這個位置我們管它叫destination,即目的地。 目的地有以下兩種:

  • 隊列queue(點對點);
  • 主題topic(發佈與訂閱);

1、點對點傳輸: 所謂點對點傳輸,可以理解為發私信。你發了一條消息給你女朋友,只有你女朋友能收到。那接下來就看看怎麼發消息和收消息。首先添加依賴:

 <!-- activemq-all -->   <dependency>        <groupId>org.apache.activemq</groupId>        <artifactId>activemq-all</artifactId>        <version>5.15.8</version>   </dependency>   <dependency>        <groupId>org.apache.xbean</groupId>        <artifactId>xbean-spring</artifactId>        <version>4.12</version>   </dependency>
  • 生產消息:
public class Productor {      private static final String URL = "tcp://192.168.0.103:61616";      private static final String QUEUE_NAME = "queue_test";        public static void main(String[] args) throws Exception {          // 1. 創建factory工廠          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);          // 2. 創建connection連接          Connection connection = factory.createConnection();          connection.start();          // 3. 創建session          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          // 4. 創建目的地queue          Queue queue = session.createQueue(QUEUE_NAME);          MessageProducer producer = session.createProducer(queue);          // 5. 生產消息          for (int i = 1; i <= 3; i++) {              TextMessage message = session.createTextMessage("queue" + 1);              // 6. 將消息發送到MQ              producer.send(message);          }          // 7. 關閉資源(順着申請,倒着關閉)          producer.close();          session.close();          connection.close();          System.out.println("發送到MQ完成!");      }  }

運行後,就可以在8161端口看到如下信息了:

生產消息

  • 消費消息:
public class Consumer {      private static final String URL = "tcp://192.168.0.103:61616";      private static final String QUEUE_NAME = "queue_test";        public static void main(String[] args) throws Exception {          // 1. 創建factory工廠          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);          // 2. 創建connection連接          Connection connection = factory.createConnection();          connection.start();          // 3. 創建session          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          // 4. 創建目的地queue          Queue queue = session.createQueue(QUEUE_NAME);          MessageConsumer consumer = session.createConsumer(queue);          // 5. 消費消息          while (true){              // receive裏面的參數表示超時時間              TextMessage message = (TextMessage) consumer.receive(3000);              if (message != null)                  System.out.println(message.getText());              else                  break;          }          // 6. 關閉資源(順着申請,倒着關閉)          consumer.close();          session.close();          connection.close();          System.out.println("3秒還沒消息來,我溜了!");      }  }

運行後,在8161端口就可以看到如下變化:

消費消息

可以看到消息隊列為3,出列的也是3,說明消費完了。

  • 異步監聽的方式消費消息: 異步相對的就是同步,上面那種方式就是同步的。就是調用receive方法來接收消息,在沒接收到消息或超時之前,程序將一直阻塞。在上面那段代碼中,receive方法設置了3秒的超時時間,假如MQ中此刻沒有消息供消費,那麼程序將要在3秒後才能輸出 「3秒還沒消息,我溜了!」 這句話。異步就是不會阻塞,即使沒收到消息,程序還是該幹嘛就幹嘛。異步監聽方式寫法如下:
TextMessage message = (TextMessage) consumer.receive();  consumer.setMessageListener(new MessageListener() {              @Override              public void onMessage(Message message) {                  if (message != null && message instanceof TextMessage){                      TextMessage textMessage = (TextMessage) message;                      try {                          System.out.println("收到消息: " +  textMessage.getText());                      } catch (JMSException e) {                          e.printStackTrace();                      }                  }              }          });          System.in.read();          // 6. 關閉資源(順着申請,倒着關閉)          consumer.close();          session.close();          connection.close();
  • 啟動順序問題: — 先啟動生產者,再依次啟動兩個消費者: ——- 先啟動的消費者可以拿到消息,後啟動的就不能消費了。結論:消息不能被重複消費。 — 先啟動兩個消費者,再啟動生產者生產消息: ——- 結果就是兩個消費者一人消費一半。
  • 小總結:

從上面生產消息和消費消息的demo中可以發現,其步驟其實和JDBC操作數據庫差不多,都是先創建factory,然後通過factory創建connection連接,再創建session,最後執行操作的是session。點對點傳輸還有如下特點:

  • 每條消息只能有一個消費者,也就是上面說的消息不能被重複消費;
  • 消息生產者和消費者沒有時間上的關聯,生產消息時不用管是不是有人消費,消費者也隨時可以提取消息;
  • 消息被消費後將不會再存儲,用過就沒了。

2、發佈與訂閱: 上面說了點對點,就是你跟你女朋友發微信。那麼發佈與訂閱就是你在微信公眾號發推文,凡是關注了你公眾號的人都能收到消息。點對點的目的地是queue,發佈與訂閱的目的地是topic,每條消息可以有多個消費者;生產者和消費者有時間上的關聯,訂閱了某個topic,只能消費你訂閱之後的消息,說簡單就是,關注了你公眾號的人,他不能收到在他關注你之前的消息;假如無人訂閱就去生產,那就是一條廢消息,沒有人關注你的公眾號,那麼你發的推文就沒有意思,就是一條廢消息,所以一般會先啟動消費者,再啟動生產者。

關於發佈與訂閱,相比點對點,只需要把queue改成topic就可以了,這裡就不再貼代碼了。

關於topic和queue的區別,如下表所示:

topic

queue

工作模式

一對多

一對一

狀態

無狀態

queue數據會在mq服務器上以文件形式保存,也可配置成DB存儲

完整性

如果沒有訂閱者,消息將被丟棄

消息不會被丟棄

處理效率

隨着訂閱者的增加效率會降低

由於一條消息只發給一個消費者,所以消費者再多也不會明顯地影響性能

四、關於JMS

1、什麼是JMS? JMS中文名叫Java消息服務,它是一種規範,是javaEE的13種核心規範之一。關於javaEE的13種核心規範,網上一搜一大堆,這裡不再贅述。JMS就是天上飛的理念,而各種MQ就是這種理念的落地實現。比如activeMQ、rocketMQ等,都要遵循JMS這個規範。 2、JMS的結構和特點:

  • JMS結構:
    • JMS Provider:實現了JMS接口和規範的消息中間件,像activeMQ、rocketMQ等
    • JMS Producer:消息生產者
    • JMS consumer:消息消費者
    • JMS message:消息
      • 消息頭
        • JMSDestination:目的地,queue和topic
        • JMSDeliveryMode:分為持久和非持久模式。持久模式意味着消息即使JMS提供者出現故障,該消息並不會丟失,會在服務器恢復後再次發送;反之,非持久模式就是服務器出現故障,該消息將永久丟失。
        • JMSExpiration:消息過期時間,如果為0,表示永不過期。
        • JMSPriority:優先級,0到4是普通消息,5到9是加急消息,默認是4。
        • JMSMessageID:消息的唯一標識,由MQ生成。
      • 消息體
        • 封裝的具體消息數據就是消息體
        • 消息體格式,有5種,常用的 TextMessage(String類型) 和 MapMessage(key、value形式)
        • 發送和接收的消息體類型必須對應一致
      • 消息屬性
        • 是什麼:一個對象的屬性能幹嘛?用來描述這個對象的特點嘛,消息屬性也一樣地理解就好了。
        • 如果需要除消息頭字段以外的值,可以使用消息屬性
        • 消息屬性可以用來做識別/去重/重點標註等操作,設置消息屬性的方法如下:
TextMessage textMessage = new session.createTextMessage("這是一條TextMessage");  // TextMessage 類型設置消息屬性  textMessage.setStringProperty("property", "VIP");

在消費者中取出消息後:

textMessage.getStringProperty("property")

即可取出消息屬性。 注意上面JMS結構的層級關係。

3、如何保證消息的可靠性?(面試重點) 一般要從三個角度去回答(持久性、事務、簽收)。


  • 持久性:持久,是MQ掛了,消息依然存在,非持久,就是MQ掛了,消息就沒了。

隊列生產者的持久性:

// 這個producer是隊列   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久   producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久

隊列設置為非持久,如果生產者將消息發送到MQ後,MQ掛了,那麼這些消息就沒了,即使MQ恢復正常也沒了。隊列設置為持久,那麼消息只要還沒消費就還會有。activeMQ的隊列默認設置了持久,可保證消息只被傳送一次和成功使用一次。


主題的持久性: 主題要設置持久,生產者和消費者的編碼方式與之前都有點兒不一樣,代碼如下:

public class Consumer {      private static final String URL = "tcp://192.168.x.xxx:61616";      private static final String TOPIC_NAME = "topic_test";        public static void main(String[] args) throws Exception {          // 1. 創建factory工廠          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);          // 2. 創建connection連接          Connection connection = factory.createConnection();          connection.setClientID("張三");          System.out.println("張三訂閱");          // 3. 創建session          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          // 4. 訂閱topic          Topic topic = session.createTopic(TOPIC_NAME);          TopicSubscriber subscriber = session.createDurableSubscriber(topic,"備註信息");          // 5. 啟動          connection.start();          // 6. 消費topic的消息          Message message = subscriber.receive();          while (null != message){              TextMessage textMessage = (TextMessage) message;              System.out.println("收到消息:" +  textMessage.getText());              message = subscriber.receive(5000L);          }          // 6. 關閉資源(順着申請,倒着關閉)          session.close();          connection.close();          System.out.println("5秒還沒消息來,我溜了!");      }  }
public class Productor {      private static final String URL = "tcp://192.168.0.103:61616";      private static final String TOPIC_NAME = "topic_test";        public static void main(String[] args) throws Exception {          // 1. 創建factory工廠          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);          // 2. 創建connection連接          Connection connection = factory.createConnection();          // 3. 創建session          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          // 4. 創建目的地topic          Topic topic = session.createTopic(TOPIC_NAME);          MessageProducer producer = session.createProducer(topic);          // 設置持久性          //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久          producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久            connection.start();          // 5. 生產消息          for (int i = 1; i <= 3; i++) {              TextMessage message = session.createTextMessage("queue" + i);              // 6. 將消息發送到MQ              producer.send(message);          }          // 7. 關閉資源(順着申請,倒着關閉)          producer.close();          session.close();          connection.close();          System.out.println("發送到MQ完成!");      }  }

主題設置了持久的話,一定要先運行一次消費者,等於向MQ註冊,表示我訂閱了這個主題。然後再運行生產者發送信息,此時,不論消費者是否還在線,都會接收到消息,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。


  • 事務:創建session的時候要傳兩個參數,一個是事務,一個是簽收。

生產者事務:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

第一個參數就是表示事務,設置為false,表示只要執行了send方法,消息就進入到隊列中了;如果設置為true,需要send後再執行commit,消息才會被提交到隊列中。所以在session提交前,需要調commit方法,如下:

try{    //沒問題就提交事務    session.commit();  }catch(Exception e){    //有問題就回滾    session.rollback();  }finally{    producer.close();    session.close();  }

生產者主事務,不管簽收,因為消費者才需要簽收嘛。生產者設置了事務,簽收機制就無所謂了,只是這個方法需要傳一個簽收機制,其實事務設置為true後,起作用的就是事務了。


消費者事務: 如果消費者開啟了事務,進行消費時而沒有commit的話,MQ會認為你還沒有成功消費消息,就會出現重複消費的情況,所以消費者一般不開啟事務,而是以簽收機製為主。


  • 簽收:簽收機制有四種,用得較多的是自動和手動兩種方式。

消費者非事務的手動簽收:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

如果這個時候直接運行消費者,發現又可以重複消費消息,因為MQ不知道你已經簽收消息了。所以在receive到消息後,應該手動簽收,才不會重複消費,如下:

while (null != message){        TextMessage textMessage = (TextMessage) message;        textMessage.acknowledge(); // 手動簽收        System.out.println("收到消息:" +  textMessage.getText());        message = subscriber.receive(5000L);  }

消費者開啟事務的情況下的簽收:

Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

開啟了事務,就會自動設置為自動簽收,即使後面那個參數設置了手動簽收,也不起作用了。所以,不需要調用acknowledge()方法進行簽收。如果開啟了事務,設置了手動簽收,調用了acknowledge()方法,但是沒有commit,還是會重複消費。


總之,在事務會話中,當一個事務被成功提交則消息被自動簽收,如果事務回滾,則消息會被再次傳遞。非事務會話中,消息何時被確認取決於創建會話時的簽收模式。

小結:不能容忍丟失消息,就用持久訂閱,可以容忍丟失消息,就用非持久訂閱。

五、activeMQ的broker

1、什麼是broker? broker就是嵌入式的activemq,也就是說,使用broker,只需要引入相關依賴就可以了,而不需要你本地安裝activemq,類似於springboot那樣內嵌tomcat。

2、怎麼用? 除了之前引入的activemq-all,還需要引入如下依賴:

<dependency>      <groupId>com.fasterxml.jackson.core</groupId>      <artifactId>jackson-databind</artifactId>      <version>2.9.9.3</version>  </dependency>

然後編碼:

public static void main(String[] args) throws Exception{          BrokerService service = new BrokerService();          service.setUseJmx(true);          service.addConnector("tcp://localhost:61616");          service.start();  }

運行後,就可以在控制台看到這個嵌入式的activemq已經啟動了。

image.png

未完待續……