Spring框架之jms源碼完全解析
- 2020 年 12 月 15 日
- 筆記
Spring框架之jms源碼完全解析
我們在前兩篇文章中介紹了Spring兩大核心IOC(Inversion of Control控制反轉)和AOP(Aspect Oriented Programming面向切面編程)技術:Spring框架之beans源碼完全解析和Spring框架之AOP源碼完全解析,下面對Spring的jms源碼進行分析,先對jms進行簡單的介紹,其次對Spring中jms模塊源碼文件清單進行梳理,然後對jms的單獨使用和Spring整合jms使用進行演示,最後對Spring中jms模塊有兩個核心JmsTemplate和消息監聽器源碼進行分析。
一、jms簡介
分佈式系統消息通信技術主要包括:(1) RPC(Remote Procedure Call Protocol),一般是C/S方式,同步的,跨語言跨平台,面向過程。(2)CORBA(Common Object Request Broker Architecture),CORBA從概念上擴展了RPC,面向對象的,企業級的(面向對象中間件還有DCOM)。(3) RMI(Remote Method Invocation),面向對象方式的 Java RPC。(4)WebService基於Web,C/S或B/S,跨系統跨平台跨網絡。多為同步調用,實時性要求較高。(5)MOM(Message oriented Middleware) 面向消息中間件。
面向消息中間件,主要適用於消息通道、消息總線、消息路由和發佈/訂閱的場景。目前主流標準有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。AMQP是一個面向協議的,跟語言平台無關的消息傳遞應用層協議規範。STOMP是流文本定向消息協議,是一種為MOM設計的簡單文本協議。AMQP和STOMP都是跟http處於同一層的協議。JMS是Java平台上的面向接口的消息規範,是一套API標準,並沒有考慮異構系統。
JMS即Java消息服務應用程序接口,是一個Java平台關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數面向消息中間件提供商都對JMS提供支持,JMS類似於JDBC(Java Database Connectivity),JDBC 是可以用來訪問許多不同關係數據庫的API,而JMS則提供同樣與廠商無關的訪問方法,用來訪問消息收發服務。所以兩個應用程序之間要進行通信,我們使用了一個JMS服務,進行中間的轉發,通過使用JMS,可以解除兩個程序之間的耦合,提高消息靈活性,支持異步性。
JMS編程模型包含的幾個要素:
(1)連接工廠。 連接工廠(ConnectionFactory)是由管理員創建,並綁定到JNDI(Java命名和目錄接口)樹中。針對兩種不同的jms消息模型(點對點和發佈/訂閱),分別有QueueConnectionFactory和TopicConnectionFactory兩種。客戶端使用JNDI查找連接工廠,然後利用連接工廠創建一個JMS連接。
(2)JMS連接。JMS連接(Connection)表示JMS客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(3)JMS會話。JMS會話(Session)表示JMS客戶與JMS服務器之間的會話狀態。JMS會話建立在JMS連接上,表示客戶與服務器之間的一個會話線程。Session是我們操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當我們需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
通俗的講,Connection(連接)是一個物理概念,是指一個通過網絡建立的客戶端和專有服務器或調度器之間的一個網絡連接。Session(會話)是一個邏輯概念,它存在於實例中,一個Connection可以擁有多個Session,也可以沒有Session,同一個Connection上的多個Session之間不會相互影響。connection相當於修路,而session相當於通過這條道路的一次運輸。
(4)JMS目的。JMS目的(Destination),又稱為消息隊列,是實際的消息源。Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以Destination實際上就是兩種類型的對象:Queue、Topic。
(5)JMS消息。消息由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、位元組流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。
通常有兩種類型:① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息往往與隊列(javax.jms.Queue)相關聯。② 發佈/訂閱(Publish/Subscribe)。發佈/訂閱消息系統支持一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發佈事件,而使用者訂閱感興趣的事件,並使用事件。該類型消息一般與特定的主題(javax.jms.Topic)關聯。
(6)消息的生產者。生產者(Message Producer)對象由Session對象創建,用於發送消息,將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(7)消息消費者。 消費者(Message Consumer)對象由Session對象創建,用於接收消息,接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(8)消息監聽器。消息監聽器MessageListener,類似於鉤子函數,hook到消息相關的事件中,換句話說,當消息被創建、開始傳輸、轉發、傳輸中止、刪除時,會調用相應的鉤子函數。如註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。
Spring中集成JMS:
JMS是一個 Java 標準,定義了使用消息代理的通用API 。Spring 通過基於模板的抽象為 JMS 功能提供了支持,這個模板就是 JmsTemplate 。使用 JmsTemplate能夠非常容易地在消息生產方發送隊列和主題消息,在消費消息的一方也能夠非常容易地接收這些消息。 (模板方法模式是一種設計模式。通俗的講就是完成一件事情,有固定的數個步驟,但是每個步驟根據對象的不同,而實現細節不同。這樣可以在父類中定義一個完成該事情的總方法,按照完成事件需要的步驟去調用其每個步驟的實現方法。每個步驟的具體實現,由子類完成)。後面我們的代碼分析也以JmsTemplate為核心進行分析。對於類似Java EE的消息驅動Bean形式的異步接收,Spring提供了大量用於創建消息驅動POJOs的消息監聽器。Spring還提供了一種創建消息監聽器的聲明式方法。
根據《Spring 5 官方文檔》:
(1)org.springframework.jms:定義了各種不同的JmsException異常類。在org.springframework.jms.support.JmsUtils的convertJmsAccessException方法中將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
(2)org.springframework.jms.annotation包提供了支持註解驅動監聽端點的必要基礎架構,通過使用@JmsListener實現。
(3)org.springframework.jms.config包提供了 JMS 命名空間的解析實現,以及配置監聽容器和創建監聽端點的 java 配置支持。
(4)org.springframework.jms.connection包提供了適用於獨立應用程序的ConnectionFactory實現。 它還包含 Spring 對 JMS 的PlatformTransactionManager實現(即JmsTransactionManager)。這將允許 JMS 作為事務性資源無縫集成到 Spring 的事務管理機制中。
(5)org.springframework.jms.core包提供了使用 JMS 的核心功能。它包含了JMS 模板類,用來處理資源的創建與釋放,從而簡化 JMS 的使用,就像JdbcTemplate對 JDBC 做的一樣。
(6)org.springframework.jms.listener:提供了消息監聽器及相關支持類。
(7)org.springframework.jms.remoting:提供基於JMS的RPC方案。
(8)org.springframework.jms.support包:提供了一些支持的功能函數。converter子包提供了 MessageConverter 抽象,進行 Java 對象和 JMS 消息的互相轉換。destination子包提供了管理 JMS 目的地的不同策略,比如針對 JNDI 中保存的目標的服務定位器。
二、jms模塊源碼文件清單
1 jms/
1.1 JmsException:繼承自NestedRuntimeException,NestedRuntimeException又繼承自RuntimeException。
Java中所有異常的父類是Throwable類,在Throwable類下有兩大子類:一個是Error類,指系統錯誤異常,例如:VirtualMachineError 虛擬機錯誤,ThreadDeath 線程死鎖。一般如果是Error類的異常的話,就是程序的硬傷,就好比是工廠里斷水斷電,機器損壞了。另一個是Exception類,指編碼、環境、用戶操作輸入等異常,這個是比較常見的異常類,Exception類下面又有兩個子類,非檢查異常(又稱運行時異常RuntimeException)和檢查異常。
在RuntimeException異常中有幾個常見的子類,例如:InputMismatchException 輸入不匹配異常;ArithmeticException 算術運算異常;NullPointerException 空指針異常;ArrayIndexOutOfBoundsException 數組下標越界異常;ClassCastException 類型轉換異常。
檢查異常中的子類有:IOException 文件異常;SQLException SQL數據庫錯誤異常。
1.2 IllegalStateException
1.3 InvalidClientIDException
1.4 InvalidDestinationException
1.5 InvalidSelectorException
1.6 JmsSecurityException
1.7 MessageEOFException
1.8 MessageFormatException
1.9 MessageNotReadableException
1.10 MessageNotWriteableException
1.11 ResourceAllocationException
1.12 TransactionInProgressException
1.13 TransactionRolledBackException
1.14 UncategorizedJmsException
1.2-1.14的異常處理類都繼承自JmsException,適用場景如類名所示,其中UncategorizedJmsException表示當其他JmsException都匹配不到時拋出該異常。org.springframework.jms.support.JmsUtils的convertJmsAccessException方法負責將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
2 jms/annotation
2.1 JmsListener:該類是一個註解接口。java用@interface Annotation{ } 定義一個註解 @Annotation,一個註解是一個類。註解相當於一種標記,在程序中加上了註解就等於為程序加上了某種標記,以後javac編譯器,開發工具和其他程序可以用反射來了解你的類以及各種元素上有無任何標記,看你有什麼標記,就去干相應的事。
@JmsListener註解用來聲明這是個監聽器方法,也就是標記這個方法被JMS消息監聽器監聽。該類中屬性destination表示監聽的隊列名字,containerFactory表示用來創建JMS監聽器容器。處理@JmsListener註解主要靠JmsListenerAnnotationBeanPostProcessor。註冊JmsListenerAnnotationBeanPostProcessor可以手動進行,更便捷的是通過Spring的config文件<jms:annotation-driven/>配置,或者使用@EnableJms註解兩種方式將註解的監聽器類自動放到監聽器容器中。
2.2 EnableJms:用@JmsListener這個註解的時候。需要在配置類(@Configuration類)上加上@EnableJms註解,並且要配置一個DefaultJmsListenerContainerFactory監聽容器工廠的Bean實例。
Spring根據註解@EnableJms自動掃描帶有@JmsListener的方法,並為其創建一個MessageListener把它包裝起來。而JmsListenerContainerFactory的Bean的作用就是為每個MessageListener創建MessageConsumer並啟動消息接收循環。
Spring接收消息的步驟:通過JmsListenerContainerFactory配合@EnableJms掃描所有@JmsListener方法,自動創建MessageConsumer、MessageListener以及線程池,啟動消息循環接收處理消息,最終由我們自己編寫的@JmsListener方法處理消息,可能會由多線程同時並發處理。
2.3 JmsListenerAnnotationBeanPostProcessor:該後置處理器用來實現@JmsListener註解,將帶有@JmsListener方法註冊到指定的JMS消息監聽器容器中。該類中afterSingletonsInstantiated方法的最關鍵的一句 registrar.afterPropertiesSet()即可完成所有監聽的註冊。這個後置處理器可以通過 <jms:annotation-driven> XML配置或者@EnableJms註解兩種方式自動註冊。
2.4 JmsBootstrapConfiguration:配置類,註冊一個用於處理@JmsListener註解的JmsListenerAnnotationBeanPostProcessor後置處理器。同時也註冊一個默認的JmsListenerEndpointRegistry。當使用@EnableJms註解時,這個配置類會被自動載入。
2.5 JmsListenerConfigurer:Spring管理bean實現的可選接口,這些管理bean用來自定義JMS監聽器端點的配置方式。
2.6 JmsListeners:註解容器,多個@JmsListener註解的組成的集合。
3 jms/config
3.1 JmsListenerContainerFactory:消息監聽容器工廠接口,基於JmsListenerEndpoint。
3.2 AbstractJmsListenerContainerFactory:消息監聽容器工廠的抽象基類。
3.3 DefaultJmsListenerContainerFactory:JmsListenerContainerFactory接口的默認實現,該工廠用來創建DefaultMessageListenerContainer。
3.4 DefaultJcaListenerContainerFactory:JmsListenerContainerFactory接口的一個實現,用來創建一個基於 JCA的MessageListener容器JmsMessageEndpointManager。
JCA (J2EE 連接器架構,Java Connector Architecture)是對J2EE標準集的重要補充。因為它注重的是將Java程序連接到非Java程序和軟件包中間件的開發。
3.5 SimpleJmsListenerContainerFactory:JmsListenerContainerFactory接口的一個簡單實現,用來創建一個標準的SimpleMessageListenerContainer。
3.6 JmsNamespaceHandler:JMS命名空間處理器。註冊了三種標籤元素對應的處理函數:”listener-container”、”jca-listener-container”、”annotation-driven”。
3.7 AbstractListenerContainerParser:用來解析JMS監聽器容器元素。
3.8 JmsListenerContainerParser:解析JMS的<listener-container>元素。
3.9 JcaListenerContainerParser:解析JMS的<jca-listener-container>元素。
3.10 AnnotationDrivenJmsBeanDefinitionParser:解析jms名字空間中的 ‘annotation-driven’ 元素。
3.11 AbstractJmsListenerEndpoint:Jms監聽端點的基礎模型。
3.12 JmsListenerEndpoint:JMS listener endpoint的模型。藉助JmsListenerConfigurer可以用來註冊端點。
3.13 MethodJmsListenerEndpoint:JmsListenerEndpoint的一個實現,提供了一些方法用來為該endpoint處理到來的消息。包括get/set 所屬bean、所屬方法、jms相關參數、spring上下文、消息處理工廠等。
3.14 SimpleJmsListenerEndpoint:JmsListenerEndpoint接口的一個實現,提供了MessageListener,用來為給endpoint處理到來的消息。
3.15 JmsListenerEndpointRegistrar:將JmsListenerEndpoint對象註冊到JmsListenerEndpointRegistry對象中。
3.16 JmsListenerEndpointRegistry:創建MessageListenerContainer實例,用來保存註冊過的JmsListenerEndpoint,同時對這些消息監聽容器的生命周期進行管理。不同於手動創建的MessageListenerContainer,通過註冊生成的監聽容器不屬於ApplicationContext管理的bean,不會被自動裝配。
如果需要管理註冊的消息監聽容器則調用getListenerContainers()函數。如果要使用一個指定的消息監聽容器,使用函數getListenerContainer(String),參數就是endpoint的id值。
3.17 JmsListenerConfigUtils:配置常量值,用於子包間的內部共享。
4 jms/connection
4.1 SingleConnectionFactory:connectionFactory是Spring用於創建到JMS服務器鏈接的。Spring提供了多種connectionFactory,主要有SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:對於建立JMS服務器鏈接的請求會一直返回同一個鏈接,並且會忽略Connection的close方法調用。
4.2 CachingConnectionFactory:繼承自SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還提供緩存JMS資源功能,包括緩存Session、MessageProducer和MessageConsumer。
Spring中發送消息的核心是JmsTemplate,然而Jmstemplate的問題是在每次調用時都要打開/關閉session和producter,效率很低,所以引申出了PooledConnectionFactory連接池,用於緩存session和producter。然而這還不是最好的。從spring2.5.3版本後,Spring又提供了CachingConnectionFactory,這才是首選的方案。默認情況下, CachingConnectionFactory只緩存一個session。
4.3 SmartConnectionFactory:繼承自ConnectionFactory接口,指示從該connectionFactory得到的Connection怎樣釋放掉。
4.4 DelegatingConnectionFactory:ConnectionFactory接口的實現類,對所有調用給定的目標ConnectionFactory進行代理。
4.5 ConnectionFactoryUtils:ConnectionFactory類的功能函數,特別是用於從一個指定的ConnectionFactory獲得transactional JMS resources。主要在框架內部使用,比如JmsTemplate、DefaultMessageListenerContainer會使用到該類。
4.6 CachedMessageConsumer:MessageConsumer的裝飾器,使一個共享的MessageConsumer實例能適應所有的調用。
4.7 CachedMessageProducer:MessageProducer裝飾器,使得一個共享的MessageProducer實例能適應多數調用。
4.8 ChainedExceptionListener:ExceptionListener接口的實現類,支持異常鏈。在java代碼中常常會再捕獲一個異常後拋出另外一個異常,並且希望把異常原始信息保存下來,這被稱為異常鏈。
4.9 JmsResourceHolder:JmsResourceHolder繼承了ResourceHolderSupport,作為Jms資源句柄,封裝了JMS的connection、session等資源。
4.10 JmsTransactionManager:JmsTransactionManager用於對JMS ConnectionFactory做事務管理。這將允許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個Connection/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,並對它們進行相應操作。
4.11 SessionProxy:繼承自Session的接口,被Session代理實現,用來獲得該代理的目標Session。
4.12 SynchedLocalTransactionFailedException:同步本地事務未完成時拋出的異常。
4.13 TransactionAwareConnectionFactoryProxy:ConnectionFactory的代理,添加了Spring的事務功能,同事務JNDI ConnectionFactory類似。
4.14 UserCredentialsConnectionFactoryAdapter:ConnectionFactory的一個適配器,授予用戶對於每個標準的 createConnection()方法調用的權限。
5 jms/core
5.1 JmsMessageOperations:繼承了MessageSendingOperations、MessageReceivingOperations、MessageRequestReplyOperations幾個接口,包含關於JMS消息的操作方法,包括send、receive、convertAndSend、receiveAndConvert等。
5.2 JmsMessagingTemplate:JmsMessageOperations接口的一個實現。
5.3 JmsOperations:詳細列出JMS一系列操作,該接口會被JmsTemplate實現。
5.4 JmsTemplate:核心類。在JDBC中,Spring提供了一個JdbcTemplate來簡化JDBC代碼開發,同樣,Spring也提供了JmsTemplate來簡化JMS消息處理的開發。
JmsTemplate其實是Spring對JMS更高一層的抽象,它封裝了大部分創建連接、獲取session及發送接收消息相關的代碼,使得我們可以把精力集中在消息的發送和接收上。
5.5 MessageCreator:利用給定的Session創建一個JMS消息。
5.6 MessagePostProcessor:和JmsTemplate的send方法一起用,將一個對象轉換成message。在一個消息被轉換器處理後可以進行進一步修改。在設置JMS頭部和屬性的時候有用。
5.7 BrowserCallback:瀏覽JMS queue中的信息的回調函數。在JmsTemplate類中的browse、browseSelected函數中BrowserCallback作為一個參數傳入。(有些函數要求應用先傳給它一個函數,好在合適的時候調用,以完成目標任務。這個被傳入的、後又被調用的函數就稱為回調函數callback function)。
5.8 ProducerCallback:send一個消息到JMS destination的回調函數。作為JmsTemplate類的execute函數一個參數。
5.9 SessionCallback:在一個給定的Session執行一系列操作的回調函數。作為JmsTemplate的execute函數的參數使用。
jms/core/support
5.10 JmsGatewaySupport:方便應用類訪問JMS。使用該類時需要設置一個ConnectionFactory 或者JmsTemplate實例。如果存在ConnectionFactory ,那麼它通過createJmsTemplate方法會創建自己的JmsTemplate。
6 jms/listener
6.1 AbstractJmsListeningContainer:繼承自JmsDestinationAccessor,作為所有Message Listener Container的公共基類。它主要提供了JMS connection的生命周期管理的功能,但是沒有對消息接收的方式(主動接收方式或者異步接收方式)等做任何假定。
6.2 MessageListenerContainer:框架內部使用的一個抽象類,用來表示一個消息監聽器容器,不會被用來支持JMS和JCA模式的外部容器實現。
6.3 AbstractMessageListenerContainer: Spring消息監聽器容器(如SimpleMessageListenerContainer、SimpleMessageListenerContainer)的父類。
6.4 AbstractPollingMessageListenerContainer:繼承自AbstractMessageListenerContainer,它提供了對於主動接收消息(polling)的支持,以及支持外部的事務管理。
6.5 SimpleMessageListenerContainer:最簡單的消息監聽器容器,用來從jms 消息隊列中接收消息,然後推送註冊到它內部的消息監聽器(MessageListener)中,只能處理固定數量的JMS會話,且不支持事務。
在Spring框架中使用JMS傳遞消息有兩種方式:JMS template和message listener container,前者用於同步收發消息,後者主要用於異步收消息。
6.6 DefaultMessageListenerContainer:用於異步消息監聽的消息監聽器容器。跟SimpleMessageListenerContainer一樣,DefaultMessageListenerContainer也支持創建多個Session和MessageConsumer來接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer創建了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),並交給taskExecutor運行。
6.7 LocallyExposedJmsResourceHolder:JMS資源句柄JmsResourceHolder的子類,指示本地的資源。
6.8 SessionAwareMessageListener:SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如我們在使用MessageListener處理接收到的消息時我們需要發送一個消息通知對方我們已經收到這個消息了,那麼這個時候我們就需要在代碼裏面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到消息後發送一個回復的消息,它同樣為我們提供了一個處理接收到的消息的onMessage方法。
6.9 SubscriptionNameProvider:消息監聽器會實現該接口,表示一個持久的訂閱,否則消息監聽器被用作一個默認的訂閱。
jms/listener/adapter
6.10 AbstractAdaptableMessageListener:JMS消息監聽器適配器的抽象類,提供系列方法用來提取JMS消息的有效信息。
6.11 JmsResponse:在運行狀態時,destination需要計算時使用該類,返回JMS監聽器的方法用來指示destination。如果在運行中不需要計算destination,推薦使用org.springframework.messaging.handler.annotation.SendTo @SendTo。
6.12 MessageListenerAdapter:MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是將接收到的消息進行類型轉換,然後通過反射的形式把它委託給目標監聽器進行處理。MessageListenerAdapter會把接收到的消息做如下轉換:
TextMessage轉換為String對象;
BytesMessage轉換為byte數組;
MapMessage轉換為Map對象;
ObjectMessage轉換為對應的Serializable對象。
6.13 MessagingMessageListenerAdapter:MessageListener適配器,援引一個可配置的InvocableHandlerMethod(用於在某個請求被控制器方法處理時,包裝處理所需的各種參數和執行處理邏輯)。
6.14 ListenerExecutionFailedException:監聽器方法執行失敗時拋出的異常。
6.15 ReplyFailureException:需要回復的消息發送失敗拋出的異常。
jms/listener/endpoint
6.16 JmsMessageEndpointFactory:JCA 1.7 MessageEndpointFactory工廠類的一個實現,為JMS監聽器提供了事務管理能力。
6.17 JmsMessageEndpointManager:GenericMessageEndpointManager的一個拓展,ActivationSpec配置中加入了對JMS的支持。
6.18 JmsActivationSpecFactory:基於JmsActivationSpecConfig用來創建JCA 1.5 ActivationSpec對象的工廠。
6.19 StandardJmsActivationSpecFactory:JmsActivationSpecFactory接口的標準實現,支持JMS 1.5中定義的標準JMS屬性,忽視Spring的”maxConcurrency” 、 “prefetchSize” 設置。
6.20 DefaultJmsActivationSpecFactory: JmsActivationSpecFactory接口的默認實現。支持JCA 1.5中所定義的標準額JMS屬性,也包括Spring拓展的一些設置,如”maxConcurrency” 、 “prefetchSize” 。
6.21 JmsActivationSpecConfig:激活JMS message endpoint的一些通用的配置對象。
7 jms/remoting
7.1 JmsInvokerClientInterceptor:方法攔截器,序列化遠程觸發對象和反序列化遠程觸髮結果對象,使用Java序列化方法,例如RMI。
7.2 JmsInvokerProxyFactoryBean:jms觸發代理的工廠bean,暴露bean引用的代理服務,使用特定的服務接口。
7.3 JmsInvokerServiceExporter:為了支持基於消息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean導出為基於消息的服務;同時為客戶端提供了JmsInvokerProxyFactoryBean來使用這些服務。
8 jms/support
8.1 JmsAccessor:定義了幾個用於訪問JMS服務的共通屬性,提供了創建Connection和Session的方法。是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer的父類。
8.2 JmsHeaderMapper:將消息頭整合到要向外發送的JMS消息中的接口,或者從接收到的JMS消息提取出消息頭的信息。
8.3 SimpleJmsHeaderMapper :JmsHeaderMapper接口的簡單實現。
8.4 JmsHeaders:將JMS屬性設置到通用消息頭部或者從其提取出JMS屬性用到的預定義的名字或者前綴。
8.5 JmsMessageHeaderAccessor:MessageHeaderAccessor接口的一個實現,能夠訪問JMS規範的頭header。
8.6 JmsUtils:JMS工具包,主要是框架內部使用。
8.7 QosSettings:收集Quality-of-Service設置,在發送消息時使用。
jms/support/converter
8.8 MessageConverter:在收發消息時,將Java objects和JMS messages相互轉換。
8.9 SimpleMessageConverter: 實現String與TextMessage之間的相互轉換,位元組數組與BytesMessage之間的相互轉換,Map與MapMessage之間的相互轉換以及Serializable對象與ObjectMessage之間的相互轉換。
8.10 MarshallingMessageConverter:使用JAXB庫實現消息與XML格式之間的相互轉換。
8.11 MessagingMessageConverter:利用MessageConverter將messaging abstraction的Message和javax.jms.Message相互轉換。
8.12 SmartMessageConverter:MessageConverter的拓展,增加了轉換提示功能。
8.13 MappingJackson2MessageConverter:使用Jackson 2 JSON庫實現消息與JSON格式之間相互轉換。
8.14 MessageType:定義幾個常量表示要轉換成得目標消息的類型,有text、bytes、map、object。
8.15 MessageConversionException:MessageConverter出錯時拋出的異常。
jms/support/destination
8.16 DestinationResolver:將指定的目的地名解析為目的地實例。 參數pubSubDomain用於指定是使用「發佈/訂閱」模式(解析後的目的地是Topic),還是使用「點對點」模式(解析後的目的地是Queue)。
8.17 BeanFactoryDestinationResolver:實現了DestinationResolver接口和BeanFactoryAware接口。它會根據指定的目的地名從BeanFactory中查找目的地實例。
8.18 DynamicDestinationResolver:實現了DestinationResolver接口。根據指定的目的地名動態創建目的地實例。
8.19 CachingDestinationResolver:繼承了DestinationResolver,增加了緩存的功能,在目的地失效的時候,removeFromCache方法會被調用;在JMS provider失效的時候,clearCache方法會被調用。
8.20 JndiDestinationResolver:繼承自JndiLocatorSupport, 同時實現了CachingDestinationResolver接口。如果在JMS provider中配置了靜態目的地,那麼JndiDestinationResolver通過JNDI查找的方式獲得目的地實例。
8.21 JmsDestinationAccessor:提供了用於解析目的地的方法。destinationResolver屬性的默認值是DynamicDestinationResolver的實例,也就是說默認採用動態目的地解析的方式;pubSubDomain屬性用於指定是使用「發佈/訂閱」模式還是使用「點對點」模式,默認值是false(點對點模式)。
8.22 DestinationResolutionException:將指定的目的地名解析為目的地實例出錯拋出的異常。
三、jms的使用演示
(一)jms的單獨使用
為了更好的理解Spring整合jms,先來看下單獨使用Java消息服務的方式。以Java消息服務的開源實現產品ActiveMQ為例。使用消息服務,需要做三件事:1、開啟消息服務器。2、創建消息生產者。3、創建消息消費者。
1、開啟消息服務器
如果是Windows系統下可以直接雙擊ActiveMQ安裝目錄下的bin目錄下的activemq.bat文件來啟動消息服務器。如果是Linux系統,進入activeMq安裝包下的bin目錄,使用./activemq start命令就可以啟動activemq服務。
2、創建消息生產者
消息的生產者主要用來將包含業務邏輯的消息發送到消息服務器。以下為發送消息測試,嘗試發送三條消息到消息服務器,消息內容為「大家好,這是個測試」。
1 public class Sender{ 2 public static void main(String[] args) throws Exception{ 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start() 6 7 Session session = connection.creatSession(Boolean.True, Session.AUTO_ACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 10 MessageProducer producer = session.createProducer(destionation); 11 for (int i=0; i<3; i++){ 12 TextMessage message = session.createTextMessage("大家好,這是個測試"); 13 Tread.sleep(1000); 14 //通過消息生產者發出消息 15 producer.send(message); 16 } 17 session.commit(); 18 session.close(); 19 connection.close(); 20 } 21 }
3、創建消息消費者
消息的消費者用於連接消息服務器將服務器中的消息提取出來進行相應的處理。
1 public class Receiver{ 2 public static void main(String[] args) throws Exception { 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactroy(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start(); 6 7 final Session session = connection.createSession(Boolean.TRUE, Session.AUTOACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 MessageConsumer consumer = session.createConsumer(destination); 10 11 int i = 0; 12 while(i<3){ 13 i++; 14 TextMessage message = (TextMessage) consumer.receive(); 15 session.commit(); 16 //TODO something... 17 System.out.println("收到消息:" + message.getText()); 18 } 19 20 session.close(); 21 connection.close(); 22 } 23 }
運行時,先開啟消息的生產者,向服務器發送消息,然後開啟消息的消費者。上述代碼可以看出,和數據庫實現很相似,一系列的冗餘但是必不可少的代碼用於創建connectionFactory、connection、session,利用session來 createQueue、createProducer、createConsumer,真正用於發送和接收消息的代碼並不多。
Spring 通過基於模板方法的設計模式來解決這個問題,這個模板就是 JmsTemplate 。JmsTemplate封裝了大部分創建連接、獲取session及發送接收消息相關的代碼,使得我們可以把精力集中在消息的發送和接收上。所以使用 JmsTemplate能夠非常容易地在消息生產方發送隊列和主題消息,在消費消息的一方也能夠非常容易地接收這些消息。
(二)Spring整合jms
在Spring中使用jms同樣需要做三件事。1、配置文件的配置。2、發送消息。3、接收消息。
1、配置文件的配置
上面我們提到了Spring將Connection的創建和關閉,Session的創建和關閉等操作都封裝到了JmsTemplate中,所以在Spring的核心配置文件中首先要註冊JmsTemplate類型的bean。ActiveMQConnectionFactory用於連接消息服務器,ActiveMQQueue消息隊列是實際的消息源,也要註冊。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 </beans>
2、發送消息
Spring中使用jmsTemplate發送消息到消息服務器中去,省去了冗餘的Connection以及Session等的創建和銷毀過程。
1 public class HelloWorldSender{ 2 public static void main(String[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXmlApplicationContext(new string[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 5 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 6 Destination destination = (Destination) context.getBean("destination"); 7 8 jmsTemplate.send(destination, new MessageCreator(){ 9 public Message createMessage(Session session) throws JMSException{ 10 return session.createTextMessage("大家好,這是個測試"); 11 } 12 }); 13 } 14 }
3、接收消息。
Spring中連接服務器接收消息示例如下:
1 public class HelloWorldReceiver{ 2 public static void main(string[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXMLApplicationContext(new String[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 5 Destination destination = (Destination) context.getBean("destination"); 6 7 TextMessage msg = (TextMessage) jmsTemplate.receive(destination); 8 System.out.println("received msg is:" + msg.getText()); 9 } 10 }
經過上面3步就完成了Spring消息的發送和接收。在HelloWorldSender發送消息類中使用jmsTemplate.send方法來發送消息,沒有問題。在HelloWorldReceiver接收消息類中,使用jmsTemplate.receive方法來接收消息會存在一個問題,該方法只能接收一次消息,如果未收到消息則一直等待。利用消息監聽器來解決這個問題,消息監聽器可以循環監聽消息服務器上的消息。消息監聽器並非Spring獨有, Spring整合JMS的應用提供三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。
MessageListener是最原始的消息監聽器,它是JMS規範中定義的一個接口。其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。
SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如我們在使用MessageListener處理接收到的消息時我們需要發送一個消息通知對方我們已經收到這個消息了,那麼這個時候我們就需要在代碼裏面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到消息後發送一個回復的消息,它同樣為我們提供了一個處理接收到的消息的onMessage方法,但是這個方法可以同時接收兩個參數,一個是表示當前接收到的消息Message,另一個就是可以用來發送消息的Session對象。
MessageListenerAdapter類實現了SessionAwareMessageListener接口和MessageListener接口,它的主要作用是將接收到的消息進行類型轉換,然後通過反射的形式把它交給一個普通的Java類進行處理。TextMessage轉換為String對象;BytesMessage轉換為byte數組;MapMessage轉換為Map對象;ObjectMessage轉換為對應的Serializable對象。
4、利用消息監聽器接收消息(第3步改進版)。
我們需要做兩步,第一創建一個消息監聽器,第二為了使用消息監聽器,修改配置文件。
第一, 我們先來創建一個消息監聽器:
1 public class MyMessageListener implements MessageListener{ 2 3 @Override 4 public void onMessage(Message arg0){ 5 TextMessage msg = (TextMessage) arg0; 6 try{ 7 System.out.println(msg.getText()); 8 } catch (JMSException e){ 9 e.printStackTrace(); 10 } 11 } 12 }
一旦有新消息Spring會將消息引導至消息監聽器以方便用戶進行相應的邏輯處理。
第二,修改配置文件。
為了使用消息監聽器,需要在配置文件中註冊消息監聽器容器,並將消息監聽器注入到消息監聽器容器中。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 <bean id="myTextListener" class="test.activeMQ.Spring.MyMessageListener" /> 21 22 <bean id="javaConsumer" class="org.Springframework.jms.listener.DefaultMessageListenerContainer"> 23 <property name="ConnectionFactory" ref="connectionFactory" /> 24 <Property name="destination" ref="destination" /> 25 <Property name="messageListener" ref="myTextListener" /> 26 </bean> 27 28 </beans>
通過以上的修改配置就可以進行消息的監聽功能了,一旦有消息傳入消息服務器,則會被消息監聽器監聽到,並由Spring將消息內容引導至消息監聽器的處理函數中等待用戶的進一步邏輯處理。
四、Spring中jms模塊核心源碼分析
從第三節可以看出,Spring中使用JmsTemplate模板類來進行發送消息和接收消息操作,接收消息可以使用消息監聽器的方法來替代模板方法。所以Spring中jms模塊核心主要有兩個:JmsTemplate和消息監聽器。
(一)JmsTemplate發送消息
我們先來看JmsTemplate,在上面使用示例中,使用JmsTemplate發送消息的函數為:
1 jmsTemplate.send(destination, new MessageCreator(){ 2 public Message createMessage(Session session) throws JMSException{ 3 return session.createTextMessage("大家好,這是個測試"); 4 } 5 });
進入JmsTemplate類的send函數:
1 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 2 execute(session -> { 3 doSend(session, destination, messageCreator); 4 return null; 5 }, false); 6 }
(1)通用代碼的抽取
調用了該類中的execute函數,繼續進入execute函數查看源碼邏輯:
1 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 2 Assert.notNull(action, "Callback object must not be null"); 3 Connection conToClose = null; 4 Session sessionToClose = null; 5 try { 6 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 7 obtainConnectionFactory(), this.transactionalResourceFactory, startConnection); 8 if (sessionToUse == null) { 9 //創建connection 10 conToClose = createConnection(); 11 //根據connection創建session 12 sessionToClose = createSession(conToClose); 13 //是否開啟向服務推送連接信息,只有接收信息時需要,發送時不需要 14 if (startConnection) { 15 conToClose.start(); 16 } 17 sessionToUse = sessionToClose; 18 } 19 if (logger.isDebugEnabled()) { 20 logger.debug("Executing callback on JMS Session: " + sessionToUse); 21 } 22 //調用回調函數 23 return action.doInJms(sessionToUse); 24 } 25 catch (JMSException ex) { 26 throw convertJmsAccessException(ex); 27 } 28 finally { 29 //關閉session 30 JmsUtils.closeSession(sessionToClose); 31 //釋放連接 32 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 33 } 34 }
execute函數封裝創建Connection、創建Session、關閉Session和關閉Connection等操作,這些代碼都是發送消息都要做的工作,沒有差異,execute方法幫助我們抽離了這些冗餘代碼是我們更加專註業務邏輯的實現。做完這些通用的操作後,通過調用回調函數將程序引入用戶自定義實現的個性化處理。Spring使用execute方法封裝了冗餘代碼,而將個性化的代碼實現放在了回調函數action.doInJms(sessionToUse)中。
(2)發送消息的實現
我們繼續看回調函數action.doInJms(sessionToUse)。在發送消息的功能中回調函數通過局部類實現。
1 new SessionCallback<Object>(){ 2 public Object doInJms(Session session) throws JMSException { 3 doSend(session, destination, messageCreator); 4 return null; 5 } 6 }
此時的發送邏輯轉向了doSend方法,我們只需要關注該方法:
1 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 2 throws JMSException { 3 4 Assert.notNull(messageCreator, "MessageCreator must not be null"); 5 MessageProducer producer = createProducer(session, destination); 6 try { 7 Message message = messageCreator.createMessage(session); 8 if (logger.isDebugEnabled()) { 9 logger.debug("Sending created message: " + message); 10 } 11 doSend(producer, message); 12 // Check commit - avoid commit call within a JTA transaction. 13 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 14 // Transacted session created by this template -> commit. 15 JmsUtils.commitIfNecessary(session); 16 } 17 } 18 finally { 19 JmsUtils.closeMessageProducer(producer); 20 } 21 }
1 protected void doSend(MessageProducer producer, Message message) throws JMSException { 2 if (this.deliveryDelay >= 0) { 3 producer.setDeliveryDelay(this.deliveryDelay); 4 } 5 if (isExplicitQosEnabled()) { 6 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 7 } 8 else { 9 producer.send(message); 10 } 11 }
最終的目標還是通過MessageProducer的send來發送消息。
(二)JmsTemplate接收消息
在上面使用示例中,使用JmsTemplate接收消息的函數為:TextMessage msg = (TextMessage) jmsTemplate.receive(destination);我們進入jmsTemplate類的receive函數:
1 public Message receive(Destination destination) throws JmsException { 2 return receiveSelected(destination, null); 3 }
繼續進入JmsTemplate類的receiveSelected函數:
1 public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException { 2 return execute(session -> doReceive(session, destination, messageSelector), true); 3 }
繼續進入JmsTemplate類的doReceive函數:
1 protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector) 2 throws JMSException { 3 4 return doReceive(session, createConsumer(session, destination, messageSelector)); 5 }
1 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 2 try { 3 // Use transaction timeout (if available). 4 long timeout = getReceiveTimeout(); 5 ConnectionFactory connectionFactory = getConnectionFactory(); 6 JmsResourceHolder resourceHolder = null; 7 if (connectionFactory != null) { 8 resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 9 } 10 if (resourceHolder != null && resourceHolder.hasTimeout()) { 11 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 12 } 13 Message message = receiveFromConsumer(consumer, timeout); 14 if (session.getTransacted()) { 15 // Commit necessary - but avoid commit call within a JTA transaction. 16 if (isSessionLocallyTransacted(session)) { 17 // Transacted session created by this template -> commit. 18 JmsUtils.commitIfNecessary(session); 19 } 20 } 21 else if (isClientAcknowledge(session)) { 22 // Manually acknowledge message, if any. 23 if (message != null) { 24 message.acknowledge(); 25 } 26 } 27 return message; 28 } 29 finally { 30 JmsUtils.closeMessageConsumer(consumer); 31 } 32 }
其中代碼Message message = receiveFromConsumer(consumer, timeout);中的receiveFromConsumer函數在JmsDestinationAccessor類(package org.springframework.jms.support.destination,JmsTemplate的父類)中定義,我們進入查看源碼:
1 protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException { 2 if (timeout > 0) { 3 return consumer.receive(timeout); 4 } 5 else if (timeout < 0) { 6 return consumer.receiveNoWait(); 7 } 8 else { 9 return consumer.receive(); 10 } 11 }
實現的方式和發送相似,使用execute函數來封裝冗餘的公共操作,包括創建MessageConsumer,而最終的目標還是通過MessageConsumer(javax.jms.MessageConsumer包中)的receive來接收消息。
(三)消息監聽器
消息監聽器容器是一個特殊的bean,一旦有消息到達就可以獲取消息,並通過調用onMessage()方法將消息傳遞給一個消息監聽器MessageListener。Spring提供了兩種消息監聽器容器:
SimpleMessageListenerContainer(package org.springframework.jms.listener):最簡單的消息監聽器容器,只能處理固定數量的JMS會話,且不支持事務。
DefaultMessageListenerContainer(package org.springframework.jms.listener):這個消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支持。
下面以DefaultMessageListenerContainer為例進行分析。在上面消息監聽器的使用示例中,需要在配置文件中註冊消息監聽器容器,並將消息監聽器注入到消息監聽器容器中。我們只有把自定義的消息監聽器注入到消息監聽器容器中,容器才會把消息轉給消息監聽器進行處理。
DefaultMessageListenerContainer類的繼承關係如下:
DefaultMessageListenerContainer
— AbstractPollingMessageListenerContainer
— AbstractMessageListenerContainer
— AbstractJmsListeningContainer
— JmsDestinationAccessor
— JmsAccessor
— InitializingBean
— BeanNameAware
— DisposableBean
—
SmartLifecycle
— MessageListenerContainer
— SmartLifecycle
我們看到DefaultMessageListenerContainer類實現了InitializingBean接口,InitializingBean接口為bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是繼承該接口的類,在初始化bean的時候都會執行該方法。也就是說spring初始化bean的時候,如果該bean實現了InitializingBean接口,會自動調用afterPropertiesSet方法。DefaultMessageListenerContainer在其父類AbstractJmsListeningContainer中實現了該方法:
1 public void afterPropertiesSet() { 2 //驗證connectionFactory 3 super.afterPropertiesSet(); 4 //驗證配置文件 5 validateConfiguration(); 6 初始化 7 initialize(); 8 }
DefaultMessageListenerContainer監聽器容器的初始化中包含了三句代碼,前兩句用於屬性驗證,比如connectionFactory或者destination等屬性是否為空等,而真正用於初始化的操作委託在initialize函數中執行:
1 public void initialize() throws JmsException { 2 try { 3 //lifecycleMonitor用於控制生命周期的同步處理 4 synchronized (this.lifecycleMonitor) { 5 this.active = true; 6 this.lifecycleMonitor.notifyAll(); 7 } 8 doInitialize(); 9 } 10 catch (JMSException ex) { 11 synchronized (this.sharedConnectionMonitor) { 12 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 13 this.sharedConnection = null; 14 } 15 throw convertJmsAccessException(ex); 16 } 17 }
函數中調用了該類的抽象方法doInitialize,該函數實際在其子類DefaultMessageListenerContainer中實現(父類調用抽象方法,該抽象方法由子類實現):
1 protected void doInitialize() throws JMSException { 2 synchronized (this.lifecycleMonitor) { 3 for (int i = 0; i < this.concurrentConsumers; i++) { 4 scheduleNewInvoker(); 5 } 6 } 7 }
concurrentConsumers設置的是對每個listener在初始化的時候設置的並發消費者的個數,因為在spring中messageListener實例是單例的,spring-jms不能自作主張的創建多個messageListener實例來並發消費。所以spring在內部,創建了多個MessageConsumer實例,並使用consumer.receive()方法以阻塞的方式來獲取消息,當獲取消息後,再執行messageListener.onMessage()方法。concurrentConsumers屬性就是為了指定spring內部可以創建MessageConsumer的最大個數,當messageConsumer實例被創建後,將會封裝在一個Runner接口並交給taskExecutor來調度;如果consumer在一直沒有收到消息,則會被置為「idle」並從consumer列表中移除;如果所有的consumer都處於active狀態,則會創建新的consumer實例直到達到maxConcurrentConsumers個數上限。通常taskExecutor的線程池容量稍大於concurrentConsumer。
我們繼續上述源碼,doInitialize函數中調用了本類DefaultMessageListenerContainer中的scheduleNewInvoker方法:
1 private void scheduleNewInvoker() { 2 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 3 if (rescheduleTaskIfNecessary(invoker)) { 4 // This should always be true, since we're only calling this when active. 5 this.scheduledInvokers.add(invoker); 6 } 7 }
其中調用了父類AbstractJmsListeningContainer(package org.springframework.jms.listener)的rescheduleTaskIfNecessary方法:
1 protected final boolean rescheduleTaskIfNecessary(Object task) { 2 if (this.running) { 3 try { 4 doRescheduleTask(task); 5 } 6 catch (RuntimeException ex) { 7 logRejectedTask(task, ex); 8 this.pausedTasks.add(task); 9 } 10 return true; 11 } 12 else if (this.active) { 13 this.pausedTasks.add(task); 14 return true; 15 } 16 else { 17 return false; 18 } 19 }
這裡需要注意的是,子類DefaultMessageListenerContainer調用了父類AbstractJmsListeningContainer的rescheduleTaskIfNecessary方法,rescheduleTaskIfNecessary方法又調用回子類DefaultMessageListenerContainer的方法doRescheduleTask,即鉤子方法。所以doRescheduleTask方法是在DefaultMessageListenerContainer中定義的。
1 protected void doRescheduleTask(Object task) { 2 Assert.state(this.taskExecutor != null, "No TaskExecutor available"); 3 this.taskExecutor.execute((Runnable) task); 4 }
doRescheduleTask函數其實是在開啟一個線程執行Runnable。Spring根據concurrentConsumer數量建立了對應數量的線程,而每一個線程都作為一個獨立的接收者在循環接收消息。
現在回到DefaultMessageListenerContainer的scheduleNewInvoker方法。我們上面介紹過DefaultMessageListenerContainer創建了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),並交給taskExecutor運行。我們重點關注AsyncMessageListenerInvoker類(該類是DefaultMessageListenerContainer的一個內部類)。它是作為一個Runnable去執行,我們看下其run方法:
1 public void run() { 2 //並發控制 3 synchronized (lifecycleMonitor) { 4 activeInvokerCount++; 5 lifecycleMonitor.notifyAll(); 6 } 7 boolean messageReceived = false; 8 try { 9 //根據每個任務設置的最大處理消息數量而做不同的處理 10 //小於0默認為無限制,一直能接収消息 11 if (maxMessagesPerTask < 0) { 12 messageReceived = executeOngoingLoop(); 13 } 14 else { 15 int messageCount = 0; 16 //消息數量控制,一旦超出數量則停止循環 17 while (isRunning() && messageCount < maxMessagesPerTask) { 18 messageReceived = (invokeListener() || messageReceived); 19 messageCount++; 20 } 21 } 22 } 23 catch (Throwable ex) { 24 //清理操作,包括關閉session等 25 clearResources(); 26 if (!this.lastMessageSucceeded) { 27 // We failed more than once in a row or on startup - 28 // wait before first recovery attempt. 29 waitBeforeRecoveryAttempt(); 30 } 31 this.lastMessageSucceeded = false; 32 boolean alreadyRecovered = false; 33 synchronized (recoveryMonitor) { 34 if (this.lastRecoveryMarker == currentRecoveryMarker) { 35 handleListenerSetupFailure(ex, false); 36 recoverAfterListenerSetupFailure(); 37 currentRecoveryMarker = new Object(); 38 } 39 else { 40 alreadyRecovered = true; 41 } 42 } 43 if (alreadyRecovered) { 44 handleListenerSetupFailure(ex, true); 45 } 46 } 47 finally { 48 synchronized (lifecycleMonitor) { 49 decreaseActiveInvokerCount(); 50 lifecycleMonitor.notifyAll(); 51 } 52 if (!messageReceived) { 53 this.idleTaskExecutionCount++; 54 } 55 else { 56 this.idleTaskExecutionCount = 0; 57 } 58 synchronized (lifecycleMonitor) { 59 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 60 // We're shutting down completely. 61 scheduledInvokers.remove(this); 62 if (logger.isDebugEnabled()) { 63 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 64 } 65 lifecycleMonitor.notifyAll(); 66 clearResources(); 67 } 68 else if (isRunning()) { 69 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); 70 if (nonPausedConsumers < 1) { 71 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + 72 "Check your thread pool configuration! Manual recovery necessary through a start() call."); 73 } 74 else if (nonPausedConsumers < getConcurrentConsumers()) { 75 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + 76 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + 77 "to be triggered by remaining consumers."); 78 } 79 } 80 } 81 } 82 }
上面函數根據maxMessagesPerTask(每個任務設置的最大處理消息數量)值的不同,分開進行了處理。如果是無限值,執行函數executeOngoingLoop;如果不是,控制接收消息數量,一旦超出數量則停止循環,同時可以通過設置標誌位running來控制消息接收的暫停與恢復,核心代碼就是invokeListener()。我們也看一下executeOngoingLoop代碼:
1 private boolean executeOngoingLoop() throws JMSException { 2 boolean messageReceived = false; 3 boolean active = true; 4 while (active) { 5 synchronized (lifecycleMonitor) { 6 boolean interrupted = false; 7 boolean wasWaiting = false; 8 //如果當前任務已經處於激活狀態但是卻給了暫時終止的命令 9 while ((active = isActive()) && !isRunning()) { 10 if (interrupted) { 11 throw new IllegalStateException("Thread was interrupted while waiting for " + 12 "a restart of the listener container, but container is still stopped"); 13 } 14 if (!wasWaiting) { 15 //如果並非處於等待狀態則說明是第一次執行,需要將激活任務數量減少 16 decreaseActiveInvokerCount(); 17 } 18 //開始進入等待狀態,等待任務的恢復命令 19 wasWaiting = true; 20 try { 21 //通過wait等待,也就是等待notify或者notifyAll 22 lifecycleMonitor.wait(); 23 } 24 catch (InterruptedException ex) { 25 // Re-interrupt current thread, to allow other threads to react. 26 Thread.currentThread().interrupt(); 27 interrupted = true; 28 } 29 } 30 if (wasWaiting) { 31 activeInvokerCount++; 32 } 33 if (scheduledInvokers.size() > maxConcurrentConsumers) { 34 active = false; 35 } 36 } 37 //正常處理流程 38 if (active) { 39 messageReceived = (invokeListener() || messageReceived); 40 } 41 } 42 return messageReceived; 43 }
上面函數中線程等待不是單純採用while循環來控制,因為如果單純採用while循環會浪費CPU的始終周期,給資源造成巨大的浪費。這裡採用的使用全局控制變量lifecycleMonitor的wait()方法來暫停線程。所以,如果終止線程需要再次恢復的話,除了更改this.running標誌位外,還需要調用lifecycleMonitor.notify或者lifecycleMonitor.notifyAll來使線程恢復。
從上述代碼中可以看出其核心執行流程也是invokeListener()。所以內部類AsyncMessageListenerInvoker的run方法中核心的處理就是調用invokeListener來接收消息並激活消息監聽器。
1 private boolean invokeListener() throws JMSException { 2 this.currentReceiveThread = Thread.currentThread(); 3 try { 4 //初始化資源包括首次創建的時候創建session和consumer 5 initResourcesIfNecessary(); 6 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); 7 //改變標誌位,信息成功處理 8 this.lastMessageSucceeded = true; 9 return messageReceived; 10 } 11 finally { 12 this.currentReceiveThread = null; 13 } 14 }
上述函數調用了receiveAndExecute函數,該函數在DefaultMessageListenerContainer的父類AbstractPollingMessageListenerContainer(package org.springframework.jms.listener)給出了:
1 protected boolean receiveAndExecute( 2 Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) 3 throws JMSException { 4 5 if (this.transactionManager != null) { 6 // Execute receive within transaction. 7 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 8 boolean messageReceived; 9 try { 10 messageReceived = doReceiveAndExecute(invoker, session, consumer, status); 11 } 12 catch (JMSException | RuntimeException | Error ex) { 13 rollbackOnException(this.transactionManager, status, ex); 14 throw ex; 15 } 16 this.transactionManager.commit(status); 17 return messageReceived; 18 } 19 20 else { 21 // Execute receive outside of transaction. 22 return doReceiveAndExecute(invoker, session, consumer, null); 23 } 24 }
在介紹消息監聽器容器的分類時,已經介紹DefaultMessageListenerContainer消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支持。如果用戶配置了this.transactionManage也就是配置了事務,那麼,消息的接收會被控制在事務之內,一旦出現任何異常都會被回滾,而回滾操作也會交由事務管理器同一處理。
上面函數調用了doReceiveAndExecute(),doReceiveAndExecute包含了整個消息的接收處理過程,我們看下其代碼:
1 protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session, 2 @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 MessageConsumer consumerToClose = null; 7 try { 8 Session sessionToUse = session; 9 boolean transactional = false; 10 if (sessionToUse == null) { 11 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 12 obtainConnectionFactory(), this.transactionalResourceFactory, true); 13 transactional = (sessionToUse != null); 14 } 15 if (sessionToUse == null) { 16 Connection conToUse; 17 if (sharedConnectionEnabled()) { 18 conToUse = getSharedConnection(); 19 } 20 else { 21 conToUse = createConnection(); 22 conToClose = conToUse; 23 conToUse.start(); 24 } 25 sessionToUse = createSession(conToUse); 26 sessionToClose = sessionToUse; 27 } 28 MessageConsumer consumerToUse = consumer; 29 if (consumerToUse == null) { 30 consumerToUse = createListenerConsumer(sessionToUse); 31 consumerToClose = consumerToUse; 32 } 33 //接收消息 34 Message message = receiveMessage(consumerToUse); 35 if (message != null) { 36 if (logger.isDebugEnabled()) { 37 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 38 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 39 sessionToUse + "]"); 40 } 41 //模板方法,當消息接收且在未處理前給子類機會做相應的處理, 42 messageReceived(invoker, sessionToUse); 43 boolean exposeResource = (!transactional && isExposeListenerSession() && 44 !TransactionSynchronizationManager.hasResource(obtainConnectionFactory())); 45 if (exposeResource) { 46 TransactionSynchronizationManager.bindResource( 47 obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); 48 } 49 try { 50 //激活監聽器 51 doExecuteListener(sessionToUse, message); 52 } 53 catch (Throwable ex) { 54 if (status != null) { 55 if (logger.isDebugEnabled()) { 56 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 57 } 58 status.setRollbackOnly(); 59 } 60 handleListenerException(ex); 61 // Rethrow JMSException to indicate an infrastructure problem 62 // that may have to trigger recovery... 63 if (ex instanceof JMSException) { 64 throw (JMSException) ex; 65 } 66 } 67 finally { 68 if (exposeResource) { 69 TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 70 } 71 } 72 // Indicate that a message has been received. 73 return true; 74 } 75 else { 76 if (logger.isTraceEnabled()) { 77 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 78 "session [" + sessionToUse + "] did not receive a message"); 79 } 80 //接收到空消息的處理 81 noMessageReceived(invoker, sessionToUse); 82 // Nevertheless call commit, in order to reset the transaction timeout (if any). 83 if (shouldCommitAfterNoMessageReceived(sessionToUse)) { 84 commitIfNecessary(sessionToUse, null); 85 } 86 // Indicate that no message has been received. 87 return false; 88 } 89 } 90 finally { 91 JmsUtils.closeMessageConsumer(consumerToClose); 92 JmsUtils.closeSession(sessionToClose); 93 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 94 } 95 }
上述代碼中我們重點關注下激活監聽器 doExecuteListener(sessionToUse, message)方法,doExecuteListener方法在AbstractPollingMessageListenerContainer類的父類AbstractMessageListenerContainer(package org.springframework.jms.listener)給出。
1 protected void doExecuteListener(Session session, Message message) throws JMSException { 2 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 3 if (logger.isWarnEnabled()) { 4 logger.warn("Rejecting received message because of the listener container " + 5 "having been stopped in the meantime: " + message); 6 } 7 rollbackIfNecessary(session); 8 throw new MessageRejectedWhileStoppingException(); 9 } 10 11 try { 12 invokeListener(session, message); 13 } 14 catch (JMSException | RuntimeException | Error ex) { 15 rollbackOnExceptionIfNecessary(session, ex); 16 throw ex; 17 } 18 commitIfNecessary(session, message); 19 }
該函數又調用了該類中的invokeListener函數和commitIfNecessary函數:
(1)invokeListener函數
1 protected void invokeListener(Session session, Message message) throws JMSException { 2 Object listener = getMessageListener(); 3 4 if (listener instanceof SessionAwareMessageListener) { 5 doInvokeListener((SessionAwareMessageListener) listener, session, message); 6 } 7 else if (listener instanceof MessageListener) { 8 doInvokeListener((MessageListener) listener, message); 9 } 10 else if (listener != null) { 11 throw new IllegalArgumentException( 12 "Only MessageListener and SessionAwareMessageListener supported: " + listener); 13 } 14 else { 15 throw new IllegalStateException("No message listener specified - see property 'messageListener'"); 16 } 17 }
上述方法又調用了該類中的doInvokeListener方法,繼續查看其代碼:
1 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 2 throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 try { 7 Session sessionToUse = session; 8 if (!isExposeListenerSession()) { 9 // We need to expose a separate Session. 10 conToClose = createConnection(); 11 sessionToClose = createSession(conToClose); 12 sessionToUse = sessionToClose; 13 } 14 // Actually invoke the message listener... 15 listener.onMessage(message, sessionToUse); 16 // Clean up specially exposed Session, if any. 17 if (sessionToUse != session) { 18 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 19 // Transacted session created by this container -> commit. 20 JmsUtils.commitIfNecessary(sessionToUse); 21 } 22 } 23 } 24 finally { 25 JmsUtils.closeSession(sessionToClose); 26 JmsUtils.closeConnection(conToClose); 27 } 28 }
1 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 2 listener.onMessage(message); 3 }
通過層層調用,最終提取監聽器並使用listener.onMessage(message);激活了監聽器,也就是激活了用戶自定義的監聽器邏輯。doExecuteListener函數中還有一句重要的代碼commitIfNecessary。
(2)commitIfNecessary函數
AbstractMessageListenerContainer類中的doExecuteListener方法中調用的commitIfNecessary函數。
1 protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException { 2 // Commit session or acknowledge message. 3 if (session.getTransacted()) { 4 // Commit necessary - but avoid commit call within a JTA transaction. 5 if (isSessionLocallyTransacted(session)) { 6 // Transacted session created by this container -> commit. 7 JmsUtils.commitIfNecessary(session); 8 } 9 } 10 else if (message != null && isClientAcknowledge(session)) { 11 message.acknowledge(); 12 } 13 }
其中又調用了JmsUtils類(package org.springframework.jms.support)的commitIfNecessary(session)函數,我們進入該函數:
1 public static void commitIfNecessary(Session session) throws JMSException { 2 Assert.notNull(session, "Session must not be null"); 3 try { 4 session.commit(); 5 } 6 catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) { 7 // Ignore -> can only happen in case of a JTA transaction. 8 } 9 }
DefaultMessageListenerContainer增加了事務的支持,session.commit()在此完成消息事務的事務提交。告訴消息服務器本地已經正常接收消息,消息服務器接收到本地的事務提交後便可以將此消息刪除。否則,當前消息會被其他接收者重新接收。
Spring中使用JmsTemplate模板類來進行發送消息和接收消息操作,接收消息可以使用消息監聽器的方法來替代模板方法。至此我們完成了Spring中jms模塊兩個核心JmsTemplate和消息監聽器的分析。
本文參考了 郝佳《Spring源碼深度解析》、《Spring 5 官方文檔》及博客園、CSDN部分文獻。
博眾家之所長,集群英之薈萃。遴選各IT領域精品雄文!
歡迎關注「IT架構精選」