ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決
導讀
最新在接觸ActiveMQ,裡面有個持久訂閱者模組,功能是怎麼樣也演示不出來效果。配置參數比較簡單(配置沒幾個參數),消費者第一次運行時,需要指定ClientID(此時Broker已經記錄離線訂閱者資訊),在啟動提供者,此時消息隊列存在一條記錄,然後在啟動消費者,但是怎麼樣也獲取不到消息,阿西吧~~~什麼鬼,百度上一大堆,都是這樣步驟,消費者端,指定以下ClientID就好了,可,想要的效果死活不出來。。。。。。
采坑之路
廢話不多說,先上程式碼,後面再分析
消費者端程式碼
public void testTopicConsumer2() throws Exception { //第一步:創建ConnectionFactory String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //第二步:通過工廠,創建Connection Connection connection = connectionFactory.createConnection(); //設置持久訂閱的客戶端ID String clientId = "10086"; connection.setClientID(clientId); //第三步:打開鏈接 connection.start(); //第四步:通過Connection創建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session創建Consumer Topic topic = session.createTopic("cyb-topic"); //創建持久訂閱的消費者客戶端 //第一個參數是指定Topic //第二個參數是自定義的ClientId MessageConsumer consumer = session.createDurableSubscriber(topic, clientId); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //第七步:處理資訊 if (message instanceof TextMessage){ TextMessage tm=(TextMessage)message; try{ System.out.println(tm.getText()); } catch (Exception e){ e.printStackTrace(); } } } }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); }
只需要制定ClientID和創建持久客戶端即可
提供者端程式碼
public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:創建ConnectionFactory,用於連接broker String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設置 //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,創建Connection connection = connectionFactory.createConnection(); //第三步:連接啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用 //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session創建destination,兩種目的地:Queue、Topic //參數:消息隊列的名稱,在後台管理系統中可以看到 Topic topic = session.createTopic("cyb-topic"); //第六步:通過session創建MessageProducer producer = session.createProducer(topic); //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //第七步:創建Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->部落格園地址://www.cnblogs.com/chenyanbin/"); //第八步:通過producer發送消息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
驗證離線訂閱者功能
失敗的驗證
正確的驗證方式
首先明確一點,上面的程式碼是沒有一點問題的。為了節省時間,驗證步驟和上面的差不多,不啟動前兩步了,直接啟動第三步,也就是:
- 先啟動消費者(記錄持久訂閱者ClientID);
- 在啟動提供者;
- 啟動消費者(在下面加個死循環)
問題剖析
第一次運行消費者時,此時Broker已經記錄訂閱者ClientID,然後程式一閃而過,進入到藍色框中的,離線訂閱者中,然後在執行提供者,此時,Topic中,已經入隊一次,再次運行消費者時,運行是非同步獲取的,運行一閃而過(鄙人猜測,可能是ActiveMQ機制問題,內部邏輯大概是,先遍歷非持久訂閱者,然後在查看持久訂閱者,問題出在,程式執行太快,還沒到查看持久訂閱者時,程式就執行完了,所以第二次執行消費者時,加了個死循環,不停監聽隊列消息,具體ActiveMQ底層程式碼沒看過,有興趣的可以研究下,底層程式碼找到相應位置後,記得告訴我哦~~~)
這個小問題,搗鼓一下午,百度上也說,就這2步驟配置即可,運行結果與初衷相違背,大半夜的都打算洗洗睡了,頭腦風暴想出來這個方法,在下面寫個死循環,不停監聽隊列消息,這才有了這篇部落格,好啦…好啦,時間不早了,馬上都快凌晨1點鐘了,明個還得上班,洗洗睡了zZZZZZZZZZ