Kafka 系列(三)—— Kafka 生產者詳解

  • 2019 年 10 月 3 日
  • 筆記

一、生產者發送消息的過程

首先介紹一下 Kafka 生產者發送消息的過程:

  • Kafka 會將發送消息包裝為 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還可以指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網路上傳輸。
  • 接下來,數據被傳給分區器。如果之前已經在 ProducerRecord 對象里指定了分區,那麼分區器就不會再做任何事情。如果沒有指定分區 ,那麼分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。有一個獨立的執行緒負責把這些記錄批次發送到相應的 broker 上。
  • 伺服器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,如果達到指定的重試次數後還沒有成功,則直接拋出異常,不再重試。

二、創建生產者

2.1 項目依賴

本項目採用 Maven 構建,想要調用 Kafka 生產者 API,需要導入 kafka-clients 依賴,如下:

<dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka-clients</artifactId>      <version>2.2.0</version>  </dependency>

2.2 創建生產者

創建 Kafka 生產者時,以下三個屬性是必須指定的:

  • bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的資訊。不過建議至少要提供兩個 broker 的資訊作為容錯;
  • key.serializer :指定鍵的序列化器;
  • value.serializer :指定值的序列化器。

創建的示例程式碼如下:

public class SimpleProducer {        public static void main(String[] args) {            String topicName = "Hello-Kafka";            Properties props = new Properties();          props.put("bootstrap.servers", "hadoop001:9092");          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");          /*創建生產者*/          Producer<String, String> producer = new KafkaProducer<>(props);            for (int i = 0; i < 10; i++) {              ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i,                                                                           "world" + i);              /* 發送消息*/              producer.send(record);          }          /*關閉生產者*/          producer.close();      }  }

本篇文章的所有示例程式碼可以從 Github 上進行下載:kafka-basis

2.3 測試

1. 啟動Kakfa

Kafka 的運行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內置的 zookeeper,也可以啟動自己安裝的:

# zookeeper啟動命令  bin/zkServer.sh start    # 內置zookeeper啟動命令  bin/zookeeper-server-start.sh config/zookeeper.properties

啟動單節點 kafka 用於測試:

# bin/kafka-server-start.sh config/server.properties

2. 創建topic

# 創建用於測試主題  bin/kafka-topics.sh --create                       --bootstrap-server hadoop001:9092                        --replication-factor 1 --partitions 1                        --topic Hello-Kafka    # 查看所有主題   bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 啟動消費者

啟動一個控制台消費者用於觀察寫入情況,啟動命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning

4. 運行項目

此時可以看到消費者控制台,輸出如下,這裡 kafka-console-consumer 只會列印出值資訊,不會列印出鍵資訊。

2.4 可能出現的問題

在這裡可能出現的一個問題是:生產者程式在啟動後,一直處於等待狀態。這通常出現在你使用默認配置啟動 Kafka 的情況下,此時需要對 server.properties 文件中的 listeners 配置進行更改:

# hadoop001 為我啟動kafka服務的主機名,你可以換成自己的主機名或者ip地址  listeners=PLAINTEXT://hadoop001:9092

二、發送消息

上面的示常式序調用了 send 方法發送消息後沒有做任何操作,在這種情況下,我們沒有辦法知道消息發送的結果。想要知道消息發送的結果,可以使用同步發送或者非同步發送來實現。

2.1 同步發送

在調用 send 方法後可以接著調用 get() 方法,send 方法的返回值是一個 Future<RecordMetadata>對象,RecordMetadata 裡面包含了發送消息的主題、分區、偏移量等資訊。改寫後的程式碼如下:

for (int i = 0; i < 10; i++) {      try {          ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);          /*同步發送消息*/          RecordMetadata metadata = producer.send(record).get();          System.out.printf("topic=%s, partition=%d, offset=%s n",                  metadata.topic(), metadata.partition(), metadata.offset());      } catch (InterruptedException | ExecutionException e) {          e.printStackTrace();      }  }

此時得到的輸出如下:偏移量和調用次數有關,所有記錄都分配到了 0 分區,這是因為在創建 Hello-Kafka 主題時候,使用 --partitions 指定其分區數為 1,即只有一個分區。

topic=Hello-Kafka, partition=0, offset=40  topic=Hello-Kafka, partition=0, offset=41  topic=Hello-Kafka, partition=0, offset=42  topic=Hello-Kafka, partition=0, offset=43  topic=Hello-Kafka, partition=0, offset=44  topic=Hello-Kafka, partition=0, offset=45  topic=Hello-Kafka, partition=0, offset=46  topic=Hello-Kafka, partition=0, offset=47  topic=Hello-Kafka, partition=0, offset=48  topic=Hello-Kafka, partition=0, offset=49 

2.2 非同步發送

通常我們並不關心發送成功的情況,更多關注的是失敗的情況,因此 Kafka 提供了非同步發送和回調函數。 程式碼如下:

for (int i = 0; i < 10; i++) {      ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);      /*非同步發送消息,並監聽回調*/      producer.send(record, new Callback() {          @Override          public void onCompletion(RecordMetadata metadata, Exception exception) {              if (exception != null) {                  System.out.println("進行異常處理");              } else {                  System.out.printf("topic=%s, partition=%d, offset=%s n",                          metadata.topic(), metadata.partition(), metadata.offset());              }          }      });  }

三、自定義分區器

Kafka 有著默認的分區機制:

  • 如果鍵值為 null, 則使用輪詢 (Round Robin) 演算法將消息均衡地分布到各個分區上;
  • 如果鍵值不為 null,那麼 Kafka 會使用內置的散列演算法對鍵進行散列,然後分布到各個分區上。

某些情況下,你可能有著自己的分區需求,這時候可以採用自定義分區器實現。這裡給出一個自定義分區器的示例:

3.1 自定義分區器

/**   * 自定義分區器   */  public class CustomPartitioner implements Partitioner {        private int passLine;        @Override      public void configure(Map<String, ?> configs) {          /*從生產者配置中獲取分數線*/          passLine = (Integer) configs.get("pass.line");      }        @Override      public int partition(String topic, Object key, byte[] keyBytes, Object value,                           byte[] valueBytes, Cluster cluster) {          /*key 值為分數,當分數大於分數線時候,分配到 1 分區,否則分配到 0 分區*/          return (Integer) key >= passLine ? 1 : 0;      }        @Override      public void close() {          System.out.println("分區器關閉");      }  }

需要在創建生產者時指定分區器,和分區器所需要的配置參數:

public class ProducerWithPartitioner {        public static void main(String[] args) {            String topicName = "Kafka-Partitioner-Test";            Properties props = new Properties();          props.put("bootstrap.servers", "hadoop001:9092");          props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");            /*傳遞自定義分區器*/          props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");          /*傳遞分區器所需的參數*/          props.put("pass.line", 6);            Producer<Integer, String> producer = new KafkaProducer<>(props);            for (int i = 0; i <= 10; i++) {              String score = "score:" + i;              ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);              /*非同步發送消息*/              producer.send(record, (metadata, exception) ->                      System.out.printf("%s, partition=%d, n", score, metadata.partition()));          }            producer.close();      }  }

3.2 測試

需要創建一個至少有兩個分區的主題:

 bin/kafka-topics.sh --create                       --bootstrap-server hadoop001:9092                        --replication-factor 1 --partitions 2                        --topic Kafka-Partitioner-Test

此時輸入如下,可以看到分數大於等於 6 分的都被分到 1 分區,而小於 6 分的都被分到了 0 分區。

score:6, partition=1,  score:7, partition=1,  score:8, partition=1,  score:9, partition=1,  score:10, partition=1,  score:0, partition=0,  score:1, partition=0,  score:2, partition=0,  score:3, partition=0,  score:4, partition=0,  score:5, partition=0,  分區器關閉

四、生產者其他屬性

上面生產者的創建都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有很多可配置屬性,如下:

1. acks

acks 參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的:

  • acks=0 : 消息發送出去就認為已經成功了,不會等待任何來自伺服器的響應;
  • acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自伺服器成功響應;
  • acks=all :只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自伺服器的成功響應。

2. buffer.memory

設置生產者記憶體緩衝區的大小。

3. compression.type

默認情況下,發送的消息不會被壓縮。如果想要進行壓縮,可以配置此參數,可選值有 snappy,gzip,lz4。

4. retries

發生錯誤後,消息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。

5. batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算。

6. linger.ms

該參數制定了生產者在發送批次之前等待更多消息加入批次的時間。

7. clent.id

客戶端 id,伺服器用來識別消息的來源。

8. max.in.flight.requests.per.connection

指定了生產者在收到伺服器響應之前可以發送多少個消息。它的值越高,就會佔用越多的記憶體,不過也會提升吞吐量,把它設置為 1 可以保證消息是按照發送的順序寫入伺服器,即使發生了重試。

9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

  • timeout.ms 指定了 borker 等待同步副本返回消息的確認時間;
  • request.timeout.ms 指定了生產者在發送數據時等待伺服器返迴響應的時間;
  • metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如分區首領是誰)時等待伺服器返迴響應的時間。

10. max.block.ms

指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

11. max.request.size

該參數用於控制生產者發送的請求大小。它可以指發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1000K ,那麼可以發送的單個最大消息為 1000K ,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1K。

12. receive.buffer.bytes & send.buffer.byte

這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用作業系統的默認值。

參考資料

  1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26

更多大數據系列文章可以參見 GitHub 開源項目大數據入門指南