KafkaProducer 簡析
- 2021 年 1 月 1 日
- 筆記
使用方式
KafkaProducer 發送消息主要有以下 3 種方式:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
// 發送並忘記(fire-and-forget)
producer.send(record);
// 同步發送
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
// 非同步發送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
producer.close();
具體的發送流程可以參考 KafkaProducer發送流程簡析。
KafkaProducer 是執行緒安全的,多個執行緒可以共享同一個 KafkaProducer 對象。
配置解析
client.id
該參數可以是任意的字元串,broker 會用它來識別消息的來源,會在日誌和監控指標里展示。
bootstrap.servers
該屬性指定 broker 的地址列表。
清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他 broker 的資訊。
不過建議至少要提供兩個 broker 的資訊,一旦其中一個宕機,生產者仍然能夠連接到集群上。
key.serializer & value.serializer
這兩個屬性必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer介面的類。
生產者會使用這個類把鍵值對象序列化成位元組數組。
receive.buffer.bytes & send.buffer.bytes
設置 socket 讀寫數據時用到的 TCP 緩衝區大小。如果它們被設為 -1,就使用作業系統的默認值。
當生產者或消費者與 broker 處於不同的機房時,可以適當增大這些值。
buffer.memory
設置生產者記憶體緩衝區的大小,生產者用它緩衝要發送到伺服器的消息。
如果應用程式發送消息的速度超過發送到伺服器的速度,會導致生產者空間不足。
此時KafkaProducer.send()會阻塞等待記憶體釋放,等待時間超過 max.block.ms 後會拋出超時異常。
compression.type
該參數指定了消息被發送給 broker 之前,使用哪一種壓縮演算法(snappy,gzip或lz4)進行壓縮。
使用壓縮可以降低網路傳輸開銷和存儲開銷,而這往往是向 Kafka 發送消息的瓶頸所在。
batch.size
該參數指定了一個批次可以使用的記憶體位元組數(而不是消息個數)。
消息批次ProducerBatch包含了一組將要發送至同個分區的消息,當批次被填滿,批次里的所有消息會被立即發送出去。
不過生產者並不一定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也可能被發送。
所以就算把批次大小設置得很大,也不會造成延遲,只是會佔用更多的記憶體而已。
但如果設置得太小,生產者會頻繁地發送消息,會增加一些額外的網路開銷。
linger.ms
該參數指定了生產者在發送批次之前等待的時間。
生產者會在批次填滿或等待時間達到 linger.ms 時把批次發送出去。
設置linger.ms>0會增加延遲,但也會提升吞吐量(一次性發送更多的消息,每個消息的開銷就變小了)。
acks
參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。
這個參數決定令消息丟失的可能性:
-
acks=0生產者發出消息後不等待來自伺服器的響應
如果當中出現了問題,導致伺服器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。
不過,因為生產者不需要等待伺服器的響應,所以它可以以網路能夠支援的最大速度發送消息,從而達到很高的吞吐量。 -
acks=1只要集群的 leader 節點收到消息,生產者就會收到一個來自伺服器的成功響應
如果消息無法到達 leader 節點(比如:leader節點崩潰,新的 leader 還沒有被選舉出來),生產者會收到一個錯誤響應。
為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新 leader,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是非同步發送:
- 發送端阻塞等待伺服器的響應(通過調用
Future.get()方法),顯然會增加延遲(在網路上傳輸一個來回的延遲) - 發送端使用回調可以緩解延遲問題,不過吞吐量仍受在途消息數量的限制(比如:生產者在收到伺服器響應之前可以發送多少個消息)
- 發送端阻塞等待伺服器的響應(通過調用
-
acks=all只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自伺服器的成功響應
這種模式是最安全的,就算有伺服器發生崩潰,數據也不會丟失。
不過,它的延遲比acks=1時更高,因為我們要等待不只一個伺服器節點接收消息。
retries
該參數決定了生產者可以重發消息的次數(每次重試之間等待 retry.backoff.ms)。
伺服器返回臨時性的錯誤(比如:分區找不到 leader)時,生產者會自動重試,沒必要在程式碼邏輯里處理可重試的錯誤。
作為開發者,只需要處理那些不可重試的錯誤(比如:消息位元組數超過單個發送批次上限)或重試次數超出上限的情況即可。
max.in.flight.requests.per.connection
該參數指定生產者,最多可以發送未響應在途消息批次數量。
在途消息批次越多,會佔用更多的記憶體,不過也會提升吞吐量。
當retries > 0且max.in.flight.requests.per.connection > 1時,可能出現消息亂序。
如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。
如果此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。
一般不建議設置retries=0,而是令max.in.flight.requests.per.connection = 1來保證消息順序。
在生產者嘗試發送第一批消息時,就不會有其他的消息發送給 broker,即使發生重試消息也不會亂序。
不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這麼做。
高級特性
冪等
當 broker 失效時生產者可能會自動重試,導致一條消息被重複寫入多次。
為了避免這種情況,Kafka 在生產者端提供來冪等保證:同一條消息被生產者發送多次,但在 broker端這條消息只會被寫入日誌一次。
在發送端設置 enable.idempotence = true 可以開啟冪等性,此時配置同時滿足以下條件:
max.in.flight.requests.per.connection ≤ 5retries > 0acks = all
其工作機制如下:
- producer 在初始化時必須分配一個 PID
producer id該過程對用戶來說是完全透明的) - 發送到 broker 端的每批消息都會被賦予一個單調遞增的 SN
sequence number用於消息去重(每個分區都有獨立的序列號) - 接收到消息的 broker 會將批次的
(PID, SN)資訊一同持久化到對應的分區日誌中(保證 leader 切換後去重仍然生效)
若重試導致 broker 接收到小於或等於已知最大序列號的消息,broker 會拒絕寫入這些消息,從而保證每條消息也只會被保存在日誌中一次。
由於每個 producer 實例都會被分配不同的 PID,該機制只能保證單個 producer 實例的冪等性,無法實現協同多個 producer 實現冪等。
事務
Kafka 事務可以實現 producer 對多個主題和分區的原子寫入,並且保證 consumer 不會讀取到未提交的數據。
Kafka 要求應用程式必須提供一個全局唯一的 TIDtransactional id:
如果某個 producer 實例失效,該機制能夠保證下一個擁有相同 TID 的實例首先完成之前未完成的事務。
此外,broker 還會為自動每個 producer 分配一個epoch用於隔離fencing out失效但仍存活的 producer:
如果存在,則認為當前 producer 是一個殭屍實例zombie instance並拒絕為其提供服務,防止其破壞事務的完整性。
下面是兩個常見的應用場景:
實現跨主題原子寫入
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("enable.idempotence", "true"); // 開啟冪等
properties.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1");
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2");
ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3");
producer.initTransactions(); // 初始化事務(只需執行一次)
try {
producer.beginTransaction(); // 開始事務
// 向多個不同的 topic 寫入消息
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.commitTransaction(); // 提交事務
} catch (ProducerFencedException e) {
producer.close(); // 事務ID 已被佔用
} catch (KafkaException e) {
producer.abortTransaction();
}
實現 read-process-write 模式
final String groupID = "my-group-id";
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("enable.idempotence", "true"); // 開啟冪等
producerProps.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("isolation.level","read_committed"); // 設置隔離級別
consumerProps.setProperty("group.id", groupID); // 設置消費者組群ID
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singletonList("ping"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 讀取消息
try {
producer.beginTransaction(); // 開啟事務
// 處理消息(可以是任意業務場景)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(ConsumerRecord<String, String> record : records){
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 記錄消費偏移量
producer.send(new ProducerRecord<>("pong", record.value())); // 發送消息
}
producer.sendOffsetsToTransaction(offsets, groupID); // 提交消費偏移量
producer.commitTransaction(); // 事務提交
} catch (ProducerFencedException e) {
producer.close(); // 事務ID 已被佔用
} catch (Exception e){
producer.abortTransaction(); // 回滾事務
}
}


