Java進階專題(十九) 消息中間件架構體系(1)– ActiveMQ研究
前言
MQ全稱為Message Queue,即消息隊列,它是一種應用程式之間的通訊方法,消息隊列在分散式系統開
發中應用非常廣泛。開發中消息隊列通常有如下應用場景:1、任務非同步處理。將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行非同步處理。提高了應用程式的響應時間。2、應用程式解耦合MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程式進行解耦合。市場上還有哪些消息隊列?ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。我們主要介紹主流的消息中間件,了解每個MQ的優缺點,能知曉什麼樣的場景下選用合適的MQ。
ActiveMQ
介紹
ActiveMQ 是完全基於 JMS 規範實現的一個消息中間件產品。 是 Apache 開源基金會研發的消息中間件。ActiveMQ主要應用在分散式系統架構中,幫助構建高可用、 高性能、可伸縮的企業級面向消息服務的系統。
什麼是JMS
Java 消息服務(Java Message Service)是 java 平台中關於面向消息中間件的 API,用於在兩個應用程式之間,或者分散式系統中發送消息,進行非同步通訊。JMS 是一個與具體平台無關的 API ,絕大多數 MOM(Message Oriented Middleware)(面向消息中間件)提供商都對 JMS 提供了支援。例如ActiveMQ就是其中一個實現。
什麼是MOM
MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操作。MOM 需要提供 API 和管理工具。客戶端使用 api 調用,把消息發送到由提供者管理的目的地。在發送消息之後,客戶端會繼續執行其他工作,並且在接收方收到這個消息確認之前,提供者一直保留該消息。
JMS規範
我們已經知道了 JMS 規範的目的是為了使得 Java 應用程式能夠訪問現有 MOM (消息中間件)系統,形成一套統一的標準規範,解決不同消息中間件之間的協作問題。在創建 JMS 規範時,設計者希望能夠結合現有的消息傳送的精髓,比如說
- 不同的消息傳送模式或域,例如點對點消息傳送和發布訂閱消息傳送
- 提供於接收同步和非同步消息的工具
- 對可靠消息傳送的支援
- 常見消息格式,例如流、文本和位元組
JMS對象模型
1)連接工廠。連接工廠(ConnectionFactory)是由管理員創建,並綁定到JNDI樹中。客戶端使用JNDI查找連接工廠,然後利用連接工廠創建一個JMS連接。
2)JMS連接。JMS連接(Connection)表示JMS客戶端和伺服器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。
3)JMS會話。JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀態。JMS會話建立在JMS連接上,表示客戶與伺服器之間的一個會話執行緒。
4)JMS目的。JMS目的(Destination),又稱為消息隊列,是實際的消息源。
5)JMS生產者和消費者。生產者(Message Producer)和消費者(Message Consumer)對象由Session對象創建,用於發送和接收消息。
6)JMS消息通常有兩種類型:
① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息往往與隊列(javax.jms.Queue)相關聯。
② 發布/訂閱(Publish/Subscribe)。發布/訂閱消息系統支援一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發布事件,而使用者訂閱感興趣的事件,並使用事件。該類型消息一般與特定的主題(javax.jms.Topic)關聯。
安裝ActiveMQ
windows安裝
下載地址://activemq.apache.org/activemq-5150-release.html
下載完成後解壓進入bin目錄 運行 activemq.bat。
如果你遇到如下問題,5672埠被佔用
可以去修改activemq的conf目錄下的activemq.xml,把amqp的埠改為其他的,這裡改成了5673
再次啟動:
訪問地址://127.0.0.1:8161/admin/進入後台頁面 初始帳號密碼 admin admin
Docker安裝ActiveMQ
docker run -d –name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
ActiveMQ快速入門
Springboot集成ActiveMQ
導入依賴
<dependencies>
<!--Springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<!--ActiveMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.5.0.RELEASE</version>
</dependency>
<!--消息隊列連接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
close-timeout: 15s # 在考慮結束之前等待的時間
in-memory: true # 默認代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
non-blocking-redelivery: false # 是否在回滾回滾消息之前停止消息傳遞。這意味著當啟用此命令時,消息順序不會被保留。
send-timeout: 0 # 等待消息發送響應的時間。設置為0等待永遠。
queue-name: active.queue
topic-name: active.topic.name.model
# packages:
# trust-all: true #不配置此項,會報錯
pool:
enabled: true
max-connections: 10 #連接池最大連接數
idle-timeout: 30000 #空閑的連接過期時間,默認為30秒
# jms:
# pub-sub-domain: true #默認情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗號分隔列表(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連接請求和池滿時是否阻塞。設置false會拋「JMSException異常」。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在拋出異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時創建連接。可以在啟動時用於加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 連接過期超時。
#spring.activemq.pool.expiry-timeout=0ms
# 連接空閑超時
#spring.activemq.pool.idle-timeout=30s
# 連接池最大連接數
#spring.activemq.pool.max-connections=1
# 每個連接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連接
#spring.activemq.pool.reconnect-on-exception=true
# 在空閑連接清除執行緒之間運行的時間。當為負數時,沒有空閑連接驅逐執行緒運行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
編寫配置類
/**
* @author 原
* @date 2020/12/16
* @since 1.0
**/
@Configuration
public class BeanConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.topic-name}")
private String password;
@Value("${spring.activemq.queue-name}")
private String queueName;
@Value("${spring.activemq.topic-name}")
private String topicName;
@Bean(name = "queue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean(name = "topic")
public Topic topic() {
return new ActiveMQTopic(topicName);
}
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(username, password, brokerUrl);
}
@Bean
public JmsMessagingTemplate jmsMessageTemplate(){
return new JmsMessagingTemplate(connectionFactory());
}
/**
* 在Queue模式中,對消息的監聽需要對containerFactory進行配置
* @param connectionFactory
* @return
*/
@Bean("queueListener")
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
/**
* 在Topic模式中,對消息的監聽需要對containerFactory進行配置
* @param connectionFactory
* @return
*/
@Bean("topicListener")
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}
編寫啟動類
/**
* @author 原
* @date 2020/12/8
* @since 1.0
**/
@SpringBootApplication
@EnableJms //開啟JMS支援
public class DemoApplication {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
/**
* 應用啟動後,會執行該方法
* 會分別向queue和topic發送一條消息
*/
@PostConstruct
public void sendMsg(){
jmsMessagingTemplate.convertAndSend(queue,"queue-test");
jmsMessagingTemplate.convertAndSend(topic,"topic-test");
}
}
查看activemq後台
active.queue 為隊列的名稱
Number Of Pending Messages 等待消費的消息數量 3是因為我自己發了3次
Messages Enqueued 已經進入隊列的消息數量
因為沒有消費者,消息一直沒有被消費。下面我們編寫消費者程式碼。
/**
* @author 原
* @date 2020/12/16
* @since 1.0
**/
@Component
public class QueueConsumerListener {
@JmsListener(destination = "${spring.activemq.queue-name}",containerFactory = "queueListener")
public void getQueue(String message){
System.out.println("接受queue:"+message);
}
@JmsListener(destination = "${spring.activemq.topic-name}",containerFactory = "topicListener")
public void getTopic(String message){
System.out.println("接受topic:"+message);
}
}
在後台發送一條消息
控制台列印
發送topic消息
控制台列印:
但是發現一個問題是,之前在沒有消費的時候,有3條queue和一條topic,但是當我啟動消費者時,queue的3條消息被消費了,topic確沒有。這是因為:
topic模式有普通訂閱和持久化訂閱
普通訂閱:在消費者啟動之前發送過來的消息,消費者啟動之後不會去消費;
持久化訂閱: 在消費者啟動之前發送過來的消息,消費者啟動之後會去消費;
ActiveMQ原理分析
消息同步發送與非同步發送
ActiveMQ支援同步、非同步兩種發送模式將消息發送到broker上。
同步發送過程中,發送者發送一條消息會阻塞直到broker回饋一個確認消息,表示消息已經被broker處理。這個機
制提供了消息的安全性保障,但是由於是阻塞的操作,會影響到客戶端消息發送的性能
非同步發送的過程中,發送者不需要等待broker提供回饋,所以性能相對較高。但是可能會出現消息丟失的情況。所
以使用非同步發送的前提是在某些情況下允許出現數據丟失的情況。
默認情況下,非持久化消息是非同步發送的,持久化消息並且是在非事務模式下是同步發送的。
但是在開啟事務的情況下,消息都是非同步發送。由於非同步發送的效率會比同步發送性能更高。所以在發送持久化消
息的時候,盡量去開啟事務會話。
消息發送原理
ProducerWindowSize的含義
producer每發送一個消息,統計一下發送的位元組數,當位元組數達到ProducerWindowSize值時,需要等待broker的確認,才能繼續發送。
程式碼在:ActiveMQSession的1957行
主要用來約束在非同步發送時producer端允許積壓的(尚未ACK)的消息的大小,且只對非同步發送有意義。每次發送消息之後,都將會導致memoryUsage大小增加(+message.size),當broker返回producerAck時,memoryUsage尺寸減少(producerAck.size,此size表示先前發送消息的大小)。
可以通過如下2種方式設置:
Ø 在brokerUrl中設置: “tcp://localhost:61616?jms.producerWindowSize=1048576”,這種設置將會對所有的
producer生效。
Ø 在destinationUri中設置: “test-queue?producer.windowSize=1048576”,此參數只會對使用此Destination實例
的producer失效,將會覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味著消耗Client端的記憶體就越大。
源碼分析
ActiveMQMessageProducer.send(…)方法
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
checkClosed();//檢查session連接,若已關閉直接拋出異常
if (destination == null) {//校驗發送消息的目的地是否為空,也就是必須制定queue或者topic資訊
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
//這裡做的是封裝Destination
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
//封裝Message
if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
//如果設置了producerWindow,則需要校驗producerWindow大小
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//發送消息
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
//做統計的
stats.onMessage();
}
ActiveMQSession的send方法
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
//校驗連接
checkClosed();
//校驗發送目標
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//互斥鎖,如果一個session的多個producer發送消息到這裡,會保證消息發送的有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode);//設置是否持久化
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);//消息過期時間
message.setJMSPriority(priority);//消息優先順序
message.setJMSRedelivered(false);//是否重複發送
// transform to our own message format here 統一封裝
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
msg.setDestination(destination);
//設置消息ID
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {//如果消息是經過轉化的,則更新原來的消息id和目的地
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();//把消息屬性和消息體都設置為只讀,防止被修改
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//如果onComplete沒有設置,且發送超時時間小於0,且消息不需要回饋,且連接器不是同步發送模式,且消息非持久化或者連接器是非同步發送模式
//或者存在事務id的情況下,走非同步發送,否則走同步發送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();//非同步發送的情況下,需要設置producerWindow的大小
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);//帶超時時間的同步發送//帶回調的同步發送
}else {
this.connection.syncSendPacket(msg, onComplete);//帶回調的同步發送
}
}
}
}
看下非同步發送的程式碼ActiveMQConnection. asyncSendPacket()
/**
* send a Packet through the Connection - for internal use only
*
* @param command
* @throws JMSException
*/
public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
doAsyncSendPacket(command);
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
再看看transport是個什麼東西?在哪裡實例化的?按照以前看源碼的慣例來看,它肯定不是一個單純的對象。按照以往我看源碼的經驗來看,一定是在創建連接的過程中初始化的。所以我們定位到程式碼
//從connection=connectionFactory.createConnection();這行程式碼作為入口,一直跟蹤ActiveMQConnectionFactory. createActiveMQConnection這個方法中。程式碼如下
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws
JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
Transport transport = createTransport();//程式碼往下看
connection = createActiveMQConnection(transport, factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
//省略後面的程式碼
}
//這個方法就是實例化Transport的 1.構建Broker的URL 2.根據這個URL去創建一個鏈接TransportFactory.connect 默認使用的TCP連接
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);//裡面的程式碼繼續往下看
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
TransportFactory. findTransportFactory
- 從TRANSPORT_FACTORYS這個Map集合中,根據scheme去獲得一個TransportFactory指定的實例對象
- 如果Map集合中不存在,則通過TRANSPORT_FACTORY_FINDER去找一個並且構建實例
Ø 這個地方又有點類似於我們之前所學過的SPI的思想吧?他會從METAINF/services/org/apache/activemq/transport/ 這個路徑下,根據URI組裝的scheme去找到匹配class對象並且
實例化,所以根據tcp為key去對應的路徑下可以找到T cpT ransportFactory
//TransportFactory.connect(connectBrokerUL)
public static Transport connect(URI location) throws Exception {
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
//findTransportFactory(location)
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}
調用TransportFactory.doConnect去構建一個連接
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
Transport transport = createTransport(location, wf);
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
configure
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//組裝一個複合的transport,這裡會包裝兩層,一個是IactivityMonitor.另一個是WireFormatNegotiator
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);//再做一層包裝,MutexTransport
transport = new ResponseCorrelator(transport);//包裝ResponseCorrelator
return transport;
}
到目前為止,這個transport實際上就是一個調用鏈了,他的鏈結構為
ResponseCorrelator(MutexT ransport(WireFormatNegotiator(IactivityMonitor(T cpT ransport()))
每一層包裝表示什麼意思呢?
ResponseCorrelator 用於實現非同步請求。
MutexT ransport 實現寫鎖,表示同一時間只允許發送一個請求
WireFormatNegotiator 實現了客戶端連接broker的時候先發送數據解析相關的協議資訊,比如解析版本號,是否
使用快取等
InactivityMonitor 用於實現連接成功成功後的心跳檢查機制,客戶端每10s發送一次心跳資訊。服務端每30s讀取
一次心跳資訊。
同步發送和非同步發送的區別
public Object request(Object command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout); // 從future方法阻塞等待返回
}
持久化消息和非持久化消息的存儲原理
正常情況下,非持久化消息是存儲在記憶體中的,持久化消息是存儲在文件中的。能夠存儲的最大消息數據在
${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage節點
SystemUsage配置設置了一些系統記憶體和硬碟容量
<systemUsage>
<systemUsage>
<memoryUsage>
//該子標記設置整個ActiveMQ節點的「可用記憶體限制」。這個值不能超過ActiveMQ本身設置的最大記憶體大小。其中的
percentOfJvmHeap屬性表示百分比。佔用70%的堆記憶體
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
//該標記設置整個ActiveMQ節點,用於存儲「持久化消息」的「可用磁碟空間」。該子標記的limit屬性必須要進行設置
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
//一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然
我們說過非持久化消息不進行持久化存儲,但是ActiveMQ為了防止「數據洪峰」出現時非持久化消息大量堆積致使記憶體耗
盡的情況出現,還是會將非持久化消息寫入到磁碟的臨時區域——temp store。這個子標記就是為了設置這個temp
store區域的「可用磁碟空間限制」
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
從上面的配置我們需要get到一個結論,當非持久化消息堆積到一定程度的時候,也就是記憶體超過指定的設置閥值時,ActiveMQ會將記憶體中的非持久化消息寫入到臨時文件,以便騰出記憶體。但是它和持久化消息的區別是,重啟之後,持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除
消息的持久化策略分析
消息持久性對於可靠消息傳遞來說是一種比較好的方法,即時發送者和接受者不是同時在線或者消息中心在發送者發送消息後宕機了,在消息中心重啟後仍然可以將消息發送出去。消息持久性的原理很簡單,就是在發送消息出去後,消息中心首先將消息存儲在本地文件、記憶體或者遠程資料庫,然後把消息發送給接受者,發送成功後再把消息從存儲中刪除,失敗則繼續嘗試。接下來我們來了解一下消息在broker上的持久化存儲實現方式
持久化存儲支援類型
ActiveMQ支援多種不同的持久化方式,主要有以下幾種,不過,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
Ø KahaDB存儲(默認存儲方式)
Ø JDBC存儲
Ø Memory存儲
Ø LevelDB存儲
Ø JDBC With ActiveMQ Journal
KahaDB存儲
KahaDB是目前默認的存儲方式,可用於任何場景,提高了性能和恢復能力。消息存儲使用一個事務日誌和僅僅用一個索引文件來存儲它所有的地址。
KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到data logs中。當不再需要log文件中的數據的時候,log文件會被丟棄。
配置方式
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB的存儲原理
在data/kahadb這個目錄下,會生成四個文件
Ø db.data 它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-.log裡面存儲的消息
Ø db.redo 用來進行消息恢復
Ø db-.log 存儲消息內容。新的數據以APPEND的方式追加到日誌文件末尾。屬於順序寫入,因此消息存儲是比較
快的。默認是32M,達到閥值會自動遞增
Ø lock文件 鎖,表示當前獲得kahadb讀寫許可權的broker
JDBC存儲
使用JDBC持久化方式,資料庫會創建3個表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS 消息表,queue和topic都存在這個表中
ACTIVEMQ_ACKS 存儲持久訂閱的資訊和最後一個持久訂閱接收的消息ID
ACTIVEMQ_LOCKS 鎖表,用來確保某一時刻,只能有一個ActiveMQ broker實例來訪問資料庫
JDBC存儲配置
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
dataSource指定持久化資料庫的bean,createT ablesOnStartup是否在啟動的時候創建數據表,默認值是true,這
樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之後改成false
Mysql持久化Bean配置
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?
relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</bean>
LevelDB存儲
LevelDB持久化性能高於KahaDB,雖然目前默認的持久化方式仍然是KahaDB。並且,在ActiveMQ 5.9版本提供
了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。
不過,據ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及不再支援,推薦使用的是KahaDB
<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
Memory 消息存儲
基於記憶體的消息存儲,記憶體消息存儲主要是存儲所有的持久化的消息在記憶體中。persistent=」false」,表示不設置持
久化存儲,直接存儲到記憶體中
<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="//activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors> </broker>
</beans>
JDBC Message store with ActiveMQ Journal
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫和讀庫。
ActiveMQ Journal,使用高速快取寫入技術,大大提高了性能。
當消費者的消費速度能夠及時跟上生產者消息的生產速度時,journal文件能夠大大減少需要寫入到DB中的消息。
舉個例子,生產者生產了1000條消息,這1000條消息會保存到journal文件,如果消費者的消費速度很快的情況
下,在journal文件還沒有同步到DB之前,消費者已經消費了90%的以上的消息,那麼這個時候只需要同步剩餘的
10%的消息到DB。
如果消費者的消費速度很慢,這個時候journal文件可以使消息以批量方式寫到DB。
Ø 將原來的標籤注釋掉
Ø 添加如下標籤
<persistenceFactory>
<journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemqdata"/>
</persistenceFactory>
Ø 在服務端循環發送消息。可以看到數據是延遲同步到資料庫的
消費端消費消息的原理
我們知道有兩種方法可以接收消息,一種是使用同步阻塞的MessageConsumer#receive方法。另一種是使用消息監聽器MessageListener。這裡需要注意的是,在同一個session下,這兩者不能同時工作,也就是說不能針對不同消息採用不同的接收方式。否則會拋出異常。
至於為什麼這麼做,最大的原因還是在事務性會話中,兩種消費模式的事務不好管控
消費流程圖
ActiveMQMessageConsumer.receive消費端同步接收消息的源碼入口
public Message receive() throws JMSException {
checkClosed();
checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中,同步消費不需要設置MessageListener 否則會報錯
sendPullCommand(0); //如果PrefetchSizeSize為0並且unconsumerMessage為空,則發起pull命令
MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false); //發送ack給到broker
return createActiveMQMessage(md);//獲取消息並返回
}
sendPullCommand
發送pull命令從broker上獲取消息,前提是prefetchSize=0並且unconsumedMessages為空。
unconsumedMessage表示未消費的消息,這裡面預讀取的消息大小為prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {
clearDeliveredList();
if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
messagePull.setTimeout(timeout);
session.asyncSendPacket(messagePull); //向服務端非同步發送messagePull指令
}
}
clearDeliveredList
在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,主要用來清理已經分發的消息鏈表
deliveredMessages
deliveredMessages,存儲分發給消費者但還未應答的消息鏈表
Ø 如果session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來做重發
Ø 如果session是非事務的,根據ACK的模式來選擇不同的應答操作
// async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
private void clearDeliveredList() {
if (clearDeliveredList) {
synchronized (deliveredMessages) {
if (clearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
}
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md: deliveredMessages) {
this.session.connection.rollbackDuplicate(this, md.getMessage());
}
}
}
LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
}
clearDeliveredList = false;
}
}
}
}
dequeue
從unconsumedMessage中取出一個消息,在創建一個消費者時,就會為這個消費者創建一個未消費的消息道,
這個通道分為兩種,一種是簡單優先順序隊列分發通道SimplePriorityMessageDispatchChannel ;另一種是先進先
出的分發通道FifoMessageDispatchChannel.
至於為什麼要存在這樣一個消息分發通道,大家可以想像一下,如果消費者每次去消費完一個消息以後再broker拿一個消息,效率是比較低的。所以通過這樣的設計可以允許session能夠一次性將多條消息分發給一個消費者。
默認情況下對於queue來說,prefetchSize的值是1000
beforeMessageIsConsumed
這裡面主要是做消息消費之前的一些準備工作,如果ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來說就是除了T opic和DupAck這兩種情況),所有的消息先放到deliveredMessages鏈表的開頭。並且如果當前是事務類型的會話,則判斷transactedIndividualAck,如果為true,表示單條消息直接返回ack。
否則,調用ackLater,批量應答, client端在消費消息後暫且不發送ACK,而是把它快取下來(pendingACK),等到這些消息的條數達到一定閥值時,只需要通過一個ACK指令把它們全部確認;這比對每條消息都逐個確認,在性能上要提高很多
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}
afterMessageIsConsumed
這個方法的主要作用是執行應答操作,這裡面做以下幾個操作
Ø 如果消息過期,則返回消息過期的ack
Ø 如果是事務類型的會話,則不做任何處理
Ø 如果是AUTOACK或者(DUPS_OK_ACK且是隊列),並且是優化ack操作,則走批量確認ack
Ø 如果是DUPS_OK_ACK,則走ackLater邏輯
Ø 如果是CLIENT_ACK,則執行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
}
}
ActiveMQ的優缺點
ActiveMQ 採用消息推送方式,所以最適合的場景是默認消息都可在短時間內被消費。數據量越大,查找和消費消息就越慢,消息積壓程度與消息速度成反比。
缺點
1.吞吐量低。由於 ActiveMQ 需要建立索引,導致吞吐量下降。這是無法克服的缺點,只要使用完全符合 JMS 規範的消息中間件,就要接受這個級別的TPS。
2.無分片功能。這是一個功能缺失,JMS 並沒有規定消息中間件的集群、分片機制。而由於 ActiveMQ 是偉企業級開發設計的消息中間件,初衷並不是為了處理海量消息和高並發請求。如果一台伺服器不能承受更多消息,則需要橫向拆分。ActiveMQ 官方不提供分片機制,需要自己實現。
適用場景
對 TPS 要求比較低的系統,可以使用 ActiveMQ 來實現,一方面比較簡單,能夠快速上手開發,另一方面可控性也比較好,還有比較好的監控機制和介面
不適用的場景
消息量巨大的場景。ActiveMQ 不支援消息自動分片機制,如果消息量巨大,導致一台伺服器不能處理全部消息,就需要自己開發消息分片功能。