使用Java客戶端發送消息和消費的應用
- 2022 年 7 月 15 日
- 筆記
體驗鏈接://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6
實驗簡介
本教程將Demo演示使用java客戶端發送消息和消費的應用場景
實驗實操
第1節 如何發送和消費並發消息
並發消息,也叫普通消息,是相對順序消息而言的,普通消息的效率最高。本教程將簡單演示如何使用純java client發送和消費消息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
再執行命令, 可以看到正常生產和消費輸出
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以查看github。並發消息,意思是生產者可以並發的向topic中發送消息, 消費端不區分順序的消息,這種模式效率最好。生產者demo程式碼如下:
最後留一個思考題給大家: 生產者實例和消費者實例, 都是執行緒安全的嗎?
第2節 如何發送和消費順序消息
順序消息分為分區有序和全局有序。生產消費程式碼都是一樣的, 區別在於分區有序的topic中queue個數可以是任意有效值,全局有序的topic要求queue的個數為1。順序消息的實現非常簡單易懂,但犧牲了可用性,單節點故障會直接影響順序消息。
什麼是分區有序消息,什麼場景應該使用呢,又該如何發送分區有序消息?分區有序表示在一個queue中的消息是有序的,發送消息時設置設置了相同key的消息會被發送到同一個queue中。
本教程將簡單演示如何使用純java client發送和消費順序消息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
再執行命令, 可以看到正常生產和消費輸出。 消費輸出注意看相同queue id的消息輸出內容中的數字,按照從小到大就是正確的。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以查看github。
- 生產者說明
生產者會根據設置的keys做hash,相同hash值的消息會發送到相同的queue中。所以相同hash值的消息需要保證在同一個執行緒中順序的發送。
- 消費者說明
消費者使用相對比較簡單, 消息監聽類實現org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly介面即可。相同queue的消息需要串列處理,這樣救保證消費的順序性
第3節 如何發送和消費延遲消息
延遲消息,對於一些特殊場景比如訂票後30分鐘不支付自動取消等類似場景比較有用。本教程將簡單演示如何使用純java client發送和消費延遲消息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。 目前RocketMQ支援多種延遲級別, 不過每種延遲級別都是基於RocketMQ自身,實際延遲時間會加上Broker-Client端的網路情況不同而略有差異。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以查看github。
- 生產者說明
生產者在發送消息的時候需要設置延遲級別,RocketMQ支援多種延遲級別。如果把延遲時間算作一個以空格分割的數組,延遲級別就是延遲時間數組的下標index+1。RocketMQ如何解析延遲級別和延遲時間映射關係。
- 消費者說明: 消費者按照並發消息消費即可
第4節 如何發送和消費事務消息
事務消息,是RocketMQ解決分散式事務的一種實現,極其簡單好用。一個事物消息大致的生命周期如下圖
概括為如下幾個重要點:
-
生產者發送half消息(事物消息)
-
Broker存儲half消息
-
生產者處理本地事物,處理成功後commit事物
-
消費者消費到事物消息
本教程將簡單演示如何使用純java client發送和消費事物消息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到事物消息的全部過程。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以查看github。在事物消息中,消費程式碼和普通消息的消費一樣,主要程式碼在生產者端。
生產者端的主要程式碼包含3個步驟:
- 初始化生產者,設置回調執行緒池、設置本地事物處理監聽類。
這裡注意事物消息的生產者類是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生產者類。
事物監聽類需要實現2個方法,這裡的邏輯都是mock的,實際使用的時候需要根據實際修改。
- 發送事物消息。調用sendMessageInTransaction()方法發送事物消息, 而不是以前的send()方法。
第5節 生產者消費者如何同步發送、消費消息(Request-Reply)
request-reply模式,可以滿足目前類似RPC同步調用的場景,本教程將簡單演示如何使用該模式。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime
通過程式碼結果和程式碼比較, 我們得知request-reply類似RPC同步調用的效果。
個人覺得:需要同步調用就用RPC, 不要走MQ,畢竟兩者是完全不同的目標的產品,專業的事情交給專業的產品。
3. Demo程式碼說明
Demo程式碼可以查看github。
request-reply模式,在生產者和消費者兩端都和一般的生產消費有區別,下面分別介紹下demo程式碼。
生產者demo主要程式碼, 主要區別在於調用request(),而不是send()方法。
消費者demo主要程式碼: 消費程式碼主要增加了「回復」邏輯。回復是利用消息發送直接向生產者發送一條消息。 有點類似事物消息中broker回查生產者。
一個小問題:事物消息和request-reply消息時,生產者的生產者組名有什麼要求嘛?
第6節 如何有選擇性的消費消息
有時候我們只想消費部分消息, 當然全部消費,在程式碼中過濾。 假如消息海量時, 會有很多資源浪費,比如浪費不必要的頻寬。我們可以通過tag,sql92表達式來選擇性的消費。
- 進入broker目錄
cd /usr/local/services/5-rocketmq/broker-01
- 編輯配置文件,修改broker配置項2個
vim conf/broker.conf
配置項值:
// 是否支援重試消息也過濾
filterSupportRetry=true
// 支援屬性過濾
enablePropertyFilter=true
修改後:
- 重啟broker
./restart.sh
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行tag過濾程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
3. 執行sql過濾程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
4. Demo程式碼說明
Demo程式碼可以查看github。以下分別介紹生產者和消費者主要demo程式碼。
- 生產者
在生產tag消息的時候, 消息中需要加上發送tag;sql92過濾的時候,加上自定義k-v。
- 消費者
tag過濾消費時,在訂閱topic時, 也添加上tag訂閱
SQL92過濾時,添加上SQL過濾訂閱。至於SQL92除了等號,還是支援什麼,大家可以自行自行查看或者到群里問。
第7節 如何使用ACL客戶端生產消費消息
ACL,全稱是Access Control List,是RocketMQ設計來做訪問和許可權控制的。更多文檔參見github wiki://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL
0. 啟動一個集群
- 進入broker目錄
cd /usr/local/services/5-rocketmq/broker-01
- 編輯配置文件,修改broker配置項1個
vim conf/broker.conf
配置項值:
aclEnable=true
修改後:
- 重啟broker
./restart.sh
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone //github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。 demo程式碼使用的admin許可權發送和消費,實際使用需要對於每個topic,消費者組授權,才能正常生產消費。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以查看github。帶ACL的生產者和消費者在初始化的時候,都必須給一個hook實例,構建方法如下:
static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
在broker端secret key用來校驗資訊的完整性, access key用來校驗用戶許可權。二者缺一不可。