kafka集群擴容後的數據均衡

  • 2019 年 12 月 26 日
  • 筆記

生產環境的kafka集群擴容,是一個比較常見的需求和操作。然而kafka在新增節點後並不會像elasticsearch那樣感知到新節點加入後,自動將數據reblance到整個新集群中,因此這個過程需要我們手動分配。

分區重分配方案

擴容後的數據均衡,其本質就是對topic進行分區重分配,數據遷移的過程。在執行分區重分配的過程中,對集群的影響主要有兩點:

  1. 分區重分配主要是對topic數據進行Broker間的遷移,因此會佔用集群的頻寬資源;
  2. 分區重分配會改變分區Leader所在的Broker,因此會影響客戶端。

針對以上兩點,第一點可以在晚間業務低峰時操作,必要時還可以和業務溝通,臨時縮短數據保存時間,加快遷移,減少頻寬影響時間。針對第二點,有兩個方案:

  1. 整個分配方案分成兩個步驟:1)手動生成分配方案,原有分區Leader位置不改變,只對副本進行分區重分配;2)等待數據遷移完成後,再手動更改分區分配方案,目的是均衡Leader
  2. 直接用Kafka官方提供的分區重新分配工具生成分區重分配方案,直接執行分區重分配。

重分配方案分析

方案一

方案一理論對客戶端影響最小,把整個分配方案分成了兩個步驟,也就是將對集群的頻寬資源與客戶端的影響分開了,對過程可控性很高。

但問題是,如果集群中的某些topic,比如有 64 個分區,3 副本,共 192 個副本,就需要在保持原有分區Leader位置不變的情況下,手動均衡其餘副本,這個人工步驟過度繁雜,稍微有一點偏差,就會造成副本不均衡。

方案二

針對方案二我特意去看了分區重分配的源碼,並對其過程進一步分析。發現分區重分配的步驟是,將分區原有的副本與新分配的副本合併成一個新的副本集合,新分配的副本努力追上Leaderoffset,最終加入ISR。待全部副本都加入ISR之後,就會進行分區Leader選舉,選舉完後刪除原有副本。

這裡注意,由於是最後選舉完成才刪除原副本,所以重分配的過程中,日誌存儲量是會大幅增加的。具體細節我後續單獨寫一篇文章敘述。

根據以上分析,意味著在數據進行重分配過程中,Leader並沒有發生變動,所以客戶端不會阻塞,數據遷移完成後進行Leader選舉時發生變更,生產者會及時拉取最新的元數據,並重新進行消息發送,影響並不大。

重分配步驟

其實官方文檔關於集群擴容講解很詳細:Expanding your cluster ,整個過程分為三個步驟:獲取 kafka 給出的建議分配方案、按照給出的分配方案執行分配、查看分配的進度以及狀態。這三個步驟對應了 kafka 腳本提供的三個partition reassigment工具。

--generate: 在此模式下,給定一個 topic 列表和一個 broker 列表,該工具會生成一個候選重新分配,以將指定的 topic 的所有分區移動到新的broker。此選項僅提供了一種便捷的方式,可以根據 tpoc 和目標 broker 列表生成分區重新分配計劃。  --execute: 在此模式下,該工具基於用戶提供的重新分配計劃啟動分區重新分配。(使用--reassignment-json-file選項)。這可以是由管理員製作的自定義重新分配計劃,也可以是使用--generate選項提供的自定義重新分配計劃。  --verify: 在此模式下,該工具將驗證最近用 --execute 模式執行間的所有分區的重新分配狀態。狀態可以是成功完成,失敗或正在進行。
  1. 生成需要執行分區重分配的topic列表 json 文件: > cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
  2. 使用kafka-reassign-partitions.sh腳本獲取分配方案: > bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list "5,6" –generate 當前分區副本分配 {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } 建議的分區重新分配配置 {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } 保存當前分區副本分配情況,用作回滾操作用。保存建議的分區重新分配配置到expand-cluster-reassignment.json用於執行遷移。
  3. 執行重分配,並驗證。 > bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute > bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully is still in progress表示還在處理中,全部遷移成功後每個partition都會顯示completed successfully。注意如果topic數據量大,這個過程可能會很長,在此期間不要進行敏感操作,可能會導致數據不一致。

自定義重分配

分區重新分配工具還可以將分區的副本移動到指定的一組broker。只需自定義修改分配配置文件,後續步驟同上。

例如,以下示例將topic foo1的分區 0 移到broker5,6 中和將topic foo2的分區 1 移到broker2,3 中:

> cat custom-reassignment.json  {      "version": 1,      "partitions": [          {              "topic": "foo1",              "partition": 0,              "replicas": [                  5,                  6              ]          },          {              "topic": "foo2",              "partition": 1,              "replicas": [                  2,                  3              ]          }      ]  }