Reactive Spring實戰 — 響應式Kafka交互

本文分享如何使用KRaft部署Kafka集群,以及Spring中如何實現Kafka響應式交互。

KRaft

我們知道,Kafka使用Zookeeper負責為kafka存儲broker,Consumer Group等元數據,並使用Zookeeper完成broker選主等操作。
雖然使用Zookeeper簡化了Kafka的工作,但這也使Kafka的部署和運維更複雜。

Kafka 2.8.0開始移除了Zookeeper,並使用Kafka內部的仲裁(Quorum)控制器來取代ZooKeeper,官方稱這個控制器為 “Kafka Raft metadata mode”,即KRaft mode。從此用戶可以在不需要Zookeeper的情況下部署Kafka集群,這使Fafka更加簡單,輕量級。
使用KRaft模式後,用戶只需要專註於維護Kafka集群即可。

注意:由於該功能改動較大,目前Kafka2.8版本提供的KRaft模式是一個測試版本,不推薦在生產環境使用。相信Kafka後續版本很快會提供生產可用的kraft版本。

下面介紹一下如果使用Kafka部署kafka集群。
這裡使用3台機器部署3個Kafka節點,使用的Kafka版本為2.8.0。

  1. 生成ClusterId以及配置文件。
    (1)使用kafka-storage.sh生成ClusterId。
$ ./bin/kafka-storage.sh random-uuid
dPqzXBF9R62RFACGSg5c-Q

(2)使用ClusterId生成配置文件

$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs

注意:只需要在生成一個ClusterId,並使用該ClusterId在所有機器上生成配置文件,即集群中所有節點使用的ClusterId需相同。

  1. 修改配置文件
    腳本生成的配置文件只能用於單個Kafka節點,如果在部署Kafka集群,需要對配置文件進行一下修改。

(1)修改config/kraft/server.properties(稍後使用該配置啟動kafka)

process.roles=broker,controller 
node.id=1
listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093
advertised.listeners=PLAINTEXT://172.17.0.2:9092
[email protected]:9093,[email protected]:9093,[email protected]:9093

process.roles指定了該節點角色,有以下取值

  • broker: 這台機器將僅僅當作一個broker
  • controller: 作為Raft quorum的控制器節點
  • broker,controller: 包含以上兩者的功能

一個集群中不同節點的node.id需要不同。
controller.quorum.voters需要配置集群中所有的controller節點,配置格式為@:

(2)
kafka-storage.sh腳本生成的配置,默認將kafka數據存放在/tmp/kraft-combined-logs/,
我們還需要/tmp/kraft-combined-logs/meta.properties配置中的node.id,使其與server.properties配置中保持一起。

node.id=1
  1. 啟動kafka
    使用kafka-server-start.sh腳本啟動Kafka節點
$ ./bin/kafka-server-start.sh ./config/kraft/server.properties

下面測試一下該kafka集群

  1. 創建主題
$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 
  1. 生產消息
$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
  1. 消費消息
$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning

這部分命令的使用與低版本的Kafka保持一致。

Kafka的功能暫時還不完善,這是展示一個簡單的部署示例。
Kafka文檔://github.com/apache/kafka/blob/trunk/config/kraft/README.md

Spring中可以使用Spring-Kafka、Spring-Cloud-Stream兩個框架實現kafka響應式交互。
下面分別看一下這兩個框架的使用。

Spring-Kafka

  1. 添加引用
    添加spring-kafka引用
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.8.RELEASE</version>
</dependency>
  1. 準備配置文件,內容如下
spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=warehouse-consumers
spring.kafka.consumer.properties.spring.json.trusted.packages=*

分別是生產者和消費者對應的配置,很簡單。

  1. 發送消息
    Spring-Kakfa中可以使用ReactiveKafkaProducerTemplate發送消息。
    首先,我們需要創建一個ReactiveKafkaProducerTemplate實例。(目前SpringBoot會自動創建KafkaTemplate實例,但不會創建ReactiveKafkaProducerTemplate實例)。
@Configuration
public class KafkaConfig {
    @Autowired
    private KafkaProperties properties;

    @Bean
    public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
        SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());
        ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);
        return template;
    }
}

KafkaProperties實例由SpringBoot自動創建,讀取上面配置文件中對應的配置。

接下來,就可以使用ReactiveKafkaProducerTemplate發送消息了

    @Autowired
    private ReactiveKafkaProducerTemplate template;

    public static final String WAREHOUSE_TOPIC = "warehouse";
    public Mono<Boolean> add(Warehouse warehouse) {
        Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);
        return resultMono.flatMap(rs -> {
            if(rs.exception() != null) {
                logger.error("send kafka error", rs.exception());
                return Mono.just(false);
            }
            return Mono.just(true);
        });
    }

ReactiveKafkaProducerTemplate#send方法返回一個Mono(這是Spring Reactor中的核心對象),Mono中攜帶了SenderResult,SenderResult中的RecordMetadata、exception存儲該記錄的元數據(包括offset、timestamp等資訊)以及發送操作的異常。

  1. 消費消息
    Spring-Kafka使用ReactiveKafkaConsumerTemplate消費消息。
@Service
public class WarehouseConsumer {
    @Autowired
    private KafkaProperties properties;

    @PostConstruct
    public void consumer() {
        ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());
        options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));
        new ReactiveKafkaConsumerTemplate(options)
                .receiveAutoAck()
                .subscribe(record -> {
                    logger.info("Warehouse Record:" + record);
                });
    }
}

這裡與之前使用@KafkaListener註解實現的消息監聽者不同,不過也非常簡單,分為兩個步驟:
(1)ReceiverOptions#subscription方法將ReceiverOptions關聯到kafka主題
(2)創建ReactiveKafkaConsumerTemplate,並註冊subscribe的回調函數消費消息。
提示:receiveAutoAck方法會自動提交消費組offset。

Spring-Cloud-Stream

Spring-Cloud-Stream是Spring提供的用於構建消息驅動微服務的框架。
它為不同的消息中間件產品提供一種靈活的,統一的編程模型,可以屏蔽底層不同消息組件的差異,目前支援RabbitMQ、Kafka、RocketMQ等消息組件。

這裡簡單展示Spring-Cloud-Stream中實現Kafka響應式交互的示例,不深入介紹Spring-Cloud-Stream的應用。

  1. 引入spring-cloud-starter-stream-kafka的引用
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
  1. 添加配置
spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/json
spring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2
# 消息格式
spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json
# 消息目的地,可以理解為Kafka主題
spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2
# 定義消費者消費組,可以理解為Kafka消費組
spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers
# 映射方法名
spring.cloud.function.definition=warehouse2;warehouse3

Spring-Cloud-Stream 3.1版本之後,@EnableBinding、@Output等StreamApi註解都標記為廢棄,並提供了一種更簡潔的函數式編程模型。
該版本後,用戶不需要使用註解,只要在配置文件中指定需要綁定的方法,Spring-Cloud-Stream會為用戶將這些方法與底層消息組件綁定,用戶可以直接調用這些方法發送消息,或者接收到消息時Spring-Cloud-Stream會調用這些方法消費消息。

通過以下格式定義輸入、輸出函數的相關屬性:
輸出(發送消息):<functionName> + -out- + <index>
輸入(消費消息):<functionName> + -in- + <index>
對於典型的單個輸入/輸出函數,index始終為0,因此它僅與具有多個輸入和輸出參數的函數相關。
Spring-Cloud-Stream支援具有多個輸入(函數參數)/輸出(函數返回值)的函數。

spring.cloud.function.definition配置指定需要綁定的方法名,不添加該配置,Spring-Cloud-Stream會自動嘗試綁定返回類型為Supplier/Function/Consumer的方法,但是使用該配置可以避免Spring-Cloud-Stream綁定混淆。

  1. 發送消息
    用戶可以編寫一個返回類型為Supplier的方法,並定時發送消息
    @PollableBean
    public Supplier<Flux<Warehouse>> warehouse2() {
        Warehouse warehouse = new Warehouse();
        warehouse.setId(333L);
        warehouse.setName("天下第一倉");
        warehouse.setLabel("一級倉");

        logger.info("Supplier Add : {}", warehouse);
        return () -> Flux.just(warehouse);
    }

定義該方法後,Spring-Cloud-Stream每秒調用一次該方法,生成Warehouse實例,並發送到Kafka。
(這裡方法名warehouse3已經配置在spring.cloud.function.definition中。)

通常場景下,應用並不需要定時發送消息,而是由業務場景觸發發送消息操作, 如Rest介面,
這時可以使用StreamBridge介面

    @Autowired
    private StreamBridge streamBridge;

    public boolean add2(Warehouse warehouse) {
        return streamBridge.send("warehouse2-out-0", warehouse);
    }

暫時未發現StreamBridge如何實現響應式交互。

  1. 消費消息
    應用要消費消息,只需要定義一個返回類型為Function/Consumer的方法即可。如下
    @Bean
    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {
        Logger logger = LoggerFactory.getLogger("WarehouseFunction");
        return flux -> flux.doOnNext(data -> {
            logger.info("Warehouse Data: {}", data);
        }).then();
    }

注意:方法名與<functionName> + -out- + <index>/<functionName> + -in- + <index>
spring.cloud.function.definition中的配置需要保持一致,以免出錯。

SpringCloudStream文檔://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html

文章完整程式碼://gitee.com/binecy/bin-springreactive/tree/master/warehouse-service

如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!