RocketMQ同步複製性能優化【實戰筆記】
- 2019 年 10 月 6 日
- 筆記
目錄
一、問題描述 1.壓測日誌 2.客戶端日誌 二、解決發送失敗情況 三、解決發送TPS過低情況 四、原因分析 1.刷盤流程回顧 2.主從複製回顧 3.流程模擬 4.原因總結
一、問題描述
早些時候寫過RocketMQ性能優化【實戰筆記】和 RocketMQ性能測試【實戰筆記】文章,主要基於異步刷盤/異步複製;由於業務需要需要搭建異步刷盤/同步複製集群;同時對性能進行壓測。
壓測結果顯示集群幾乎無法使用,TPS居然是個位數,客戶端也在報錯。
1.壓測日誌

2.客戶端日誌
2019-09-19 19:22:38,038 ERROR RocketmqClient - [BENCHMARK_PRODUCER] Send Exception org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 209ms, size of queue: 9 For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:671) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:467) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449) at
二、解決發送失敗情況
經排查,將transientStorePoolEnable關閉(默認為false);壓測顯示最高TPS有1.9萬。
brokerRole=SYNC_MASTER #transientStorePoolEnable=true

三、解決發送TPS過低情況
最高TPS只有1.9萬,依然過低,與預期相差甚遠,我們預期壓測應該可以到7到8萬這樣可以滿足業務發展需要。再次檢查broker端參數配置,沒有發現有參數導致性能如此過低。 回顧性能調優的幾個方面:系統調優、集群調優、JVM調優。
系統調優與集群調優都已經做過了,唯一沒有優化的JVM調優,堆內存設置默認的8G。 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
將JVM堆內存提高4倍後,壓測效果明顯提升,基本可以達到預期7萬多的TPS。
四、原因分析
1.為什麼在異步刷盤/同步複製時開啟堆外內存池transientStorePoolEnable後,集群壓測幾乎無法進行? 2.為什麼在異步刷盤/同步複製時調大JVM堆內存後,性能明顯提升呢?提升了的倍數幾乎是堆內存增大的倍數。
1.刷盤流程回顧
異步刷盤未開啟堆外緩存示意圖

異步刷盤開啟堆外緩存示意圖

小結:異步刷盤未開啟transientStorePoolEnable時,消息追加到mappedByteBuffer中,異步線程刷調用mappedByteBuffer.force落盤;異步刷盤開啟transientStorePoolEnable時,消息寫入wrtieBuffer中,異步線程將消息提交到fileChannel,然後異步線程調用fileChannel.force落盤。
2.主從複製回顧
HAConnection#WriteSocketService負責向Slave發送數據
//查找待拉取偏移量之後所有的可讀消息 SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); // ... SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); // ... ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; //計算距離最大可讀位置的大小 ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult()
小結:主從複製使用mappedByteBuffer向Slave同步數據。
3.流程模擬
開啟堆外內存池流程
@Test public void test01(){ // 堆外內存池transientStorePoolEnable開啟後,消息追加操作 try { File file = new File("/Users/yongliang/logs/temp.log"); FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel(); String data = "beautiful girl!"; // mmap 文件映射操作 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length()); // 堆外內存transientStorePoolEnable開啟 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(data.length()); // ----------------消息追加開始----------------------- // 注意此時使用堆內內存分配 ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length()); msgStoreItemMemory.put(data.getBytes()); // 開啟transientStorePoolEnable消息寫入了ByteBuffer byteBuffer.put(msgStoreItemMemory.array(),0,data.length()); // ----------------消息追加結束----------------------- // ----------------消息提交開始----------------------- byteBuffer.position(0); byteBuffer.limit(data.length()); fileChannel.write(byteBuffer); // ----------------消息提交結束----------------------- // --------主從複製從mappedByteBuffer獲取消息開始---------- mappedByteBuffer.position(0); mappedByteBuffer.limit(data.length()); Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer()); System.out.println(charBuffer.toString()); // --------主從複製從mappedByteBuffer獲取消息結束---------- } catch (Exception e) { e.printStackTrace(); } }
小結:模擬開啟堆外內存池transientStorePoolEnable的消息追加及主從複製流程。
未開啟堆外內存池流程
@Test public void test02(){ try { File file = new File("/Users/yongliang/logs/temp1.log"); FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel(); String data = "beautiful girl!"; // mmap 文件映射操作 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length()); // ----------------消息追加開始----------------------- // 注意消息組裝使用堆內內存分配 ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length()); msgStoreItemMemory.put(data.getBytes()); mappedByteBuffer.put(msgStoreItemMemory.array(),0,data.length()); // ----------------消息追加結束----------------------- // --------主從複製從mappedByteBuffer獲取消息開始---------- mappedByteBuffer.position(0); mappedByteBuffer.limit(data.length()); Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer()); System.out.println(charBuffer.toString()); // --------主從複製從mappedByteBuffer獲取消息結束---------- } catch (Exception e) { e.printStackTrace(); } }
小結:模擬未開啟堆外內存池transientStorePoolEnable的消息追加及主從複製流程。
4、原因總結
1.為什麼在異步刷盤/同步複製時開啟堆外內存transientStorePoolEnable後,集群壓測幾乎無法進行? 解釋: 1>主從同步複製使用mappedByteBuffer; 2>開啟堆外內存池transientStorePoolEnable後數據先落到WriteBuffer,再通過異步提交線程提交到FileChannel,再通過mmap將數據映射到mappedByteBuffer; 3>未開啟堆外內存池transientStorePoolEnable數據直接寫入到mappedByteBuffe; 由於開啟堆外內存數據映射到mappedByteBuffer比直接寫入mappedByteBuffer多了很多步驟,再加上發送隊列處理事件默認只有200毫秒(waitTimeMillsInSendQueue=200),造成集群不能正常壓測的原因。
2.為什麼在異步刷盤/同步複製時調大JVM堆內存後,性能明顯提升呢?提升了的倍數幾乎是對內存增大的倍數。 解釋: 從模擬流程中可以看出,在組裝消息時使用堆內存,提高堆內存顯著提高寫入Tps的原因所在。ByteBuffer msgStoreItemMemory=ByteBuffer.allocate(data.length());