使用Java客戶端發送消息和消費的應用

  • 2022 年 7 月 15 日
  • 筆記

體驗鏈接://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

實驗簡介

本教程將Demo演示使用java客戶端發送消息和消費的應用場景

實驗實操

第1節 如何發送和消費並發消息

並發消息,也叫普通消息,是相對順序消息而言的,普通消息的效率最高。本教程將簡單演示如何使用純java client發送和消費消息。

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

再執行命令, 可以看到正常生產和消費輸出

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

3. Demo程式碼說明

Demo程式碼可以查看github。並發消息,意思是生產者可以並發的向topic中發送消息, 消費端不區分順序的消息,這種模式效率最好。生產者demo程式碼如下:

最後留一個思考題給大家: 生產者實例和消費者實例, 都是執行緒安全的嗎?

第2節 如何發送和消費順序消息

順序消息分為分區有序和全局有序。生產消費程式碼都是一樣的, 區別在於分區有序的topic中queue個數可以是任意有效值,全局有序的topic要求queue的個數為1。順序消息的實現非常簡單易懂,但犧牲了可用性,單節點故障會直接影響順序消息。

什麼是分區有序消息,什麼場景應該使用呢,又該如何發送分區有序消息?分區有序表示在一個queue中的消息是有序的,發送消息時設置設置了相同key的消息會被發送到同一個queue中。

本教程將簡單演示如何使用純java client發送和消費順序消息。

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

再執行命令, 可以看到正常生產和消費輸出。 消費輸出注意看相同queue id的消息輸出內容中的數字,按照從小到大就是正確的。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

3. Demo程式碼說明

Demo程式碼可以查看github

  • 生產者說明

生產者會根據設置的keys做hash,相同hash值的消息會發送到相同的queue中。所以相同hash值的消息需要保證在同一個執行緒中順序的發送。

  • 消費者說明

消費者使用相對比較簡單, 消息監聽類實現org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly介面即可。相同queue的消息需要串列處理,這樣救保證消費的順序性

第3節 如何發送和消費延遲消息

延遲消息,對於一些特殊場景比如訂票後30分鐘不支付自動取消等類似場景比較有用。本教程將簡單演示如何使用純java client發送和消費延遲消息。

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

執行命令, 可以看到正常生產和消費輸出。 目前RocketMQ支援多種延遲級別, 不過每種延遲級別都是基於RocketMQ自身,實際延遲時間會加上Broker-Client端的網路情況不同而略有差異。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

3. Demo程式碼說明

Demo程式碼可以查看github

  • 生產者說明

生產者在發送消息的時候需要設置延遲級別,RocketMQ支援多種延遲級別。如果把延遲時間算作一個以空格分割的數組,延遲級別就是延遲時間數組的下標index+1。RocketMQ如何解析延遲級別和延遲時間映射關係。

  • 消費者說明: 消費者按照並發消息消費即可

第4節 如何發送和消費事務消息

事務消息,是RocketMQ解決分散式事務的一種實現,極其簡單好用。一個事物消息大致的生命周期如下圖

概括為如下幾個重要點:

  1. 生產者發送half消息(事物消息)

  2. Broker存儲half消息

  3. 生產者處理本地事物,處理成功後commit事物

  4. 消費者消費到事物消息

本教程將簡單演示如何使用純java client發送和消費事物消息。

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

執行命令, 可以看到事物消息的全部過程。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

3. Demo程式碼說明

Demo程式碼可以查看github。在事物消息中,消費程式碼和普通消息的消費一樣,主要程式碼在生產者端。

生產者端的主要程式碼包含3個步驟:

  1. 初始化生產者,設置回調執行緒池、設置本地事物處理監聽類。

這裡注意事物消息的生產者類是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生產者類。

事物監聽類需要實現2個方法,這裡的邏輯都是mock的,實際使用的時候需要根據實際修改。

  1. 發送事物消息。調用sendMessageInTransaction()方法發送事物消息, 而不是以前的send()方法。

第5節 生產者消費者如何同步發送、消費消息(Request-Reply)

request-reply模式,可以滿足目前類似RPC同步調用的場景,本教程將簡單演示如何使用該模式。

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

執行命令, 可以看到正常生產和消費輸出。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

通過程式碼結果和程式碼比較, 我們得知request-reply類似RPC同步調用的效果。

個人覺得:需要同步調用就用RPC, 不要走MQ,畢竟兩者是完全不同的目標的產品,專業的事情交給專業的產品。

3. Demo程式碼說明

Demo程式碼可以查看github

request-reply模式,在生產者和消費者兩端都和一般的生產消費有區別,下面分別介紹下demo程式碼。

生產者demo主要程式碼, 主要區別在於調用request(),而不是send()方法。

消費者demo主要程式碼: 消費程式碼主要增加了「回復」邏輯。回復是利用消息發送直接向生產者發送一條消息。 有點類似事物消息中broker回查生產者。

一個小問題:事物消息和request-reply消息時,生產者的生產者組名有什麼要求嘛?

第6節 如何有選擇性的消費消息

有時候我們只想消費部分消息, 當然全部消費,在程式碼中過濾。 假如消息海量時, 會有很多資源浪費,比如浪費不必要的頻寬。我們可以通過tag,sql92表達式來選擇性的消費。

  • 進入broker目錄
cd /usr/local/services/5-rocketmq/broker-01
  • 編輯配置文件,修改broker配置項2個
vim conf/broker.conf

配置項值:

// 是否支援重試消息也過濾
filterSupportRetry=true

// 支援屬性過濾
enablePropertyFilter=true

修改後:

  • 重啟broker
./restart.sh

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行tag過濾程式碼demo

執行命令, 可以看到正常生產和消費輸出。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

3. 執行sql過濾程式碼demo

執行命令, 可以看到正常生產和消費輸出。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

4. Demo程式碼說明

Demo程式碼可以查看github。以下分別介紹生產者和消費者主要demo程式碼。

  • 生產者

在生產tag消息的時候, 消息中需要加上發送tag;sql92過濾的時候,加上自定義k-v。

  • 消費者

tag過濾消費時,在訂閱topic時, 也添加上tag訂閱

SQL92過濾時,添加上SQL過濾訂閱。至於SQL92除了等號,還是支援什麼,大家可以自行自行查看或者到群里問。

第7節 如何使用ACL客戶端生產消費消息

ACL,全稱是Access Control List,是RocketMQ設計來做訪問和許可權控制的。更多文檔參見github wiki://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

0. 啟動一個集群

  • 進入broker目錄
cd /usr/local/services/5-rocketmq/broker-01
  • 編輯配置文件,修改broker配置項1個
vim conf/broker.conf

配置項值:

aclEnable=true

修改後:

  • 重啟broker
./restart.sh

1. 下載java程式碼demo(已下載則忽略操作)

cd /data/demos

git clone //github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,執行程式碼demo

執行命令, 可以看到正常生產和消費輸出。 demo程式碼使用的admin許可權發送和消費,實際使用需要對於每個topic,消費者組授權,才能正常生產消費。

// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 運行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

3. Demo程式碼說明

Demo程式碼可以查看github。帶ACL的生產者和消費者在初始化的時候,都必須給一個hook實例,構建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用來校驗資訊的完整性, access key用來校驗用戶許可權。二者缺一不可。