kafka刪除topic數據
- 2020 年 2 月 25 日
- 筆記
一、概述
生產環境中,有一個topic的數據量非常大。這些數據不是非常重要,需要定期清理。
要求:默認保持24小時,某些topic 需要保留2小時或者6小時

二、清除方式
主要有3個:
1. 基於時間
2. 基於日誌大小
3. 基於日誌起始偏移量
詳情,請參考鏈接:
https://blog.csdn.net/u013256816/article/details/80418297
接下來,主要介紹基於時間的清除!
kafka版本為: 2.11-1.1.0
zk版本為: 3.4.13
三、kafka配置
# 啟用刪除主題 delete.topic.enable=true # 檢查日誌段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。 log.retention.check.interval.ms=1000
注意:這2行配置必須存在,否則清除策略失效!
log.retention.check.interval.ms 參數的單位是毫秒,這裡表示間隔1秒鐘
四、清除策略
全局topic
在 server.properties 文件中配置的是全局策略,針對每一個topic
比如:
log.retention.hours=3
表示保留3個小時
單個topic
針對單個topic策略,需要使用腳本kafka-configs.sh
此腳本不需要重啟kafka就會生效!
首先來查看一下,當前的topic策略,比如test
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
參數解釋:
–describe 詳細信息
–entity-type 實體類型
–entity-name 指定topic名
輸出:
Configs for topic 'test' are
這個表示為策略為空
刪除topic數據
如果需要刪除topic所有數據,使用命令
bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181
這個命令,請謹慎執行!!!
如果想保留主題,只刪除主題現有數據(log)。可以通過修改數據保留時間實現
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000
執行輸出:
Completed Updating config for entity: topic 'test'.
注意:修改保留時間為10秒鐘,並不是10秒鐘就馬上刪掉。kafka是採用輪詢的方式,輪詢到這個topic時,刪除10秒鐘前的數據。
時間由server.properties裏面的log.retention.check.interval.ms選項為主
假設說 log.retention.check.interval.ms 值為1分鐘,那麼等待70秒,這個topic的數據就會自動被刪除!
再次查看topic策略
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
輸出:
Configs for topic 'test' are retention.ms=10000
發現目前的刪除策略為 retention.ms=10000
刪除策略
如果需要刪除上面的10秒策略,使用以下命令:
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms
輸出:
Completed Updating config for entity: topic 'test'.
再次查看topic策略
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
輸出:
Configs for topic 'test' are
發現策略為空,說明刪除成功了!
五、測試清除策略
測試思路

說明:
第一步,設置清除策略為保留10秒
第二步,進入生產者模式,輸入消息 a
第三步,等待5秒,再次進入生產者模式,輸入消息 b
第四部,進入消費者模式,看輸出的消息是a還是b
判斷標準:
在進行第三步時,a這條消息,應該已經被刪除了。所以在第15秒進入消費者模式時,應該輸出 b,這樣的話,策略才是成功的!
設置策略
topic 為test的數據保留10秒
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000
生產模式
進入生產模式,輸入a
bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test > a
等待5秒後,再次進入生產模式,輸入b
bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test > b
消費者模式
等待5秒後,進入 消費者模式
bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning b
如果消費者輸出為b,表示策略成功!
備註:
如果生產環境中,正在不斷的進行生產和消費,執行kafka-configs.sh 腳本,是否會有影響呢?
答案是不會的,它是動態策略!
本文參考鏈接:
https://blog.csdn.net/forrest_ou/article/details/78999983