Kafka常用命令合集

  • 2019 年 10 月 6 日
  • 筆記

在上一篇文章《Linux安裝Kafka》中,已經介紹了如何在Linux安裝Kafka,以及Kafka的啟動/關閉和創建發話題併產生消息和消費消息。這篇文章就介紹介紹Kafka的那些常用的命令。

關於Kafka的啟停/創建話題/消息的產生和消費等命令在上一篇文章《Linux安裝Kafka》中已經指出,這裡就不說了。就說說其他常用命令。

♛ 1 查看消費者狀態和消費詳情

有時候我們需要關心消費者應用的狀態,一般消費者應用會自己通過日誌獲知當前消費到了哪個topic的哪個partition的哪個offset,但當消費者出問題之後,或者出於監控的原因,我們需要知道消費者的狀態和詳情,那麼需要藉助kafka提供的相關命令。

命令格式:

bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list

bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe

Tips:

  • BROKER_HOST是kafka server的ip地址,PORT是server的監聽埠。多個host port之間用逗號隔開
  • 第一條命令是獲取group列表,一般而言,應用是知道消費者group的,通常在應用的配置里,如果已知,該步驟可以省略
  • 第二條命令是查看具體的消費者group的詳情資訊,需要給出group的名稱

示例:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --describe

效果圖:

Kafka常用命令合集

  • TOPIC:該group里消費的topic名稱
  • PARTITION:分區編號
  • CURRENT-OFFSET:該分區當前消費到的offset
  • LOG-END-OFFSET:該分區當前latest offset
  • LAG:消費滯後區間,為LOG-END-OFFSET-CURRENT-OFFSET,具體大小需要看應用消費速度和生產者速度,一般過大則可能出現消費跟不上,需要引起應用注意
  • CONSUMER-ID:server端給該分區分配的consumer編號
  • HOST:消費者所在主機
  • CLIENT-ID:消費者id,一般由應用指定
♛ 2 查詢topic的offset的範圍

用下面命令可以查詢到topic:demo broker:localhost:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic demo --time -2

查詢offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic demo --time -1

效果圖:

Kafka常用命令合集

♛ 3 重置消費者offset

有些場景可能希望修改消費者消費到的offset位置,以達到重新消費,或者跳過一部分消息的目的,這時候重置offset的工具就非常實用。

命令格式:

bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME

bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME

Tips:

  • BROKER_HOST是kafka server的ip地址,PORT是server的監聽埠。多個host port之間用逗號隔開
  • 第一條命令是將指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重啟消費者後,消費中將從指定的offset處消費。注意這裡只能NEW_OFFSET只能設置一個值,也就是說,所有的分區都將使用這個值,如果分區消息負載不均衡,需要考慮是否適用。
  • 第二條命令是將指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消費者從頭或者從尾部消費。

示例:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-offset 3 --topic demo

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-latest --topic demo

效果圖:

Kafka常用命令合集

可以通過直接更換消費者group id的方式,配合消費者默認的消費策略,可以達到類似的效果,反而更加簡單、高效和安全。

♛ 4 查看topic的狀態和分區負載詳情

當broker出現宕機,恢復之後,我們可以看下topic的leader是否負載均衡。因為kafka的所有讀寫消息的請求,都是發送到partition leader上的,因此在生產環境,負載均衡顯得尤其重要。

命令格式:

bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --describe --topic TOPIC_NAME

Tips:

  • ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址,PORT是zookeeper監聽的埠。多個host port之間用逗號隔開
  • 類似的,zookeeper集群不需要全部列上,給出一個可用的zk地址和埠即可

示例:

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo

效果圖:

Kafka常用命令合集

如果發現以下現象說明kafka異常:

  • 某個topic的每個分區,同步副本數量和設定的副本數量不一致
  • 某個topic的每個分區,leader的id數值是-1或者none
♛ 5 消費消息

從頭開始消費:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

效果圖:

Kafka常用命令合集

從尾部開始:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0

效果圖:

Kafka常用命令合集

指定分區:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0

取指定個數:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0 --max-messages 2

如示例部分,取2個消息,取完自動結束回話。

♛ 自帶壓測工具

測試使用Kafka自帶的測試腳本,通過命令對Kafka發起寫入MQ消息和Kafka消費MQ消息的請求。模擬不同數量級的MQ消息寫入和MQ消息消費場景,根據Kafka的處理結果,評估Kafka是否滿足處理億級以上的消息的能力。

命令格式:

bin/kafka-producer-perf-test.sh --topic demo --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

示例:

測試項 壓測消息數(單位:W)  測試命令
寫入MQ消息 10 

bin/kafka-producer-perf-test.sh --topic demo --num-records 100000 --record-size 1000  --throughput 2000 --producer-props bootstrap.servers=localhost:9092

  100 

bin/kafka-producer-perf-test.sh --topic demo --num-records 1000000 --record-size 2000  --throughput 5000 --producer-props bootstrap.servers=localhost:9092

  1000

bin/kafka-producer-perf-test.sh --topic demo --num-records 10000000 --record-size 2000  --throughput 5000 --producer-props bootstrap.servers=localhost:9092

消費MQ消息 10 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 100000 --threads 1

  100 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 1000000 --threads 1

  1000 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 10000000 --threads 1

本文中部分內容翻譯或借鑒於以下學習資料,特別鳴謝:

作  者:請叫我頭頭哥
出  處:http://www.cnblogs.com/toutou/
關於作者:專註於基礎平台的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回復。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信
聲援部落客:如果您覺得文章對您有幫助,可以點擊文章右下角推薦一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!