kafka刪除topic數據

  • 2020 年 2 月 25 日
  • 筆記

一、概述

生產環境中,有一個topic的數據量非常大。這些數據不是非常重要,需要定期清理。

要求:默認保持24小時,某些topic 需要保留2小時或者6小時

二、清除方式

主要有3個:

1. 基於時間

2. 基於日誌大小

3. 基於日誌起始偏移量

詳情,請參考鏈接:

https://blog.csdn.net/u013256816/article/details/80418297

接下來,主要介紹基於時間的清除!

kafka版本為:  2.11-1.1.0

zk版本為:  3.4.13

三、kafka配置

# 啟用刪除主題  delete.topic.enable=true  # 檢查日誌段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。  log.retention.check.interval.ms=1000

注意:這2行配置必須存在,否則清除策略失效!

log.retention.check.interval.ms 參數的單位是毫秒,這裡表示間隔1秒鐘

四、清除策略

全局topic

在 server.properties 文件中配置的是全局策略,針對每一個topic

比如:

log.retention.hours=3

表示保留3個小時

單個topic

針對單個topic策略,需要使用腳本kafka-configs.sh

此腳本不需要重啟kafka就會生效!

首先來查看一下,當前的topic策略,比如test

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

參數解釋:

–describe  詳細信息

–entity-type 實體類型

–entity-name 指定topic名

輸出:

Configs for topic 'test' are

這個表示為策略為空

刪除topic數據

如果需要刪除topic所有數據,使用命令

bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181

這個命令,請謹慎執行!!!

如果想保留主題,只刪除主題現有數據(log)。可以通過修改數據保留時間實現

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

執行輸出:

Completed Updating config for entity: topic 'test'.

注意:修改保留時間為10秒鐘,並不是10秒鐘就馬上刪掉。kafka是採用輪詢的方式,輪詢到這個topic時,刪除10秒鐘前的數據。

時間由server.properties裏面的log.retention.check.interval.ms選項為主

假設說 log.retention.check.interval.ms 值為1分鐘,那麼等待70秒,這個topic的數據就會自動被刪除!

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

輸出:

Configs for topic 'test' are retention.ms=10000

發現目前的刪除策略為 retention.ms=10000

刪除策略

如果需要刪除上面的10秒策略,使用以下命令:

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms

輸出:

Completed Updating config for entity: topic 'test'.

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test

輸出:

Configs for topic 'test' are

發現策略為空,說明刪除成功了!

五、測試清除策略

測試思路

說明:

第一步,設置清除策略為保留10秒

第二步,進入生產者模式,輸入消息 a

第三步,等待5秒,再次進入生產者模式,輸入消息 b

第四部,進入消費者模式,看輸出的消息是a還是b

判斷標準:

在進行第三步時,a這條消息,應該已經被刪除了。所以在第15秒進入消費者模式時,應該輸出 b,這樣的話,策略才是成功的!

設置策略

topic 為test的數據保留10秒

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

生產模式

進入生產模式,輸入a

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test  > a

等待5秒後,再次進入生產模式,輸入b

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test  > b

消費者模式

等待5秒後,進入 消費者模式

bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning    b

如果消費者輸出為b,表示策略成功!

備註:

如果生產環境中,正在不斷的進行生產和消費,執行kafka-configs.sh 腳本,是否會有影響呢?

答案是不會的,它是動態策略!

本文參考鏈接:

https://blog.csdn.net/forrest_ou/article/details/78999983