ActiveMQ入門
- 2020 年 4 月 16 日
- 筆記
Apache ActiveMQ是當前最流行的開源的,支援多協議的,基於Java的消息中間件,官網的原話是:Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server.
ActiveMQ是一個完全支援JMS1.1和J2EE規範的JMS Provider實現,儘管JMS規範出台已經是很久的事情了,但是JMS在當今J2EE應用中仍扮演者特殊的地位。
JMS是什麼
JMS全稱Java Message Service,即Java消息服務應用程式介面,是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程式之間,或分散式系統中發送消息,進行非同步通訊。Java消息服務是一個與具體平台無關的API。
JMS對象模型
JMS消息模型
在JMS標準中,有兩種消息模型PTP(Point to Point)以及Publish/Subscribe(Pub/Sub)。
PTP,點對點消息傳送模型
在點對點消息傳送模型中,發送者將消息發送給一個特殊的消息隊列,該隊列保存了所有發送給它的消息,消費者從這個隊列中獲取消息。
PTP的特點:
- 每個消息只有一個消費者,即一旦被消費,消息就不再在消息隊列中
- 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運行,都不會影響到消息被發送到隊列
- 接收者在成功接收消息之後需向隊列發送確認收到通知
Pub/Sub,發布/訂閱消息傳遞模型
在發布/訂閱消息模型中,發布者發布一個消息,該消息通過topic傳遞給所有的客戶端。在這種模型中,發布者和訂閱者彼此不知道對方,是匿名的且可以動態發布和訂閱topic。在發布/訂閱消息模型中,目的地被稱為主題(topic),topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。
Pub/Sub特點:
- 每個消息可以有多個消費者
- 發布者和訂閱者之間有時間上的依賴性。針對某個topic的訂閱者,它必須創建一個或多個訂閱者之後,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。
- 為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱,這樣就可以在訂閱者沒有運行的時候也能接收到發布者的消息
JMS消息結構
Message主要由三部分組成,分別是消息頭Header,消息屬性Properties,以及消息體Body。
消息頭中主要內容:
消息屬性可以理解為消息的附加消息頭,屬性名可以自定義。消息的屬性值可以是String, boolean , byte,short, double, int ,long或float型,Message介面為讀取和寫入屬性提供了若干個取值函數和賦值函數方法。
消息體的類型:
ActiveMQ的特性
- 支援多種程式語言
- 支援多種傳輸協議
- 有多種持久化方式
ActiveMQ的安裝
安裝環境:JDK1.8,CentOS7
下載地址://activemq.apache.org/components/classic/download/
CentOS在連網的情況下也可以通過wget(如果wget命令不存在可以通過yum install wget進行安裝)命令獲取軟體包,如:wget //archive.apache.org/dist/activemq/5.15.10/apache-activemq-5.15.10-bin.tar.gz
提取文件: tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /vartar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /var
重命名:mv /var/apache-activemq-5.15.10/ /var/activemq/
ActiveMQ解壓後的目錄結構:
在/etc/profile文件中添加Java環境變數:
export JAVA_HOME=/var/jdk1.8.0
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ActiveMQ解壓後就可以使用,bin目錄下可執行activemq可以進行ActiveMQ的啟動停止。
ActiveMQ服務
前面使用命令運行ActiveMQ,但最好的方式是將ActiveMQ作為服務啟動,使用system服務可以保證ActiveMQ在系統啟動時自動啟動。
創建ActiveMQ服務步驟:
- 創建一個systemd服務文件:
vi /usr/lib/systemd/system/activemq.service
- 在服務文件中添加以下內容
[Unit]
Description=ActiveMQ service
After=network.target
[Service]
Type=forking
ExecStart=/var/activemq/bin/activemq start
ExecStop=/var/activemq/bin/activemq stop
User=root
Group=root
Restart=always
RestartSec=9
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=activemq
[Install]
WantedBy=multi-user.target
- 產看Java安裝目錄:whereis java
- 設置activemq配置文件/var/activemq/bin/env中的JAVA_HOME
# Location of the java installation
# Specify the location of your java installation using JAVA_HOME, or specify the
# path to the "java" binary using JAVACMD
# (set JAVACMD to "auto" for automatic detection)
JAVA_HOME="/var/jdk1.8.0"
JAVACMD="auto"
- 通過systemctl管理activemq啟停
- 啟動activemq服務:systemctl start activemq
- 查看服務狀態:systemctl status activemq
- 創建軟體鏈接:ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service
- 開機自啟:systemctl enable activemq
- 檢測是否開啟成功:systemctl list-unit-files |grep activemq
- 停止activemq服務:systemctl stop activemq
ActiveMQ的Web管理平台
ActiveMQ自帶有Web管理平台,默認使用8161埠,服務啟動後在瀏覽器輸入//服務IP:8161/admin 即可進入,默認配置的賬戶admin,密碼也是admin。
如果服務啟動後頁面無法訪問可能是防火牆內需要添加需要的埠。
查看防火牆狀態:systemctl status firewalld
防火牆添加埠:firewall-cmd —zone=public —add-port=61616/tcp —permanent
重啟防護牆:systemctl restart firewalld.service
或者直接關閉防火牆:systemctl stop firewalld.service
ActiveMQ的Web管理平台是基於jetty的,在ActiveMQ的安裝目錄下conf文件中有jetty.xml配置文件,通過該文件可以對Web管理平台進行配置管理, 如:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<!--此處即為管理平台的埠-->
<property name="port" value="8161"/>
</bean>
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="user,admin" />
<!-- 改為false即可關閉登陸 -->
<property name="authenticate" value="true" />
</bean>
通過jetty-realm.properties配置文件可以對Web管理平台的用戶進行管理:
# 在此即可維護帳號密碼,格式:
# 用戶名:密碼,角色
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: 1234, user
ActiveMQ的Java示例
Maven管理的Jar包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.10</version>
</dependency>
Producer程式碼示例:
package com.demo.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ProducerDemo {
private static final String BORKER_URL = "tcp://ip:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws Exception {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory("admin", "admin", BORKER_URL);
// 創建連接對象
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建點對點發送的目標Queue
Queue queue = session.createQueue(QUEUE_NAME);
// 創建消息生產者
MessageProducer producer = session.createProducer(queue);
// Topic topic1 = session.createTopic("topic-test");
// MessageProducer producer1 = session.createProducer(topic1);
// 設置生產者的模式,有兩種可選 持久化 / 不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ message");
// 發送消息
producer.send(message);
// 關閉連接
producer.close();
session.close();
connection.close();
}
}
運行之後可以在Web控制台Queues tab下看到消息:
Consumer程式碼示例:
package com.demo.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ConsumerDemo {
private static final String BORKER_URL = "tcp://192.168.0.242:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws Exception {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BORKER_URL);
// 創建連接對象
Connection connection = activeMQConnectionFactory.createConnection("admin", "admin");
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建點對點消費的目標Queue
Queue queue = session.createQueue(QUEUE_NAME);
// Topic topic1 = session.createTopic("topic-test");
// MessageConsumer consumer1 = session.createConsumer(topic1);
// 創建消息消費者
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}
// 關閉連接
consumer.close();
session.close();
connection.close();
}
}
運行後可以看到消息被消費:
SpringBoot中使用ActiveMQ的程式碼示例
Maven依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
yml配置文件:
spring:
activemq:
broker-url: tcp://ip:61616
user: admin
password: admin
程式碼示例:
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Producer {
@Autowired private JmsTemplate jmsTemplate;
@PostConstruct
public void init() {
ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-test");
jmsTemplate.convertAndSend(activeMQTopic, "Hello SpringBoot ActiveMQ!");
}
public static void main(String[] args) {
SpringApplication.run(Producer.class);
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@EnableJms
@SpringBootApplication
public class Consumer {
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
@JmsListener(destination = "topic-test", containerFactory = "myFactory")
public void receive(String message) {
System.out.println("Received Message: " + message);
}
public static void main(String[] args) {
SpringApplication.run(Consumer.class);
}
}