我的 Kafka 旅程 – 性能調優
Producer
於 config/producer.properties 配置文件中的項
# 序列化數據壓縮方式 [none/gzip/snappy/lz4/zstd]
compression.type = snappy # default=none
# 記憶體隊列緩衝區總大小
buffer.memory = 67108864 # default=32M
# 數據塊/批次 單個大小
batch.size = 32768 # default=16K
# 數據塊/批次 過期毫秒
linger.ms = 5 # default=0
# Broker 分區的應答機制
acks = 1 # default=all
# 發送請求允許最大的積壓數
max.in.flight.requests.per.connection = 5 # default=5
# 發送失敗的重試次數
retries = 2147483647 # default=0
# 發送失敗重試間隔毫秒
retry.backoff.ms = 100 # default=100ms
# 冪等性(生產者編號 + Broker分區編號 + 消息編號)
enable.idempotence = true # default=true
Broker
於 config/server.properties 配置文件中的項
# 數據寫磁碟執行緒數(佔總核心數60%)
num.io.threads = 8 # default=8
# 副本主動拉取執行緒數(佔總核心數10%)
num.replica.fetchers = 1 # default=1
# 數據網路傳輸執行緒數(佔總核心數30%)
num.network.threads = 3 # default=3
# 不存在的Topic自動創建
auto.create.topics.enable = true # default=true
# 副本通訊超時
replica.lag.time.max.ms = 30000 # default=30000
# Broker leader partition 分區再平衡
auto.leader.rebalance.enable = true # default=true
# 再平衡警戒值(%)
leader.imbalance.per.broker.percentage = 1 # default=10
# 再平衡檢測間隔秒數
leader.imbalance.check.interval.seconds = 300 # default=300
# 數據分片單文件大小
log.segment.bytes = 1073741824 # default=1GB
# 數據每索引範圍大小
log.index.interval.bytes = 4096 # default=4KB
# 數據保留時長
log.retention.hours = 168 # default=168 (7天)
# 數據保留分鐘
log.retention.minutes # default=null
# 數據保留毫秒
log.retention.ms # default=null
# 數據保留檢測間隔
log.retention.check.interval.ms = 300000 # default=300000
# 數據保留總大小
log.retention.bytes = -1 # default=-1 (無窮大)
# 數據刪除策略 [compact,delete]
log.cleanup.policy = delete # default=delete
Consumer
於 config/consumer.properties 配置文件中的項
# 自動提交消費偏移量
enable.auto.commit = true # default=true
# 提交消費偏移量頻率間隔
auto.commit.interval.ms = 5000 # default=5000
# 缺少偏移量的處理 [latest,earliest,none]
auto.offset.reset = latest # default=latest
# 分區數
offsets.topic.num.partitions = 50 # default=50
# 與Broker間的心跳間隔
heartbeat.interval.ms = 5000 # default=3000
# 與Broker間的超時
session.timeout.ms = 45000 # default=45000
# 消息處理最大時長
max.poll.interval.ms = 300000 # default=300000
# 單次拉取數據大小
fetch.max.bytes = 57671680 # default=50M
# 單次拉取數據最大條數
max.poll.records = 500 # default=500
# 再平衡策略 # default= Range + CooperativeSticky
partition.assignment.strategy = class...RangeAssignor,class...CooperativeStickyAssignor
整體吞吐量
生產者
- buffer.memory:增加記憶體緩衝區
- batch.size:增加單數據塊/批次容量
- linger.ms:消息發送延遲5毫秒
- compression.type:開啟壓縮
Broker
- 增加分區數(按分類分區)並行處理
消費者
- fetch.max.bytes:每次消費數據最大容量
- max.poll.recodes:每次消費數據最大條數
數據精確一次
生產者:acks = all,冪等性 + 事務
Broker:分區副本至少大於2,防丟失
消費者:手動提交offset + 事務