JMS監聽Oracle AQ
- 2021 年 1 月 20 日
- 筆記
- JAVA, Oracle
- 該文檔中,oracle版本為11g,jdk版本1.8,java項目為maven構建的springboot項目,springboot的版本為2.1.6,並使用了定時任務來做AQ監聽的重連功能,解決由於外部原因導致連接斷裂之後,需要手動重啟項目才能恢復連接的問題
一、創建隊列
1.1.管理員登錄執行
- 管理員登錄,執行授權操作,oracle使用隊列需要單獨的授權,默認未開啟,須手動開啟,授權命令如下,username使用自己的用戶名即可
GRANT EXECUTE ON SYS.DBMS_AQ to 'username';
GRANT EXECUTE ON SYS.DBMS_AQADM to 'username';
GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username';
GRANT EXECUTE ON SYS.DBMS_AQIN to 'username';
GRANT EXECUTE ON SYS.DBMS_JOB to 'username';
1.2.用戶登錄執行執行
1.2.1. 創建消息負荷payload
- 創建的此type用來封裝隊列所帶的,根據實際需求進行創建
CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT
(
param_1 VARCHAR2(100),
param_2 VARCHAR2(100)
)
1.2.2. 創建隊列表
- 創建對列表,並指定隊列數據的類型,隊列表名自定義即可,數據類型使用上面剛創建的type
begin
sys.dbms_aqadm.create_queue_table(
queue_table => 'QUEUE_TABLE',
queue_payload_type => 'TYPE_QUEUE_INFO',
sort_list => 'ENQ_TIME',
compatible => '10.0.0',
primary_instance => 0,
secondary_instance => 0);
end;
1.2.3. 創建隊列並啟動
- 創建名稱為QUEUE_TEST的隊列,並指定對列表名【同一個oracle用戶下,可以有多個對列表,同一個對列表中,可以有多個隊列】
begin
sys.dbms_aqadm.create_queue(
queue_name => 'QUEUE_TEST',
queue_table => 'QUEUE_TABLE',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 5,
retry_delay => 0,
retention_time => 0);
end;
- 剛創建的隊列的狀態默認是未開啟的,需要手動開啟一下,同理,存在刪除、停止等操作
begin
-- 啟動隊列
sys.dbms_aqadm.start_queue(
queue_name => 'QUEUE_TEST'
);
-- 暫停隊列
--sys.dbms_aqadm.STOP_QUEUE(
-- queue_name => 'QUEUE_TEST'
--);
-- 刪除隊列
--sys.dbms_aqadm.DROP_QUEUE(
-- queue_name => 'QUEUE_TEST'
--);
-- 刪除對列表
--sys.dbms_aqadm.DROP_QUEUE_TABLE(
-- queue_table => 'QUEUE_TABLE'
--);
end;
1.2.4. 創建存儲過程
- 儲存過程的作用為把數據載入到隊列中,生成的新的隊列會自動添加進綁定的對列表中,等待消費者進行消費
CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload TYPE_QUEUE_INFO;
begin
-- 封裝最終消息
o_payload := TYPE_QUEUE_INFO(param_1, param_2);
-- 入隊操作,指定隊列
dbms_aq.enqueue(queue_name => 'QUEUE_TEST',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
-- 出隊操作
--dbms_aq.enqueue(queue_name => 'QUEUE_TEST',
-- dequeue_options => r_dequeue_options,
-- message_properties => r_message_properties,
-- payload => o_payload,
-- msgid => v_message_handle);
end pro_queue;
二、Java中JMS的使用
2.1. 項目配置
2.1.1. maven
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jmscommon</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jta</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi_g</artifactId>
<version>1.2</version>
</dependency>
2.1.2. yml
datasource:
url: jdbc:oracle:thin:@ip:port/sid
username: **
password: **
queue:
aq:
# 該隊列是否可用,用來控制隊列的載入和重連,不可省略
enable: true
# 隊列名稱,不可省略
name: QUEUE_TEST
# 隊列重連的定時任務對應的時間表達式,不可省略
cron: 0 */1 * * * ?
2.2. AQ初始化
- 在項目啟動結束後立即運行此類,會根據所配置的隊列名稱監聽對應的隊列
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @Title: MessageAQInit.java
* @Description: AQ 初始化
* @author wangqq
* @date 2020年6月28日 下午3:45:23
* @version 1.0
*/
@Component
public class MessageAQInit implements CommandLineRunner {
@Autowired
private MessageAQConfig aqConfig;
@Autowired
private MessageAQListener listener;
@Override
public void run(String... args) throws RuntimeException {
// 檢查消息隊列是否啟用
if (aqConfig.enable) {
// 設置AQ的消息監聽器
MessageAQConnection.setListener(listener);
// 設置oracle配置
if (!MessageAQConnection.initFactory(aqConfig)) {
throw new RuntimeException("Message Oracle AQ initialization failed!");
}
// 建立連接
if (!MessageAQConnection.establishConnection(aqConfig)) {
throw new RuntimeException("Message Oracle AQ connection failed!");
}
}
}
}
2.3. 配置資訊類
- 配置類,將yml的配置文件轉為java對象【時間表達式在程式碼中不會以對象屬性的方式被使用,因此在該類中沒有設置】
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @Title: MessageAQConfig.java
* @Description: ORACLE 消息隊列配置
* @author wangqq
* @date 2020年6月28日 下午3:36:08
* @version 1.0
*/
@Component
public class MessageAQConfig {
/** 是否開啟MessageAq功能 */
@Value("${queue.aq.enable}")
public Boolean enable;
/** 資料庫用戶名 */
@Value("${datasource.username}")
public String userName;
/** 資料庫密碼 */
@Value("${datasource.password}")
public String password;
/** 資料庫地址url */
@Value("${datasource.url}")
public String url;
/** 隊列名稱 */
@Value("${queue.aq.name}")
public String queue;
}
2.4. AQ 連接工廠類
- AQ 鏈接的核心類,根據配置對象以及注入的監聽對象,動態監聽AQ隊列
import javax.jms.Queue;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConnectionFactory;
import oracle.jms.AQjmsConsumer;
import oracle.jms.AQjmsSession;
/**
* @Title: MessageAQConnection.java
* @Description: AQ 連接
* @author wangqq
* @date 2020年6月28日 下午3:50:32
* @version 1.0
*/
@Slf4j
public class MessageAQConnection {
private static AQjmsConnectionFactory aQjmsConnectionFactory;
private static AQjmsConsumer aQjmsConsumer;
private static AQjmsSession aQjmsSession;
private static AQjmsConnection aQjmsConnection;
private static MessageAQListener listener;
/**
* 設置JMS監聽器
*
* @param messageAqJmsListener
* @author wangqq
* @date 2020年7月6日 上午8:33:57
*/
public static void setListener(MessageAQListener messageAqJmsListener) {
listener = messageAqJmsListener;
}
/**
* 初始化 AQ 連接 Factory
*
* @param aqConfig 消息隊列配置
* @return 是否成功
*/
public static boolean initFactory(MessageAQConfig aqConfig) {
try {
aQjmsConnectionFactory = new AQjmsConnectionFactory();
aQjmsConnectionFactory.setJdbcURL(aqConfig.url);
aQjmsConnectionFactory.setUsername(aqConfig.userName);
aQjmsConnectionFactory.setPassword(aqConfig.password);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
/**
* 連接消息隊列
*
* @param aqConfig 消息隊列配置
* @return 是否成功
*/
public static boolean establishConnection(MessageAQConfig aqConfig) {
try {
aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection();
aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
aQjmsConnection.start();
Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue);
aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null,
false);
aQjmsConsumer.setMessageListener(listener);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
/**
* 關閉消息隊列連接
*
* @return 是否成功
*/
public static boolean closeConnection() {
try {
aQjmsConsumer.close();
aQjmsSession.close();
aQjmsConnection.close();
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
}
2.5. 創建AQ 數據承載類
- 用來接收oracle隊列中所帶的參數,基本保證與資料庫中的type格式相同即可
import lombok.Data;
/**
* @Title: Test.java
* @Description: AQ 數據承載類
* @author wangqq
* @date 2021-01-20 16:19:16
* @version 1.0
*/
@Data
public class Test {
private String param_1;
private String param_2;
}
2.6. 數據類型轉換
- 將oracleAq所承載的數據,轉化為我們自己需要的實例對象,及上述中的Test對象
package com.synjones.message.oracleaq;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Struct;
import com.synjones.message.vo.MessageInfoVo;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
/**
* @Title: MessageORAData.java
* @Description: 數據類型轉換類
* @author synjones
* @date 2018年12月3日 上午11:29:50
* @version 1.0
*/
@Slf4j
@NoArgsConstructor
public class MessageORAData implements ORAData, ORADataFactory {
@SuppressWarnings("unused")
private Object[] rawData = new Object[8];
private static final MessageORAData MESSAGE_FACTORY = new MessageORAData();
public static ORADataFactory getFactory() {
return MESSAGE_FACTORY;
}
@Override
public ORAData create(Datum datum, int sqlType) throws SQLException {
if (datum == null) {
return null;
} else {
try {
MessageORAData payOraData = new MessageORAData();
Struct aStruct = (Struct) datum;
payOraData.rawData = aStruct.getAttributes();
return payOraData;
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
}
@Override
public Datum toDatum(Connection arg0) throws SQLException {
return null;
}
/**
* 消息內容解析並封裝
*
* @return
* @author wangqq
* @date 2020年7月6日 上午8:38:01
*/
public Test getContent() {
try {
return Test.builder()
.param_1(rawData[0] == null ? null : rawData[0].toString())
.param_2(rawData[0] == null ? null : rawData[0].toString())
.build();
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
}
2.7. AQ 監聽
import javax.jms.Message;
import javax.jms.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
/**
* @Title: JMSListener.java
* @Description: JMS監聽ORACLEAQ的隊列消息
* @author wangqq
* @date 2020年6月28日 上午11:23:42
* @version 1.0
*/
@Slf4j
@Component
public class MessageAQListener implements MessageListener {
@Override
public void onMessage(Message message1) {
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1;
try {
MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload();
// 獲取消息內容
Test test = payload.getContent();
// 個人業務程式碼
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
2.8. AQ 監控任務, 在AQ斷開後重連
- 通過定時任務,定時查詢是否有入隊時間在5分鐘之內的隊列未被消費【隊列入隊後,會在對列表中產生一條數據,消費之後該數據會被清除掉】,若存在,則說明監聽異常,需要重新創建連接監聽隊列
- 資料庫對列表中的入隊時間在本次測試中為0時區的時間,故而在程式碼中轉換了一下時區,否則無法根據入隊時間查詢數據
import com.synjones.core.tool.utils.DateUtil;
import com.synjones.core.tool.utils.StringUtil;
import com.synjones.message.mapper.MessageAqMapper;
import lombok.extern.slf4j.Slf4j;
/**
* @Title: MessageAQMonitor.java
* @Description: AQ 監控任務, 在AQ斷開後重連
* @author wangqq
* @date 2020年6月28日 下午4:35:31
* @version 1.0
*/
@Slf4j
@Component
public class MessageAQMonitor {
@Autowired
private MessageAQConfig aqConfig;
@Autowired
private MessageAqMapper aqMapper;
@Scheduled(cron = "${message.queue.aq.cron}")
private void monitorJob() {
// 檢查消息隊列是否啟用
if (!aqConfig.enable) {
return;
}
// 獲取當前時間,並向前推5分鐘
String formatDateTime = DateUtil.formatDateTime(new Date(System.currentTimeMillis() - 300000));
// 將該時間轉為0時區的時間【資料庫中存儲的隊列時間為0時區的時間】
String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss");
// 查詢是否存在5分鐘以前的隊列未被消費
int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime);
if (selectCount != 0) {
// 若存在,則重新啟動監聽
if (MessageAQConnection.closeConnection()) {
log.info("--> AQ connection has been closed.");
if (MessageAQConnection.establishConnection(aqConfig)) {
log.info("--> AQ connection has been re-established.");
}
}
}
}
}
2.9. 隊列表中隊列數量的查詢
- 根據隊列名稱和入隊時間,查詢在入隊時間之後入對的隊列數量
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
/**
* @Title: MessageAqMapper.java
* @Description: oracleAQ的查詢
* @author wangqq
* @date 2020年6月28日 下午4:04:50
* @version 1.0
*/
@Mapper
public interface MessageAqMapper {
/**
*
* 查詢資料庫中的隊列表中符合條件的隊列的條數
*
* @param qName 隊列名稱
* @param minDatetime 隊列入隊的最小時間
* @return
* @author wangqq
* @date 2020-07-10 15:44:43
*/
@Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} "
+ "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}")
int selectCount(String qName, String minDatetime);
}