我的 Kafka 旅程 – 性能調優

Producer

於 config/producer.properties 配置文件中的項

# 序列化數據壓縮方式 [none/gzip/snappy/lz4/zstd]
compression.type = snappy			# default=none
# 內存隊列緩衝區總大小
buffer.memory = 67108864			# default=32M
# 數據塊/批次 單個大小
batch.size = 32768				# default=16K
# 數據塊/批次 過期毫秒
linger.ms = 5					# default=0
# Broker 分區的應答機制
acks = 1					# default=all
# 發送請求允許最大的積壓數
max.in.flight.requests.per.connection = 5	# default=5
# 發送失敗的重試次數
retries = 2147483647				# default=0
# 發送失敗重試間隔毫秒
retry.backoff.ms = 100				# default=100ms
# 冪等性(生產者編號 + Broker分區編號 + 消息編號)
enable.idempotence = true			# default=true

Broker

於 config/server.properties 配置文件中的項

# 數據寫磁盤線程數(佔總核心數60%)
num.io.threads = 8				# default=8
# 副本主動拉取線程數(佔總核心數10%)
num.replica.fetchers = 1			# default=1
# 數據網絡傳輸線程數(佔總核心數30%)
num.network.threads = 3				# default=3
# 不存在的Topic自動創建
auto.create.topics.enable = true		# default=true
# 副本通信超時
replica.lag.time.max.ms = 30000			# default=30000
# Broker leader partition 分區再平衡
auto.leader.rebalance.enable = true		# default=true
# 再平衡警戒值(%)
leader.imbalance.per.broker.percentage = 1	# default=10
# 再平衡檢測間隔秒數
leader.imbalance.check.interval.seconds = 300	# default=300
# 數據分片單文件大小
log.segment.bytes = 1073741824			# default=1GB
# 數據每索引範圍大小
log.index.interval.bytes = 4096			# default=4KB
# 數據保留時長
log.retention.hours = 168			# default=168 (7天)
# 數據保留分鐘
log.retention.minutes				# default=null
# 數據保留毫秒
log.retention.ms				# default=null
# 數據保留檢測間隔
log.retention.check.interval.ms = 300000	# default=300000
# 數據保留總大小
log.retention.bytes = -1			# default=-1 (無窮大)
# 數據刪除策略 [compact,delete]
log.cleanup.policy = delete			# default=delete

Consumer

於 config/consumer.properties 配置文件中的項

# 自動提交消費偏移量
enable.auto.commit = true		# default=true
# 提交消費偏移量頻率間隔
auto.commit.interval.ms = 5000		# default=5000
# 缺少偏移量的處理 [latest,earliest,none]
auto.offset.reset = latest		# default=latest
# 分區數
offsets.topic.num.partitions = 50	# default=50
# 與Broker間的心跳間隔
heartbeat.interval.ms = 5000		# default=3000
# 與Broker間的超時
session.timeout.ms = 45000		# default=45000
# 消息處理最大時長
max.poll.interval.ms = 300000		# default=300000
# 單次拉取數據大小
fetch.max.bytes = 57671680		# default=50M
# 單次拉取數據最大條數
max.poll.records = 500			# default=500
# 再平衡策略				# default= Range + CooperativeSticky
partition.assignment.strategy = class...RangeAssignor,class...CooperativeStickyAssignor

整體吞吐量

生產者

  • buffer.memory:增加內存緩衝區
  • batch.size:增加單數據塊/批次容量
  • linger.ms:消息發送延遲5毫秒
  • compression.type:開啟壓縮

Broker

  • 增加分區數(按分類分區)並行處理

消費者

  • fetch.max.bytes:每次消費數據最大容量
  • max.poll.recodes:每次消費數據最大條數

數據精確一次

生產者:acks = all,冪等性 + 事務
Broker:分區副本至少大於2,防丟失
消費者:手動提交offset + 事務

 

Tags: