Spring Boot Kafka概覽、配置及優雅地實現發布訂閱
- 2019 年 12 月 24 日
- 筆記
本文屬於翻譯,轉載註明出處,歡迎關注微信小程式小白AI部落格
微信公眾號小白AI
或者網站 https://xiaobaiai.net
1 前言
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完肯定可以的,有任何問題可以隨時交流!
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完肯定可以的,有任何問題可以隨時交流!
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完肯定可以的,有任何問題可以隨時交流!
本篇文章主要介紹Spring Kafka的常用配置、主題自動創建、發布消息到集群、訂閱消息(群組)、流處理配置以及嵌入式Kafka做測試配置相關內容,最後通過兩種方式去實現消息的發布和訂閱功能,其中一種是基於Spring Integration
方式。本文內容基於Spring Kafka2.3.3文檔及Spring Boot Kafka相關文檔,Spring創建了一個名為Spring kafka
的項目,它封裝了Apache的kafka客戶端部分(生產者/消費者/流處理等),以便在Spring項目中快速集成kafka,Spring-Kafka項目提供了Apache Kafka自動化配置,通過Spring Boot的簡化配置(以spring.kafka.*
作為前綴的配置參數),在Spring Boot中使用Kafka特別簡單。並且Spring Boot還提供了一個嵌入式Kafka代理方便做測試。
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
實現下面的所涉及到的功能實現,需要有如下環境:
- Java運行或開發環境(JRE/JDK)
- Kafka安裝成功
更多的配置可以參考《Kafka,ZK集群開發或部署環境搭建及實驗》
這一篇文章。
本文盡量做到闡述邏輯清晰,主要路線就是全局介紹Spring Kafka的主要功能及重點配置,而Spring Boot對Spring Kafka進一步簡化配置,通過Spring Boot中的Kafka幾大註解實現發布訂閱功能,同時通過Spring Integration + 自定義Kafka配置方式實現一個較為複雜的Kafka發布訂閱功能,本文通過自己實驗和整理了較久的時間,涵蓋了Spring Kafka大部分內容,希望大家耐心讀下來,有什麼問題隨時回饋,一起學習。
2 Spring Kafka功能概覽
Spring Kafka、Spring Integration和Kafka客戶端版本聯繫或者兼容性如下(截至2019年12月9日):
Spring for Apache Kafka |
Spring Integration for Apache Kafka Version |
kafka-clients |
---|---|---|
2.3.x |
3.2.x |
2.3.1 |
2.2.x |
3.1.x |
2.0.1, 2.1.x, 2.2.x |
2.1.x |
3.0.x |
1.0.x, 1.1.x, 2.0.0 |
1.3.x |
2.3.x |
0.11.0.x, 1.0.x |
具體更多版本特點可以看官網,spring kafka當前最新為2.3.4版本。
Spring Kafka相關的註解有如下幾個:
啟用由AbstractListenerContainerFactory 在封面(covers)下創建的Kafka監聽器註解端點,用於配置類;
如使用@EnableKafka
可以監聽AbstractListenerContainerFactory
子類目標端點,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子類。
public class ConcurrentKafkaListenerContainerFactory<K,V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
@EnableKafka
並不是在Spring Boot中啟用Kafka必須的,Spring Boot附帶了Spring Kafka的自動配置,因此不需要使用顯式的@EnableKafka
。如果想要自己實現Kafka配置類,則需要加上@EnableKafka
,如果你不想要Kafka自動配置,比如測試中,需要做的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
2.1 自動創建主題
? 要在應用啟動時就創建主題,可以添加NewTopic
類型的Bean。如果該主題已經存在,則忽略Bean。
2.2 發送消息
Spring的KafkaTemplate
是自動配置的,你可以直接在自己的Bean中自動連接它,如下例所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
KafkaTemplate
包裝了一個生產者,並提供了向kafka主題發送數據的方便方法。提供非同步和同步(發送阻塞)方法,非同步(發送非阻塞)方法返回ListenableFuture
,以此監聽非同步發送狀態,成功還是失敗,KafkaTemplate提供如下介面:
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); Map<MetricName, ? extends Metric> metrics(); List<PartitionInfo> partitionsFor(String topic); <T> T execute(ProducerCallback<K, V, T> callback); // Flush the producer. void flush(); interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); }
sendDefault
API 要求已向模板提供默認主題。部分API接受一個時間戳作為參數,並將該時間戳存儲在記錄中,如何存儲用戶提供的時間戳取決於Kafka主題上配置的時間戳類型,如果主題配置為使用CREATE_TIME
,則記錄用戶指定的時間戳(如果未指定則生成)。如果將主題配置為使用LOG_APPEND_TIME
,則忽略用戶指定的時間戳,並且代理將添加本地代理時間。metrics
和 partitionsFor
方法委託給底層Producer上的相同方法。execute方法提供對底層生產者的直接訪問
要使用模板,可以配置一個生產者工廠並在模板的構造函數中提供它。下面的示例演示了如何執行此操作:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { // KafkaTemplate構造函數中輸入生產者工廠配置 return new KafkaTemplate<Integer, String>(producerFactory()); }
然後,要使用模板,可以調用其方法之一發送消息。
當你使用包含Message<?>
參數的方法時,主題、分區和鍵資訊在消息頭中提供,有如下子項:
KafkaHeaders.TOPIC KafkaHeaders.PARTITION_ID KafkaHeaders.MESSAGE_KEY KafkaHeaders.TIMESTAMP
如訪問頭部資訊中某一項資訊:
public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC)); }
可選的功能是,可以使用ProducerListener
配置KafkaTemplate
,以獲得帶有發送結果(成功或失敗)的非同步回調,而不是等待將來完成。以下列表顯示了ProducerListener
介面的定義:
public interface ProducerListener<K, V> { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); void onError(String topic, Integer partition, K key, V value, Exception exception); boolean isInterestedInSuccess(); }
默認情況下,模板配置有LoggingProducerListener
,它只記錄錯誤,在發送成功時不執行任何操作。只有當isInterestedInSuccess
返回true時才調用onSuccess
。為了方便起見,如果你只想實現其中一個方法,那麼將提供抽象ProducerListenerAdapter
。對於isInterestedInSuccess
,它返回false。下面演示了非同步結果回調:
public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); }
如果希望阻止式發送執行緒等待結果,可以調用future
的get()
方法。你可能希望在等待之前調用flush()
,或者為了方便起見,模板有一個帶有autoFlush
參數的構造函數,該構造函數在每次發送時都會導致模板flush()
。不過,請注意,刷新可能會顯著降低性能:
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } }
使用DefaultKafkaProducerFactory:
如上面使用KafkaTemplate
中所示,ProducerFactory
用於創建生產者。默認情況下,當不使用事務時,DefaultKafkaProducerFactory
會創建一個供所有客戶機使用的單例生產者,如KafkaProducer
javadocs中所建議的那樣。但是,如果對模板調用flush(),這可能會導致使用同一個生產者的其他執行緒延遲。從2.3版開始,DefaultKafkaProducerFactory
有一個新屬性producerPerThread
。當設置為true
時,工廠將為每個執行緒創建(和快取)一個單獨的生產者,以避免此問題。
當
producerPerThread
為true時,當不再需要生產者時,用戶程式碼必須在工廠上調用closeThreadBoundProducer()
。這將實際關閉生產者並將其從ThreadLocal
中移除。調用reset()或destroy()不會清理這些生產者。
創建DefaultKafkaProducerFactory
時,可以通過調用只接受屬性映射的構造函數(請參閱使用KafkaTemplate中的示例)從配置中獲取鍵和/或值序列化器類,或者序列化程式實例可以傳遞給DefaultKafkaProducerFactory
構造函數(在這種情況下,所有生產者共享相同的實例)。或者,可以提供Supplier<Serializer> s
(從版本2.3開始),用於為每個生產者獲取單獨的Serializer
實例:
@Bean public ProducerFactory<Integer, CustomValue> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); } @Bean public KafkaTemplate<Integer, CustomValue> kafkaTemplate() { return new KafkaTemplate<Integer, CustomValue>(producerFactory()); }
使用ReplyingKafkaTemplate:
版本2.1.3
引入了KafkaTemplate
的一個子類來提供請求/應答語義。這個類名為ReplyingKafkaTemplate
,並且有一個方法(除了超類中的那些方法之外)。下面的列表顯示了方法簽名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record); RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout);
結果是一個ListenableFuture
,它被結果非同步填充(或者超時時出現異常)。結果還有一個sendFuture
屬性,這是調用KafkaTemplate.send()
的結果。你可以使用此Future確定發送操作的結果。這裡就不展開了。
2.3 接收消息
可以通過配置MessageListenerContainer
並提供消息監聽器或使用@KafkaListener
註解來接收消息。
2.3.1 消息監聽器
使用消息監聽器容器(message listener container)時,必須提供監聽器才能接收數據。目前有八個消息監聽器支援的介面。下面的列表顯示了這些介面:
// 使用自動提交或容器管理的提交方法之一時,使用此介面處理從Kafka 消費者 poll() 作接收的單個ConsumerRecord實例 public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } // 使用手動提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的單個ConsumerRecord實例 public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } // 使用自動提交或容器管理的提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的單個ConsumerRecord實例。提供對消費者對象的訪問。 public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } // 使用手動提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的單個ConsumerRecord實例。提供對消費者對象的訪問。 public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } // 使用自動提交或容器管理的提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的所有ConsumerRecord實例。使用此介面時不支援AckMode.RECORD,因為監聽器已獲得完整的批處理。 public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } // 使用手動提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的所有ConsumerRecord實例。 public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } // 使用自動提交或容器管理的提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的所有ConsumerRecord實例。使用此介面時不支援AckMode.RECORD,因為監聽器已獲得完整的批處理。提供對使用者對象的訪問。 public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } // 使用手動提交方法之一時,使用此介面處理從Kafka 消費者 poll() 操作接收的所有ConsumerRecord實例。提供對使用者對象的訪問。 public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
上述消費者對象不是執行緒安全的。只能在調用偵聽器的執行緒上調用其方法。
2.3.1.1 消息監聽器容器
提供了兩個MessageListenerContainer
的實現:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
從單個執行緒上的所有主題或分區接收所有消息(即一個分區只能分配到一個消費者,一個消費者可以被分配多個分區)。ConcurrentMessageListenerContainer
委託給一個或多個KafkaMessageListenerContainer
實例,以提供多執行緒使用,從多執行緒上去處理主題或分區的所有消息。
從Spring Kafka2.2.7版開始,你可以將RecordInterceptor
添加到偵聽器容器中;在調用偵聽器以允許檢查或修改記錄之前,將調用它。如果攔截器返回null,則不調用偵聽器。偵聽器是批處理偵聽器時不調用偵聽器。從2.3版開始,CompositeRecordInterceptor
可用於調用多個攔截器。
默認情況下,使用事務時,偵聽器在事務啟動後調用。從2.3.4版開始,你可以設置偵聽器容器的interceptBeforeTx
屬性,以便在事務啟動之前調用偵聽器。沒有為批處理偵聽器提供偵聽器,因為Kafka已經提供了ConsumerInterceptor
。
2.3.1.2 使用KafkaMessageListenerContainer
有如下構造函數可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset... topicPartitions)
每個都獲取一個ConsumerFactory
以及有關主題和分區的資訊,以及ContainerProperties
對象中的其他配置。ConcurrentMessageListenerContainer
(稍後介紹)使用第二個構造函數跨使用者實例分發TopicPartitionOffset
。ContainerProperties
具有以下構造函數:
public ContainerProperties(TopicPartitionOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern)
第一個構造函數接受一個TopicPartitionOffset
參數數組來顯式地指示容器要使用哪些分區(使用消費者的 assign()方法)和可選的初始偏移量。默認情況下,正值是絕對偏移量。默認情況下,負值是相對於分區內的當前最後偏移量。提供了TopicPartitionOffset
的構造函數,該構造函數接受一個附加的布爾參數。如果是true,則初始偏移(正偏移或負偏移)相對於該消耗器的當前位置。容器啟動時應用偏移量。第二個是主題數組,Kafka基於group.id
屬性:在組中分布分區來分配分區。第三個使用regex表達式來選擇主題。
要將MessageListener
分配給容器,可以在創建容器時使用ContainerProps.setMessageListener
方法。下面的示例演示了如何執行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
注意當創建一個Defaultkafkafkaconsumerfactory
時,使用構造器,該構造器僅以其特性為基礎,就意味著從配置中獲取了key/value的Deserializer類別。或者,反序列化程式實例可以傳遞給key/value的DefaultKafkaConsumerFactory
構造函數,在這種情況下,所有消費者共享相同的實例。另一個選項是提供Supplier<Deserializer>s
(從版本2.3開始),用於為每個使用者獲取單獨的反序列化程式實例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
有關可以設置的各種屬性的更多資訊,請參閱Javadoc 中ContainerProperties
。
從版本Spring Kafka 2.1.1開始,一個名為logContainerConfig
的新屬性就可用了。當啟用true和INFO日誌記錄時,每個偵聽器容器都會寫入一條日誌消息,總結其配置屬性。
例如,要將日誌級別更改為INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)
。
從版本Spring Kafka 2.2開始,添加了名為missingtopicsfailal
的新容器屬性(默認值:true)。如果代理上不存在任何客戶端發布或訂閱涉及到的主題,這將阻止容器啟動。如果容器配置為偵聽主題模式(regex),則不適用。以前,容器執行緒在consumer.poll()
方法中循環,等待在記錄許多消息時出現主題。除了日誌,沒有跡象表明有問題。要恢復以前的行為,可以將屬性設置為false,這個時候,Broker設置項allow.auto.create.topics=true,且這個容器屬性為false,則會自動創建不存在的topic。
2.3.1.3 使用 ConcurrentMessageListenerContainer
單個構造函數類似於第一個KafkaListenerContainer
構造函數。下面的列表顯示了構造函數的簽名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
它還有一個並發屬性。例如,container.setConcurrency(3)
即表示創建三個KafkaMessageListenerContainer
實例。對於第一個構造函數,Kafka使用它的組管理功能將分區分布到消費者之間。
當監聽多個主題時,默認的分區分布可能不是你期望的那樣。例如,如果你有三個主題,每個主題有五個分區,並且希望使用
concurrency=15
,那麼你只看到五個活動的消費者,每個消費者從每個主題中分配一個分區,其他十個消費者處於空閑狀態。這是因為默認的KafkaPartitionAssignor
是RangeAssignor
(參見其Javadoc)。對於這種情況,你可能需要考慮改用RoundRobinAssignor
,它將分區分布到所有使用者。然後,為每個使用者分配一個主題或分區。若要更改PartitionAssignor
,你可以在提供給DefaultKafkaConsumerFactory
的屬性中設置partition.assignment.strategy
消費者配置參數(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。使用Spring Boot時,可以按如下方式分配設置策略:
spring.kafka.consumer.properties.partition.assignment.strategy= org.apache.kafka.clients.consumer.RoundRobinAssignor
對於第二個構造函數,ConcurrentMessageListenerContainer
將TopicPartition
實例分布在委託KafkaMessageListenerContainer
實例上。
例如,如果提供了六個TopicPartition
實例,並發性為3;每個容器得到兩個分區。對於五個TopicPartition
實例,兩個容器得到兩個分區,第三個容器得到一個分區。如果並發性大於TopicPartitions
的數量,則會向下調整並發性,以便每個容器獲得一個分區。調整分區的方式可以使用命令行工具kafka-topics.sh
查詢和調整主題上的分區數。還可以添加一個NewTopic
Bean,如果NewTopic設定的數目大於當前數目,spring boot的自動配置的KafkaAdmin
將向上調整分區。
client.id屬性(如果已設置)將附加
-n
,其中n是對應於並發的消費者實例。當啟用JMX時,這是為MBeans提供唯一名稱所必需的。
從版本Spring Kafka 1.3開始,MessageListenerContainer
提供了對底層KafkaConsumer
的度量的訪問。對於ConcurrentMessageListenerContainer
,metrics()
方法返回所有目標KafkaMessageListenerContainer
實例的度量(metrics)。根據為底層KafkaConsumer
提供的client-id
度量被分組到Map<MetricName, ?extends Metric>
。
從2.3版開始,ContainerProperties
提供了一個idleBetweenPolls
選項,允許偵聽器容器中的主循環在KafkaConsumer.poll()
調用之間睡眠。從提供的選項中選擇實際睡眠間隔作為最小值,並且選擇max.poll.interval.ms
消費者配置和當前記錄批處理時間之間的差異。
2.3.1.4 提交偏移量
提供了幾個提交偏移量的選項。如果enable.auto.commit
使用者屬性為true
,則Kafka將根據其配置自動提交偏移量。如果為false
,則容器支援多個AckMode
設置(在下一個列表中描述)。默認的確認模式是批處理。從2.3版開始,框架將enable.auto.commit
設置為false
,除非在配置中顯式設置。以前,如果未設置屬性,則使用Kafka默認值(true)。消費者 poll()
方法返回一個或多個ConsumerRecords
。為每個記錄調用MessageListener
。以下列表描述了容器對每個AckMode
採取的操作:
- RECORD: 當偵聽器在處理記錄後返回時提交偏移量。
- BATCH: 處理完
poll()
返回的所有記錄後提交偏移量。 - TIME: 在處理完
poll()
返回的所有記錄後提交偏移量,只要超過上次提交後的ackTime
- COUNT: 在處理完
poll()
返回的所有記錄後提交偏移量,只要上次提交後收到ackCount
記錄。 - COUNT_TIME: 類似於
TIME
和COUNT
,但如果兩個條件都為true,則執行提交。 - MANUAL: 消息偵聽器負責
acknowledge()
和Acknowledgment
。之後,應用與BATCH相同的語義。 - MANUAL_IMMEDIATE: 偵聽器調用
Acknowledgement.acknowledge()
方法時立即提交偏移量。
MANUAL和MANUAL_IMMEDIATE 要求偵聽器是
AcknowledgingMessageListener
或BatchAcknowledgingMessageListener
。請參見消息偵聽器。
根據syncCommits
容器屬性,使用消費者上的commitSync()
或commitAsync()
方法。默認情況下,syncCommits
為true;另請參閱setSyncCommitTimeout
。請參閱setCommitCallback
以獲取非同步提交的結果;默認回調是LoggingCommitCallback
,它記錄錯誤(以及調試級別的成功)。
因為偵聽器容器有自己的提交偏移的機制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
為false。從2.3版開始,除非在使用者工廠或容器的使用者屬性重寫中特別設置,否則它將無條件地將其設置為false。
Acknowledgment
有以下方法:
public interface Acknowledgment { void acknowledge(); }
此方法使偵聽器可以控制何時提交偏移。
從版本2.3開始,確認介面有兩個附加方法nack(long sleep)
和nack(int index, long sleep)
。第一個用於記錄偵聽器,第二個用於批處理偵聽器。為偵聽器類型調用錯誤的方法將引發IllegalStateException
。
nack()只能在調用偵聽器的消費者執行緒上調用。
使用批處理偵聽器時,可以在發生故障的批內指定索引。調用nack()
時,將在對失敗和丟棄的記錄的分區執行索引和查找之前提交記錄的偏移量,以便在下次poll()
時重新傳遞這些偏移量。這是對SeekToCurrentBatchErrorHandler
的改進,SeekToCurrentBatchErrorHandler
只能查找整個批次以便重新交付。
注意:通過組管理使用分區分配時,確保sleep參數(加上處理上一次輪詢記錄所花費的時間)小於
consumer max.poll.interval.ms
屬性非常重要。
2.3.1.5 偵聽器容器自動啟動和手動啟動
偵聽器容器實現了SmartLifecycle
(通過SmartLifecycle
在Spring載入和初始化所有bean後,接著執行一些任務或者啟動需要的非同步服務),默認情況下autoStartup
為true
。容器在後期啟動(Integer.MAX-VALUE - 100
)。實現SmartLifecycle
以處理來自偵聽器的數據的其他組件應該在較早的階段啟動。-100
為以後的階段留出了空間,使組件能夠在容器之後自動啟動。比如我們通過@Bean
將監聽器容器交給Spring管理,這個時候通過SmartLifecycle
自動執行了初始化的任務,但是當我們手動通過new監聽器容器實例,則後初始化則不會執行,比如KafkaMessageListenerContainer
實例需要手動執行start()
。
autoStartup
在手動執行start中設置true與false沒有作用,可以參見@KafkaListener
聲明周期管理這一小節。
2.3.2 @KafkaListener註解
2.3.2.1 Record Listeners
@KafkaListener
註解用於將bean方法指定為偵聽器容器的偵聽器。bean包裝在一個MessagingMessageListenerAdapter
中,該適配器配置有各種功能,如轉換器,用於轉換數據(如有必要)以匹配方法參數。通過使用屬性佔位符(${…}
),或者可以使用SpEL(#{…}
)配置注釋上的大多數屬性。有關更多資訊,請參閱Javadoc。
@KafkaListener
:
id
:listener唯一id,當GroupId沒有被配置的時候,默認id為自動產生,此值指定後會覆蓋group id。containerFactory
:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,這裡面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory
,配置Bean名稱topics
:需要監聽的Topic,可監聽多個,可以是表達式或者佔位符關鍵字或者直接是主題名稱,如多個主題監聽:{"topic1" , "topic2"}
topicPattern
: 此偵聽器的主題模式。條目可以是「主題模式」、「屬性佔位符鍵」或「表達式」。框架將創建一個容器,該容器訂閱與指定模式匹配的所有主題,以獲取動態分配的分區。模式匹配將針對檢查時存在的主題周期性地執行。表達式必須解析為主題模式(支援字元串或模式結果類型)。這使用組管理,Kafka將為組成員分配分區。topicPartitions
:用於使用手動主題/分區分配時errorHandler
:監聽異常處理器,配置Bean名稱,默認為空groupId
:消費組IDidIsGroup
:id是否為GroupIdclientIdPrefix
:消費者Id前綴beanRef
:真實監聽容器的Bean名稱,需要在 Bean名稱前加 "__"
@KafkaListener
註解為簡單的POJO偵聽器提供了一種機制。下面的示例演示如何使用它:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
此機制生效需要@Configuration
類之一上的@EnableKafka
註解和用於配置基礎ConcurrentMessageListenerContainer
的偵聽器容器工廠。默認情況下,需要名為kafkaListenerContainerFactory
的bean。以下示例演示如何使用ConcurrentMessageListenerContain
:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
注意,要設置容器屬性,必須在工廠上使用getContainerProperties()
方法。它用作注入容器的實際屬性的模板。
從版本2.1.1開始,現在可以為註解創建的消費者設置client.id
屬性。clientdprefix
的後綴是-n
,其中n是一個整數,表示使用並發時的容器號。
從2.2版開始,現在可以通過使用批註本身的屬性來重寫容器工廠的並發性和自動啟動屬性。屬性可以是簡單值、屬性佔位符或SpEL表達式。下面的示例演示了如何執行此操作:
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... }
你還可以使用顯式主題和分區(以及可選的初始偏移量)配置POJO偵聽器。下面的示例演示了如何執行此操作:
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
你可以在partitions
或partitionOffsets
屬性中指定每個分區,但不能同時指定兩者。
使用手動AckMode
時,還可以向偵聽器提供Acknowledgment
。下面的示例還演示了如何使用不同的容器工廠:
@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
最後,可以從消息頭獲得有關消息的元數據。你可以使用以下頭名稱來檢索消息頭內容:
KafkaHeaders.OFFSET KafkaHeaders.RECEIVED_MESSAGE_KEY KafkaHeaders.RECEIVED_TOPIC KafkaHeaders.RECEIVED_PARTITION_ID KafkaHeaders.RECEIVED_TIMESTAMP KafkaHeaders.TIMESTAMP_TYPE
示例:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... }
2.3.2.2 批處理偵聽器
從版本1.1開始,可以配置@KafkaListener
方法來接收從消費者接收的整批消費者記錄。要將偵聽器容器工廠配置為創建批處理偵聽器,可以設置batchListener
屬性。下面的示例演示了如何執行此操作:
@Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; }
以下示例顯示如何接收有效載荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... }
主題、分區、偏移量等在與有效負載並行的頭中可用。下面的示例演示如何使用標題:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... }
或者,您可以接收消息列表Message<?>
對象,其中包含每個偏移量和每個消息中的其他詳細資訊,但它必須是唯一的參數(除了使用手動提交時的Acknowledgment和/
或Consumer<?, ?>
參數)。下面的示例演示如何執行此操作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) { ... } @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) { ... } @KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) { ... }
在這種情況下,不會對有效載荷執行轉換。如果BatchMessagingMessageConverter
配置了RecordMessageConverter
,則還可以向消息參數添加泛型類型,並轉換有效負載。有關詳細資訊,請參閱使用批處理偵聽器的負載轉換。
你還可以收到一個ConsumerRecord<?, ?>
對象,但它必須是唯一的參數(當使用手動提交或Consumer<?, ?>
參數時,除了可選的Acknowledgment)。下面的示例演示了如何執行此操作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) { ... } @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) { ... }
從版本2.2開始,偵聽器可以接收poll()
方法返回的完整的ConsumerRecords<?, ?>
對象,允許偵聽器訪問其他方法,例如partitions()
(返回列表中的TopicPartition
實例)和records
(TopicPartition)(獲取選擇性記錄)。同樣,這必須是唯一的參數(當使用手動提交或Consumer<?, ?>
參數時,除了可選的Acknowledgment)。下面的示例演示了如何執行此操作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") public void pollResults(ConsumerRecords<?, ?> records) { ... }
2.3.3 @KafkaListener@Payload驗證
從2.2版開始,現在更容易添加驗證程式來驗證@KafkaListener``@Payload
參數。以前,你必須配置一個自定義的DefaultMessageHandlerMethodFactory
並將其添加到註冊器中。現在,你可以將驗證器添加到註冊器本身。以下程式碼說明了如何執行此操作:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); } }
當你在Spring Boot使用validation starter
,會自動配置LocalValidatorFactoryBean
,如下例所示:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
以下示例演示如何驗證:
public static class ValidatedClass { @Max(10) private int bar; public int getBar() { return this.bar; } public void setBar(int bar) { this.bar = bar; } }
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
2.3.4 重新平衡監聽者
ContainerProperties
有一個名為consumerRebalanceListener
的屬性,該屬性接受Kafka客戶端的consumerRebalanceListene
r介面的實現。如果未提供此屬性,則容器將配置日誌偵聽器,該偵聽器將在資訊級別記錄重新平衡事件。該框架還添加了一個子介面ConsumerRawareRebalanceListener
。以下列表顯示了ConsumerRawareRebalanceListener
介面定義:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); }
2.3.5 轉發監聽者消息
從2.0版開始,如果還使用@SendTo
註解注釋@KafkaListener
,並且方法調用返回結果,則結果將轉發到@SendTo
指定的主題。如:
@KafkaListener(topics = "annotated21") @SendTo("!{request.value()}") // runtime SpEL public String replyingListener(String in) { ... } @KafkaListener(topics = "${some.property:annotated22}") @SendTo("#{myBean.replyTopic}") // config time SpEL public Collection<String> replyingBatchListener(List<String> in) { ... } @KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") @SendTo("annotated23reply") // static reply topic definition public String replyingListenerWithErrorHandler(String in) { ... } ... @KafkaListener(topics = "annotated25") @SendTo("annotated25reply1") public class MultiListenerSendTo { @KafkaHandler public String foo(String in) { ... } @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
2.3.6 @KafkaListener生命周期管理
為@KafkaListener
註解創建的偵聽器容器不是應用程式上下文中的bean。相反,它們是用KafkaListenerEndpointRegistry
類型的基礎設施bean註冊的。這個bean由框架自動聲明並管理容器的生命周期;它將自動啟動任何autoStartup
設置為true
的容器。所有容器工廠創建的所有容器必須處於同一phase
。有關詳細資訊,請參閱偵聽器容器自動啟動。你可以使用註冊表以編程方式管理生命周期。啟動或停止註冊表將啟動或停止所有已註冊的容器。或者,可以通過使用單個容器的id屬性來獲取對該容器的引用。可以在批註上設置autoStartup
,這將覆蓋容器工廠中配置的默認設置(setAutoStartup(true)
)。你可以從應用程式上下文中獲取對bean的引用,例如自動連接,以管理其註冊的容器。以下示例說明了如何執行此操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false") public void listen(...) { ... }
@Autowired private KafkaListenerEndpointRegistry registry; ... this.registry.getListenerContainer("myContainer").start(); ...
註冊表只維護其管理的容器的生命周期;聲明為bean的容器不受註冊表管理,可以從應用程式上下文中獲取。可以通過調用註冊表的getListenerContainers()
方法來獲取託管容器的集合。Spring Kafka版本2.2.5添加了一個方便方法getAllListenerContainers()
,它返回所有容器的集合,包括由註冊表管理的容器和聲明為bean的容器。返回的集合將包括任何已初始化的原型bean,但它不會初始化任何延遲bean聲明。
2.4 流處理
Spring for Apache Kafka
提供了一個工廠bean來創建StreamsBuilder
對象並管理其流的生命周期。只要kafka流在classpath上並且kafka流通過@EnableKafkaStreams
註解開啟,Spring Boot就會自動配置所需的KafkaStreamsConfiguration
bean。
啟用Kafka流意味著必須設置應用程式id和引導伺服器(bootstrap servers)。前者可以使用spring.kafka.streams.application-id
配置,如果未設置,則默認為spring.application.name
。後者可以全局設置,也可以專門為流覆寫。
使用專用屬性可以使用其他幾個屬性;可以使用spring.Kafka.streams.properties
命名空間設置其他任意Kafka屬性。有關詳細資訊,Additional Kafka Properties 。
默認情況下,由它創建的StreamBuilder
對象管理的流將自動啟動。可以使用spring.kafka.streams.auto-startup
屬性自定義此行為。
要使用工廠bean,只需將StreamsBuilder
連接到@bean
,如下例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public static class KafkaStreamsExampleConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } }
默認情況下,由它創建的StreamBuilder
對象管理的流將自動啟動。可以使用spring.kafka.streams.auto-startup
屬性自定義此行為。
2.5 附加配置
自動配置支援的屬性顯示在公用應用程式屬性中。注意,在大多數情況下,這些屬性(連字元或駝峰樣式)直接映射到Apache Kafka點式屬性。有關詳細資訊,請參閱Apache Kafka
文檔。
前面提到的幾個屬性應用於所有組件(生產者、消費者、管理員和流),但如果希望使用不同的值,則可以在組件級別指定。Apache Kafka指定重要性為HIGH
、MEDIUM
或LOW
的屬性。Spring Boot自動配置支援所有高重要性屬性、某些選定的中、低屬性以及任何沒有默認值的屬性。
只有Kafka支援的屬性的一個子集可以通過KafkaProperties
類直接使用,如果要使用不直接支援的其他屬性配置生產者或消費者,請使用以下屬性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth spring.kafka.streams.properties.prop.five=fifth
上面的參數設置示例將公共prop.one
Kafka屬性設置為first
(適用於生產者、消費者和管理員),prop.two
admin屬性設置為second
,prop.three
consumer屬性設置為third
,prop.four
producer屬性設置為fourth
,prop.five
streams屬性設置為fifth
。
你還可以配置Spring Kafka JsonDeserializer
,如下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
類似地,可以禁用JsonSerializer
在頭中發送類型資訊的默認行為:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
注意:以這種方式設置的屬性將覆蓋Spring Boot顯式支援的任何配置項。
2.6 使用Embdded Kafka做測試
Spring for Apache Kafka提供了一種使用嵌入式Apache Kafka代理測試項目的便捷方法。要使用此功能,請使用Spring Kafka測試模組中的@EmbeddedKafka
註解測試類。有關更多資訊,請參閱Spring For Apache Kafka參考手冊。
要使Spring Boot自動配置與前面提到的嵌入式Apache Kafka代理一起工作,需要將嵌入式代理地址(由EmbeddedKafkaBroker
填充)的系統屬性重新映射到Apache Kafka的Spring Boot配置屬性中。有幾種方法可以做到這一點:
- 提供系統屬性以將嵌入的代理地址映射到測試類中的
spring.kafka.bootstrap-servers
:
static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
- 在
@EmbeddedKafka
註解上配置屬性名:
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
- 在配置屬性中使用佔位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
2.7 Spring Integration支援
Spring Integration也有Kafka的適配器,因此我們可以很方便的採用Spring Integration去實現發布訂閱,當然你也可以不使用Spring Integration。
Spring Integration是什麼,具體有什麼作用,可以參考另一篇文章《Spring Integration最詳解》。
3 Spring Kafka配置參數
這裡對所有配置做個說明的是,spring kafka配置分全局配置和子模組配置,子模組配置會複寫全局配置,比如SSL認證可以全局配置,但是也可以在每個子模組,如消費者、生產者、流式處理中都可以單獨配置SSL(可能是微服務部署,消費者和生產者不在同一個應用中)。這裡重點介紹生產者和消費者配置吧,其他就不展開了,用到的時候再去查找和補充。
3.1 全局配置
# 用逗號分隔的主機:埠對列表,用於建立到Kafka群集的初始連接。覆蓋全局連接設置屬性 spring.kafka.bootstrap-servers # 在發出請求時傳遞給伺服器的ID。用於伺服器端日誌記錄 spring.kafka.client-id,默認無 # 用於配置客戶端的其他屬性,生產者和消費者共有的屬性 spring.kafka.properties.* # 消息發送的默認主題,默認無 spring.kafka.template.default-topic
3.2 生產者
Spring Boot中,Kafka 生產者
相關配置(所有配置前綴為spring.kafka.producer.
):
# 生產者要求Leader在考慮請求完成之前收到的確認數 spring.kafka.producer.acks # 默認批量大小。較小的批處理大小將使批處理不太常見,並可能降低吞吐量(批處理大小為零將完全禁用批處理) spring.kafka.producer.batch-size spring.kafka.producer.bootstrap-servers # 生產者可用於緩衝等待發送到伺服器的記錄的總記憶體大小。 spring.kafka.producer.buffer-memory # 在發出請求時傳遞給伺服器的ID。用於伺服器端日誌記錄。 spring.kafka.producer.client-id # 生產者生成的所有數據的壓縮類型 spring.kafka.producer.compression-type # 鍵的序列化程式類 spring.kafka.producer.key-serializer spring.kafka.producer.properties.* # 大於零時,啟用失敗發送的重試次數 spring.kafka.producer.retries spring.kafka.producer.ssl.key-password spring.kafka.producer.ssl.key-store-location spring.kafka.producer.ssl.key-store-password spring.kafka.producer.ssl.key-store-type spring.kafka.producer.ssl.protocol spring.kafka.producer.ssl.trust-store-location spring.kafka.producer.ssl.trust-store-password spring.kafka.producer.ssl.trust-store-type # 非空時,啟用對生產者的事務支援 spring.kafka.producer.transaction-id-prefix spring.kafka.producer.value-serializer
3.3 消費者
Spring Boot中,Kafka 消費者相關配置(所有配置前綴為spring.kafka.consumer.
):
# 如果「enable.auto.commit」設置為true,設置消費者偏移自動提交到Kafka的頻率,默認值無,單位毫秒(ms) spring.kafka.consumer.auto-commit-interval # 當Kafka中沒有初始偏移或伺服器上不再存在當前偏移時策略設置,默認值無,latest/earliest/none三個值設置 # earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 # none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常 spring.kafka.consumer.auto-offset-reset # 用逗號分隔的主機:埠對列表,用於建立到Kafka群集的初始連接。覆蓋全局連接設置屬性 spring.kafka.consumer.bootstrap-servers # 在發出請求時傳遞給伺服器的ID,用於伺服器端日誌記錄 spring.kafka.consumer.client-id # 消費者的偏移量是否在後台定期提交 spring.kafka.consumer.enable-auto-commit # 如果沒有足夠的數據來立即滿足「fetch-min-size」的要求,則伺服器在取回請求之前阻塞的最大時間量 spring.kafka.consumer.fetch-max-wait # 伺服器應為獲取請求返回的最小數據量。 spring.kafka.consumer.fetch-min-size # 標識此消費者所屬的默認消費者組的唯一字元串 spring.kafka.consumer.group-id # 消費者協調員的預期心跳間隔時間。 spring.kafka.consumer.heartbeat-interval # 用於讀取以事務方式寫入的消息的隔離級別。 spring.kafka.consumer.isolation-level # 密鑰的反序列化程式類 spring.kafka.consumer.key-deserializer # 在對poll()的單個調用中返回的最大記錄數。 spring.kafka.consumer.max-poll-records # 用於配置客戶端的其他特定於消費者的屬性。 spring.kafka.consumer.properties.* # 密鑰存儲文件中私鑰的密碼。 spring.kafka.consumer.ssl.key-password # 密鑰存儲文件的位置。 spring.kafka.consumer.ssl.key-store-location # 密鑰存儲文件的存儲密碼。 spring.kafka.consumer.ssl.key-store-password # 密鑰存儲的類型,如JKS spring.kafka.consumer.ssl.key-store-type # 要使用的SSL協議,如TLSv1.2, TLSv1.1, TLSv1 spring.kafka.consumer.ssl.protocol # 信任存儲文件的位置。 spring.kafka.consumer.ssl.trust-store-location # 信任存儲文件的存儲密碼。 spring.kafka.consumer.ssl.trust-store-password # 信任存儲區的類型。 spring.kafka.consumer.ssl.trust-store-type # 值的反序列化程式類。 spring.kafka.consumer.value-deserializer
3.4 監聽器
Spring Boot中,Kafka Listener相關配置(所有配置前綴為spring.kafka.listener.
):
# ackMode為「COUNT」或「COUNT_TIME」時偏移提交之間的記錄數 spring.kafka.listener.ack-count= spring.kafka.listener.ack-mode spring.kafka.listener.ack-time spring.kafka.listener.client-id spring.kafka.listener.concurrency spring.kafka.listener.idle-event-interval spring.kafka.listener.log-container-config # 如果Broker上不存在至少一個配置的主題(topic),則容器是否無法啟動, # 該設置項結合Broker設置項allow.auto.create.topics=true,如果為false,則會自動創建不存在的topic spring.kafka.listener.missing-topics-fatal=true # 非響應消費者的檢查間隔時間。如果未指定持續時間後綴,則將使用秒作為單位 spring.kafka.listener.monitor-interval spring.kafka.listener.no-poll-threshold spring.kafka.listener.poll-timeout spring.kafka.listener.type
3.5 管理
spring.kafka.admin.client-id # 如果啟動時代理不可用,是否快速失敗 spring.kafka.admin.fail-fast=false spring.kafka.admin.properties.* spring.kafka.admin.ssl.key-password spring.kafka.admin.ssl.key-store-location spring.kafka.admin.ssl.key-store-password spring.kafka.admin.ssl.key-store-type spring.kafka.admin.ssl.protocol spring.kafka.admin.ssl.trust-store-location spring.kafka.admin.ssl.trust-store-password spring.kafka.admin.ssl.trust-store-type
3.6 授權服務(JAAS)
spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=false spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule spring.kafka.jaas.options.*
3.7 SSL認證
spring.kafka.ssl.key-password spring.kafka.ssl.key-store-location spring.kafka.ssl.key-store-password spring.kafka.ssl.key-store-type spring.kafka.ssl.protocol spring.kafka.ssl.trust-store-location spring.kafka.ssl.trust-store-password spring.kafka.ssl.trust-store-type
3.8 Stream流處理
spring.kafka.streams.application-id spring.kafka.streams.auto-startup spring.kafka.streams.bootstrap-servers spring.kafka.streams.cache-max-size-buffering spring.kafka.streams.client-id spring.kafka.streams.properties.* spring.kafka.streams.replication-factor spring.kafka.streams.ssl.key-password spring.kafka.streams.ssl.key-store-location spring.kafka.streams.ssl.key-store-password spring.kafka.streams.ssl.key-store-type spring.kafka.streams.ssl.protocol spring.kafka.streams.ssl.trust-store-location spring.kafka.streams.ssl.trust-store-password spring.kafka.streams.ssl.trust-store-type spring.kafka.streams.state-dir
4 Kafka訂閱發布基本特性回顧
- 同一消費組下所有消費者協同消費訂閱主題的所有分區
- 同消費組,多消費者訂閱單主題單分區,則分區只會分配給其中一個消費者,除非這個消費者掛掉,才會分配給其他一個消費者消費消息,意思就是其他消費者在旁邊看著吃東西
- 同消費組,N個消費者訂閱單主題N個分區,則默認每個消費者都會被分配一個分區
- 同消費組,N個消費者訂閱單主題M個分區,當M > N時,則會有消費者多分配多於一個分區的情況;當M < N時,則會有空閑消費者,類似第一條
- 所有上面所說的消費者實例可以是執行緒方式或者是進程方式存在,所說的分區分配機制叫做重平衡(rebalance)
- 當消費者內成員個數發生變化會觸發重平衡;訂閱的主題個數發生變化會觸發重平衡;訂閱的主題分區個數發生變化會觸發重平衡;
- 總之就是一個分區只能分配到一個消費者,一個消費者可以被分配多個分區
- 消費者offset管理機制
- 每個主題分區中的消息都有一個唯一偏移值,具有先後順序,與消費者具有對應關係,消費者每消費一條消息,偏移量加1,並記錄在消費者本地,並定期的將記錄同步到服務端(Broker),這裡的同步機制是可以設置的
- 消息是被持久化的,當組內所有消費者重新訂閱主題時,可以設置是否從頭開始消費消息或者是從最後記錄的偏移值位置開始消費
- 分區和消費者個數如何設置
- 我們知道主題分區是分布在不同的Broker上的,每個分區對應一個消費者,從而具有消息處理具有很高的吞吐量
- 分區是調優Kafka並行度的最小單元,多執行緒消費者連接多分區消費消息,在實現上,通過socket連接,因此也會佔用文件句柄個數
- 創建分區都是會佔用一定記憶體的,並不是分區越多越好,當然現在kafka社區在優化這一部分,讓分區數達到更大,性能也不會有所影響
具體怎麼調優副本、分區、消費者等這裡就不展開了,後面專門來研究這個問題。
5 發布訂閱示例
實現下面的示例需要的環境:
- Kafka + Zookeeper單點伺服器或集群已配置好(如果環境搭建不熟悉,可以去翻看前面寫的關於Kafka的環境搭建和測試那一篇),或者是使用
Spring-kafka-test
embedded Kafka Server
- Spring Boot開發環境(2.2.1)
- JDK(1.8或以上)
- STS(4.4.RELEASE)
- MARVEN構建方式
5.1 使用Embedded Kafka Server
我們知道Kafka是Scala+Zookeeper
構建的,可以從官方網站下載部署包並在本地部署。不過,Spring Kafka Test已經封裝了Kafka測試的帶註解的一鍵式功能,以打開Kafka伺服器,從而簡化了驗證Kafka相關功能的開發過程,使用起來也非常簡單。
添加依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
啟動服務,下面使用Junit測試用例直接啟動Kafka伺服器服務,包括四個代理節點,Run as JUnit Test
。:
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
@EmbeddedKafka
中可以設置相關參數:
- value: 設置創建代理的個數
- count: 同value
- ports: 代理埠號列表
- brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"
注意:EmbeddedKafka這樣默認是沒有創建主題的。會提示
Topic(s) [test] is/are not present and missingTopicsFatal is true
錯誤。@EmbeddedKafka默認情況是創建一個代理,該代理具有一個不帶任何參數的隨機埠,它將在啟動日誌中輸出特定埠和默認配置項。
5.2 簡單的發布訂閱實現(無自定義配置)
下面實現一個簡單發布訂閱功能,通過前端WEB調用一個API,然後在該API控制器中得到請求後生產者開始發送消息,消費者後台監聽消息,如果收到消費者消息,則列印出來。
5.2.1 添加依賴及配置Kafka
添加Kafka依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置Kafka,這裡消費者和生產者在同一應用中,我們只需要配置Kafka Brokers的服務地址+埠:
server: port: 9000 spring: kafka: bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094 listener: # 設置不監聽主題錯誤,false時,如果broker設置了llow.auto.create.topics = true,生產者發送到未創建主題時,會默認自動創建主題 # 且默認創建的主題是單副本單分區的 missing-topics-fatal: false consumer: # 配置消費者消息offset是否自動重置(消費者重連會能夠接收最開始的消息) auto-offset-reset: earliest
5.2.2 添加生產者
@Service public class Producer { private static final Logger LOGGER = LogManager.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { LOGGER.info(String.format("===Producing message: {}", message)); this.kafkaTemplate.send(TOPIC, message); } }
5.2.3 添加消費者
@Service public class Consumer { private static final Logger LOGGER = LogManager.getLogger(Consumer.class); @KafkaListener(topics = "test", groupId = "group_test") public void consume(String message) throws IOException { LOGGER.info(String.format("#### -> Consumed message -> %s", message)); } }
5.2.4 添加WEB控制器
@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @GetMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }
5.2.5 測試
添加Spring Boot Application:
@SpringBootApplication public class TestKafkaApplication { public static void main(String[] args) { SpringApplication.run(TestKafkaApplication.class, args); } }
啟動Kafka Brokers後,需要手動創建主題(如果想自動創建,則需要藉助KafkaAdmin,或者是Kafka Broker設置了allow.auto.create.topics=true
且應用設置了listener.missing-topics-fatal=false
):
# 如果對kafka-topics.sh這裡不熟悉,可以去翻看前面寫的關於Kafka的相關文章(環境搭建和測試那一篇) # 創建test主題 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
打開瀏覽器測試:
http://localhost:9000/kafka/publish?message=hello
則應用控制台會列印hello
。整個發布訂閱的實現只使用了跟Kafka相關的@KafkaListener
註解接收消息和KafkaTemplate
模板發送消息,很是簡單。
5.3 基於自定義配置發布訂閱實現
上面是簡單的通過Spring Boot依賴的Spring Kafka配置即可快速實現發布訂閱功能,這個時候我們是無法在程式中操作這些配置的,因此這一小節就是利用我們之前《Spring Boot從零入門7_最新配置文件配置及優先順序詳細介紹》文章中講述的自定義配置文件方式去實現發布訂閱功能。
實現內容有:
- 自定義Kafka配置參數文件(非application.properties/yml)
- 可實現多生產者(每個生產者為單服務單執行緒),多消費者(非
@KafkaListener
實現消息監聽) - 支援SSL安全配置
- 監聽生產者
源碼不會直接貼,只給出主體部分。
配置文件:
@Configuration @ConfigurationProperties(prefix = "m2kc") @PropertySource("classpath:kafka.properties") @Validated public class M2KCKafkaConfig { @Value("${m2kc.kafka.bootstrap.servers}") private String kafkaBootStrapServers; @Value("${m2kc.kafka.key.serializer.class}") private String kafkaKeySerializerClass; ...... ...... }
生產者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaProducer { private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class); private String mTopic = "test"; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaTemplate<String, String> mKafkaTemplate; @Autowired public KafkaProducer(M2KCKafkaConfig kafkaConfig) { mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; mKafkaTemplate = getKafkaTemplate(); } public KafkaTemplate<String, String> getKafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaProducerFactory<String, String>(properties); } public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); } }
消費者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaConsumer implements InitializingBean { private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class); private String mTopic; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; @Autowired public KafkaConsumer(M2KCKafkaConfig kafkaConfig) { LOGGER.info("===KafkaConsumer construct"); mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; } @PostConstruct public void start(){ LOGGER.info("===KafkaConsumer start"); } @Override public void afterPropertiesSet() throws Exception { LOGGER.info("===afterPropertiesSet is called"); createContainer(); } private void createContainer() { mKafkaMessageListenerContainer = createKafkaMessageListenerContainer(); mKafkaMessageListenerContainer.setAutoStartup(false);; mKafkaMessageListenerContainer.start(); LOGGER.info("===", mKafkaMessageListenerContainer); } private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() { KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(), createContainerProperties()); LOGGER.info("===createKafkaMessageListenerContainer"); return container; } private ContainerProperties createContainerProperties() { ContainerProperties containerProps = new ContainerProperties(mTopic); containerProps.setMessageListener(createMessageListener()); return containerProps; } private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaConsumerFactory<String, String>(properties); } private MessageListener<String, String> createMessageListener() { return new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> data) { // TODO Auto-generated method stub LOGGER.info("===Consuming msg: {}", data.value()); } }; } }
繼承InitializingBean
只是為了初始化,也可以去掉,將初始化寫入了構造函數中。這裡的消費者和生產者都使用@Scope
,所以需要手動獲取實例,通過context去調用getBean()。另外配置文件沒有寫全,這裡需要注意。
5.3 基於Spring Integration發布訂閱實現
Spring Integration也有對Kafka支援的適配器,採用Spring Integration,我們也能夠快速的實現發布訂閱功能,且實現群組多消費者批量消費功能:
- 實現Kafka自定義配置類
- 採用Spring Integration
- 發布訂閱
- 群組多消費者批量消費
- 採用DSL特定領域語法去編寫
- 生產者發布成功與失敗異常處理

我們可以先看看整體的Kafka消息傳遞通道:
- 出站通道中KafkaProducerMessageHandler用於將消息發送到主題
- KafkaMessageDrivenChannelAdapter用於設置入站通道和消息處理
具體的Demo可以參考Github中的一個sample :
- https://github.com/spring-projects/spring-integration-samples
6 總結
本篇文章詳細介紹了Spring Kafka的發送消息和接收消息功能,其他包括Spring Kafka Stream的簡單介紹,以及在Spring Boot中如何通過三種方式去實現Kafka的發布訂閱功能,涉及了Kafka的多消費者多訂閱者,SSL安全傳輸,Spring Integration Kafka等。文章很長,把握總體,結合實際,差不多基本內容都有所涉及了。
7 知識擴展
Spring Expression Language(簡稱SpEL),在Spring中,不同於屬性佔位符${...}
,而SpEL
表達式則要放到#{...}
中(除程式碼塊中用Expression外)。如配置文件中有topics參數spring.kafka.topics
,則可以將配置文件中參數傳入註解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
。
SpEL
表達式常用示例:
// 字面量 #{3.1415926} // 浮點數 #{9.87E4} // 科學計數法表示98700 #{'Hello'} // String 類型 #{false} // Boolean 類型 // 引用Bean、屬性和方法 #{sgtPeppers} // 使用這個bean #{sgtPeppers.artist} // 引用bean中的屬性 #{sgtPeppers.selectArtist()} // 引用bean中的方法 #{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操作 #{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null則執行toUpperCase() // 訪問類作用域的方法和常量的話,使用T()這個關鍵的運算符 #{T(java.lang.Math)} #{T(java.lang.Math).PI} // 引用PI的值 #{T(java.lang.Math).random()} // 獲取0-1的隨機數 #{T(System).currentTimeMillis()} // 獲取時間到當前的毫秒數 // 替代屬性佔位符獲取配置文件屬性值 @Value("#{表達式}" private String variable;
8 參考資料
- https://docs.spring.io/spring-kafka/docs/2.3.4.RELEASE/reference/html/
- https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html
- https://blog.csdn.net/lishuangzhe7047/article/details/74530417
- https://docs.spring.io/spring-boot/docs/2.2.0.RELEASE/reference/htmlsingle/#boot-features-kafka
- https://docs.spring.io/spring-boot/docs/2.2.0.RELEASE/reference/htmlsingle/#common-application-properties
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/EnableKafka.html
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html
- https://www.javatt.com/p/16904
- https://github.com/cwenao/springboot_cwenao
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/package-summary.html
- https://docs.spring.io/spring-kafka/reference/html/#spring-integration
- https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html
- https://www.intertech.com/Blog/spring-integration-enterprise-integration-veg-o-matic/
- https://joshlong.com/jl/blogPost/spring_integration_adapters_gateways_and_channels.html
- https://examples.javacodegeeks.com/enterprise-java/spring/integration/spring-integration-kafka-tutorial/
- https://www.orchome.com/553
- https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/java-dsl.html
- https://programming.vip/docs/spring-boot-integration-kafka-spring-kafka-in-depth-exploration.html (事務型消息)
- https://docs.confluent.io/current/kafka/authentication_ssl.html
- https://github.com/spring-projects/spring-kafka/issues/361
- https://www.jianshu.com/p/27fd3754bb9c
- https://www.jianshu.com/p/cec449a7e73a
- https://memorynotfound.com/spring-kafka-batch-listener-example/