kafka数据存储目录间迁移

  • 2019 年 12 月 26 日
  • 筆記

生产环境kafka集群,在数据量大的情况下,经常会出现单机各个磁盘间的占用不均匀情况。

原因探究

这是因为kafka在之前的版本中只保证分区数在各个磁盘上均匀分布,但因无法统计每个分区实际大小,导致大概率出现某些分区数据量巨大导致磁盘利用率不均衡。

在 kafka1.1 版本之前,用户对此基本没有什么优雅的处理方法,即便手动迁移日志文件和 offset 信息,也需要重启生效,风险极高。在 1.1 之前,kafka只支持分区数据在不同broker间的reassigment,而无法做到在同一个broker下的不同磁盘间做重新分配。而在1.1 版本后,kafka正式开始支持副本在不同路径间迁移,具体的实现细节可以看kafka官方wiki KIP-113

目录间迁移步骤

假设我在server.properties文件中配置了多个日志存储路径(表示日志数据存储在多块磁盘),如下所示:

# A comma seperated list of directories under which to store log files    log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs

首先创建一个 9 分区的topic,并发送 1000W 条消息。查询这些数据目录,发现Kafka均匀地将 9 个分区分布在这三个路径上:

> ll /data1/kafka-logs/ |grep test-topic  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-3  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-4  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-5    > ll /data2/kafka-logs/ |grep test-topic  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-0  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-2    > ll /data3/kafka-logs/ |grep test-topic  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-6  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-7  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-8

假设由于还有其他topic数据分布等原因,导致磁盘存储不均衡。需要将test-topic678分区全部迁移到/data2路径下,并且把test-topic1分区迁移到/data1下。若要实现这个需求,我们首先需要写一个JSON文件,migrate-replica.json:

{      "partitions": [          {              "topic": "test-topic",              "partition": 1,              "replicas": [                  0              ],              "log_dirs": [                  "/data1/kafka-logs"              ]          },          {              "topic": "test-topic",              "partition": 6,              "replicas": [                  0              ],              "log_dirs": [                  "/data2/kafka-logs"              ]          },          {              "topic": "test-topic",              "partition": 7,              "replicas": [                  0              ],              "log_dirs": [                  "/data2/kafka-logs"              ]          },          {              "topic": "test-topic",              "partition": 8,              "replicas": [                  0              ],              "log_dirs": [                  "/data2/kafka-logs"              ]          }      ],      "version": 1  }

其中,replicas中的 0 表示broker ID,由于本文只启动了一个broker,且broker.id = 0,故这里只写 0 即可。实际上你可以指定多个broker实现为多个broker同时迁移副本的功能。另外当前的version固定是 1。

保存好这个JSON后,我们执行以下命令执行副本迁移:

> bin/kafka-reassign-partitions.sh  --zookeeper localhost:2181 --bootstrap-server localhost:9092 --reassignment-json-file ../migrate-replica.json --execute    Current partition replica assignment  {"version":1,"partitions":[{"topic":"test-topic","partition":8,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":5,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":7,"replicas":[0],"log_dirs":["any"]},{"topic":"test-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}    Save this to use as the --reassignment-json-file option during rollback    Successfully started reassignment of partitions.

迁移结果

执行完成后,我们再次查看存储目录副本分布:

> ll /data1/kafka-logs/ |grep test-topic  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-3  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-4  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-5    > ll /data2/kafka-logs/ |grep test-topic  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-0  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-1  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-2  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-6  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-7  drwxr-xr-x   6 kafka  staff  192 Dec 14 17:21 test-topic-8    > ll /data3/kafka-logs/ |grep test-topic

可以看到,678已经被成功地迁移到/data2下,而分区1也迁移到了/data1下。值得一提的是,不仅所有的日志段、索引文件被迁移,实际上分区外层的checkpoint文件也会被更新。比如我们检查/data2下的replication-offset-checkpoint文件可以发现,现在该文件已经包含了678分区的位移数据,如下所示:

> cat replication-offset-checkpoint  0  7  test-topic 8 1000000  test-topic 2 1000000  test 0 1285714  test-topic 6 1000000  test-topic 7 1000000  test-topic 0 1000000  test 2 1285714

Exit mobile version