RocketMQ同步複製性能優化【實戰筆記】

  • 2019 年 10 月 6 日
  • 筆記

目錄

一、問題描述      1.壓測日誌      2.客戶端日誌  二、解決發送失敗情況  三、解決發送TPS過低情況  四、原因分析      1.刷盤流程回顧      2.主從複製回顧      3.流程模擬      4.原因總結  
一、問題描述

早些時候寫過RocketMQ性能優化【實戰筆記】RocketMQ性能測試【實戰筆記】文章,主要基於異步刷盤/異步複製;由於業務需要需要搭建異步刷盤/同步複製集群;同時對性能進行壓測。

壓測結果顯示集群幾乎無法使用,TPS居然是個位數,客戶端也在報錯。

1.壓測日誌

2.客戶端日誌

2019-09-19 19:22:38,038 ERROR RocketmqClient - [BENCHMARK_PRODUCER] Send Exception  org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 209ms, size of queue: 9  For more information, please visit the url, http://rocketmq.apache.org/docs/faq/    at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:671)    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:467)    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449)    at  
二、解決發送失敗情況

經排查,將transientStorePoolEnable關閉(默認為false);壓測顯示最高TPS有1.9萬。

brokerRole=SYNC_MASTER  #transientStorePoolEnable=true  
三、解決發送TPS過低情況

最高TPS只有1.9萬,依然過低,與預期相差甚遠,我們預期壓測應該可以到7到8萬這樣可以滿足業務發展需要。再次檢查broker端參數配置,沒有發現有參數導致性能如此過低。 回顧性能調優的幾個方面:系統調優、集群調優、JVM調優。 系統調優與集群調優都已經做過了,唯一沒有優化的JVM調優,堆內存設置默認的8G。 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

將JVM堆內存提高4倍後,壓測效果明顯提升,基本可以達到預期7萬多的TPS。

四、原因分析

1.為什麼在異步刷盤/同步複製時開啟堆外內存池transientStorePoolEnable後,集群壓測幾乎無法進行? 2.為什麼在異步刷盤/同步複製時調大JVM堆內存後,性能明顯提升呢?提升了的倍數幾乎是堆內存增大的倍數。

1.刷盤流程回顧

RocketMQ存儲–消息追加【源碼筆記】

RocketMQ存儲–同步刷盤和異步刷盤【源碼筆記】

異步刷盤未開啟堆外緩存示意圖

異步刷盤開啟堆外緩存示意圖

小結:異步刷盤未開啟transientStorePoolEnable時,消息追加到mappedByteBuffer中,異步線程刷調用mappedByteBuffer.force落盤;異步刷盤開啟transientStorePoolEnable時,消息寫入wrtieBuffer中,異步線程將消息提交到fileChannel,然後異步線程調用fileChannel.force落盤。

2.主從複製回顧

RocketMQ存儲–主從同步【源碼筆記】

HAConnection#WriteSocketService負責向Slave發送數據

//查找待拉取偏移量之後所有的可讀消息  SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);  // ...  SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);  // ...  ByteBuffer byteBuffer = this.mappedByteBuffer.slice();  byteBuffer.position(pos);  int size = readPosition - pos; //計算距離最大可讀位置的大小  ByteBuffer byteBufferNew = byteBuffer.slice();  byteBufferNew.limit(size);  return new SelectMappedBufferResult()

小結:主從複製使用mappedByteBuffer向Slave同步數據。

3.流程模擬

開啟堆外內存池流程
@Test  public void test01(){      // 堆外內存池transientStorePoolEnable開啟後,消息追加操作      try {          File file = new File("/Users/yongliang/logs/temp.log");          FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();          String data = "beautiful girl!";          // mmap 文件映射操作          MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length());          // 堆外內存transientStorePoolEnable開啟          ByteBuffer byteBuffer = ByteBuffer.allocateDirect(data.length());          // ----------------消息追加開始-----------------------          // 注意此時使用堆內內存分配          ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length());          msgStoreItemMemory.put(data.getBytes());          // 開啟transientStorePoolEnable消息寫入了ByteBuffer          byteBuffer.put(msgStoreItemMemory.array(),0,data.length());          // ----------------消息追加結束-----------------------            // ----------------消息提交開始-----------------------          byteBuffer.position(0);          byteBuffer.limit(data.length());          fileChannel.write(byteBuffer);          // ----------------消息提交結束-----------------------            // --------主從複製從mappedByteBuffer獲取消息開始----------          mappedByteBuffer.position(0);          mappedByteBuffer.limit(data.length());          Charset charset = Charset.forName("UTF-8");          CharsetDecoder decoder = charset.newDecoder();          CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer());          System.out.println(charBuffer.toString());          // --------主從複製從mappedByteBuffer獲取消息結束----------      } catch (Exception e) {          e.printStackTrace();      }  }

小結:模擬開啟堆外內存池transientStorePoolEnable的消息追加及主從複製流程。

未開啟堆外內存池流程

@Test  public void test02(){      try {          File file = new File("/Users/yongliang/logs/temp1.log");          FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();          String data = "beautiful girl!";          // mmap 文件映射操作          MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length());            // ----------------消息追加開始-----------------------          // 注意消息組裝使用堆內內存分配          ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length());          msgStoreItemMemory.put(data.getBytes());          mappedByteBuffer.put(msgStoreItemMemory.array(),0,data.length());          // ----------------消息追加結束-----------------------            // --------主從複製從mappedByteBuffer獲取消息開始----------          mappedByteBuffer.position(0);          mappedByteBuffer.limit(data.length());          Charset charset = Charset.forName("UTF-8");          CharsetDecoder decoder = charset.newDecoder();          CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer());          System.out.println(charBuffer.toString());          // --------主從複製從mappedByteBuffer獲取消息結束----------      } catch (Exception e) {          e.printStackTrace();      }  }
小結:模擬未開啟堆外內存池transientStorePoolEnable的消息追加及主從複製流程。
4、原因總結

1.為什麼在異步刷盤/同步複製時開啟堆外內存transientStorePoolEnable後,集群壓測幾乎無法進行? 解釋: 1>主從同步複製使用mappedByteBuffer; 2>開啟堆外內存池transientStorePoolEnable後數據先落到WriteBuffer,再通過異步提交線程提交到FileChannel,再通過mmap將數據映射到mappedByteBuffer; 3>未開啟堆外內存池transientStorePoolEnable數據直接寫入到mappedByteBuffe; 由於開啟堆外內存數據映射到mappedByteBuffer比直接寫入mappedByteBuffer多了很多步驟,再加上發送隊列處理事件默認只有200毫秒(waitTimeMillsInSendQueue=200),造成集群不能正常壓測的原因。

2.為什麼在異步刷盤/同步複製時調大JVM堆內存後,性能明顯提升呢?提升了的倍數幾乎是對內存增大的倍數。 解釋: 從模擬流程中可以看出,在組裝消息時使用堆內存,提高堆內存顯著提高寫入Tps的原因所在。ByteBuffer msgStoreItemMemory=ByteBuffer.allocate(data.length());