🏆【Alibaba中間件技術系列】「RocketMQ技術專題」Broker配置介紹及發送流程、異常(XX Busy)問題分析
- 2021 年 12 月 10 日
- 筆記
- 【技術專區-Alibaba】
參考資料
-
Rocketmq官網://rocketmq.apache.org/
-
Rocketmq的其它項目://github.com/apache/rocketmq-externals
-
Rocketmq-console安裝://blog.csdn.net/zzzgd_666/article/details/81387237
RocketMQ的參數指南
NameServer配置屬性
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerClusterName=rocketmqcluster
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#這個配置可解決雙網卡,發送消息走外網的問題,這裡配上內網ip就可以了
brokerIP1=10.30.51.149
#在發送消息時,自動創建伺服器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=8
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=false
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽埠
listenPort=10911
#刪除文件時間點,默認凌晨 0點
deleteWhen=03
#文件保留時間,默認 48 小時
fileReservedTime=48
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=1000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
#檢測物理文件磁碟空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/app/data/rocketmq/data
#commitLog 存儲路徑
storePathCommitLog=/app/data/rocketmq/data/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue
#消息索引存儲路徑
storePathIndex=/app/data/rocketmq/data/index
#checkpoint 文件存儲路徑
storeCheckpoint=/app/data/rocketmq/data/checkpoint
#abort 文件存儲路徑
abortFile=/app/data/rocketmq/data/abort
#限制的消息大小 修改為16M
maxMessageSize=16777216
#發送隊列等待時間
waitTimeMillsInSendQueue=3000
osPageCacheBusyTimeOutMills=5000
flushCommitLogLeastPages=12
flushConsumeQueueLeastPages=6
flushCommitLogThoroughInterval=30000
flushConsumeQueueThoroughInterval=180000
#Broker 的角色
#- ASYNC_MASTER 非同步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 非同步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息執行緒池數量
sendMessageThreadPoolNums=80
#拉消息執行緒池數量
pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
Rocketmq 發送控制流程
針對前4種 broker busy ,主要是由於 Broker 在追加消息時持有的鎖時間超過了設置的1s,Broker 為了自我保護,會拋出錯誤,客戶端會選擇其他 broker 伺服器進行重試。
如果對不是金融級服務,建議將 transientStorePoolEnable = true,可以有效避免前面 4 種 broker ,因為開啟這個參數,消息首先會存儲在堆外記憶體中,並且 RocketMQ 提供了記憶體鎖定的功能,其追加性能能得到一定的保障,這樣可以做到在記憶體使用層面的讀寫分離,即寫消息是直接寫入堆外記憶體,消費消息直接從 pagecache中讀,然後定時將堆外記憶體的消息寫入 pagecache。
但這種方案隨之帶來的就是可能存在消息丟失,如果對消息非常嚴謹的話,建議擴容集群,或遷移topic到新的集群。
可以看出來,拋出這種錯誤,在 broker 還沒有發送「嚴重」的 pagecache 繁忙,即消息追加到記憶體中的最大時延沒有超過 1s,通常追加是很快的,絕大部分都會低於1ms,但可能會由於出現一個超過200ms的追加時間,導致排隊中的任務等待時間超過了200ms,則此時會觸發broker 端的快速失敗,讓請求快速失敗,便於客戶端快速重試。但是這種請求並不是實時的,而是每隔10s 檢查一遍。
值得注意的是,一旦出現 TIMEOUT_CLEAN_QUEUE,可能在一個點會有多個這樣的錯誤資訊,具體多少與當前積壓在待發送隊列中的個數有關。
Rocketmq 發送時異常
system busy 和 broker busy 解決方案
- [REJECTREQUEST]system busy too many requests and system thread pool busy
- [PC_SYNCHRONIZED]broker busy
- [PCBUSY_CLEAN_QUEUE]broker busy
- [TIMEOUT_CLEAN_QUEUE]broker busy
之前寫的解決方案,都是基於測試環境測試的.到生產環境之後,正常使用沒有問題,生產環境壓測時,又出現了system busy異常(簡直崩潰)
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
For more information, please visit the url, //docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)
at
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
報錯定位
-
cleanExpiredRequestInQueue會處理髮送消息、拉取消息、心跳、事務消息隊列中的數據,此次遇到的問題是發送Topic消息報出來的錯誤,所以接下來針對發送消息流程進行分析。
-
報出此錯誤的源碼位置為broker快速失敗機制BrokerFastFailure.java類(該類在Broker啟動時會啟動一個定時任務,每10毫秒執行一次),報錯位置程式碼如下:
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
// 獲取隊列頭元素
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
// 如果頭元素對應的任務處理時間超過設置的最大等待時間,則處理請求返回該錯誤,並移除掉該任務
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
這段程式碼是Broker快速失敗機制的核心程式碼,如果一個等待隊列的頭元素(也就是第一個要處理或者正在處理的元素)等待時間超過該隊列設置的最大等待時間,則丟棄該元素對象的任務,並對這個請求返回[TIMEOUT_CLEAN_QUEUE]broker busy異常資訊。
發送Topic消息報該錯誤
sendThreadPoolQueue取出頭元素,轉換成對應的任務,判斷任務在隊列存活時間是否超過了隊列設置的最大等待時間,如果超過了則組裝處理返回對象response,response的code為RemotingSysResponseCode.SYSTEM_BUSY,內容為:
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [當前任務在隊列存活時間], size of queue: [當前隊列的長度]
MQClientAPIImpl.processSendResponse處理返回response,根據response.getCode()的處理分支,最終返回MQBrokerException異常,response分支處理程式碼如下:
// 只有ResponseCode.SUCCESS的情況下返回結果,其他情況拋出MQBrokerException異常
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.SLAVE_NOT_AVAILABLE: {
}
case ResponseCode.SUCCESS: {
// 省略部分程式碼
return sendResult;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
消息發送客戶端接收到MQBrokerException異常資訊,捕獲異常處理中不符合消息重試邏輯,直接拋出該異常,也就是用戶看到的;
// timesTotal為消息生產者設置的發送失敗重試次數
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 省略部分程式碼
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
// 此處為MQBrokerException異常處理邏輯,RemotingSysResponseCode.SYSTEM_BUSY不符合分支條件,最終throw e拋出異常
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
生產環境各種參數:
-
broker busy異常: 可通過增大 waitTimeMillsInSendQueue 解決
-
system busy異常:可通過增大 osPageCacheBusyTimeOutMills 解決
#發送隊列等待時間
waitTimeMillsInSendQueue=3000
#系統頁面快取繁忙超時時間(翻譯),默認值 1000
osPageCacheBusyTimeOutMills=5000
出現問題分析
出現異常的原因是因為我們同一台伺服器部署的多個應用造成的。我們一台伺服器上部署了 三個ES、八個redis、一個rocketmq ,壓力測試時這些都在使用,雖然cpu、記憶體都還有很大剩餘,但是磁碟io和記憶體頻率畢竟只有那麼多可能已經佔滿,或者還有其他都會有影響。
之前測試環境測試其他東西時,發現mq和redis同時大量使用時,redis速度會降低三到四倍,由此可見應用分伺服器部署的重要性。以前知道會有影響,沒想到影響這麼大。
最終結解決方案:應該給rocketmq單獨部署性能較高的伺服器.
記一次 rocketmq 使用時的異常。
問題分析總結
- system busy , start flow control for a while
該異常會造成 消息丟失。
- broker busy , start flow control for a while
該異常不會造成消息丟失。
問題解決過程
1、最開始時候 ,測試發現在性能好的伺服器上只會出現system busy,也就是說出現異常就會消息丟失。
所以:業務程式碼進行處理,出現異常就會重發到當前topic的bak隊列,當時想的是既然這個topic busy了,就換到另外的topic去發,總不能都 busy吧。也算是臨時解決了。
2、發現有消息重複的現象。不用想肯定是報broker busy異常,重發到topic的 bak隊列了。又因為broker busy可能不會造成消息丟失,所以消息重複就出現了。
解決方案:
修改rocketmq配置文件:
-
方案一:sendMessageThreadPoolNums 改成 1 ,沒有的話新增一行。sendMessageThreadPoolNums=1
-
方案二:useReentrantLockWhenPutMessage改成true,沒有的話新增一行。
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
sendMessageThreadPoolNums這個屬性是發送執行緒池大小, rocketmq4.1版本之後默認為 1,之前版本默認什麼不知道但是肯定大於1。這個屬性改成1的話,就不用管useReentrantLockWhenPutMessage這個屬性了;
如果改成大於1,就需要將useReentrantLockWhenPutMessage這個屬性設置為 true;
目前測試 未發現這兩個方案有什麼區別,sendMessageThreadPoolNums=1 時也支援多執行緒發送,發送速度感覺和 sendMessageThreadPoolNums大於1沒有區別,都能跑滿100M的網卡。
感覺如果useReentrantLockWhenPutMessage=true的時候,就是打開鎖,然後關鍵程式碼其實還是單執行緒處理;
解決方案
- 業務邏輯處理中進行異常捕獲,如果捕獲到異常為MQBrokerException並且responseCode為2則重發消息;
- 修改broker的默認發送消息任務隊列等待時長waitTimeMillsInSendQueue(單位: 毫秒);
除此之外,還可以觀察報錯時磁碟的IO情況,出現這種錯誤很有可能是當時的磁碟IO很高,導致消息落盤時間變長。