Kafka 系列(三)—— Kafka 生產者詳解
- 2019 年 10 月 3 日
- 筆記
一、生產者發送消息的過程
首先介紹一下 Kafka 生產者發送消息的過程:
- Kafka 會將發送消息包裝為 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還可以指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網路上傳輸。
- 接下來,數據被傳給分區器。如果之前已經在 ProducerRecord 對象里指定了分區,那麼分區器就不會再做任何事情。如果沒有指定分區 ,那麼分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。有一個獨立的執行緒負責把這些記錄批次發送到相應的 broker 上。
- 伺服器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,如果達到指定的重試次數後還沒有成功,則直接拋出異常,不再重試。

二、創建生產者
2.1 項目依賴
本項目採用 Maven 構建,想要調用 Kafka 生產者 API,需要導入 kafka-clients
依賴,如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
2.2 創建生產者
創建 Kafka 生產者時,以下三個屬性是必須指定的:
- bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的資訊。不過建議至少要提供兩個 broker 的資訊作為容錯;
- key.serializer :指定鍵的序列化器;
- value.serializer :指定值的序列化器。
創建的示例程式碼如下:
public class SimpleProducer { public static void main(String[] args) { String topicName = "Hello-Kafka"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*創建生產者*/ Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i); /* 發送消息*/ producer.send(record); } /*關閉生產者*/ producer.close(); } }
本篇文章的所有示例程式碼可以從 Github 上進行下載:kafka-basis
2.3 測試
1. 啟動Kakfa
Kafka 的運行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內置的 zookeeper,也可以啟動自己安裝的:
# zookeeper啟動命令 bin/zkServer.sh start # 內置zookeeper啟動命令 bin/zookeeper-server-start.sh config/zookeeper.properties
啟動單節點 kafka 用於測試:
# bin/kafka-server-start.sh config/server.properties
2. 創建topic
# 創建用於測試主題 bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic Hello-Kafka # 查看所有主題 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. 啟動消費者
啟動一個控制台消費者用於觀察寫入情況,啟動命令如下:
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
4. 運行項目
此時可以看到消費者控制台,輸出如下,這裡 kafka-console-consumer
只會列印出值資訊,不會列印出鍵資訊。

2.4 可能出現的問題
在這裡可能出現的一個問題是:生產者程式在啟動後,一直處於等待狀態。這通常出現在你使用默認配置啟動 Kafka 的情況下,此時需要對 server.properties
文件中的 listeners
配置進行更改:
# hadoop001 為我啟動kafka服務的主機名,你可以換成自己的主機名或者ip地址 listeners=PLAINTEXT://hadoop001:9092
二、發送消息
上面的示常式序調用了 send
方法發送消息後沒有做任何操作,在這種情況下,我們沒有辦法知道消息發送的結果。想要知道消息發送的結果,可以使用同步發送或者非同步發送來實現。
2.1 同步發送
在調用 send
方法後可以接著調用 get()
方法,send
方法的返回值是一個 Future<RecordMetadata>對象,RecordMetadata 裡面包含了發送消息的主題、分區、偏移量等資訊。改寫後的程式碼如下:
for (int i = 0; i < 10; i++) { try { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*同步發送消息*/ RecordMetadata metadata = producer.send(record).get(); System.out.printf("topic=%s, partition=%d, offset=%s n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
此時得到的輸出如下:偏移量和調用次數有關,所有記錄都分配到了 0 分區,這是因為在創建 Hello-Kafka
主題時候,使用 --partitions
指定其分區數為 1,即只有一個分區。
topic=Hello-Kafka, partition=0, offset=40 topic=Hello-Kafka, partition=0, offset=41 topic=Hello-Kafka, partition=0, offset=42 topic=Hello-Kafka, partition=0, offset=43 topic=Hello-Kafka, partition=0, offset=44 topic=Hello-Kafka, partition=0, offset=45 topic=Hello-Kafka, partition=0, offset=46 topic=Hello-Kafka, partition=0, offset=47 topic=Hello-Kafka, partition=0, offset=48 topic=Hello-Kafka, partition=0, offset=49
2.2 非同步發送
通常我們並不關心發送成功的情況,更多關注的是失敗的情況,因此 Kafka 提供了非同步發送和回調函數。 程式碼如下:
for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i); /*非同步發送消息,並監聽回調*/ producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("進行異常處理"); } else { System.out.printf("topic=%s, partition=%d, offset=%s n", metadata.topic(), metadata.partition(), metadata.offset()); } } }); }
三、自定義分區器
Kafka 有著默認的分區機制:
- 如果鍵值為 null, 則使用輪詢 (Round Robin) 演算法將消息均衡地分布到各個分區上;
- 如果鍵值不為 null,那麼 Kafka 會使用內置的散列演算法對鍵進行散列,然後分布到各個分區上。
某些情況下,你可能有著自己的分區需求,這時候可以採用自定義分區器實現。這裡給出一個自定義分區器的示例:
3.1 自定義分區器
/** * 自定義分區器 */ public class CustomPartitioner implements Partitioner { private int passLine; @Override public void configure(Map<String, ?> configs) { /*從生產者配置中獲取分數線*/ passLine = (Integer) configs.get("pass.line"); } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /*key 值為分數,當分數大於分數線時候,分配到 1 分區,否則分配到 0 分區*/ return (Integer) key >= passLine ? 1 : 0; } @Override public void close() { System.out.println("分區器關閉"); } }
需要在創建生產者時指定分區器,和分區器所需要的配置參數:
public class ProducerWithPartitioner { public static void main(String[] args) { String topicName = "Kafka-Partitioner-Test"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*傳遞自定義分區器*/ props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner"); /*傳遞分區器所需的參數*/ props.put("pass.line", 6); Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i <= 10; i++) { String score = "score:" + i; ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score); /*非同步發送消息*/ producer.send(record, (metadata, exception) -> System.out.printf("%s, partition=%d, n", score, metadata.partition())); } producer.close(); } }
3.2 測試
需要創建一個至少有兩個分區的主題:
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 2 --topic Kafka-Partitioner-Test
此時輸入如下,可以看到分數大於等於 6 分的都被分到 1 分區,而小於 6 分的都被分到了 0 分區。
score:6, partition=1, score:7, partition=1, score:8, partition=1, score:9, partition=1, score:10, partition=1, score:0, partition=0, score:1, partition=0, score:2, partition=0, score:3, partition=0, score:4, partition=0, score:5, partition=0, 分區器關閉
四、生產者其他屬性
上面生產者的創建都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有很多可配置屬性,如下:
1. acks
acks 參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的:
- acks=0 : 消息發送出去就認為已經成功了,不會等待任何來自伺服器的響應;
- acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自伺服器成功響應;
- acks=all :只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自伺服器的成功響應。
2. buffer.memory
設置生產者記憶體緩衝區的大小。
3. compression.type
默認情況下,發送的消息不會被壓縮。如果想要進行壓縮,可以配置此參數,可選值有 snappy,gzip,lz4。
4. retries
發生錯誤後,消息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。
5. batch.size
當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算。
6. linger.ms
該參數制定了生產者在發送批次之前等待更多消息加入批次的時間。
7. clent.id
客戶端 id,伺服器用來識別消息的來源。
8. max.in.flight.requests.per.connection
指定了生產者在收到伺服器響應之前可以發送多少個消息。它的值越高,就會佔用越多的記憶體,不過也會提升吞吐量,把它設置為 1 可以保證消息是按照發送的順序寫入伺服器,即使發生了重試。
9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
- timeout.ms 指定了 borker 等待同步副本返回消息的確認時間;
- request.timeout.ms 指定了生產者在發送數據時等待伺服器返迴響應的時間;
- metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如分區首領是誰)時等待伺服器返迴響應的時間。
10. max.block.ms
指定了在調用 send()
方法或使用 partitionsFor()
方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。
11. max.request.size
該參數用於控制生產者發送的請求大小。它可以指發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1000K ,那麼可以發送的單個最大消息為 1000K ,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1K。
12. receive.buffer.bytes & send.buffer.byte
這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用作業系統的默認值。
參考資料
- Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26
更多大數據系列文章可以參見 GitHub 開源項目: 大數據入門指南