快速學習-Kafka API
- 2020 年 2 月 24 日
- 筆記
第 4 章 Kafka API
4.1 Producer API
4.1.1 消息發送流程
Kafka 的 Producer 發送消息採用的是非同步發送的方式。在消息發送的過程中,涉及到了兩個執行緒——main 執行緒和 Sender 執行緒,以及一個執行緒共享變數——RecordAccumulator。main 執行緒將消息發送給 RecordAccumulator,Sender 執行緒不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

相關參數: batch.size:只有數據積累到 batch.size 之後,sender 才會發送數據。 linger.ms:如果數據遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送數據。
4.1.2 非同步發送 API
1)導入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
2)編寫程式碼 需要用到的類: KafkaProducer:需要創建一個生產者對象,用來發送數據 ProducerConfig:獲取所需的一系列配置參數 ProducerRecord:每條數據都要封裝成一個 ProducerRecord 對象
- 不帶回調函數的 API
public class CustomProducer { public static void main(String[] args) throws ExecutionException,InterruptedException { Properties props = new Properties(); //kafka 集群,broker-list props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); //重試次數 props.put("retries", 1); //批次大小 props.put("batch.size", 16384); //等待時間 props.put("linger.ms", 1); //RecordAccumulator 緩衝區大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))); } producer.close(); } }
- 帶回調函數的 API 回調函數會在 producer 收到 ack 時調用,為非同步調用,該方法有兩個參數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。 注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重試次數 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator 緩 沖區大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() { //回調函數,該方法會在 Producer 收到 ack 時調用,為非同步調用 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("success->" + metadata.offset()); } else { exception.printStackTrace(); } } }); } producer.close(); } }
4.1.3 同步發送 API
同步發送的意思就是,一條消息發送之後,會阻塞當前執行緒,直至返回 ack。由於 send 方法返回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實現同步發送的效果,只需在調用 Future 對象的 get 方發即可。
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker - list props.put("acks", "all"); props.put("retries", 1);//重試次數 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待時間 props.put("buffer.memory", 33554432);//RecordAccumulator 緩 沖區大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get(); } producer.close(); } }
4.2 Consumer API
Consumer 消費數據時的可靠性是很容易保證的,因為數據在 Kafka 中是持久化的,故不用擔心數據丟失問題。由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。 所以 offset 的維護是 Consumer 消費數據是必須考慮的問題。
4.2.1 自動提交 offset
1)導入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
2)編寫程式碼 需要用到的類: KafkaConsumer:需要創建一個消費者對象,用來消費數據 ConsumerConfig:獲取所需的一系列配置參數 ConsuemrRecord:每條數據都要封裝成一個 ConsumerRecord 對象為了使我們能夠專註於自己的業務邏輯,Kafka 提供了自動提交 offset 的功能。自動提交 offset 的相關參數: enable.auto.commit:是否開啟自動提交 offset 功能 auto.commit.interval.ms:自動提交 offset 的時間間隔 以下為自動提交 offset 的程式碼:
public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Arrays.asList("first")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value()); } } }
4.2.2 手動提交 offset
雖然自動提交 offset 十分簡介便利,但由於其是基於時間提交的,開發人員難以把握offset 提交的時機。因此 Kafka 還提供了手動提交 offset 的 API。
手動提交 offset 的方法有兩種:分別是 commitSync(同步提交)和 commitAsync(非同步提交)。兩者的相同點是,都會將本次 poll 的一批數據最高的偏移量提交;不同點是,commitSync 阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitAsync 則沒有失敗重試機制,故有可能提交失敗。 1)同步提交 offset 由於同步提交 offset 有失敗重試機制,故更加可靠,以下為同步提交 offset 的示例。
public class CustomComsumer { public static void main(String[] args) { Properties props = new Properties(); //Kafka 集群 props.put("bootstrap.servers", "hadoop102:9092"); //消費者組,只要 group.id 相同,就屬於同一個消費者組 props.put("group.id", "test"); props.put("enable.auto.commit", "false");//關閉自動提交 offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題 while (true) { //消費者拉取數據 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value()); } //同步提交,當前執行緒會阻塞直到 offset 提交成功 consumer.commitSync(); } } }
2)非同步提交 offset 雖然同步提交 offset 更可靠一些,但是由於其會阻塞當前執行緒,直到提交成功。因此吞吐量會收到很大的影響。因此更多的情況下,會選用非同步提交 offset 的方式。 以下為非同步提交 offset 的示例:
public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); //Kafka 集群 props.put("bootstrap.servers", "hadoop102:9092"); //消費者組,只要 group.id 相同,就屬於同一個消費者組 props.put("group.id", "test"); //關閉自動提交 offset props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題 while (true) { ConsumerRecords<String, String> records = consumer.poll(100);//消費者拉取數據 for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value()); } //非同步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for" + offsets); } } }); } } }
3) 數據漏消費和重複消費分析 無論是同步提交還是非同步提交 offset,都有可能會造成數據的漏消費或者重複消費。先提交 offset 後消費,有可能造成數據的漏消費;而先消費後提交 offset,有可能會造成數據的重複消費。
4.2.3 自定義存儲 offset
Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之後,默認將 offset 存儲在 Kafka的一個內置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。 offset 的維護是相當繁瑣的,因為需要考慮到消費者的 Rebalace。當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新分配,重新分配的過程叫做 Rebalance。消費者發生 Rebalance 之後,每個消費者消費的分區就會發生變化。因此消費者要首先獲取到自己被重新分配到的分區,並且定位到每個分區最近提交的 offset 位置繼續消費。要實現自定義存儲 offset,需要藉助 ConsumerRebalanceListener
,以下為示例程式碼,其中提交和獲取 offset 的方法,需要根據所選的 offset 存儲系統自行實現。
public class CustomConsumer { private static Map<TopicPartition, Long> currentOffset = new HashMap(); public static void main(String[] args) { //創建配置資訊 Properties props = new Properties(); //Kafka 集群 props.put("bootstrap.servers", "hadoop102:9092"); //消費者組,只要 group.id 相同,就屬於同一個消費者組 props.put("group.id", "test"); //關閉自動提交 offset props.put("enable.auto.commit", "false"); //Key 和 Value 的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //創建一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題 consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() { //該方法會在 Rebalance 之前調用 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { commitOffset(currentOffset); } //該方法會在 Rebalance 之後調用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear(); for (TopicPartition partition : partitions) { consumer.seek(partition, getOffset(partition));// 定位到最近提交的 offset 位置繼續消費 } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100);//消費者拉取數據 for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value()); currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); } commitOffset(currentOffset);//非同步提交 } } //獲取某分區的最新 offset private static long getOffset(TopicPartition partition) { return 0; } //提交該消費者所有分區的 offset private static void commitOffset(Map<TopicPartition, Long> currentOffset) { } }
4.3 自定義 Interceptor
4.3.1 攔截器原理
Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用於實現 clients 端的訂製化控制邏輯。對於 producer 而言,interceptor 使得用戶在消息發送前以及 producer 回調邏輯前有機會對消息做一些訂製化需求,比如修改消息等。同時,producer 允許用戶指定多個 interceptor按序作用於同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor 的實現介面是 org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括: (1)configure(configs) 獲取配置資訊和初始化數據時調用。 (2)onSend(ProducerRecord): 該方法封裝進 KafkaProducer.send 方法中,即它運行在用戶主執行緒中。Producer 確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區,否則會影響目標分區的計算。 (3)onAcknowledgement(RecordMetadata, Exception): 該方法會在消息從 RecordAccumulator 成功發送到 Kafka Broker 之後,或者在發送過程中失敗時調用。並且通常都是在 producer 回調邏輯觸發之前。onAcknowledgement 運行在producer 的 IO 執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢 producer 的消息發送效率。 (4)close: 關閉 interceptor,主要用於執行一些資源清理工作如前所述,interceptor 可能被運行在多個執行緒中,因此在具體實現時用戶需要自行確保執行緒安全。另外倘若指定了多個 interceptor,則 producer 將按照指定順序調用它們,並僅僅是捕獲每個 interceptor 可能拋出的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程中要特別留意。
4.3.2 攔截器案例
1)需求: 實現一個簡單的雙 interceptor 組成的攔截鏈。第一個 interceptor 會在消息發送前將時間戳資訊加到消息 value 的最前部;第二個 interceptor 會在消息發送後更新成功發送消息數或失敗發送消息數。

2)案例實操 (1)增加時間戳攔截器
public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創建一個新的 record,把時間戳寫入消息體的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }
(2)統計發送消息成功和發送失敗消息數,並在 producer 關閉時列印這兩個計數器
public class CounterInterceptor implements ProducerInterceptor<String, String> { private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統計成功和失敗的次數 if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
(3)producer 主程式
public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1 設置配置資訊 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 構建攔截鏈 List<String> interceptors = new ArrayList<>(); interceptors.add("com.atguigu.kafka.interceptor.TimeInterce ptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer<String, String> producer = new KafkaProducer<>(props); // 3 發送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); producer.send(record); } // 4 一定要關閉 producer,這樣才會調用 interceptor 的 close 方法 producer.close(); } }
3)測試 (1)在 kafka 上啟動消費者,然後運行客戶端 java 程式。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first 1501904047034,message0 1501904047225,message1 1501904047230,message2 1501904047234,message3 1501904047236,message4 1501904047240,message5 1501904047243,message6 1501904047246,message7 1501904047249,message8 1501904047252,message9