KafkaProducer 簡析
- 2021 年 1 月 1 日
- 筆記
使用方式
KafkaProducer 發送消息主要有以下 3 種方式:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
// 發送並忘記(fire-and-forget)
producer.send(record);
// 同步發送
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
// 異步發送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
producer.close();
具體的發送流程可以參考 KafkaProducer發送流程簡析。
KafkaProducer 是線程安全的,多個線程可以共享同一個 KafkaProducer 對象。
配置解析
client.id
該參數可以是任意的字符串,broker 會用它來識別消息的來源,會在日誌和監控指標里展示。
bootstrap.servers
該屬性指定 broker 的地址列表。
清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他 broker 的信息。
不過建議至少要提供兩個 broker 的信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。
key.serializer & value.serializer
這兩個屬性必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer
接口的類。
生產者會使用這個類把鍵值對象序列化成位元組數組。
receive.buffer.bytes & send.buffer.bytes
設置 socket 讀寫數據時用到的 TCP 緩衝區大小。如果它們被設為 -1,就使用操作系統的默認值。
當生產者或消費者與 broker 處於不同的機房時,可以適當增大這些值。
buffer.memory
設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。
如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。
此時KafkaProducer.send()
會阻塞等待內存釋放,等待時間超過 max.block.ms 後會拋出超時異常。
compression.type
該參數指定了消息被發送給 broker 之前,使用哪一種壓縮算法(snappy
,gzip
或lz4
)進行壓縮。
使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發送消息的瓶頸所在。
batch.size
該參數指定了一個批次可以使用的內存位元組數(而不是消息個數)。
消息批次ProducerBatch
包含了一組將要發送至同個分區的消息,當批次被填滿,批次里的所有消息會被立即發送出去。
不過生產者並不一定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也可能被發送。
所以就算把批次大小設置得很大,也不會造成延遲,只是會佔用更多的內存而已。
但如果設置得太小,生產者會頻繁地發送消息,會增加一些額外的網絡開銷。
linger.ms
該參數指定了生產者在發送批次之前等待的時間。
生產者會在批次填滿或等待時間達到 linger.ms 時把批次發送出去。
設置linger.ms>0
會增加延遲,但也會提升吞吐量(一次性發送更多的消息,每個消息的開銷就變小了)。
acks
參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。
這個參數決定令消息丟失的可能性:
-
acks=0
生產者發出消息後不等待來自服務器的響應
如果當中出現了問題,導致服務器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。
不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。 -
acks=1
只要集群的 leader 節點收到消息,生產者就會收到一個來自服務器的成功響應
如果消息無法到達 leader 節點(比如:leader節點崩潰,新的 leader 還沒有被選舉出來),生產者會收到一個錯誤響應。
為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新 leader,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是異步發送:
- 發送端阻塞等待服務器的響應(通過調用
Future.get()
方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲) - 發送端使用回調可以緩解延遲問題,不過吞吐量仍受在途消息數量的限制(比如:生產者在收到服務器響應之前可以發送多少個消息)
- 發送端阻塞等待服務器的響應(通過調用
-
acks=all
只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應
這種模式是最安全的,就算有服務器發生崩潰,數據也不會丟失。
不過,它的延遲比acks=1
時更高,因為我們要等待不只一個服務器節點接收消息。
retries
該參數決定了生產者可以重發消息的次數(每次重試之間等待 retry.backoff.ms
)。
服務器返回臨時性的錯誤(比如:分區找不到 leader)時,生產者會自動重試,沒必要在代碼邏輯里處理可重試的錯誤。
作為開發者,只需要處理那些不可重試的錯誤(比如:消息位元組數超過單個發送批次上限)或重試次數超出上限的情況即可。
max.in.flight.requests.per.connection
該參數指定生產者,最多可以發送未響應在途消息批次數量。
在途消息批次越多,會佔用更多的內存,不過也會提升吞吐量。
當retries > 0
且max.in.flight.requests.per.connection > 1
時,可能出現消息亂序。
如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。
如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。
一般不建議設置retries=0
,而是令max.in.flight.requests.per.connection = 1
來保證消息順序。
在生產者嘗試發送第一批消息時,就不會有其他的消息發送給 broker,即使發生重試消息也不會亂序。
不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這麼做。
高級特性
冪等
當 broker 失效時生產者可能會自動重試,導致一條消息被重複寫入多次。
為了避免這種情況,Kafka 在生產者端提供來冪等保證:同一條消息被生產者發送多次,但在 broker端這條消息只會被寫入日誌一次。
在發送端設置 enable.idempotence = true
可以開啟冪等性,此時配置同時滿足以下條件:
max.in.flight.requests.per.connection ≤ 5
retries > 0
acks = all
其工作機制如下:
- producer 在初始化時必須分配一個 PID
producer id
該過程對用戶來說是完全透明的) - 發送到 broker 端的每批消息都會被賦予一個單調遞增的 SN
sequence number
用於消息去重(每個分區都有獨立的序列號) - 接收到消息的 broker 會將批次的
(PID, SN)
信息一同持久化到對應的分區日誌中(保證 leader 切換後去重仍然生效)
若重試導致 broker 接收到小於或等於已知最大序列號的消息,broker 會拒絕寫入這些消息,從而保證每條消息也只會被保存在日誌中一次。
由於每個 producer 實例都會被分配不同的 PID,該機制只能保證單個 producer 實例的冪等性,無法實現協同多個 producer 實現冪等。
事務
Kafka 事務可以實現 producer 對多個主題和分區的原子寫入,並且保證 consumer 不會讀取到未提交的數據。
Kafka 要求應用程序必須提供一個全局唯一的 TIDtransactional id
:
如果某個 producer 實例失效,該機制能夠保證下一個擁有相同 TID 的實例首先完成之前未完成的事務。
此外,broker 還會為自動每個 producer 分配一個epoch
用於隔離fencing out
失效但仍存活的 producer:
如果存在,則認為當前 producer 是一個殭屍實例zombie instance
並拒絕為其提供服務,防止其破壞事務的完整性。
下面是兩個常見的應用場景:
實現跨主題原子寫入
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("enable.idempotence", "true"); // 開啟冪等
properties.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1");
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2");
ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3");
producer.initTransactions(); // 初始化事務(只需執行一次)
try {
producer.beginTransaction(); // 開始事務
// 向多個不同的 topic 寫入消息
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.commitTransaction(); // 提交事務
} catch (ProducerFencedException e) {
producer.close(); // 事務ID 已被佔用
} catch (KafkaException e) {
producer.abortTransaction();
}
實現 read-process-write 模式
final String groupID = "my-group-id";
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("enable.idempotence", "true"); // 開啟冪等
producerProps.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("isolation.level","read_committed"); // 設置隔離級別
consumerProps.setProperty("group.id", groupID); // 設置消費者組群ID
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singletonList("ping"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 讀取消息
try {
producer.beginTransaction(); // 開啟事務
// 處理消息(可以是任意業務場景)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(ConsumerRecord<String, String> record : records){
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 記錄消費偏移量
producer.send(new ProducerRecord<>("pong", record.value())); // 發送消息
}
producer.sendOffsetsToTransaction(offsets, groupID); // 提交消費偏移量
producer.commitTransaction(); // 事務提交
} catch (ProducerFencedException e) {
producer.close(); // 事務ID 已被佔用
} catch (Exception e){
producer.abortTransaction(); // 回滾事務
}
}