ActiveMq 之JMS 看這一篇就夠了

什麼是JMS MQ

全稱:Java MessageService 中文:Java 消息服務。

JMS 是 Java 的一套 API 標準,最初的目的是為了使應用程式能夠訪問現有的 MOM 系 統(MOM 是 MessageOriented Middleware 的英文縮寫,指的是利用高效可靠的消息傳遞機 制進行平台無關的數據交流,並基於數據通訊來進行分散式系統的集成。) ;

後來被許多現有 的 MOM 供應商採用,並實現為 MOM 系統。【常見 MOM 系統包括 Apache 的 ActiveMQ、 阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。 (並 非全部的 MOM 系統都遵循 JMS 規範)】

基於 JMS 實現的 MOM,又被稱為 JMSProvider。

「消息」是在兩台電腦間傳送的數據單位。消息可以非常簡單,例如只包含文本字元串; 也可以更複雜,可能包含嵌入對象。 消息被發送到隊列中。

「消息隊列」是在消息的傳輸過程中保存消息的容器。消息隊列管 理器在將消息從它的源中繼到它的目標時充當中間人。

隊列的主要目的是提供路由並保證消 息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。

消息隊列的主要特點是非同步處理,主要目的是減少請求響應時間和解耦。所以主要的使 用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。同 時由於使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不需要彼此聯繫, 也不需要受對方的影響,即解耦和。如: 跨系統的非同步通訊,所有需要非同步交互的地方都可以使用消息隊列。就像我們除了打電 話(同步)以外,還需要發簡訊,發電子郵件(非同步)的通訊方式。 多個應用之間的耦合,由於消息是平台無關和語言無關的,而且語義上也不再是函數調 用,因此更適合作為多個應用之間的松耦合的介面。基於消息隊列的耦合,不需要發送方和 接收方同時在線。 在企業應用集成(EAI)中,文件傳輸,共享資料庫,消息隊列,遠程過程調用都可以 作為集成的方法。 應用內的同步變非同步,比如訂單處理,就可以由前端應用將訂單資訊放到隊列,後端應 用從隊列里依次獲得消息處理,高峰時的大量訂單可以積壓在隊列里慢慢處理掉。由於同步 通常意味著阻塞,而大量執行緒的阻塞會降低電腦的性能。 消息驅動的架構(EDA),系統分解為消息隊列,和消息製造者和消息消費者,一個處 理流程可以根據需要拆成多個階段(Stage) ,階段之間用隊列連接起來,前一個階段處理的 結果放入隊列,後一個階段從隊列中獲取消息繼續處理。 應用需要更靈活的耦合方式,如發布訂閱,比如可以指定路由規則。 跨區域網,甚至跨城市的通訊,比如北京機房與廣州機房的應用程式的通訊。

消息中間件應用場景

非同步通訊

有些業務不想也不需要立即處理消息。消息隊列提供了非同步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

緩衝

在任何重要的系統中,都會有需要不同的處理時間的元素。消息隊列通過一個緩衝層來幫助任務最高效率的執行,該緩衝有助於控制和優化數據流經過系統的速度。以調節系統響應時間。

解耦

降低工程間的強依賴程度,針對異構系統進行適配。在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。通過消息系統在處理過程中間插入了一個隱含的、基於數據的介面層,兩邊的處理過程都要實現這一介面,當應用發生變化時,可以獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

冗餘

有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

擴展性

因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變程式碼、不需要調節參數。便於分散式擴容。

可恢復性

系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。

順序保證

在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。

過載保護

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

數據流處理

分散式系統產生的海量數據流,如:業務日誌、監控數據、用戶行為等,針對這些數據流進行實時或批量採集匯總,然後進行大數據分析是當前互聯網的必備技術,通過消息隊列完成此類數據收集是最好的選擇。

常用消息隊列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)比較

特性MQ ActiveMQ RabbitMQ RocketMQ Kafka
生產者消費者模式 支援 支援 支援 支援
發布訂閱模式 支援 支援 支援 支援
請求回應模式 支援 支援 不支援 不支援
Api完備性
多語言支援 支援 支援 java 支援
單機吞吐量 萬級 萬級 萬級 十萬級
消息延遲 微秒級 毫秒級 毫秒級
可用性 高(主從) 高(主從) 非常高(分散式) 非常高(分散式)
消息丟失 理論上不會丟失 理論上不會丟失
文檔的完備性
提供快速入門
社區活躍度
商業支援 商業雲 商業雲

JMS中的一些角色

Broker

消息伺服器,作為server提供消息核心服務

provider

生產者

消息生產者是由會話創建的一個對象,用於把消息發送到一個目的地。

Consumer

消費者

消息消費者是由會話創建的一個對象,它用於接收發送到目的地的消息。消息的消費可以採用以下兩種方法之一:

  • 同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。
  • 非同步消費。客戶可以為消費者註冊一個消息監聽器,以定義在消息到達時所採取的動作。

p2p

基於點對點的消息模型

消息生產者生產消息發送到 queue 中,然後消息消費者從 queue 中取出並且消費消息。 消息被消費以後,queue 中不再有存儲,所以消息消費者不可能消費到已經被消費的消
息。
Queue 支援存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它 的則不能消費此消息了。 當消費者不存在時,消息會一直保存,直到有消費消費

pub/sub

基於訂閱/發布的消息模型

消息生產者(發布)將消息發布到 topic 中,同時有多個消息消費者(訂閱)消費該消
息。
和點對點方式不同,發布到 topic 的消息會被所有訂閱者消費。 當生產者發布消息,不管是否有消費者。都不會保存消息 一定要先有消息的消費者,後有消息的生產者。

PTP 和 PUB/SUB 簡單對

1 Topic Queue
Publish Subscribe messaging 發布 訂閱消息 Point-to-Point 點對點
有無狀態 topic 數據默認不落地,是無狀態的。 Queue 數據默認會在 mq 服 務器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存儲。
完整性保障 並不保證 publisher 發布的每條數 據,Subscriber 都能接受到。 Queue 保證每條數據都能 被 receiver 接收。消息不超時。
消息是否會丟失 一般來說 publisher 發布消息到某 一個 topic 時,只有正在監聽該 topic 地址的 sub 能夠接收到消息;如果沒 有 sub 在監聽,該 topic 就丟失了。 Sender 發 送 消 息 到 目 標 Queue, receiver 可以非同步接收這 個 Queue 上的消息。Queue 上的 消息如果暫時沒有 receiver 來 取,也不會丟失。前提是消息不 超時。
消息發布接 收策略 一對多的消息發布接收策略,監 聽同一個topic地址的多個sub都能收 到 publisher 發送的消息。Sub 接收完 通知 mq 伺服器 一對一的消息發布接收策 略,一個 sender 發送的消息,只 能有一個 receiver 接收。 receiver 接收完後,通知 mq 伺服器已接 收,mq 伺服器對 queue 里的消 息採取刪除或其他操作。

Queue

隊列存儲,常用與點對點消息模型

默認只能由唯一的一個消費者處理。一旦處理消息刪除。

Topic

主題存儲,用於訂閱/發布消息模型

主題中的消息,會發送給所有的消費者同時處理。只有在消息可以重複處 理的業務場景中可使用。

Queue/Topic都是 Destination 的子介面

ConnectionFactory

連接工廠,jms中用它創建連接

連接工廠是客戶用來創建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。

Connection

JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。

Destination

消息的目的地

目的地是客戶用來指定它生產的消息的目標和它消費的消息的來源的對象。JMS1.0.2規範中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發布/訂閱消息傳遞域。 點對點消息傳遞域的特點如下:

  • 每個消息只能有一個消費者。
  • 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處於運行狀態,它都可以提取消息。

發布/訂閱消息傳遞域的特點如下:

  • 每個消息可以有多個消費者。
  • 生產者和消費者之間有時間上的相關性。
  • 訂閱一個主題的消費者只能消費自它訂閱之後發布的消息。JMS規範允許客戶創建持久訂閱,這在一定程度上放鬆了時間上的相關性要求 。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。
    在點對點消息傳遞域中,目的地被成為隊列(queue);在發布/訂閱消息傳遞域中,目的地被成為主題(topic)。

Session

JMS Session是生產和消費消息的一個單執行緒上下文。會話用於創建消息生產者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發送和接收被組合到了一個原子操作中。

JMS的消息格式

JMS消息由以下三部分組成的:

  • 消息頭。

    每個消息頭欄位都有相應的getter和setter方法。

  • 消息屬性。

    如果需要除消息頭欄位以外的值,那麼可以使用消息屬性。

  • 消息體。

    JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

TextMessage

文本消息

MapMessage

k/v

BytesMessage

位元組流

StreamMessage

java原始的數據流

ObjectMessage

序列化的java對象

消息可靠性機制

確認 JMS消息

只有在被確認之後,才認為已經被成功地消費了。

消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。

在事務性會話中,當一個事務被提交的時候,確認自動發生。

在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:

  • Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。
  • Session.CLIENT_ACKNOWLEDGE。客戶通過消息的acknowledge方法確認消息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然後確認第5個消息,那麼所有10個消息都被確認。
  • Session.DUPS_ACKNOWLEDGE。該選擇只是會話遲鈍的確認消息的提交。如果JMS Provider失敗,那麼可能會導致一些重複的消息。如果是重複的消息,那麼JMS Provider必須把消息頭的JMSRedelivered欄位設置為true。

持久性

JMS 支援以下兩種消息提交模式:

  • PERSISTENT。指示JMS Provider持久保存消息,以保證消息不會因為JMS Provider的失敗而丟失。
  • NON_PERSISTENT。不要求JMS Provider持久保存消息。

優先順序

可以使用消息優先順序來指示JMS Provider首先提交緊急的消息。優先順序分10個級別,從0(最低)到9(最高)。如果不指定優先順序,默認級別是4。需要注意的是,JMS Provider並不一定保證按照優先順序的順序提交消息。

消息過期

可以設置消息在一定時間後過期,默認是永不過期。

臨時目的地

可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來創建臨時目的地。它們的存在時間只限於創建它們的連接所保持的時間。只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息。

持久訂閱

首先消息生產者必須使用PERSISTENT提交消息。客戶可以通過會話上的createDurableSubscriber方法來創建一個持久訂閱,該方法的第一個參數必須是一個topic,第二個參數是訂閱的名稱。 JMS Provider會存儲發布到持久訂閱對應的topic上的消息。如果最初創建持久訂閱的客戶或者任何其它客戶使用相同的連接工廠和連接的客戶ID、相同的主題和相同的訂閱名再次調用會話上的createDurableSubscriber方法,那麼該持久訂閱就會被激活。JMS Provider會象客戶發送客戶處於非激活狀態時所發布的消息。 持久訂閱在某個時刻只能有一個激活的訂閱者。持久訂閱在創建之後會一直保留,直到應用程式調用會話上的unsubscribe方法。

本地事務

在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收。JMS Session介面提供了commit和rollback方法。事務提交意味著生產的所有消息被發送,消費的所有消息被確認;事務回滾意味著生產的所有消息被銷毀,消費的所有消息被恢復並重新提交,除非它們已經過期。 事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。 需要注意的是,如果使用請求/回復機制,即發送一個消息,同時希望在同一個事務中等待接收該消息的回復,那麼程式將被掛起,因為知道事務提交,發送操作才會真正執行。 需要注意的還有一個,消息的生產和消費不能包含在同一個事務中。

ActiveMQ

官方網站

//activemq.apache.org/

Broker

ActiveMQ 5.0 的二進位發布包中bin目錄中包含一個名為activemq的腳本,直接運行這個腳本就可以啟動一個broker。 此外也可以通過Broker Configuration URI或Broker XBean URI對broker進行配置,以下是一些命令行參數的例子:

Example Description
activemq Runs a broker using the default ‘xbean:activemq.xml’ as the broker configuration file.
activemq xbean:myconfig.xml Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.
activemq xbean:file:./conf/broker1.xml Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml
activemq xbean:file:C:/ActiveMQ/conf/broker2.xml Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml
activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true Runs a broker with two transport connectors and JMX enabled.
activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false Runs a broker with 1 transport connector and 1 network connector with persistence disabled.

存儲

KahaDB存儲

KahaDB是默認的持久化策略,所有消息順序添加到一個日誌文件中,同時另外有一個索引文件記錄指向這些日誌的存儲地址,還有一個事務日誌用於消息回復操作。是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。

在data/kahadb這個目錄下,會生成四個文件,來完成消息持久化
1.db.data 它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-*.log裡面存儲的消息
2.db.redo 用來進行消息恢復 *

3.db-.log 存儲消息內容。新的數據以APPEND的方式追加到日誌文件末尾。屬於順序寫入,因此消息存儲是比較 快的。默認是32M,達到閥值會自動遞增
4.lock文件 鎖,寫入當前獲得kahadb讀寫許可權的broker ,用於在集群環境下的競爭處理

<persistenceAdapter> <!--directory:保存數據的目錄;journalMaxFileLength:保存消息的文件大小 --> <kahaDBdirectory="${activemq.data}/kahadb"journalMaxFileLength="16mb"/> </persistenceAdapter>

特性:

1、日誌形式存儲消息;

2、消息索引以 B-Tree 結構存儲,可以快速更新;

3、 完全支援 JMS 事務;

4、支援多種恢復機制kahadb 可以限制每個數據文件的大小。不代表總計數據容量。

AMQ 方式

只適用於 5.3 版本之前。 AMQ 也是一個文件型資料庫,消息資訊最終是存儲在文件中。記憶體中也會有快取數據。

<persistenceAdapter> <!--directory:保存數據的目錄 ;maxFileLength:保存消息的文件大小 --> <amqPersistenceAdapterdirectory="${activemq.data}/amq"maxFileLength="32mb"/> </persistenceAdapter>

性能高於 JDBC,寫入消息時,會將消息寫入日誌文件,由於是順序追加寫,性能很高。

為了提升性能,創建消息主鍵索引,並且提供快取機制,進一步提升性能。

每個日誌文件的 大小都是有限制的(默認 32m,可自行配置) 。

當超過這個大小,系統會重新建立一個文件。

當所有的消息都消費完成,系統會刪除這 個文件或者歸檔。

主要的缺點是 AMQ Message 會為每一個 Destination 創建一個索引,如果使用了大量的 Queue,索引文件的大小會佔用很多磁碟空間。

而且由於索引巨大,一旦 Broker(ActiveMQ 應用實例)崩潰,重建索引的速度會非常 慢。

雖然 AMQ 性能略高於 Kaha DB 方式,但是由於其重建索引時間過長,而且索引文件 佔用磁碟空間過大,所以已經不推薦使用。

JDBC存儲

使用JDBC持久化方式,資料庫默認會創建3個表,每個表的作用如下:
activemq_msgs:queue和topic的消息都存在這個表中
activemq_acks:存儲持久訂閱的資訊和最後一個持久訂閱接收的消息ID
activemq_lock:跟kahadb的lock文件類似,確保資料庫在某一時刻只有一個broker在訪問

ActiveMQ 將數據持久化到資料庫中。

不指定具體的資料庫。 可以使用任意的資料庫 中。

本環節中使用 MySQL 資料庫。 下述文件為 activemq.xml 配置文件部分內容。

首先定義一個 mysql-ds 的 MySQL 數據源,然後在 persistenceAdapter 節點中配置 jdbcPersistenceAdapter 並且引用剛才定義的數據源。

dataSource 指定持久化資料庫的 bean,createTablesOnStartup 是否在啟動的時候創建數 據表,默認值是 true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為 true,之後改成 false。

Beans中添加

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 

<property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/> 

</bean>

修改persistenceAdapter

        <persistenceAdapter>
           <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->

		<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" /> 


        </persistenceAdapter>

依賴jar包

commons-dbcp commons-pool mysql-connector-java

表欄位解釋

activemq_acks:用於存儲訂閱關係。如果是持久化Topic,訂閱者和伺服器的訂閱關係在這個表保存。
主要的資料庫欄位如下:

container:消息的destination 
sub_dest:如果是使用static集群,這個欄位會有集群其他系統的資訊 
client_id:每個訂閱者都必須有一個唯一的客戶端id用以區分 
sub_name:訂閱者名稱 
selector:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支援多屬性and和or操作 
last_acked_id:記錄消費過的消息的id。

2:activemq_lock:在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用於記錄哪個Broker是當前的Master Broker。

3:activemq_msgs:用於存儲消息,Queue和Topic都存儲在這個表中。
主要的資料庫欄位如下:

id:自增的資料庫主鍵 
container:消息的destination 
msgid_prod:消息發送者客戶端的主鍵 
msg_seq:是發送消息的順序,msgid_prod+msg_seq可以組成jms的messageid 
expiration:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數 
msg:消息本體的java序列化對象的二進位數據 
priority:優先順序,從0-9,數值越大優先順序越高 
xid:用於存儲訂閱關係。如果是持久化topic,訂閱者和伺服器的訂閱關係在這個表保存。

LevelDB存儲

LevelDB持久化性能高於KahaDB,雖然目前默認的持久化方式仍然是KahaDB。並且,在ActiveMQ 5.9版本提供 了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。 但是在ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及不再支援,推薦使用的是KahaDB

Memory 消息存儲

顧名思義,基於記憶體的消息存儲,就是消息存儲在記憶體中。persistent=」false」,表示不設置持 久化存儲,直接存儲到記憶體中
在broker標籤處設置。

JDBC Message store with ActiveMQ Journal

這種方式克服了JDBC Store的不足,JDBC存儲每次消息過來,都需要去寫庫和讀庫。 ActiveMQ Journal,使用延遲存儲數據到資料庫,當消息來到時先快取到文件中,延遲後才寫到資料庫中。

當消費者的消費速度能夠及時跟上生產者消息的生產速度時,journal文件能夠大大減少需要寫入到DB中的消息。 舉個例子,生產者生產了1000條消息,這1000條消息會保存到journal文件,如果消費者的消費速度很快的情況 下,在journal文件還沒有同步到DB之前,消費者已經消費了90%的以上的消息,那麼這個時候只需要同步剩餘的 10%的消息到DB。 如果消費者的消費速度很慢,這個時候journal文件可以使消息以批量方式寫到DB。

協議

完整支援的協議

//activemq.apache.org/configuring-version-5-transports.html

ActiveMQ支援的client-broker通訊協議有:TCP、NIO、UDP、SSL、Http(s)、VM。

Transmission Control Protocol (TCP)

1:這是默認的Broker配置,TCP的Client監聽埠是61616。
2:在網路傳輸數據前,必須要序列化數據,消息是通過一個叫wire protocol的來序列化成位元組流。默認情況下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使網路上的效率和數據快速交互。
3:TCP連接的URI形式:tcp://hostname:port?key=value&key=value,加粗部分是必須的
4:TCP傳輸的優點:
(1)TCP協議傳輸可靠性高,穩定性強
(2)高效性:位元組流方式傳遞,效率很高
(3)有效性、可用性:應用廣泛,支援任何平台

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

New I/O API Protocol(NIO)

1:NIO協議和TCP協議類似,但NIO更側重於底層的訪問操作。它允許開發人員對同一資源可有更多的client調用和服務端有更多的負載。
2:適合使用NIO協議的場景:
(1)可能有大量的Client去鏈接到Broker上一般情況下,大量的Client去鏈接Broker是被作業系統的執行緒數所限制的。因此,NIO的實現比TCP需要更少的執行緒去運行,所以建議使用NIO協議
(2)可能對於Broker有一個很遲鈍的網路傳輸NIO比TCP提供更好的性能
3:NIO連接的URI形式:nio://hostname:port?key=value
4:Transport Connector配置示例:

<transportConnectors>
  <transportConnector
    name="tcp"
    uri="tcp://localhost:61616?trace=true" />
  <transportConnector
    name="nio"
    uri="nio://localhost:61618?trace=true" />
</transportConnectors>

上面的配置,示範了一個TCP協議監聽61616埠,一個NIO協議監聽61618埠

User Datagram Protocol(UDP)

1:UDP和TCP的區別
(1)TCP是一個原始流的傳遞協議,意味著數據包是有保證的,換句話說,數據包是不會被複制和丟失的。UDP,另一方面,它是不會保證數據包的傳遞的
(2)TCP也是一個穩定可靠的數據包傳遞協議,意味著數據在傳遞的過程中不會被丟失。這樣確保了在發送和接收之間能夠可靠的傳遞。相反,UDP僅僅是一個鏈接協議,所以它沒有可靠性之說
2:從上面可以得出:TCP是被用在穩定可靠的場景中使用的;UDP通常用在快速數據傳遞和不怕數據丟失的場景中,還有ActiveMQ通過防火牆時,只能用UDP
3:UDP連接的URI形式:udp://hostname:port?key=value
4:Transport Connector配置示例:

<transportConnectors>
    <transportConnector
        name="udp"
        uri="udp://localhost:61618?trace=true" />
</transportConnectors>

Secure Sockets Layer Protocol (SSL)

1:連接的URI形式:ssl://hostname:port?key=value
2:Transport Connector配置示例:

<transportConnectors>
    <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
</transportConnectors>

Hypertext Transfer Protocol (HTTP/HTTPS)

1:像web和email等服務需要通過防火牆來訪問的,Http可以使用這種場合
2:連接的URI形式://hostname:port?key=value或者//hostname:port?key=value
3:Transport Connector配置示例:

<transportConnectors>
    <transportConnector name="http" uri="//localhost:8080?trace=true" />
</transportConnectors>

VM Protocol(VM)

1、VM transport允許在VM內部通訊,從而避免了網路傳輸的開銷。這時候採用的連 接不是socket連接,而是直接的方法調用。

2、第一個創建VM連接的客戶會啟動一個embed VM broker,接下來所有使用相同的 broker name的VM連接都會使用這個broker。當這個broker上所有的連接都關閉 的時候,這個broker也會自動關閉。

3、連接的URI形式:vm://brokerName?key=value

4、Java中嵌入的方式: vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=fal se , 定義了一個嵌入的broker名稱為embededbroker以及配置了一個 tcptransprotconnector在監聽埠6000上

5、使用一個載入一個配置文件來啟動broker vm://localhost?brokerConfig=xbean:activemq.xml

HelloWorld

下載

//activemq.apache.org/

安裝啟動

解壓後直接執行

bin/win64/activemq.bat

web控制台

//localhost:8161/

通過8161埠訪問

修改訪問埠

修改 ActiveMQ 配置文件:/usr/local/activemq/conf/jetty.xml

jettyport節點

配置文件修改完畢,保存並重新啟動 ActiveMQ 服務。

開發

maven坐標

<!-- //mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
</dependency>

Sender

package com.mashibing.activemq01;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

	public static void main(String[] args) throws Exception {
		// 1. 建立工廠對象,
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				"tcp://localhost:61616"
				);
		//2 從工廠里拿一個連接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3 從連接中獲取Session(會話)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 從會話中獲取目的地(Destination)消費者會從這個目的地取消息
		Queue queue = session.createQueue("f");
		
		
		//從會話中創建消息提供者
		
		MessageProducer producer = session.createProducer(queue);
		//從會話中創建文本消息(也可以創建其它類型的消息體)
		
		for (int i = 0; i < 100; i++) {
			TextMessage message = session.createTextMessage("msg: " + i);
			// 通過消息提供者發送消息到ActiveMQ
			Thread.sleep(1000);
			producer.send(message);
		}
		
		// 關閉連接
		connection.close();
		System.out.println("exit");
	}
}

Receiver

package com.mashibing.activemq01;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

	public static void main(String[] args) throws Exception {
		// 1. 建立工廠對象,
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				"tcp://localhost:61616"
				);
		//2 從工廠里拿一個連接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3 從連接中獲取Session(會話)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 從會話中獲取目的地(Destination)消費者會從這個目的地取消息
		Queue queue = session.createQueue("f");
		
		
		//從會話中創建消息提供者
		
		MessageConsumer consumer = session.createConsumer(queue);
		//從會話中創建文本消息(也可以創建其它類型的消息體)
		


		while (true) {
			TextMessage receive = (TextMessage)consumer.receive();
			System.out.println("TextMessage:" + receive.getText());
			
		}
	}
}

Active MQ的安全機制

web控制台安全

# username: password [,rolename ...]
admin: admin, admin
user: user, user
yiming: 123, user

用戶名:密碼,角色

注意: 配置需重啟ActiveMQ才會生效。

消息安全機制

修改 activemq.xml

在123行 節點中添加

	<plugins>
      <simpleAuthenticationPlugin>
          <users>
              <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
              <authenticationUser username="publisher" password="publisher"  groups="publishers,consumers"/>
              <authenticationUser username="consumer" password="consumer" groups="consumers"/>
              <authenticationUser username="guest" password="guest"  groups="guests"/>
          </users>
      </simpleAuthenticationPlugin>
 </plugins>