什么,kafka能够从follower副本读数据了 —kafka新功能介绍

最近看了kafka2.4新版本的一些功能特性,不得不说,在kafka2.0以后,kafka自身就比较少推出一些新的feature了,基本都是一些修修补补的东西。倒是kafka connect和kafka stream相关的开发工作做的比较多。可能kafka的野心也不局限于要当一个中间件,而是要实现一个流处理系统的生态了。

这次要介绍的是我觉得比较有意思的两个特性,一个是kafka支持从follower副本读取数据,当然这个功能并不是为了提供读取性能,后面再详细介绍。另一个则是新推出的sticky partitioner功能,我猜是从rebalance的StickyAssignor中得到灵感,发现producer的分区策略也可以这样搞,233,这个feature主要作用是提高性能。

这两个feature都是kafka2.4.0版本推出的,如果想使用这些新feature,那么不妨升级下吧~

follower副本读取数据(consumer fetch from closest replica)

背景

在早先kafka的设计中,为了使consumer读取数据能够保持一致,是只允许consumer读取leader副本的数据的。即follower replica只是单纯地备份数据的作用。那推出follower replica fetch功能的背景是什么呢?

举个比较常见的场景,kafka存在多个数据中心,不同数据中心存在于不同的机房,当其中一个数据中心需要向另一个数据中心同步数据的时候,由于只能从leader replica消费数据,那么它不得不进行跨机房获取数据,而这些流量带宽通常是比较昂贵的(尤其是云服务器)。即无法利用本地性来减少昂贵的跨机房流量。

所以kafka推出这一个功能,就是帮助类似这种场景,节约流量资源。并且这种功能似乎还可以和新推出的mirror maker2相互配合,实现多个数据源的数据同步,不过我自己还没测试过。

rack功能介绍

要说follower replica fetch,那就不得不先说rack功能,这个是kafka比较早就推出的功能,是Kafka对机架感知提供了的基本支持,可以将其用于控制副本的放置,详细内容可以参阅这篇Kafka机架感知文章。

使用方式,其实就是一个broker端的参数,broker.rack,这个参数可以说明当前broker在哪个机房。

举上面文章中的例子,如果一个数据中心的集群分布如下:
kafka-rack功能

那么可以这样配置:

  • broker0 -> rack1
  • broker1 -> rack1
  • broker2 -> rack2
  • broker3 -> rack2

这样其实就是相当于给broker打一个标签,当新建topic,比如新建一个两个副本 & 两个分区的topic,kafka至少会自动给rack1或rack2分配全部分区的一个副本。什么,你说要是创建两个分区一个副本的topic该怎么分。。。抱歉,我给不了答案。等你自己实践然后评论跟我说下答案 =。=

replica fetch功能测试

OK,上面介绍的rack功能,我们就能发现,这个其实跟跨机房读数据的这种场景是很搭的。在跨机房多数据中心场景中,如果数据中心A,一个副本放在数据中心B的机房中,只要让数据中心B的consumer能够读数据中心A的那个replica的数据(follower副本)读数据,那不就万事大吉。

社区也是这样想的,所以就推出了这个功能。让消费者可以指定rack id,然后可以不从消费者读取数据。要实现这个目的,需要先配置两个参数:

replica.selector.class

  • broker端配置
  • 配置名:replica.selector.class
  • 解释:ReplicaSelector实现类的全名,包括路径 (比如 RackAwareReplicaSelector 即按 rack id 指定消费)
  • 默认:从 Leader 消费的 LeaderSelector

为了支持这个功能,kafka修改了这部分的接口实现,源码中新增一个ReplicaSelector接口,如果用户有自定义消费策略的需求,也可以继承这个接口实现自己的功能。

目前这个接口有两个实现类,一个是LeaderSelector,即从leader副本读数据。另一个则是RackAwareReplicaSelector,会去到指定的rack id读数据。

client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的broker.rack相同,表示去哪个rack中获取数据。
  • 默认:null

这个参数只有在上面的replica.selector.class指定为RackAwareReplicaSelector且broekr指定了broker.rack才会生效。

这个功能要测试也挺简单的,可以直接搭建一个两个broker的kafka集群,配置broker.rack,然后使用consumer客户端指定client.rack发送到非leader的节点查数据就行了。另外,可以使用这条命令查看网卡流量信息:

sar -n DEV 1 300

存在问题

从follower replica读取数据肯定有问题,最可能的问题就是落后节点的问题,从这样的节点读取数据会面临什么样的情况呢?官方给出了几种场景及解决办法。先看看这张图
img

主要有四种可能出现问题的情况,我们分别来看看应该如何解决:

Case 1(uncommitted offset)

这个场景是follower接收到数据但还未committed offset,这个时候,若消费者的offet消费到high watemark到log end offset之间的那段(Case 1黄色那段),会返回空数据,而不是一个错误信息。直到这段内容 committed。

case 2(unavailable offset)

这种场景应该发生于慢节点的情况下,满节点的broker还未接收到实际数据,但已经跟leader通信知道有部分数据committed了(case 2黄色部分)。当遇到这种情况,consumer 消费到时候,会返回 OFFSET_NOT_AVAILABLE 错误信息。

case 3(offset too small)

这种情况可能出现在消费者指定了 offset 的情况。那么在指定不同auto.offset.reset的时候有不同的情况。

  1. If the reset policy is “earliest,” fetch the log start offset of the current replica that raised the out of range error.
  2. If the reset policy is “latest,” fetch the log end offset from the leader.
  3. If the reset policy is “none,” raise an exception.

case 4(offset too large)

遇到这种情况,会返回一个 broker 会返回一个 OFFSET_OUT_OF_RANGE 的错误。

但 OFFSET_OUT_OF_RANGE 遇到这种错误的时候也有多种可能,官方给出当 consumer 遇到这种问题的解决思路,

Use the OffsetForLeaderEpoch API to verify the current position with the leader.

  1. If the fetch offset is still valid, refresh metadata and continue fetching
  2. If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
  3. Otherwise, follow the same steps above as in case 3.

sticky partitioner功能

背景

kafka producer发送数据并不是一个一个消息发送,而是取决于两个producer端参数。一个是linger.ms,默认是0ms,当达到这个时间后,kafka producer就会立刻向broker发送数据。另一个参数是batch.size,默认是16kb,当产生的消息数达到这个大小后,就会立即向broker发送数据。

按照这个设计,从直观上思考,肯定是希望每次都尽可能填满一个batch再发送到一个分区。但实际决定batch如何形成的一个因素是分区策略(partitioner strategy)。在Kafka2.4版本之前,在producer发送数据默认的分区策略是轮询策略(没指定keyd的情况),这在我以前的文章有说到过详细解析kafka之kafka分区和副本。如果多条消息不是被发送到相同的分区,它们就不能被放入到一个batch中。

所以如果使用默认的轮询partition策略,可能会造成一个大的batch被轮询成多个小的batch的情况。鉴于此,kafka2.4的时候推出一种新的分区策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy会随机地选择另一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。

鉴于小batch可能导致延时增加,之前对于无Key消息的分区策略效率很低。社区于2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。该策略是一种全新的策略,能够显著地降低给消息指定分区过程中的延时。

使用Sticky Partitioner有助于改进消息批处理,减少延迟,并减少broker的负载。

功能解析

sticky Partitioner实现的代码是在UniformStickyPartitioner里面。贴下代码看看:

public class UniformStickyPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }

    public void close() {}

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

我们主要关注UniformStickyPartitioner#partition()方法,可以看到,它是直接通过一个cache类获取相同的分区,这表示新的record会一直发送到同一个分区中,除非生成新的batch,触发了UniformStickyPartitioner#onNewBatch()方法才会换分区。

可以看看RoundRobinPartitioner#partition()方法(即轮询分区策略)进行对比,就能发现比较明显的对比。

这个sticky partitioner最大的好处就是性能较好,按照官方给出的测试结果,使用sticky partitioner测试可以减少50%的延时,吞吐也有相对应的提高。我自己测了下数据基本出入不大。

另外说明下,在kafka2.4以后,默认的partitioner分区策略,已经包含了sticky partitioner了,所以升级到kafka2.4以后,并不需要任何修改就能享受到性能到极大提升。这里可以看下kafka2.4版本的策略说明:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

有一点挺奇怪到,在测试过程中(使用bin/kafka-producer-perf-test.sh测试),发现DefaultPartitioner的性能要比UniformStickyPartitioner的性能要好一些,不确定是什么原因,知道到小伙伴可以在评论区给出答案:)

参考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner
以上~

Tags: