Reactive Spring實戰 — 響應式Kafka交互
- 2021 年 6 月 22 日
- 筆記
- Reactive Spring實戰
本文分享如何使用KRaft部署Kafka集群,以及Spring中如何實現Kafka響應式交互。
KRaft
我們知道,Kafka使用Zookeeper負責為kafka存儲broker,Consumer Group等元數據,並使用Zookeeper完成broker選主等操作。
雖然使用Zookeeper簡化了Kafka的工作,但這也使Kafka的部署和運維更複雜。
Kafka 2.8.0開始移除了Zookeeper,並使用Kafka內部的仲裁(Quorum)控制器來取代ZooKeeper,官方稱這個控制器為 “Kafka Raft metadata mode”,即KRaft mode。從此用戶可以在不需要Zookeeper的情況下部署Kafka集群,這使Fafka更加簡單,輕量級。
使用KRaft模式後,用戶只需要專註於維護Kafka集群即可。
注意:由於該功能改動較大,目前Kafka2.8版本提供的KRaft模式是一個測試版本,不推薦在生產環境使用。相信Kafka後續版本很快會提供生產可用的kraft版本。
下面介紹一下如果使用Kafka部署kafka集群。
這裡使用3台機器部署3個Kafka節點,使用的Kafka版本為2.8.0。
- 生成ClusterId以及配置文件。
(1)使用kafka-storage.sh生成ClusterId。
$ ./bin/kafka-storage.sh random-uuid
dPqzXBF9R62RFACGSg5c-Q
(2)使用ClusterId生成配置文件
$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs
注意:只需要在生成一個ClusterId,並使用該ClusterId在所有機器上生成配置文件,即集群中所有節點使用的ClusterId需相同。
- 修改配置文件
腳本生成的配置文件只能用於單個Kafka節點,如果在部署Kafka集群,需要對配置文件進行一下修改。
(1)修改config/kraft/server.properties(稍後使用該配置啟動kafka)
process.roles=broker,controller
node.id=1
listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093
advertised.listeners=PLAINTEXT://172.17.0.2:9092
[email protected]:9093,[email protected]:9093,[email protected]:9093
process.roles指定了該節點角色,有以下取值
- broker: 這台機器將僅僅當作一個broker
- controller: 作為Raft quorum的控制器節點
- broker,controller: 包含以上兩者的功能
一個集群中不同節點的node.id需要不同。
controller.quorum.voters需要配置集群中所有的controller節點,配置格式為
(2)
kafka-storage.sh腳本生成的配置,默認將kafka數據存放在/tmp/kraft-combined-logs/,
我們還需要/tmp/kraft-combined-logs/meta.properties配置中的node.id,使其與server.properties配置中保持一起。
node.id=1
- 啟動kafka
使用kafka-server-start.sh腳本啟動Kafka節點
$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
下面測試一下該kafka集群
- 創建主題
$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
- 生產消息
$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
- 消費消息
$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning
這部分命令的使用與低版本的Kafka保持一致。
Kafka的功能暫時還不完善,這是展示一個簡單的部署示例。
Kafka文檔://github.com/apache/kafka/blob/trunk/config/kraft/README.md
Spring中可以使用Spring-Kafka、Spring-Cloud-Stream兩個框架實現kafka響應式交互。
下面分別看一下這兩個框架的使用。
Spring-Kafka
- 添加引用
添加spring-kafka引用
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
- 準備配置文件,內容如下
spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=warehouse-consumers
spring.kafka.consumer.properties.spring.json.trusted.packages=*
分別是生產者和消費者對應的配置,很簡單。
- 發送消息
Spring-Kakfa中可以使用ReactiveKafkaProducerTemplate發送消息。
首先,我們需要創建一個ReactiveKafkaProducerTemplate實例。(目前SpringBoot會自動創建KafkaTemplate實例,但不會創建ReactiveKafkaProducerTemplate實例)。
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties properties;
@Bean
public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());
ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);
return template;
}
}
KafkaProperties實例由SpringBoot自動創建,讀取上面配置文件中對應的配置。
接下來,就可以使用ReactiveKafkaProducerTemplate發送消息了
@Autowired
private ReactiveKafkaProducerTemplate template;
public static final String WAREHOUSE_TOPIC = "warehouse";
public Mono<Boolean> add(Warehouse warehouse) {
Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);
return resultMono.flatMap(rs -> {
if(rs.exception() != null) {
logger.error("send kafka error", rs.exception());
return Mono.just(false);
}
return Mono.just(true);
});
}
ReactiveKafkaProducerTemplate#send方法返回一個Mono(這是Spring Reactor中的核心對象),Mono中攜帶了SenderResult,SenderResult中的RecordMetadata、exception存儲該記錄的元數據(包括offset、timestamp等資訊)以及發送操作的異常。
- 消費消息
Spring-Kafka使用ReactiveKafkaConsumerTemplate消費消息。
@Service
public class WarehouseConsumer {
@Autowired
private KafkaProperties properties;
@PostConstruct
public void consumer() {
ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());
options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));
new ReactiveKafkaConsumerTemplate(options)
.receiveAutoAck()
.subscribe(record -> {
logger.info("Warehouse Record:" + record);
});
}
}
這裡與之前使用@KafkaListener註解實現的消息監聽者不同,不過也非常簡單,分為兩個步驟:
(1)ReceiverOptions#subscription方法將ReceiverOptions關聯到kafka主題
(2)創建ReactiveKafkaConsumerTemplate,並註冊subscribe的回調函數消費消息。
提示:receiveAutoAck方法會自動提交消費組offset。
Spring-Cloud-Stream
Spring-Cloud-Stream是Spring提供的用於構建消息驅動微服務的框架。
它為不同的消息中間件產品提供一種靈活的,統一的編程模型,可以屏蔽底層不同消息組件的差異,目前支援RabbitMQ、Kafka、RocketMQ等消息組件。
這裡簡單展示Spring-Cloud-Stream中實現Kafka響應式交互的示例,不深入介紹Spring-Cloud-Stream的應用。
- 引入spring-cloud-starter-stream-kafka的引用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
- 添加配置
spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/json
spring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2
# 消息格式
spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json
# 消息目的地,可以理解為Kafka主題
spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2
# 定義消費者消費組,可以理解為Kafka消費組
spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers
# 映射方法名
spring.cloud.function.definition=warehouse2;warehouse3
Spring-Cloud-Stream 3.1版本之後,@EnableBinding、@Output等StreamApi註解都標記為廢棄,並提供了一種更簡潔的函數式編程模型。
該版本後,用戶不需要使用註解,只要在配置文件中指定需要綁定的方法,Spring-Cloud-Stream會為用戶將這些方法與底層消息組件綁定,用戶可以直接調用這些方法發送消息,或者接收到消息時Spring-Cloud-Stream會調用這些方法消費消息。
通過以下格式定義輸入、輸出函數的相關屬性:
輸出(發送消息):<functionName> + -out- + <index>
輸入(消費消息):<functionName> + -in- + <index>
對於典型的單個輸入/輸出函數,index始終為0,因此它僅與具有多個輸入和輸出參數的函數相關。
Spring-Cloud-Stream支援具有多個輸入(函數參數)/輸出(函數返回值)的函數。
spring.cloud.function.definition配置指定需要綁定的方法名,不添加該配置,Spring-Cloud-Stream會自動嘗試綁定返回類型為Supplier/Function/Consumer的方法,但是使用該配置可以避免Spring-Cloud-Stream綁定混淆。
- 發送消息
用戶可以編寫一個返回類型為Supplier的方法,並定時發送消息
@PollableBean
public Supplier<Flux<Warehouse>> warehouse2() {
Warehouse warehouse = new Warehouse();
warehouse.setId(333L);
warehouse.setName("天下第一倉");
warehouse.setLabel("一級倉");
logger.info("Supplier Add : {}", warehouse);
return () -> Flux.just(warehouse);
}
定義該方法後,Spring-Cloud-Stream每秒調用一次該方法,生成Warehouse實例,並發送到Kafka。
(這裡方法名warehouse3已經配置在spring.cloud.function.definition中。)
通常場景下,應用並不需要定時發送消息,而是由業務場景觸發發送消息操作, 如Rest介面,
這時可以使用StreamBridge介面
@Autowired
private StreamBridge streamBridge;
public boolean add2(Warehouse warehouse) {
return streamBridge.send("warehouse2-out-0", warehouse);
}
暫時未發現StreamBridge如何實現響應式交互。
- 消費消息
應用要消費消息,只需要定義一個返回類型為Function/Consumer的方法即可。如下
@Bean
public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {
Logger logger = LoggerFactory.getLogger("WarehouseFunction");
return flux -> flux.doOnNext(data -> {
logger.info("Warehouse Data: {}", data);
}).then();
}
注意:方法名與<functionName> + -out- + <index>
/<functionName> + -in- + <index>
、
spring.cloud.function.definition中的配置需要保持一致,以免出錯。
SpringCloudStream文檔://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html
文章完整程式碼://gitee.com/binecy/bin-springreactive/tree/master/warehouse-service
如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!