0748-5.14.4-Kafka的擴容和縮容

  • 2020 年 2 月 24 日
  • 筆記

文檔編寫目的

在Kafka集群資源使用已超出系統配置的資源時,或者有大量資源閑置造成資源浪費的時候,需要分別通過擴容Kafka和縮容Kafka來進行調整。本篇文章Fayson主要介紹如何進行Kafka的擴容和縮容,以及變更後的Kafka集群如何進行負載均衡的操作。

  • 測試環境:

1.Redhat7.2

2.採用root用戶操作

3.CM為5.16.2,CDH為5.14.4

4.Kafka版本為0.10.2

5.集群啟用了Kerberos,Kafka未啟用Kerberos和Sentry

Kafka集群的擴容

2.1 當前Kafka集群狀態

集群中有3個kafka broker

有2個topic,分別為test和test1,情況如下

2.2 擴容前準備

新擴容的機器要先加入集群中,通過CM管理,按照下面的步驟進行操作

1.修改新添加的機器的hostname

[root@hadoop6 ~]# hostnamectl set-hostname cdh04.hadoop.com  

2.修改/etc/hosts文件並同步到所有節點,這裡用腳本來實現。

192.168.0.204 cdh01.hadoop.com cdh01  192.168.0.205 cdh02.hadoop.com cdh02  192.168.0.206 cdh03.hadoop.com cdh03  192.168.0.195 cdh04.hadoop.com cdh04  

3.新添加的節點關閉防火牆,設置開機自動關閉

[root@cdh04 ~]# systemctl stop firewalld  [root@cdh04 ~]# systemctl disable firewalld  

4.新添加的節點禁用SELinux,並修改修改/etc/selinux/config

[root@cdh04 ~]# setenforce 0  setenforce: SELinux is disabled  [root@cdh04 ~]# vim /etc/selinux/config  

5.新添加的節點關閉透明大頁面,並且在/etc/rc.d/rc.local 裡面加入腳本,設置自動開機關閉。

[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/defrag  [root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/enabled  [root@cdh04 ~]# vim /etc/rc.d/rc.local  if test -f /sys/kernel/mm/transparent_hugepage/defrag  then echo never > /sys/kernel/mm/transparent_hugepage/defrag  fi  if test -f /sys/kernel/mm/transparent_hugepage/enabled  then echo never > /sys/kernel/mm/transparent_hugepage/enabled  fi  
[root@cdh04 ~]# chmod +x /etc/rc.d/rc.local  

6.新添加的節點設置SWAP,並且設置開機自動更改

[root@cdh04 ~]# sysctl vm.swappiness=1  vm.swappiness = 1  [root@cdh04 ~]# echo vm.swappiness = 1  >> /etc/sysctl.conf  

7.新添加的節點配置時鐘同步,先在所有伺服器卸載chrony,再安裝ntp,再修改配置把時鐘同步跟其他kafka broker節點保持一致

[root@cdh04 ~]# yum -y remove chrony  [root@cdh04 ~]# yum -y install ntp  
vim /etc/ntp.conf  

啟動ntp服務,並設置開機啟動

[root@cdh04 java]# systemctl start ntpd  [root@cdh04 java]# systemctl enable ntpd

8.新添加的節點安裝kerberos客戶端,並進行配置

[root@cdh04 ~]# yum -y install krb5-libs krb5-workstation  

從其他節點拷貝/etc/krb5.conf文件到新節點

9.從其他節點拷貝集群使用的JDK到新節點/usr/java目錄下

[root@cdh01 java]# scp -r jdk1.8.0_131/ cdh04:/usr/java/jdk1.8.0_131/  

10.在CM編輯主機模板kafka,只添加kafka broker角色

2.3 擴容Kafka

1.添加新的節點到集群,從CM主頁點擊Add Hosts,如下圖所示

2.點擊繼續

3.搜索主機,輸入主機名,點擊搜索並繼續

4.輸入自定義存儲庫地址,並繼續

5.點擊安裝JDK並繼續

6.輸入主機密碼並繼續

7.正在進行安裝,安裝成功後點擊繼續

激活完成,點擊繼續

8.點擊繼續,進入主機檢查,添加主機完成

9.應用主機模板kafka,擴容完成

擴容完成

擴容後平衡

在擴容完成後,可以通過自帶的命令來生成topic的平衡策略和執行平衡的操作。

3.1 生成平衡策略

1.查詢當前創建的topic

[root@cdh01 ~]# kafka-topics --list --zookeeper cdh01.hadoop.com:2181  

2.按照查詢到的topic來創建文件topics-to-move.json 格式如下

{"topics": [{"topic": "test"},  {"topic": "test1"}              ],  "version":1  }  

3.在CM查詢kafka broker的ID

4.用命令生成遷移方案,下面畫紅框的就是生成的平衡方案

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,125,126" --generate  

5.把Proposed partition reassignment configuration下的內容複製保存為json文件newkafka.json,平衡方案生成完成。

3.2 執行平衡策略

1.執行下面的命令來進行平衡

[root@cdh01 ~]# kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute  

2.用下面命令來進行查看執行的進度

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify  

從結果可以看到已經執行完成。,並且在新添加的節點上也可以看到有topic的副本存過來了。

縮容前的準備

1.編輯之前創建的topics-to-move.json文件,添加上系統自動生成的__consumer_offsets

[root@cdh01 ~]# vim topics-to-move.json  {"topics": [{"topic": "test"},{"topic": "__consumer_offsets"},  {"topic": "test1"}              ],  "version":1  }  

2.再次使用命令生成遷移計劃,這裡只選取121,124,126這三個broker,然後把生成計劃中的126替換成125進行保存,這樣就把126上的數據全部遷移到了125上。

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,126" --generate  

3.執行遷移命令,進行遷移

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute  

4.進行查詢,遷移完成

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify  

5.在要刪除的broker上也可以看到,topic數據已經遷移走

Kafka集群的縮容

在完成上訴縮容前的準備後,現在可以進行kafka集群的縮容。

1.從CM進入Kafka的實例介面

2.勾選要刪除的broker,先停止該broker

3.停止完成後,進行刪除

刪除完成。

總結

1.Kafka集群的擴容和縮容可以通過CM來進行添加broker和刪除broker來進行。

2.在Kafka集群擴容後,已有topic的partition不會自動均衡到新的磁碟上。可以通過kafka-reassign-partitions命令來進行數據平衡,先用命令生成平衡方案,再執行。也可以手動編輯遷移方案來進行執行。

3.新建topic的partition, 會以磁碟為單位,按照partition數量最少的來落盤。

4.在Kafka縮容前,需要把要刪除的broker上的topic數據遷出,也可以通過kafka-reassign-partitions來進行遷移,手動編輯遷移方案,再通過命令執行即可。

5.kafka-reassign-partitions這個命令,只有指定了broker id上的topic才會參與partition的reassign。根據我們的需求,可以手動來編寫和修改遷移計劃。

Fayson的github: https://github.com/fayson/cdhproject