0749-5.14.4-如何实现Kafka Broker节点磁盘数据Balance
- 2020 年 2 月 24 日
- 筆記
文档编写目的
Kafka的数据目录可以配置一个或多个,通常这些目录是分布在不同的磁盘上用于提高K集群的读写性能,同时也可以提升消息的存储空间。
实际的生产环境中随着消息量的增加,Kafka存储的消息量过大,导致磁盘空间爆满,此时在不扩容Broker的情况下,我们通过对已有节点挂载磁盘的方式扩容Kafka的存储。在扩容存储后会发现已有Topic的Partition并不会自动均衡到新的磁盘上,依然没有缓解磁盘爆满的情况。而新建Topic时以磁盘为单位,在Partition数量最少的磁盘上创建Partition目录。上述的情况均会导致Kafka数据在磁盘之间分布不均的问题。
本篇文章Fayson主要介绍如何实现磁盘之间Parttion的迁移,从而实现Kafka Broker节点磁盘数据Balance,阅读本文前可以先查看如下视频:
《如何在一个Kafka Broker的log.dir中移动partition数据》
- 测试环境:
1.Redhat7.2
2.采用root用户操作
3.CM为5.16.2,CDH为5.14.4
4.Kafka版本为2.2.0
5.Kafka未启用Kerberos和Sentry
Kafka磁盘扩容
1.扩容前Kafka的数据目录。
通过CM的Kafka配置搜索log.dirs可以看到当前Kafka的数据目录只有/var/local/kafka/data这一个。

2.扩容后数据目录说明
通过CM,进入Kafka,点击配置,输入log.dirs进行搜索,然后添加一个目录/data0/kafka/data,点击保存确认,然后根据提示重启。这样进行磁盘目录的扩容。

3.扩容后创建一个新的Topic进行说明
使用如下命令创建一个新的Topic
[root@cdh01 data]# kafka-topics --create --zookeeper cdh01.hadoop.com:2181 --replication-factor 3 --partitions 9 --topic test

查看Topic分区分布情况

可以看到,由于/var/local/kafka/data里面有系统创建的分区30个,所以新建的Topic按照规则全部存储在分区目录少的数据目录/data0/kafka/data中。
这里我们就来把test的分区目录 test-0和test-1迁移到/var/local/kafka/data中。
生产消息后,从头开始消费可以看到当前的数据
[root@cdh01 ~]# kafka-console-consumer --bootstrap-server cdh01.hadoop.com:9092 --topic test --from-beginning

Partition迁移
本章节主要讲述Broker节点磁盘之间Partition迁移过程,详细过程如下
1.在迁移操作进行前,需要在CM中停止Kafka服务


2.这里将Topic的一半Parttion迁移至/var/local/kafka/data数据目录下
[root@cdh01 data]# mv test-0/ test-1/ /var/local/kafka/data

3.修改/data0/kafka/data/recovery-point-offset-checkpoint文件内容
修改前

修改后,把第二行的9改为7,因为迁移走了两个分区,现在分区总数为7。然后把下面的test 1和test 2这两行删除

4.修改/var/local/kafka/data/recovery-point-offset-checkpoint文件内容
修改前

修改后

这里加了2个分区,所以把之前的30改为了32,然后最下面添加了test 1和test 2这两行,后面的数字跟/data0/kafka/data/recovery-point-offset-checkpoint修改前的保持一致。
5.修改/data0/kafka/data/replication-offset-checkpoint文件内容
修改前

修改后

同样的把第二行的9改为7,因为迁移走了两个分区,现在分区总数为7。然后把下面的test 1和test 2这两行删除
6.修改/var/local/kafka/data/replication-offset-checkpoint文件内容
修改前

修改后

同样的,这里加了2个分区,所以把之前的30改为了32,然后最下面添加了test 1和test 2这两行。
7.修改完之后,通过CM启动Kafka服务

启动完成

8.进行消费测试
[root@cdh01 data]# kafka-console-consumer --bootstrap-server cdh01.hadoop.com:9092 --topic test --from-beginning

与前面数据一致,这里迁移完成。
总结
1.在操作期间需要停止Kafka的服务才可以进行。
2.上面迁移步骤是做了一个broker上的迁移。如果其他broker也需要迁移,可以同样步骤进行操作,需要注意的是,每个broker上的partition不一定是固定的,需要根据实际情况进行操作。
3.partition的迁移这里实际上做了物理的目录迁移之后,修改了迁移前和迁移后的磁盘目录下的recovery-point-offset-checkpoint和replication-offset-checkpoint这两个文件。相应的把文件中记录的总partition数和具体的每个partition的记录进行修改就可以了。
4.在只扩容Kafka存储空间的情况下,建议优先对Kafka Broker的磁盘做Balance,而后再创建新的Topic,否则新建Topic的Partition会根据分配规则都创建到新挂载的磁盘上,导致数据不均衡及Topic吞吐性能问题等。
Fayson的github: https://github.com/fayson/cdhproject