0748-5.14.4-Kafka的擴容和縮容
- 2020 年 2 月 24 日
- 筆記
文檔編寫目的
在Kafka集群資源使用已超出系統配置的資源時,或者有大量資源閑置造成資源浪費的時候,需要分別通過擴容Kafka和縮容Kafka來進行調整。本篇文章Fayson主要介紹如何進行Kafka的擴容和縮容,以及變更後的Kafka集群如何進行負載均衡的操作。
- 測試環境:
1.Redhat7.2
2.採用root用戶操作
3.CM為5.16.2,CDH為5.14.4
4.Kafka版本為0.10.2
5.集群啟用了Kerberos,Kafka未啟用Kerberos和Sentry
Kafka集群的擴容
2.1 當前Kafka集群狀態
集群中有3個kafka broker

有2個topic,分別為test和test1,情況如下


2.2 擴容前準備
新擴容的機器要先加入集群中,通過CM管理,按照下面的步驟進行操作
1.修改新添加的機器的hostname
[root@hadoop6 ~]# hostnamectl set-hostname cdh04.hadoop.com

2.修改/etc/hosts文件並同步到所有節點,這裡用腳本來實現。
192.168.0.204 cdh01.hadoop.com cdh01 192.168.0.205 cdh02.hadoop.com cdh02 192.168.0.206 cdh03.hadoop.com cdh03 192.168.0.195 cdh04.hadoop.com cdh04


3.新添加的節點關閉防火牆,設置開機自動關閉
[root@cdh04 ~]# systemctl stop firewalld [root@cdh04 ~]# systemctl disable firewalld

4.新添加的節點禁用SELinux,並修改修改/etc/selinux/config
[root@cdh04 ~]# setenforce 0 setenforce: SELinux is disabled [root@cdh04 ~]# vim /etc/selinux/config

5.新添加的節點關閉透明大頁面,並且在/etc/rc.d/rc.local 裡面加入腳本,設置自動開機關閉。
[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/defrag [root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/enabled [root@cdh04 ~]# vim /etc/rc.d/rc.local if test -f /sys/kernel/mm/transparent_hugepage/defrag then echo never > /sys/kernel/mm/transparent_hugepage/defrag fi if test -f /sys/kernel/mm/transparent_hugepage/enabled then echo never > /sys/kernel/mm/transparent_hugepage/enabled fi

[root@cdh04 ~]# chmod +x /etc/rc.d/rc.local
6.新添加的節點設置SWAP,並且設置開機自動更改
[root@cdh04 ~]# sysctl vm.swappiness=1 vm.swappiness = 1 [root@cdh04 ~]# echo vm.swappiness = 1 >> /etc/sysctl.conf

7.新添加的節點配置時鐘同步,先在所有伺服器卸載chrony,再安裝ntp,再修改配置把時鐘同步跟其他kafka broker節點保持一致
[root@cdh04 ~]# yum -y remove chrony [root@cdh04 ~]# yum -y install ntp

vim /etc/ntp.conf

啟動ntp服務,並設置開機啟動
[root@cdh04 java]# systemctl start ntpd [root@cdh04 java]# systemctl enable ntpd
8.新添加的節點安裝kerberos客戶端,並進行配置
[root@cdh04 ~]# yum -y install krb5-libs krb5-workstation

從其他節點拷貝/etc/krb5.conf文件到新節點

9.從其他節點拷貝集群使用的JDK到新節點/usr/java目錄下
[root@cdh01 java]# scp -r jdk1.8.0_131/ cdh04:/usr/java/jdk1.8.0_131/

10.在CM編輯主機模板kafka,只添加kafka broker角色

2.3 擴容Kafka
1.添加新的節點到集群,從CM主頁點擊Add Hosts,如下圖所示

2.點擊繼續

3.搜索主機,輸入主機名,點擊搜索並繼續


4.輸入自定義存儲庫地址,並繼續

5.點擊安裝JDK並繼續

6.輸入主機密碼並繼續

7.正在進行安裝,安裝成功後點擊繼續



激活完成,點擊繼續

8.點擊繼續,進入主機檢查,添加主機完成

9.應用主機模板kafka,擴容完成


擴容完成

擴容後平衡
在擴容完成後,可以通過自帶的命令來生成topic的平衡策略和執行平衡的操作。
3.1 生成平衡策略
1.查詢當前創建的topic
[root@cdh01 ~]# kafka-topics --list --zookeeper cdh01.hadoop.com:2181

2.按照查詢到的topic來創建文件topics-to-move.json 格式如下
{"topics": [{"topic": "test"}, {"topic": "test1"} ], "version":1 }

3.在CM查詢kafka broker的ID

4.用命令生成遷移方案,下面畫紅框的就是生成的平衡方案
kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json --broker-list "121,124,125,126" --generate

5.把Proposed partition reassignment configuration下的內容複製保存為json文件newkafka.json,平衡方案生成完成。

3.2 執行平衡策略
1.執行下面的命令來進行平衡
[root@cdh01 ~]# kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute

2.用下面命令來進行查看執行的進度
kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify

從結果可以看到已經執行完成。,並且在新添加的節點上也可以看到有topic的副本存過來了。

縮容前的準備
1.編輯之前創建的topics-to-move.json文件,添加上系統自動生成的__consumer_offsets
[root@cdh01 ~]# vim topics-to-move.json {"topics": [{"topic": "test"},{"topic": "__consumer_offsets"}, {"topic": "test1"} ], "version":1 }

2.再次使用命令生成遷移計劃,這裡只選取121,124,126這三個broker,然後把生成計劃中的126替換成125進行保存,這樣就把126上的數據全部遷移到了125上。
kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json --broker-list "121,124,126" --generate


3.執行遷移命令,進行遷移
kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute

4.進行查詢,遷移完成
kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify

5.在要刪除的broker上也可以看到,topic數據已經遷移走

Kafka集群的縮容
在完成上訴縮容前的準備後,現在可以進行kafka集群的縮容。
1.從CM進入Kafka的實例介面

2.勾選要刪除的broker,先停止該broker

3.停止完成後,進行刪除

刪除完成。

總結
1.Kafka集群的擴容和縮容可以通過CM來進行添加broker和刪除broker來進行。
2.在Kafka集群擴容後,已有topic的partition不會自動均衡到新的磁碟上。可以通過kafka-reassign-partitions命令來進行數據平衡,先用命令生成平衡方案,再執行。也可以手動編輯遷移方案來進行執行。
3.新建topic的partition, 會以磁碟為單位,按照partition數量最少的來落盤。
4.在Kafka縮容前,需要把要刪除的broker上的topic數據遷出,也可以通過kafka-reassign-partitions來進行遷移,手動編輯遷移方案,再通過命令執行即可。
5.kafka-reassign-partitions這個命令,只有指定了broker id上的topic才會參與partition的reassign。根據我們的需求,可以手動來編寫和修改遷移計劃。
Fayson的github: https://github.com/fayson/cdhproject