ActiceMQ詳解
1. MQ理解
1.1 MQ的產品種類和對比
MQ即消息中間件。MQ是一種理念,ActiveMQ是MQ的落地產品。
消息中間件產品
各類MQ對比
- Kafka
- 程式語言:Scala
- 大數據領域的主流MQ
- RabbitMQ
- 程式語言:Erlang
- 基於erlang語言,不好修改底層,不要查找問題的原因,不建議選用。
- RocketMQ
- 程式語言:Java
- 適用於大型項目,適用於集群
- ActiveMQ
- 程式語言:Java
- 適用於中小型項目
1.2 MQ產生背景
系統之間直接調用存在的問題?
微服務架構後,鏈式調用是我們在寫程式時候的一般流程,為了完成一個整體功能會將其拆分成多個函數(或子模組),比如模組A調用模組B,模組B調用模組C,模組C調用模組D。但在大型分散式應用中,系統間的RPC交互繁雜,一個功能背後要調用上百個介面並非不可能,從單機架構過渡到分散式微服務架構的通例。這些架構會有哪些問題?
-
系統之間介面耦合比較嚴重
每新增一個下游功能,都要對上游的相關介面進行改造。舉個例子:如果系統A要發送數據給系統B和系統C,發送給每個系統的數據可能有差異,因此系統A對要發送給每個系統的數據進行了組裝,然後逐一發送。當程式碼上線後又新增了一個需求:把數據也發送給D,新上了一個D系統也要接受A系統的數據,此時就需要修改A系統,讓他感知到D系統的存在,同時把數據處理好再給D。在這個過程你會看到每接入一個下游系統都要對系統A進行程式碼改造,開發聯調的效率很低。其整體架構如下圖:
-
面對大流量並發時容易被衝垮
每個介面模組的吞吐能力是有限的,這個上限能力如果是堤壩,當大流量(洪水)來臨時容易被衝垮。舉例秒殺業務:上游系統發起下單購買操作就是下單一個操作很快就完成。然而下游系統要完成秒殺業務後面的所有邏輯(讀取訂單,庫存檢查,庫存凍結,餘額檢查,餘額凍結,訂單生產,餘額扣減,庫存減少,生成流水,餘額解凍,庫存解凍)。
-
等待同步存在性能問題
RPC介面上基本都是同步調用,整體的服務性能遵循「木桶理論」,即整體系統的耗時取決於鏈路中最慢的那個介面。比如A調用B/C/D都是50ms,但此時B又調用了B1,花費2000ms,那麼直接就拖累了整個服務性能。
根據上述的幾個問題,在設計系統時可以明確要達到的目標:
-
要做到系統解耦,當新的模組接進來時可以做到程式碼改動最小,能夠解耦
-
設置流量緩衝池,可以讓後端系統按照自身吞吐能力進行消費不被衝垮,能削峰
-
強弱依賴梳理能將非關鍵調用鏈路的操作非同步化並提升整體系統的吞吐能力,能夠非同步
1.3 MQ主要作用
非同步
調用者無需等待解耦
解決了系統之間耦合調用的問題消峰
抵禦洪峰流量,保護了主業務
1.4 MQ的定義
面向消息的中間件(message-oriented middleware
)MOM能夠很好的解決以上問題。是指利用高效可靠的消息傳遞機制與平台無關的數據交流,並基於數據通訊來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型在分散式環境下提供應用解耦,彈性伸縮,冗餘存儲、流量削峰,非同步通訊,數據同步等功能。
大致的過程是這樣的:發送者把消息發送給消息伺服器,消息伺服器將消息存放在若干隊列/主題topic中,在合適的時候消息伺服器會將消息轉發給接受者。在這個過程中發送和接收是非同步的,也就是發送無需等待,而且發送者和接受者的生命周期也沒有必然的關係。尤其在發布pub/訂閱sub模式下,也可以完成一對多的通訊即讓一個消息有多個接受者。
1.5 MQ特點
採用非同步處理模式
消息發送者可以發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道(主題或者隊列)上。消息接收者則訂閱或者監聽該通道。一條消息可能最終轉發給一個或者多個消息接收者,這些消息接收者都無需對消息發送者做出同步回應。整個過程都是非同步的。
案例:也就是說一個系統跟另一個系統之間進行通訊的時候,假如系統A希望發送一個消息給系統B讓他去處理。但是系統A不關注系統B到底怎麼處理或者有沒有處理好,所以系統A把消息發送給MQ然後就不管這條消息的「死活了」,接著系統B從MQ裡面消費出來處理即可。至於怎麼處理,是否處理完畢,什麼時候處理都是系統B的事兒與系統A無關。
應用系統之間解耦合
發送者和接受者不必了解對方,只需要確認消息。發送者和接受者不必同時在線。
整體架構
MQ缺點
兩個系統之間不能同步調用,不能實時回復,不能響應某個調用的回復。
1.6 CentOS7安裝ActiveMQ
cd /root
mkdir active_mq
tar -xzvf apache-activemq-5.14.0-bin.tar.gz
# /etc/init.d/目錄增加增加activemq文件
cd /etc/init.d/
vim activemq
#!/bin/sh
#
# /etc/init.d/activemq
# chkconfig: 345 63 37
# description: activemq servlet container.
# processname: activemq 5.14.0
# Source function library.
#. /etc/init.d/functions
# source networking configuration.
#. /etc/sysconfig/network
export JAVA_HOME=/root/java/jdk1.8.0_221
export CATALINA_HOME=/root/active_mq/apache-activemq-5.14.0
case $1 in
start)
sh $CATALINA_HOME/bin/activemq start
;;
stop)
sh $CATALINA_HOME/bin/activemq stop
;;
restart)
sh $CATALINA_HOME/bin/activemq stop
sleep 1
sh $CATALINA_HOME/bin/activemq start
;;
esac
exit 0
# 對activemq文件授予許可權
chmod 777 activemq
# 設置開機啟動並啟動activemq
chkconfig activemq on
service activemq start
# 啟動時指定日誌輸出文件,activemq日誌默認的位置是在:%activemq安裝目錄%/data/activemq.log
service activemq start > /root/active_mq/activemq.log
# 訪問地址://IP地址:8161/
# 默認賬戶:admin/admin
# 61616 埠提供JMS服務
# 8161 埠提供管理控制台服務
# 查看activemq狀態
service activemq status
# 關閉activemq服務
service activemq stop
2. Java程式生成消息基本案例
2.1 JMS簡介
JMS 總體編碼規範
JMS開發基本步驟
Destination
Destination 即目的地。下面拿 jvm 和 mq 做個對比,目的地可以理解為是數據存儲的地方。
兩種Destination
2.2 Idea新建Maven工程
<!-- activemq 所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基礎包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- junit/log4j等基礎配置 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependency>
2.3 隊列消息(Queue)
隊列消息特點
點對點消息傳遞域的特點如下
- 每個消息只能有一個消費者,類似 1對1 的關係。
- 消息的生產者和消費者之間 沒有時間上的相關性。無論消費者在生產者發送消息時是否處於運行狀態,消費者都可以提取消息。如我們發送簡訊,發送者發送後接受者不一定會及收及看。
- 消息被消費後隊列 不會再存儲,所以消費者 不會消費到已經被消費過的消息。
隊列消息生產者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
//1.創建連接工廠,按照給定的url地址,採用默認用戶名和密碼
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接工廠獲得連接connection並啟動訪問
Connection conn = factory.createConnection();
conn.start();
//3.創建會話session
// 兩個參數:事務,簽收
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.創建目的地(具體是隊列queue還是主題topic)
// Destination -> Queue/Topic
Queue queue = session.createQueue(QUEUE_NAME);
//5.創建消息的生產者
MessageProducer producer = session.createProducer(queue);
//6.通過使用消息生產者發送三條消息到MQ隊列中
for (int i = 0; i < 3; i++) {
//創建消息
TextMessage textMessage = session.createTextMessage("msg -> " + i);
//通過消息生產者發送給MQ
producer.send(textMessage);
}
//7.關閉資源
producer.close();
session.close();
conn.close();
System.out.println("====> 消息發布到MQ完成");
}
}
隊列消息消費者 – 同步阻塞式 receive
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
// 創建消息的消費者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
// reveive():一直等待接收消息,在能夠接收到消息之前將一直阻塞。 是同步阻塞方式,和socket的accept方法類似的。
// reveive(Long time):等待n毫秒之後還沒有收到消息就結束阻塞。
// 因為消息發送者是 TextMessage,所以消息接受者也要是TextMessage
TextMessage message = (TextMessage) consumer.receive(4000L);
if (null != message) {
System.out.println("====> 消費者的消息:" + message.getText());
} else {
break;
}
}
consumer.close();
session.close();
conn.close();
}
}
隊列消息消費者 – 非同步非阻塞監聽式 MessageListener
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
// 監聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消費者接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //保證控制台不停
consumer.close();
session.close();
conn.close();
}
}
消費者三種情況
- 先生產,只啟動一個消費者 ① => ①消費者會消費掉全部消息
- 先生產,然後先啟動消費者①,再啟動消費者② => ①消費者會消費掉全部消息,②消費者不能消費消息
- 先啟動消費者①和②,再生產 => ①和②輪詢消費,各自消費一半消息
2.4 主題消息(Topic)
主題消息特點
在發布訂閱消息傳遞域中,目的地被稱為主題(topic)。
發布/訂閱消息傳遞域的特點如下:
-
生產者將消息發布到topic中,每個消息可以有多個消費者,屬於1:N的關係。
-
生產者和消費者之間有時間上的相關性。訂閱某一個主題的消費者只能消費 自它訂閱之後發布的消息。
-
生產者生產時,topic 不保存消息,它是 無狀態的 不落地的,假如無人訂閱就去生產那就是一條廢消息,所以一般先啟動消費者再啟動生產者。
默認情況下如上所述,但是JMS規範允許客戶創建持久訂閱,這在一定程度上放鬆了時間上的相關性要求。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。一句話,好比我們的微信公眾號訂閱。
主題消息生產者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//只有這一步和Queue有區別
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息發布到MQ完成");
}
}
主題消息消費者
存在多個消費者,每個消費者都能收到自從自己啟動後所有生產的消息。
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws Exception {
System.out.println("=====> 1號消費者");//多加幾個消費者做實驗
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//只有這一步和Queue有區別
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消費者接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
2.5 Topic和Queue對比
3. JMS (Java消息服務) 詳解
3.1 Java消息服務是什麼
Java消息服務指的是兩個應用程式之間進行非同步通訊的API,它為標準協議和消息服務提供了一組通用介面,包括創建、發送、讀取消息等,用於支援Java應用程式開發。在JavaEE中當兩個應用程式使用JMS進行通訊時,它們之間不是直接相連的,而是通過一個共同的消息收發服務組件關聯起來以達到解耦/非同步削峰的效果。
3.2 JMS四大組成元素
Message – 消息頭
JMS的消息頭有哪些屬性:
-
JMSDestination
:消息目的地。主要是指Queue和Topic。 -
JMSDeliveryMode
:消息持久化模式。分為持久模式和非持久模式,一條持久性的消息應該被傳送「一次僅僅一次」,這就意味著如果JMS提供者出現故障,該消息並不會丟失,它會在伺服器恢復之後再次傳遞。一條非持久的消息最多會傳遞一次,這意味著伺服器出現故障,該消息將會永遠丟失。 -
JMSExpiration
:消息過期時間。可以設置消息在一定時間後過期,默認是永不過期消息過期時間,等於Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。如果timeToLive值等於0,則JMSExpiration被設為0,表示該消息永不過期。如果發送後在消息過期時間之後還沒有被發送到目的地,則該消息被清除。 -
JMSPriority
:消息的優先順序。消息優先順序從0-9十個級別,0-4是普通消息,5-9是加急消息。 JMS不要求MQ嚴格按照這十個優先順序發送消息但必須保證加急消息要先於普通消息到達。默認是4級。 -
JMSMessageID
:消息的唯一標識符。唯一標識每個消息的標識由MQ產生,也可以自己指定但是每個消息的標識要求唯一。
說明:消息的生產者可以set這些屬性,消息的消費者可以get這些屬性。這些屬性在send方法裡面也可以設置。
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
//這裡可以指定每個消息的目的地
textMessage.setJMSDestination(topic);
//消息的模式,持久模式/非持久模式
textMessage.setJMSDeliveryMode(0);
//消息的過期時間
textMessage.setJMSExpiration(1000);
//消息的優先順序
textMessage.setJMSPriority(10);
//指定每個消息的標識。MQ會給我們默認生成一個,我們也可以自己指定。
textMessage.setJMSMessageID("ABCD");
//上面的屬性也可以通過send重載方法進行設置
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息發布到MQ完成");
}
}
Message – 消息體
理解:封裝具體的消息數據
五種消息格式
注意:發送和接收的消息體類型必須一致對應
消息生產者
for (int i = 0; i < 3; i++) {
// 發送TextMessage消息體
TextMessage textMessage = session.createTextMessage("topic " + i);
producer.send(textMessage);
// 發送MapMessage 消息體。set方法添加,get方式獲取
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","rose" + i);
mapMessage.setInt("age", 18 + i);
producer.send(mapMessage);
}
消息消費者
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("====> 消費者接受到text消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("====> 消費者接受到map消息:" + mapMessage.getString("name"));
System.out.println("====> 消費者接受到map消息:" + mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Message – 消息屬性
如果需要除消息頭欄位之外的值那麼可以使用消息屬性。它是 識別 / 去重 / 重點標註 等操作非常有用的方法。
它們是以屬性名和屬性值對的形式制定的。可以將屬性是為消息頭得擴展,屬性指定一些消息頭沒有包括的附加資訊,比如可以在屬性里指定消息選擇器。消息的屬性就像可以分配給一條消息的附加消息頭一樣。它們允許開發者添加有關消息的不透明附加資訊。它們還用於暴露消息選擇器在消息過濾時使用的數據。
下圖是設置消息屬性的API:
生產者
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topic " + i);
// 調用Message的set*Property()方法就能設置消息屬性
// 根據value的數據類型的不同,有相應的API
textMessage.setStringProperty("From","[email protected]");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
producer.send(textMessage);
}
消費者
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息體:" + textMessage.getText());
System.out.println("消息屬性:" + textMessage.getStringProperty("From"));
System.out.println("消息屬性:" + textMessage.getByteProperty("Spec"));
System.out.println("消息屬性:" + textMessage.getBooleanProperty("Invalide"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.5 JMS的可靠性
RERSISTENT – 持久性
什麼是持久化消息 => 保證消息只被傳送一次和成功使用一次。在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由於某種原因導致失敗,它可以恢復此消息並將此消息傳送至相應的消費者,雖然這樣增加了消息傳送的開銷但卻增加了可靠性。
我的理解:在消息生產者將消息成功發送給MQ消息中間件之後。無論是出現任何問題如:MQ伺服器宕機、消費者掉線等。都保證(topic要之前註冊過,queue不用)消息消費者能夠成功消費消息。如果消息生產者發送消息就失敗了,那麼消費者也不會消費到該消息。
- Queue消息非持久和持久
- Queue非持久,當伺服器宕機消息不存在(消息丟失了)。
注意:只要伺服器沒有宕機,即便是非持久,消費者不在線的話消息也不會丟失,等待消費者在線還是能夠收到消息的。
//非持久化的消費者和之前的程式碼一樣。下面演示非持久化的生產者。
// 非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Queue持久化,當伺服器宕機消息依然存在。Queue消息默認是持久化的。
持久化消息,保證這些消息只被傳送一次和成功使用一次。對於這些消息可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們之前不會丟失這些消息。
//持久化的消費者和之前的程式碼一樣。下面演示持久化的生產者。
//持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- Topic消息非持久和持久
-
Topic非持久,Topic默認就是非持久化的,因為生產者生產消息時消費者也要在線,這樣消費者才能消費到消息。
-
Topic消息持久化,只要消費者向MQ伺服器註冊過,所有生產者發布成功的消息該消費者都能收到,不管是MQ伺服器宕機還是消費者不在線。
//持久化topic生產者程式碼
// 設置持久化topic
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 設置持久化topic之後再啟動連接
conn.start();
//持久化topic消費者程式碼
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = activeMQConnectionFactory.createConnection();
// 設置客戶端ID,向MQ伺服器註冊自己的名稱
conn.setClientID("marrry");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 創建一個topic訂閱者對象。一參是topic,二參是訂閱者名稱
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 之後再開啟連接
connection.start();
//之前是消息的消費者,這裡就改為主題的訂閱者
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic:" + textMessage.getText());
message = topicSubscriber.receive(2000L);//繼續監聽2s,從激活到離線
//經測試:離線再激活後仍然能收到之前的消息
}
session.close();
conn.close();
}
注意:
一定要先運行一次消費者,等於向MQ註冊,類似我訂閱了這個主題。
然後再運行生產者發送消息。
之後無論消費者是否在線都會收到消息。如果不在線的話,下次連接的時候會把沒有收過的消息都接收過來。
Transaction – 事務
生產者開啟事務後,執行commit方法這批消息才真正的被提交。不執行commit方法這批消息不會提交。執行rollback方法之前的消息會回滾掉。生產者的事務機制要高於簽收機制,當生產者開啟事務後簽收機制不再重要。
消費者開啟事務後,執行commit方法這批消息才算真正的被消費。不執行commit方法這些消息不會標記已消費,下次還會被消費。執行rollback方法不能回滾之前執行過的業務邏輯,但是能夠回滾之前的消息,回滾後的消息下次還會被消費。消費者利用commit和rollback方法,甚至能夠違反一個消費者只能消費一次消息的原理。
注意:消費者和生產者需要同時操作事務才行嗎? => 消費者和生產者的事務完全沒有關聯,各自是各自的事務。
- 生產者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
//1.創建會話session,兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
//設置為開啟事務
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topic " + i);
producer.send(textMessage);
// if(i == 2) {
// throw new RuntimeException("=====> GG");
// }
}
// 2. 開啟事務後,使用commit提交事務,這樣這批消息才能真正的被提交。
session.commit();
System.out.println("====> 消息發布到MQ完成");
} catch (JMSException e) {
System.out.println("出現異常,消息回滾");
// 3. 工作中一般當程式碼出錯我們在catch程式碼塊中回滾。這樣這批發送的消息就能回滾。
session.rollback();
} finally {
producer.close();
session.close();
conn.close();
}
}
}
//如果有一條拋出異常,則回滾
//Exception in thread "main" java.lang.RuntimeException: =====> GG
- 消費者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String TOPIC_NAME = "topic_01";
public static void main(String[] args) throws Exception {
System.out.println("=====> 1號消費者");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
// 創建會話session,兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
// 消費者開啟了事務就必須手動提交,不然會重複消費消息
final Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
int a = 0;
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消息體:" + textMessage.getText());
if(a == 0){
System.out.println("commit");
session.commit();
}
if (a == 2) {
System.out.println("rollback");
session.rollback();
}
a++;
} catch (JMSException e) {
System.out.println("出現異常,消費失敗,放棄消費");
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
});
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
// 不執行commit方法的1和2消息不會標記已消費,下次還會被消費
// 執行rollback方法不能回滾之前執行過的業務邏輯,但是能夠回滾之前的消息,回滾後的消息下次還會被消費
// =====> 1號消費者
// 消息體:topic 0
// commit
// 消息體:topic 1
// 消息體:topic 2
// rollback
// 消息體:topic 1
// 消息體:topic 2
Acknowledge – 簽收
簽收的幾種方式
-
自動簽收(Session.AUTO_ACKNOWLEDGE):該方式是默認的,該種方式無需我們程式做任何操作,框架會幫我們自動簽收收到的消息。
-
手動簽收(Session.CLIENT_ACKNOWLEDGE):手動簽收,該種方式需要我們手動調用Message.acknowledge()來簽收消息。如果不簽收消息該消息會被我們反覆消費直到被簽收。
-
允許重複消息(Session.DUPS_OK_ACKNOWLEDGE):多執行緒或多個消費者同時消費到一個消息,因為執行緒不安全可能會重複消費。該種方式很少使用到。
-
事務下的簽收(Session.SESSION_TRANSACTED):開啟事務的情況下可以使用該方式,該種方式很少使用到。
事務和簽收的關係
-
在事務性會話中,當一個事務被成功提交則消息被自動簽收。如果事務回滾則消息會被再次傳送。事務優先於簽收,開始事務後簽收機制不再起任何作用。
-
非事務性會話中,消息何時被確認取決於創建會話時的應答模式。
-
生產者事務開啟,只有commit後才能將全部消息變為已消費。
-
事務偏向生產者,簽收偏向消費者。也就是說生產者使用事務更好點,消費者使用簽收機制更好點。
非事務下的消費者如何使用手動簽收的方式
- 非事務下的生產者跟之前的程式碼一樣
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg -> " + i);
producer.send(textMessage);
}
producer.close();
session.close();
conn.close();
System.out.println("====> 消息發布到MQ完成");
}
}
- 非事務下的消費者如何手動簽收
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
public static final String QUEUE_NAME = "queue_01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection conn = factory.createConnection();
conn.start();
//這裡改為Session.CLIENT_ACKNOWLEDGE
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage message = (TextMessage) consumer.receive(4000L);
if (null != message) {
System.out.println("====> 消費者的消息:" + message.getText());
//設置為Session.CLIENT_ACKNOWLEDGE後,要調用該方法,標誌著該消息已被簽收(消費)。
//如果不調用該方法,該消息的標誌還是未消費,下次啟動消費者或其他消費者還會收到改消息。
message.acknowledge();
} else {
break;
}
}
consumer.close();
session.close();
conn.close();
}
}
注意:JMS保證可靠有四種方式,除了上面講到的持久性,事務,簽收,還可以通過多節點集群的方式來保證可靠性。
3.6 JMS的點對點總結
點對點模型是基於隊列的,生產者發消息到隊列,消費者從隊列接收消息,隊列的存在使得消息的非同步傳輸成為可能,和我們平時給朋友發送簡訊類似。
如果在Session關閉時有部分消息己被收到但還沒有被簽收(acknowledged),那當消費者下次連接到相同的隊列時,這些消息還會被再次接收。
隊列可以長久地保存消息直到消費者收到消息,消費者不需要因為擔心消息會丟失而時刻和隊列保持激活的連接狀態,充分體現了非同步傳輸模式的優勢。
3.7 JMS的發布訂閱總結
JMS的發布訂閱總結
JMS Pub/Sub 模型定義了如何向一個內容節點發布和訂閱消息,這些節點被稱作Topic。
主題可以被認為是消息的傳輸中介,發布者(publisher)發布消息到主題,訂閱者(subscribe)從主題訂閱消息。
主題使得消息訂閱者和消息發布者保持互相獨立,不需要解除即可保證消息的傳送。
非持久訂閱
非持久訂閱只有當客戶端處於激活狀態,也就是和MQ保持連接狀態才能收發到某個主題的消息。
如果消費者處於離線狀態,生產者發送的主題消息將會丟失作廢,消費者永遠不會收到。一句話:先訂閱註冊才能接受到發布,只給訂閱者發布消息。
持久訂閱
客戶端首先向MQ註冊一個自己的身份ID識別號,當這個客戶端處於離線時,生產者會為這個ID保存所有發送到主題的消息,當客戶再次連接到MQ的時候,會根據消費者的ID得到所有當自己處於離線時發送到主題的消息。
當非持久訂閱狀態下,不能恢復或重新派送一個未簽收的消息。持久訂閱才能恢復或重新派送一個未簽收的消息。
非持久和持久化訂閱如何選擇
當所有的消息必須被接收則用持久化訂閱,當消息丟失能夠被容忍則用非持久訂閱。
4. ActiveMQ的Broker
4.1 broker是什麼
相當於 一個ActiveMQ伺服器實例。說白了Broker其實就是實現了用程式碼的形式啟動ActiveMQ將MQ嵌入到Java程式碼中,以便隨時用隨時啟動,在用的時候再去啟動這樣能節省了資源,也保證了可用性。這種方式,我們實際開發中很少採用,因為他缺少太多了東西,如:日誌,數據存儲等等。
4.2 啟動broker時指定配置文件
啟動broker時指定配置文件,可以幫助我們在一台伺服器上啟動多個broker。實際工作中一般一台伺服器只啟動一個broker。
4.3 嵌入式的broker啟動
用ActiveMQ Broker作為獨立的消息伺服器來構建Java應用。
ActiveMQ也支援在vm中通訊基於嵌入的broker,能夠無縫的集成其他java應用。
下面演示如何啟動嵌入式的broker
pom.xml添加一個依賴
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
嵌入式broker的啟動類
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支援在vm中通訊基於嵌入的broker
BrokerService brokerService = new BrokerService();
brokerService.setPopulateJMSXUserID(true);
brokerService.addConnector("tcp://127.0.0.1:61616");
brokerService.start();
}
}
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue_01";
...
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue_01";
...
5. Spring整合ActiveMQ
理解
我們之前介紹的內容也很重要,它更靈活,支援各種自定義功能,可以滿足我們工作中複雜的需求。
很多activemq的功能要看官方文檔或者部落格,這些功能大多是在上面程式碼的基礎上修改完善的。如果非要把這些功能強行整合到spring,就有些緣木求魚了。而另一種方式整合spring更好,就是將上面的類注入到Spring中,其他不變。這樣既能保持原生的程式碼,又能集成到spring。
下面我們講的Spring和SpringBoot整合ActiveMQ也重要,它給我們提供了一個模板,簡化了程式碼,減少我們工作中遇到坑,能夠滿足開發中90%以上的功能。
**pom.xml添加依賴 **
<dependencies>
<!-- ActiveMQ 所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- ActiveMQ 和 Spring 整合的基礎包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- 嵌入式ActiveMQ -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<!-- Spring對JMS的支援,整合Spring和ActiveMQ -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<!-- ActiveMQ連接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- Spring核心依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- junit/log4j等基礎配置 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
Spring的ActiveMQ配置文件
src/main/resources/spring-activemq.cml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xmlns:context="//www.springframework.org/schema/context"
xsi:schemaLocation="//www.springframework.org/schema/beans
//www.springframework.org/schema/beans/spring-beans.xsd
//www.springframework.org/schema/context
//www.springframework.org/schema/context/spring-context.xsd">
<!-- 開啟包的自動掃描 -->
<context:component-scan base-package="com.polaris"/>
<!-- 配置生產者 -->
<bean id="connectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<!-- 真正可以生產Connection的ConnectionFactory,由對應的JMS服務商提供 -->
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://mpolaris.top:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!-- 這個是隊列目的地,點對點的Queue -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 通過構造注入Queue名 -->
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 這個是主題目的地, 發布訂閱的主題Topic-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!-- Spring提供的JMS工具類,他可以進行消息發送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 傳入連接工廠 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 傳入目的地 -->
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息自動轉換器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
隊列生產者
@Service
public class JmsProduce {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ioc = new ClassPathXmlApplicationContext("spring-activemq.xml");
JmsProduce produce = (JmsProduce) ioc.getBean("jmsProduce");
produce.jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("====> Spring和ActiveMQ的整合情況");
return message;
}
});
System.out.println("Send task over!");
}
}
隊列消費者
@Service
public class JmsConsumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ioc = new ClassPathXmlApplicationContext("spring-activemq.xml");
JmsConsumer consumer = (JmsConsumer) ioc.getBean("jmsConsumer");
String value = (String) consumer.jmsTemplate.receiveAndConvert();
System.out.println(value);
}
}
主題生產者和消費者
只需要修改配置文件目的地即可
<!-- Spring提供的JMS工具類,他可以進行消息發送,接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 傳入連接工廠 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 傳入目的地 -->
<property name="defaultDestination" ref="destinationTopic"/>
<!-- 消息自動轉換器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
配置消費者的監聽類
寫了一個類來實現消息監聽後,只需要啟動生產者,消費者不需要啟動就自動會監聽記錄!
<!-- 配置監聽程式 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destinationTopic"/>
</bean>
<bean id="myMessageListener" class="com.polaris.queue.MyMessageListener">
</bean>
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
6. SpringBoot整合ActiveMQ
個人不太贊成使用這種方式SpringBoot整合ActiveMQ,因為這樣做會失去原生程式碼的部分功能和靈活性。但是工作中這種方式做能夠滿足我們常見的需求,也方便和簡化我們的程式碼,也為了適應工作中大家的習慣。
6.1 隊列案例 – 生產者點擊投遞
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="//maven.apache.org/POM/4.0.0" xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<groupId>com.polaris</groupId>
<artifactId>springboot-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
# web佔用的埠
server:
port: 8085
spring:
activemq:
# activemq的broker的url
broker-url: tcp://mpolaris.top:61616
# 連接activemq的broker所需的帳號和密碼
user: admin
password: admin
jms:
# 目的地是queue還是topic, false(默認)=queue true=topic
pub-sub-domain: false
# 自定義隊列名稱,這只是個常量
myqueue: boot-activemq-queue
ActiveMQ配置類
@Configuration
@EnableJms //開啟Jms適配的註解
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
//注入目的地
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
隊列消息生產者
@Component
public class QueueProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue,"===> SpringBoot + ActiveMQ消息");
}
}
測試類
@SpringBootTest(classes = Application.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Resource //這個是java 的註解,而Autowried是spring 的
private QueueProduce produce;
@Test
public void testSend() {
produce.produceMsg();
}
}
6.2 隊列案例 – 生產者間隔定投
QueueProduce新增定時投遞方法
/**
* 間隔3秒定時投送
*/
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(queue,"定時投送 => "
+ UUID.randomUUID().toString().substring(0,6));
}
主啟動類添加一個註解
@SpringBootApplication
@EnableScheduling //允許開啟定時投送功能
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
直接開啟主啟動類,間隔投遞消息
6.3 隊列案例 – 消費者監聽
@Component
public class QueueCustomer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage message) throws JMSException {
System.out.println("消費者收到消息 => " + message.getText());
}
}
6.4 主題基本案例
application.yml配置文件
server:
port: 6666
spring:
activemq:
broker-url: tcp://mpolaris.top:61616
user: admin
password: admin
jms:
# 目的地是queue還是topic, false(默認)=queue true=topic
pub-sub-domain: true
mytopic: boot-activemq-topic
ActiveMQ配置文件
@Configuration
@EnableJms //開啟Jms適配的註解
public class ConfigBean {
@Value("${mytopic}")
private String myTopic;
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
}
主題生產者
@Component
public class TopicProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(topic,"===> SpringBoot + ActiveMQ消息");
}
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(topic,"定時投送 => "
+ UUID.randomUUID().toString().substring(0,6));
System.out.println("定時投送");
}
}
主題消費者
@Component
public class QueueCustomer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage message) throws JMSException {
System.out.println("消費者收到消息 => " + message.getText());
}
}
7. ActiveMQ傳輸協議
7.1 簡介
ActiveMQ支援的client-broker通訊協議有:TCP、NIO、UDP、SSL、Http(s)、VM等。其中配置Transport Connector的文件在ActiveMQ安裝目錄的conf/activemq.xml
中的
activemq傳輸協議的官方文檔://activemq.apache.org/configuring-version-5-transports.html
除了tcp和nio協議其他的了解就行。各種協議有各自擅長該協議的中間件,工作中一般不會使用activemq去實現這些協議。如: mqtt是物聯網專用協議,採用的中間件一般是mosquito。ws是websocket的協議,是和前端對接常用的,一般在java程式碼中內嵌一個基地台(中間件)。stomp好像是郵箱使用的協議的,各大郵箱公司都有基地台(中間件)。
注意:協議不同,我們的程式碼都會不同。
7.2 各協議理解
TCP協議
Transmission Control Protocol(TCP)是默認的,TCP的Client監聽埠61616
在網路傳輸數據前必須要先序列化數據,消息是通過一個叫wire protocol
的來序列化成位元組流。默認情況下ActiveMQ把wrie protocol
叫做 OpenWire,它的目的就是促使網路上的效率更高和數據快速交換。
TCP連接的URI形式如:tcp://HostName:port?key=value&key=value
,後面的參數是可選的。
TCP傳輸的的優點:
-
TCP協議傳輸可靠性高,穩定性強
-
高效率:位元組流方式傳遞,效率很高
-
有效性、可用性:應用廣泛,支援任何平台
關於Transport協議的可選配置參數可以參考官網//activemq.apache.org/tcp-transport-reference
NIO協議
New I/O API Protocol(NIO)。NIO協議和TCP協議類似,但NIO更側重於底層的訪問操作。它允許開發人員對同一資源可有更多的client調用和伺服器端有更多的負載。
適合使用NIO協議的場景:
-
可能有大量的Client去連接到Broker上,一般情況下大量的Client去連接Broker是被作業系統的執行緒所限制的。因此NIO的實現比TCP需要更少的執行緒去運行,所以建議使用NIO協議。
-
可能對於Broker有一個很遲鈍的網路傳輸,NIO比TCP提供更好的性能。
NIO連接的URI形式:nio://hostname:port?key=value&key=value
關於Transport協議的可選配置參數可以參考官網//activemq.apache.org/configuring-version-5-transports.html
AMQP協議
Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端,中間件,不同產品,不同開發語言等條件限制。
STOMP協議
STOP,Streaming Text Orientation Message Protocol,是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息中間件)設計的簡單文本協議。
MQTT協議
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支援所有平台,幾乎可以把所有聯網物品和外部連接起來,被用來當作感測器和致動器(比如通過Twitter讓房屋聯網)的通訊協議。
GitLub查看MQTT示例程式碼://github.com/fusesource/mqtt-client
7.3 NIO協議案例
ActiveMQ這些協議傳輸的底層默認都是使用BIO網路的IO模型。只有當我們指定使用nio才使用NIO的IO模型。
NIO網路IO模型簡單配置
修改配置文件activemq.xml
如果你 不特別指定ActiveMQ的網路監聽埠,那麼這些埠都將使用BIO網路IO模型,所以為了首先提高單節點的網路吞吐性能,我們需要明確指定ActiveMQ網路IO模型。如下所示:URI格式頭以「nio」開頭,表示這個埠使用以TCP協議為基礎的NIO網路IO模型。
<transportConnectors>
<!-- 新增NIO協議 -->
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" /></transportConnectors>
SpringBoot修改埠即可
server:
port: 6666
spring:
activemq:
broker-url: nio://mpolaris.top:61618
user: admin
password: admin
jms:
pub-sub-domain: true
mytopic: boot-activemq-topic
NIO增強
修改activemq.xml配置文件(其實只要auto+nio一條都行了)
auto: 針對所有的協議,他會識別我們是什麼協議。
nio:使用NIO網路IO模型
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1893?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61624?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
</transportConnectors>
修改埠號為61608即可
server:
port: 6666
spring:
activemq:
# broker-url: tcp://mpolaris.top:61608 適配多種協議(注意有些協議程式碼不一樣)
broker-url: nio://mpolaris.top:61608
user: admin
password: admin
jms:
pub-sub-domain: true
mytopic: boot-activemq-topic
8. ActiveMQ的消息存儲和持久化
8.1 理解
此處持久化和之前持久性的區別
MQ高可用:事務、持久性、簽收,是屬於MQ自身特性,自帶的。這裡的持久化是外力,是外部插件。之前講的持久性是MQ的外在表現,現在講的的持久是是底層實現。
8.2 持久化是什麼
官網文檔://activemq.apache.org/persistence
持久化是什麼?一句話就是:ActiveMQ宕機了消息不會丟失的機制。
說明:為了避免意外宕機以後丟失資訊,需要做到重啟後可以恢復消息隊列,消息系統一般都會採用持久化機制。ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、記憶體資料庫或者遠程資料庫等。再試圖將消息發給接收者,成功則將消息從存儲中刪除,失敗則繼續嘗試發送。消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有則會先把存儲位置中的消息發出去。
8.3 MQ持久化機制有哪些
AMQ Message Store
基於文件的存儲機制,是以前的默認機制,現在不再使用。AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32M,當一個文件中的消息已經全部被消費,那麼這個文件將被標識為可刪除,在下一個清除階段這個文件會被刪除。AMQ適用於ActiveMQ5.3之前的版本。
KahaDB
基於日誌文件,從ActiveMQ5.4(含)開始默認的持久化,下面我們詳細介紹。
LevelDB消息存儲
新興的技術,現在有些不確定。 官方文檔://activemq.apache.org/leveldb-store。這種文件系統是從ActiveMQ5.8之後引進的,它和KahaDB非常相似,也是基於文件的本地資料庫存儲形式,但是它提供比KahaDB更快的持久性。但它不使用自定義B-Tree實現來索引獨寫日誌,而是使用基於LevelDB的索引,默認配置如下:
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
JDBC消息存儲
下面我們再詳細介紹
JDBC Message Store with ActiveMQ Journal
下面我們再詳細介紹
8.4 KahaDB消息存儲
理解
KahaDB是目前默認的存儲方式,可用於任何場景,提高了性能和恢復能力。消息存儲使用一個 事務日誌 和僅僅用一個 索引文件 來存儲它所有的地址。KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模型進行了優化。數據被追加到data logs中。當不再需要log文件中的數據的時候,log文件會被丟棄。
官網文檔://activemq.aache.org/kahadb,官網上還有一些其他配置參數。
activemq.xml配置文件
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB存儲原理
KahaDB在消息保存的目錄中有4類文件和一個lock,跟ActiveMQ的其他幾種文件存儲引擎相比,這就非常簡潔了。
db-number.log
KahaDB存儲消息到預定大小的數據紀錄文件中,文件名為db-number.log。當數據文件已滿時,一個新的文件會隨之創建,number數值也會隨之遞增,它隨著消息數量的增多,如每32M一個文件,文件名按照數字進行編號,如db-1.log,db-2.log······。當不再有引用到數據文件中的任何消息時,文件會被刪除或者歸檔。
db.data
該文件包含了持久化的BTree索引,索引了消息數據記錄中的消息,它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-number。log裡面存儲消息。db.free
記錄當前db.data文件裡面哪些頁面是空閑的,文件具體內容是所有空閑頁的IDdb.redo
用來進行消息恢復,如果KahaDB消息存儲再強制退出後啟動,用於恢復BTree索引。lock
文件鎖,表示當前kahadb獨寫許可權的broker。
8.4 JDBC消息存儲
原理圖
配置
添加mysql資料庫的驅動包到ActiveMQ的lib文件夾下
在activemq.xml配置文件指定JDBC消息存儲
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<!-- dataSource指定將要引用的持久化資料庫的bean名稱
createTablesOnStartup指定是否在啟動的時候創建數據表,默認為true
注意:一般是第一次啟動時設置為true,之後改為false -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
在activemq.xml配置文件的標籤和
注意:
① 我們需要準備一個mysql資料庫,並創建一個名為activemq的資料庫
② 默認是的dbcp資料庫連接池,如果要換成其他資料庫連接池,需要將該連接池jar包,也放到lib目錄下。
...
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://mpolaris.top:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="maxTotal" value="200" />
<property name="poolPreparedStatements" value="true"/>
</bean>
<import resource="jetty.xml"/>
...
重啟activemq會自動生成如下3張表。如果沒有自動生成需要我們手動執行SQL。我個人建議要自動生成,我在操作過程中查看日誌文件發現了不少問題,最終解決了這些問題後是能夠自動生成的。如果不能自動生成說明你的操作有問題。表欄位說明如下
- ACTIVEMQ_MSGS 消息數據表
- ACTIVEMQ_ACKS數據表
- ACTIVEMQ_LOCK數據表:表ACTIVEMQ_LOCK在集群環境下才有用,只有一個Broker可以獲取消息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用於記錄哪個Broker是當前的Master Broker 。
Queue驗證和數據表變化
在點對點類型中,當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在記憶體中。當DeliveryMode設置為PERSISTENCE時,消息保存在broker的相應的文件或者資料庫中。而且點對點類型中消息一旦被Consumer消費,就從數據中刪除,消費前的消息會被存放到資料庫 上面的消息被消費後被MQ自動刪除。
- Queue非持久化模式:不會將消息持久化到資料庫
- Queue持久化模式:會將消息持久化到資料庫,但是消息被消費者消費後會自動刪除持久化數據。
我們使用queue持久化模式發布3條消息後,發現ACTIVEMQ_MSGS數據表多了3條數據。
啟動消費者消費了所有的消息後,發現數據表的數據消失了。
Topic驗證和說明
設置了持久訂閱資料庫裡面會保存訂閱者的資訊
ACTIVEMQ_ACKS表中的LAST_ACKED_ID記錄了CLIENT_ID最後簽收的一條消息,而LAST_ACKED_ID和ACTIVEMQ_MSGS的ID欄位是外鍵關聯關係,這樣就可以實現Topic的消息保存到ACTIVEMQ_MSGS表內的同時還能根據ACTIVEMQ_ACKS表中的持久訂閱者查到該訂閱者上次收到的最後一條消息是什麼。值得注意的是Topic內的消息是不會被刪除的,而Queue的消息在被刪除後會在資料庫中被刪除,如果需要保存Queue,應該使用其他方案解決。
我們啟動主題持久化,生產者發布3個數據,ACTIVEMQ_MSGS數據表新增3條數據,消費者消費所有的數據後,ACTIVEMQ_MSGS數據表的數據並沒有消失。持久化topic的消息不管是否被消費,是否有消費者,產生的數據永遠都存在,且只存儲一條。這個是要注意的,持久化的topic大量數據後可能導致性能下降。這裡就像公總號一樣,消費者消費完後,消息還會保留。
總結
如果是Queue,在沒有消費者消費的情況下會將消息保存到activemq_msgs表中,只要有任意一個消費者消費了,就會刪除消費過的消息。
如果是Topic,一般是先啟動消費訂閱者然後再生產的情況下會將持久訂閱者永久保存到qctivemq_acks,而消息則永久保存在activemq_msgs,在acks表中的訂閱者有一個last_ack_id對應了activemq_msgs中的id欄位,這樣就知道訂閱者最後收到的消息是哪一條。
常見坑
在配置關係型資料庫作為ActiveMQ的持久化存儲方案時,有許多坑。
-
資料庫jar包:注意對應版本的資料庫jar或者你自己使用的非自帶的資料庫連接池jar包
-
createTablesOnStartup屬性:該屬性默認為true,每次啟動activemq都會自動創建表,在第一次啟動後應改為false避免不必要的損失。
-
下劃線:報錯”java.lang.IllegalStateException: LifecycleProcessor not initialized”。確認電腦主機名名稱沒有下劃線
8.5 JDBC Message Store with ActiveMQ Journal
理解
這種方式克服了JDBC Store的不足,JDBC每次消息過來都需要去寫庫讀庫。ActiveMQ Journal,使用高速快取寫入技術大大提高了性能。當消費者的速度能夠及時跟上生產者消息的生產速度時,journal文件能夠大大減少需要寫入到DB中的消息。
舉個例子:生產者生產了1000條消息,這1000條消息會保存到journal文件,如果消費者的消費速度很快的情況下,在journal文件還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候只需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal文件可以使消息以批量方式寫到DB。
為了高性能,這種方式使用日誌文件存儲+資料庫存儲。先將消息持久到日誌文件,等待一段時間再將未消費的消息持久到資料庫。該方式要比JDBC性能要高
配置(基於JDBC配置稍作修改)
activemq.xml修改
# 修改配置前
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
# 修改配置後(注釋掉之前的jdbc配置使用下面的)
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="../activemq-data" />
</persistenceFactory>
8.6 總結
-
Jdbc效率低,KahaDB效率高,Jdbc+Journal效率較高。
-
持久化消息主要指的是:MQ所在伺服器宕機了消息不會丟試的機制。
-
持久化機制演變的過程:從最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事務支援)附件,並且同步推出了關於關係型資料庫的存儲方案。ActiveMQ5.3版本又推出了對KahaDB的支援(5.4版本後被作為默認的持久化方案),後來ActiveMQ 5.8版本開始支援LevelDB,到現在5.9提供了標準的Zookeeper+LevelDB集群化方案。
-
ActiveMQ消息持久化機制有:
持久化機制 | 特點 |
---|---|
AMQ | 基於日誌文件 |
KahaDB | 基於日誌文件,從ActiveMQ5.4開始默認使用 |
JDBC | 基於第三方資料庫 |
Replicated LevelDB Store | 從5.9開始提供了LevelDB和Zookeeper的數據複製方法,用於Master-slave方式的首選數據複製方案。 |
9. ActiveMQ多節點集群
9.1 理解
基於zookeeper和LevelDB搭建ActiveMQ集群。集群僅提供主備方式的高可用集群功能,避免單點故障。
9.2 三種集群方式
-
基於shareFileSystem共享文件系統(KahaDB)
-
基於JDBC
-
基於可複製的LevelDB
9.3 ZK + Replicated LevelDB Store 案例
Replicated LevelDB Store
使用Zookeeper集群註冊所有的ActiveMQ Broker但只有其中一個Broker可以提供服務,它將被視為Master,其他的Broker處於待機狀態被視為Slave。如果Master因故障而不能提供服務,Zookeeper會從Slave中選舉出一個Broker充當Master。Slave連接Master並同步他們的存儲狀態,Slave不接受客戶端連接。所有的存儲操作都將被複制到連接至Maste的Slaves。如果Master宕機得到了最新更新的Slave會變成Master。故障節點在恢復後會重新加入到集群中並連接Master進入Slave模式。所有需要同步的消息操作都將等待存儲狀態被複制到其他法定節點的操作完成才能完成。所以,如給你配置了replicas=3,name法定大小是(3/2)+1 = 2。Master將會存儲更新然後等待(2-1)=1個Slave存儲和更新完成,才彙報success,至於為什麼是2-1,陽哥的zookeeper講解過自行複習。有一個ode要作為觀察者存在。當一個新的Master被選中,你需要至少保障一個法定mode在線以能夠找到擁有最新狀態的ode,這個ode才可以成為新的Master。因此,推薦運行至少3個replica nodes以防止一個node失敗後服務中斷。
部署規劃和步驟
- 環境和版本
- 關閉防火牆並保證各個伺服器能夠ping通
- 具備zk集群並可以成功啟動
- 集群部署規劃列表
- 創建3台集群目錄(就是一台電腦複製三份ActiveMQ)
- 修改管理控制台埠(就是ActiveMQ後台管理頁面的訪問埠)
- hostname名字映射(如果不映射只需要吧mq配置文件的hostname改成當前主機ip)
- ActiveMQ集群配置
- 配置文件裡面的BrokerName要全部一致
- 持久化配置(必須)
- 修改各個節點的消息埠(真實的三台機器不用管)
- 按順序啟動3個ActiveMQ節點,到這步前提是zk集群已經成功啟動運行(先啟動Zk 在啟動ActiveMQ)
- zk集群節點狀態說明
- 3台Zk連接任意一台驗證三台ActiveMQ是否註冊上了Zookeeper
- 查看Master
集群可用性測試
10 ActiveMQ高級特性
10.1 引入消息中間件後如何保證其高可用
zookeeper+Replicated LevelDB
10.2 非同步投遞Async Sends
非同步投遞
對於一個Slow Consumer,使用同步發送消息可能出現Producer堵塞的情況,慢消費者適合使用非同步發送。
是什麼
ActiveMQ支援同步,非同步兩種發送的模式將消息發送到broker,模式的選擇對發送延時有巨大的影響。producer能達到怎麼樣的產出率(產出率=發送數據總量/時間)主要受發送延時的影響,使用非同步發送可以顯著提高發送的性能。
ActiveMQ默認使用非同步發送的模式:除非明確指定使用同步發送的方式或者在未使用事務的前提下發送持久化的消息,這兩種情況都是同步發送的。
如果你 沒有使用事務且發送的是持久化的消息,每一次發送都是同步發送的且會阻塞producer知道broker返回一個確認,表示消息已經被安全的持久化到磁碟。確認機制提供了消息安全的保障,但同時會阻塞客戶端帶來了很大的延時。很多高性能的應用,允許在失敗的情況下有少量的數據丟失。如果你的應用滿足這個特點,你可以使用非同步發送來提高生產率,即使發送的是持久化的消息。
非同步發送:它可以最大化producer端的發送效率。我們通常在發送消息量比較密集的情況下使用非同步發送,它可以很大的提升Producer性能,不過這也帶來了額外的問題:就是需要消耗更多的Client端記憶體同時也會導致broker端性能消耗增加;此外它不能有效的確保消息的發送成功。在userAsyncSend=true的情況下客戶端需要容忍消息丟失的可能。
自我理解:此處的非同步是指生產者和broker之間發送消息的非同步。不是指生產者和消費者之間非同步。
說明:對於一個Slow Consumer,使用同步發送消息可能出成Producer堵塞等情況,慢消費者適合使用非同步發送。(這句話我認為有誤)
總結:① 非同步發送可以讓生產者發的更快。② 如果非同步投遞不需要保證消息是否發送成功,發送者的效率會有所提高。如果非同步投遞還需要保證消息是否成功發送,並採用了回調的方式,發送者的效率提高不多,這種就有些雞肋。
參考官網程式碼實現
非同步消息如何確定發送成功?
非同步發送丟失消息的場景是:生產者設置userAsyncSend=true
,使用producer.send(msg)
持續發送消息。如果消息不阻塞,生產者會認為所有send
的消息均被成功發送至MQ
。
如果MQ突然宕機,此時生產者端記憶體中尚未被發送至MQ的消息都會丟失。
所以正確的非同步發送方法是需要接收回調的。
同步發送和非同步發送的區別就在此,同步發送等send
不阻塞了就表示一定發送成功了,非同步發送需要客戶端回執並由客戶端再判斷一次是否發送成功。
10.3 延遲投遞和定時投遞
官網說明://activemq.apache.org/delay-and-schedule-message-delivery.html
四大屬性
案例
要在activemq.xml中配置schedulerSupport屬性為true
Java程式碼裡面封裝的輔助消息類型:ScheduledMessage
10.4 分發策略
10.5 ActiveMQ消息重試機制
是什麼
消費者收到消息,之後出現異常了,沒有告訴broker確認收到該消息,broker會嘗試再將該消息發送給消費者。嘗試n次,如果消費者還是沒有確認收到該消息,那麼該消息將被放到死信隊列重,之後broker不會再將該消息發送給消費者。
具體哪些情況會引發消息重發
-
Client用了transactions且再session中調用了rollback
-
Client用了transactions且再調用commit之前關閉或者沒有commit
-
Client再CLIENT_ACKNOWLEDGE的傳遞模式下,session中調用了recover
請說說消息重發時間間隔和重發次數
-
間隔:1
-
次數:6
-
每秒發6次
有毒消息Poison ACK
一個消息被redelivedred超過默認的最大重發次數(默認6次)時,消費的回個MQ發一個「poison ack」表示這個消息有毒,告訴broker不要再發了。這個時候broker會把這個消息放到DLQ(私信隊列)。
屬性說明
10.6 死信隊列
是什麼
異常消息規避處理的集合,主要處理失敗的消息。
使用:處理失敗的消息
- 一般生產環境中在使用MQ時設計兩個隊列:一個核心業務隊列,一個死信隊列
- 核心業務隊列:比如下圖專門用來讓訂單系統發送訂單消息的,然後另一個死信隊列就是用來處理異常情況的。
- 假如第三方物流系統故障了,此時無法請求,那麼倉儲系統每次消費到一條訂單消息,嘗試通知發貨和配送都會遇到對方的介面報錯。此時倉儲系統就可以把這條消息拒絕訪問或者標誌位處理失敗。一旦標誌這條消息處理失敗了之後,MQ就會把這條消息轉入提前設置好的一個死信隊列中。
- 然後你會看到的就是,在第三方物流系統故障期間,所有訂單消息全部處理失敗,全部會轉入死信隊列。然後你的倉儲系統得專門有一個後台執行緒,監控第三方物流系統是否正常,是否請求,不停的監視。一旦發現對方恢復正常,這個後台執行緒就從死信隊列消費出來處理失敗的訂單,重新執行發貨和配送的通知邏輯。
死信隊列的配置(一般採用默認)
- sharedDeadLetterStrategy
- 不管是queue還是topic,失敗的消息都放到這個隊列中。下面修改activemq.xml的配置,可以達到修改隊列的名字。
- 將所有的DeadLetter保存在一個共享的隊列中,這是ActiveMQ broker端默認的策略。共享隊列默認為「ActiveMQ.QLQ」,可以通過”deaLetterQueue”屬性來設定
<deadLetterStrategy>
<sharedDeadLetterStrategy deaLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
-
individualDeadLetterStrategy
可以為queue和topic單獨指定兩個死信隊列。還可以為某個話題,單獨指定一個死信隊列。
屬性”useQueueForTopicMessages”,此值表示是否將Topic的DeaLetter保存在Queue中,默認為true
- 自動刪除過期消息
過期消息是值生產者指定的過期時間,超過這個時間的消息
- 存放非持久消息到死信隊列中
10.7 消息不被重複消費,冪等性問題
之後回來完善
activemq的API文檔://activemq.apache.org/maven/apidocs/index.html