快速學習-Kafka API

  • 2020 年 2 月 24 日
  • 筆記

第 4 章 Kafka API

4.1 Producer API

4.1.1 消息發送流程

Kafka 的 Producer 發送消息採用的是非同步發送的方式。在消息發送的過程中,涉及到了兩個執行緒——main 執行緒和 Sender 執行緒,以及一個執行緒共享變數——RecordAccumulator。main 執行緒將消息發送給 RecordAccumulator,Sender 執行緒不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

相關參數: batch.size:只有數據積累到 batch.size 之後,sender 才會發送數據。 linger.ms:如果數據遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送數據。

4.1.2 非同步發送 API

1)導入依賴

<dependency>  	<groupId>org.apache.kafka</groupId>  	<artifactId>kafka-clients</artifactId>  	<version>0.11.0.0</version>  </dependency>

2)編寫程式碼 需要用到的類: KafkaProducer:需要創建一個生產者對象,用來發送數據 ProducerConfig:獲取所需的一系列配置參數 ProducerRecord:每條數據都要封裝成一個 ProducerRecord 對象

  1. 不帶回調函數的 API
public class CustomProducer {  	 public static void main(String[] args) throws ExecutionException,InterruptedException {  		 Properties props = new Properties();  		 //kafka 集群,broker-list  		 props.put("bootstrap.servers", "hadoop102:9092");  		 props.put("acks", "all");  		 //重試次數  		 props.put("retries", 1);  		 //批次大小  		 props.put("batch.size", 16384);  		 //等待時間  		 props.put("linger.ms", 1);  		 //RecordAccumulator 緩衝區大小  		 props.put("buffer.memory", 33554432);  		 props.put("key.serializer",  		"org.apache.kafka.common.serialization.StringSerializer");  		 props.put("value.serializer",  		"org.apache.kafka.common.serialization.StringSerializer");  		 Producer<String, String> producer = new  		KafkaProducer<>(props);  		 for (int i = 0; i < 100; i++) {  			 producer.send(new ProducerRecord<String, String>("first",  			Integer.toString(i), Integer.toString(i)));  		 }  		 producer.close();  	 }   }
  1. 帶回調函數的 API 回調函數會在 producer 收到 ack 時調用,為非同步調用,該方法有兩個參數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。 注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
public class CustomProducer {      public static void main(String[] args) throws ExecutionException, InterruptedException {          Properties props = new Properties();          props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker-list          props.put("acks", "all");          props.put("retries", 1);//重試次數          props.put("batch.size", 16384);//批次大小          props.put("linger.ms", 1);//等待時間          props.put("buffer.memory", 33554432);//RecordAccumulator 緩          沖區大小          props.put("key.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          props.put("value.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          Producer<String, String> producer = new                  KafkaProducer<>(props);          for (int i = 0; i < 100; i++) {              producer.send(new ProducerRecord<String, String>("first",                      Integer.toString(i), Integer.toString(i)), new Callback() {                  //回調函數,該方法會在 Producer 收到 ack 時調用,為非同步調用                  @Override                  public void onCompletion(RecordMetadata metadata, Exception exception) {                      if (exception == null) {                          System.out.println("success->" +                                  metadata.offset());                      } else {                          exception.printStackTrace();                      }                  }              });          }          producer.close();      }  }

4.1.3 同步發送 API

同步發送的意思就是,一條消息發送之後,會阻塞當前執行緒,直至返回 ack。由於 send 方法返回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實現同步發送的效果,只需在調用 Future 對象的 get 方發即可。

public class CustomProducer {      public static void main(String[] args) throws ExecutionException,              InterruptedException {          Properties props = new Properties();          props.put("bootstrap.servers", "hadoop102:9092");//kafka 集群,broker - list          props.put("acks", "all");          props.put("retries", 1);//重試次數          props.put("batch.size", 16384);//批次大小          props.put("linger.ms", 1);//等待時間          props.put("buffer.memory", 33554432);//RecordAccumulator 緩          沖區大小          props.put("key.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          props.put("value.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          Producer<String, String> producer = new                  KafkaProducer<>(props);          for (int i = 0; i < 100; i++) {              producer.send(new ProducerRecord<String, String>("first",                      Integer.toString(i), Integer.toString(i))).get();          }          producer.close();      }  }

4.2 Consumer API

Consumer 消費數據時的可靠性是很容易保證的,因為數據在 Kafka 中是持久化的,故不用擔心數據丟失問題。由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。 所以 offset 的維護是 Consumer 消費數據是必須考慮的問題。

4.2.1 自動提交 offset

1)導入依賴

<dependency>  	<groupId>org.apache.kafka</groupId>  	<artifactId>kafka-clients</artifactId>  	<version>0.11.0.0</version>  </dependency>

2)編寫程式碼 需要用到的類: KafkaConsumer:需要創建一個消費者對象,用來消費數據 ConsumerConfig:獲取所需的一系列配置參數 ConsuemrRecord:每條數據都要封裝成一個 ConsumerRecord 對象為了使我們能夠專註於自己的業務邏輯,Kafka 提供了自動提交 offset 的功能。自動提交 offset 的相關參數: enable.auto.commit:是否開啟自動提交 offset 功能 auto.commit.interval.ms:自動提交 offset 的時間間隔 以下為自動提交 offset 的程式碼:

public class CustomConsumer {      public static void main(String[] args) {          Properties props = new Properties();          props.put("bootstrap.servers", "hadoop102:9092");          props.put("group.id", "test");          props.put("enable.auto.commit", "true");          props.put("auto.commit.interval.ms", "1000");          props.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          props.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props);          consumer.subscribe(Arrays.asList("first"));          while (true) {              ConsumerRecords<String, String> records =                      consumer.poll(100);              for (ConsumerRecord<String, String> record : records)                  System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value());          }      }  }

4.2.2 手動提交 offset

雖然自動提交 offset 十分簡介便利,但由於其是基於時間提交的,開發人員難以把握offset 提交的時機。因此 Kafka 還提供了手動提交 offset 的 API。

手動提交 offset 的方法有兩種:分別是 commitSync(同步提交)和 commitAsync(非同步提交)。兩者的相同點是,都會將本次 poll 的一批數據最高的偏移量提交;不同點是,commitSync 阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitAsync 則沒有失敗重試機制,故有可能提交失敗。 1)同步提交 offset 由於同步提交 offset 有失敗重試機制,故更加可靠,以下為同步提交 offset 的示例。

public class CustomComsumer {      public static void main(String[] args) {          Properties props = new Properties();          //Kafka 集群          props.put("bootstrap.servers", "hadoop102:9092");          //消費者組,只要 group.id 相同,就屬於同一個消費者組          props.put("group.id", "test");          props.put("enable.auto.commit", "false");//關閉自動提交 offset          props.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          props.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          KafkaConsumer<String, String> consumer = new                  KafkaConsumer<>(props);          consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題          while (true) {              //消費者拉取數據              ConsumerRecords<String, String> records =                      consumer.poll(100);              for (ConsumerRecord<String, String> record : records) {                  System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value());              }              //同步提交,當前執行緒會阻塞直到 offset 提交成功              consumer.commitSync();          }      }  }

2)非同步提交 offset 雖然同步提交 offset 更可靠一些,但是由於其會阻塞當前執行緒,直到提交成功。因此吞吐量會收到很大的影響。因此更多的情況下,會選用非同步提交 offset 的方式。 以下為非同步提交 offset 的示例:

public class CustomConsumer {      public static void main(String[] args) {          Properties props = new Properties();          //Kafka 集群          props.put("bootstrap.servers", "hadoop102:9092");          //消費者組,只要 group.id 相同,就屬於同一個消費者組          props.put("group.id", "test");          //關閉自動提交 offset          props.put("enable.auto.commit", "false");          props.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          props.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          KafkaConsumer<String, String> consumer = new                  KafkaConsumer<>(props);          consumer.subscribe(Arrays.asList("first"));//消費者訂閱主題          while (true) {              ConsumerRecords<String, String> records =                      consumer.poll(100);//消費者拉取數據              for (ConsumerRecord<String, String> record : records) {                  System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value());              }              //非同步提交              consumer.commitAsync(new OffsetCommitCallback() {                  @Override                  public void onComplete(Map<TopicPartition,                          OffsetAndMetadata> offsets, Exception exception) {                      if (exception != null) {                          System.err.println("Commit failed for" +                                  offsets);                      }                  }              });          }      }  }

3) 數據漏消費和重複消費分析 無論是同步提交還是非同步提交 offset,都有可能會造成數據的漏消費或者重複消費。先提交 offset 後消費,有可能造成數據的漏消費;而先消費後提交 offset,有可能會造成數據的重複消費。

4.2.3 自定義存儲 offset

Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之後,默認將 offset 存儲在 Kafka的一個內置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。 offset 的維護是相當繁瑣的,因為需要考慮到消費者的 Rebalace。當有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區發生變化,就會觸發到分區的重新分配,重新分配的過程叫做 Rebalance。消費者發生 Rebalance 之後,每個消費者消費的分區就會發生變化。因此消費者要首先獲取到自己被重新分配到的分區,並且定位到每個分區最近提交的 offset 位置繼續消費。要實現自定義存儲 offset,需要藉助 ConsumerRebalanceListener,以下為示例程式碼,其中提交和獲取 offset 的方法,需要根據所選的 offset 存儲系統自行實現。

public class CustomConsumer {      private static Map<TopicPartition, Long> currentOffset = new              HashMap();        public static void main(String[] args) {          //創建配置資訊          Properties props = new Properties();          //Kafka 集群          props.put("bootstrap.servers", "hadoop102:9092");          //消費者組,只要 group.id 相同,就屬於同一個消費者組          props.put("group.id", "test");          //關閉自動提交 offset          props.put("enable.auto.commit", "false");          //Key 和 Value 的反序列化類          props.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          props.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          //創建一個消費者          KafkaConsumer<String, String> consumer = new                  KafkaConsumer<>(props);          //消費者訂閱主題          consumer.subscribe(Arrays.asList("first"), new                  ConsumerRebalanceListener() {                        //該方法會在 Rebalance 之前調用                      @Override                      public void                      onPartitionsRevoked(Collection<TopicPartition> partitions) {                          commitOffset(currentOffset);                      }                        //該方法會在 Rebalance 之後調用                      @Override                      public void                      onPartitionsAssigned(Collection<TopicPartition> partitions) {                          currentOffset.clear();                          for (TopicPartition partition : partitions) {                              consumer.seek(partition, getOffset(partition));//                              定位到最近提交的 offset 位置繼續消費                          }                      }                  });          while (true) {              ConsumerRecords<String, String> records =                      consumer.poll(100);//消費者拉取數據              for (ConsumerRecord<String, String> record : records) {                  System.out.printf("offset = %d, key = %s, value = % s % n ", record.offset(), record.key(), record.value());                  currentOffset.put(new TopicPartition(record.topic(),                          record.partition()), record.offset());              }              commitOffset(currentOffset);//非同步提交          }      }        //獲取某分區的最新 offset      private static long getOffset(TopicPartition partition) {          return 0;      }        //提交該消費者所有分區的 offset      private static void commitOffset(Map<TopicPartition, Long> currentOffset) {      }  }

4.3 自定義 Interceptor

4.3.1 攔截器原理

Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用於實現 clients 端的訂製化控制邏輯。對於 producer 而言,interceptor 使得用戶在消息發送前以及 producer 回調邏輯前有機會對消息做一些訂製化需求,比如修改消息等。同時,producer 允許用戶指定多個 interceptor按序作用於同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor 的實現介面是 org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括: (1)configure(configs) 獲取配置資訊和初始化數據時調用。 (2)onSend(ProducerRecord): 該方法封裝進 KafkaProducer.send 方法中,即它運行在用戶主執行緒中。Producer 確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區,否則會影響目標分區的計算。 (3)onAcknowledgement(RecordMetadata, Exception): 該方法會在消息從 RecordAccumulator 成功發送到 Kafka Broker 之後,或者在發送過程中失敗時調用。並且通常都是在 producer 回調邏輯觸發之前。onAcknowledgement 運行在producer 的 IO 執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢 producer 的消息發送效率。 (4)close: 關閉 interceptor,主要用於執行一些資源清理工作如前所述,interceptor 可能被運行在多個執行緒中,因此在具體實現時用戶需要自行確保執行緒安全。另外倘若指定了多個 interceptor,則 producer 將按照指定順序調用它們,並僅僅是捕獲每個 interceptor 可能拋出的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程中要特別留意。

4.3.2 攔截器案例

1)需求: 實現一個簡單的雙 interceptor 組成的攔截鏈。第一個 interceptor 會在消息發送前將時間戳資訊加到消息 value 的最前部;第二個 interceptor 會在消息發送後更新成功發送消息數或失敗發送消息數。

2)案例實操 (1)增加時間戳攔截器

public class TimeInterceptor implements ProducerInterceptor<String, String> {      @Override      public void configure(Map<String, ?> configs) {      }        @Override      public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {          // 創建一個新的 record,把時間戳寫入消息體的最前部          return new ProducerRecord(record.topic(),                  record.partition(), record.timestamp(), record.key(),                  System.currentTimeMillis() + "," +                          record.value().toString());      }        @Override      public void onAcknowledgement(RecordMetadata metadata,                                    Exception exception) {      }        @Override      public void close() {      }  }

(2)統計發送消息成功和發送失敗消息數,並在 producer 關閉時列印這兩個計數器

public class CounterInterceptor implements ProducerInterceptor<String, String> {      private int errorCounter = 0;      private int successCounter = 0;        @Override      public void configure(Map<String, ?> configs) {      }        @Override      public ProducerRecord<String, String>      onSend(ProducerRecord<String, String> record) {          return record;      }        @Override      public void onAcknowledgement(RecordMetadata metadata,                                    Exception exception) {          // 統計成功和失敗的次數          if (exception == null) {              successCounter++;          } else {              errorCounter++;          }      }        @Override      public void close() {          // 保存結果          System.out.println("Successful sent: " + successCounter);          System.out.println("Failed sent: " + errorCounter);      }  }

(3)producer 主程式

public class InterceptorProducer {      public static void main(String[] args) throws Exception {          // 1 設置配置資訊          Properties props = new Properties();          props.put("bootstrap.servers", "hadoop102:9092");          props.put("acks", "all");          props.put("retries", 3);          props.put("batch.size", 16384);          props.put("linger.ms", 1);          props.put("buffer.memory", 33554432);          props.put("key.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          props.put("value.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          // 2 構建攔截鏈          List<String> interceptors = new ArrayList<>();          interceptors.add("com.atguigu.kafka.interceptor.TimeInterce ptor");          interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor");          props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);          String topic = "first";          Producer<String, String> producer = new                  KafkaProducer<>(props);          // 3 發送消息          for (int i = 0; i < 10; i++) {              ProducerRecord<String, String> record = new                      ProducerRecord<>(topic, "message" + i);              producer.send(record);          }          // 4 一定要關閉 producer,這樣才會調用 interceptor 的 close 方法          producer.close();      }  }

3)測試 (1)在 kafka 上啟動消費者,然後運行客戶端 java 程式。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh   --bootstrap-server hadoop102:9092 --from-beginning --topic  first  1501904047034,message0  1501904047225,message1  1501904047230,message2  1501904047234,message3  1501904047236,message4  1501904047240,message5  1501904047243,message6  1501904047246,message7  1501904047249,message8  1501904047252,message9