什么,kafka能够从follower副本读数据了 —kafka新功能介绍
最近看了kafka2.4新版本的一些功能特性,不得不说,在kafka2.0以后,kafka自身就比较少推出一些新的feature了,基本都是一些修修补补的东西。倒是kafka connect和kafka stream相关的开发工作做的比较多。可能kafka的野心也不局限于要当一个中间件,而是要实现一个流处理系统的生态了。
这次要介绍的是我觉得比较有意思的两个特性,一个是kafka支持从follower副本读取数据,当然这个功能并不是为了提供读取性能,后面再详细介绍。另一个则是新推出的sticky partitioner功能,我猜是从rebalance的StickyAssignor中得到灵感,发现producer的分区策略也可以这样搞,233,这个feature主要作用是提高性能。
这两个feature都是kafka2.4.0版本推出的,如果想使用这些新feature,那么不妨升级下吧~
follower副本读取数据(consumer fetch from closest replica)
背景
在早先kafka的设计中,为了使consumer读取数据能够保持一致,是只允许consumer读取leader副本的数据的。即follower replica只是单纯地备份数据的作用。那推出follower replica fetch功能的背景是什么呢?
举个比较常见的场景,kafka存在多个数据中心,不同数据中心存在于不同的机房,当其中一个数据中心需要向另一个数据中心同步数据的时候,由于只能从leader replica消费数据,那么它不得不进行跨机房获取数据,而这些流量带宽通常是比较昂贵的(尤其是云服务器)。即无法利用本地性来减少昂贵的跨机房流量。
所以kafka推出这一个功能,就是帮助类似这种场景,节约流量资源。并且这种功能似乎还可以和新推出的mirror maker2相互配合,实现多个数据源的数据同步,不过我自己还没测试过。
rack功能介绍
要说follower replica fetch,那就不得不先说rack功能,这个是kafka比较早就推出的功能,是Kafka对机架感知提供了的基本支持,可以将其用于控制副本的放置,详细内容可以参阅这篇Kafka机架感知文章。
使用方式,其实就是一个broker端的参数,broker.rack,这个参数可以说明当前broker在哪个机房。
举上面文章中的例子,如果一个数据中心的集群分布如下:
那么可以这样配置:
- broker0 -> rack1
- broker1 -> rack1
- broker2 -> rack2
- broker3 -> rack2
这样其实就是相当于给broker打一个标签,当新建topic,比如新建一个两个副本 & 两个分区的topic,kafka至少会自动给rack1或rack2分配全部分区的一个副本。什么,你说要是创建两个分区一个副本的topic该怎么分。。。抱歉,我给不了答案。等你自己实践然后评论跟我说下答案 =。=
replica fetch功能测试
OK,上面介绍的rack功能,我们就能发现,这个其实跟跨机房读数据的这种场景是很搭的。在跨机房多数据中心场景中,如果数据中心A,一个副本放在数据中心B的机房中,只要让数据中心B的consumer能够读数据中心A的那个replica的数据(follower副本)读数据,那不就万事大吉。
社区也是这样想的,所以就推出了这个功能。让消费者可以指定rack id,然后可以不从消费者读取数据。要实现这个目的,需要先配置两个参数:
replica.selector.class
- broker端配置
- 配置名:replica.selector.class
- 解释:ReplicaSelector实现类的全名,包括路径 (比如 RackAwareReplicaSelector 即按 rack id 指定消费)
- 默认:从 Leader 消费的 LeaderSelector
为了支持这个功能,kafka修改了这部分的接口实现,源码中新增一个ReplicaSelector
接口,如果用户有自定义消费策略的需求,也可以继承这个接口实现自己的功能。
目前这个接口有两个实现类,一个是LeaderSelector
,即从leader副本读数据。另一个则是RackAwareReplicaSelector
,会去到指定的rack id读数据。
client.rack
- consumer端配置
- 配置名:client.rack
- 解释:这个参数需要和broker端指定的
broker.rack
相同,表示去哪个rack中获取数据。 - 默认:null
这个参数只有在上面的replica.selector.class
指定为RackAwareReplicaSelector
且broekr指定了broker.rack
才会生效。
这个功能要测试也挺简单的,可以直接搭建一个两个broker的kafka集群,配置broker.rack,然后使用consumer客户端指定client.rack发送到非leader的节点查数据就行了。另外,可以使用这条命令查看网卡流量信息:
sar -n DEV 1 300
存在问题
从follower replica读取数据肯定有问题,最可能的问题就是落后节点的问题,从这样的节点读取数据会面临什么样的情况呢?官方给出了几种场景及解决办法。先看看这张图
主要有四种可能出现问题的情况,我们分别来看看应该如何解决:
Case 1(uncommitted offset)
这个场景是follower接收到数据但还未committed offset,这个时候,若消费者的offet消费到high watemark到log end offset之间的那段(Case 1黄色那段),会返回空数据,而不是一个错误信息。直到这段内容 committed。
case 2(unavailable offset)
这种场景应该发生于慢节点的情况下,满节点的broker还未接收到实际数据,但已经跟leader通信知道有部分数据committed了(case 2黄色部分)。当遇到这种情况,consumer 消费到时候,会返回 OFFSET_NOT_AVAILABLE 错误信息。
case 3(offset too small)
这种情况可能出现在消费者指定了 offset 的情况。那么在指定不同auto.offset.reset
的时候有不同的情况。
- If the reset policy is “earliest,” fetch the log start offset of the current replica that raised the out of range error.
- If the reset policy is “latest,” fetch the log end offset from the leader.
- If the reset policy is “none,” raise an exception.
case 4(offset too large)
遇到这种情况,会返回一个 broker 会返回一个 OFFSET_OUT_OF_RANGE 的错误。
但 OFFSET_OUT_OF_RANGE 遇到这种错误的时候也有多种可能,官方给出当 consumer 遇到这种问题的解决思路,
Use the OffsetForLeaderEpoch API to verify the current position with the leader.
- If the fetch offset is still valid, refresh metadata and continue fetching
- If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
- Otherwise, follow the same steps above as in case 3.
sticky partitioner功能
背景
kafka producer发送数据并不是一个一个消息发送,而是取决于两个producer端参数。一个是linger.ms
,默认是0ms,当达到这个时间后,kafka producer就会立刻向broker发送数据。另一个参数是batch.size
,默认是16kb,当产生的消息数达到这个大小后,就会立即向broker发送数据。
按照这个设计,从直观上思考,肯定是希望每次都尽可能填满一个batch再发送到一个分区。但实际决定batch如何形成的一个因素是分区策略(partitioner strategy)。在Kafka2.4版本之前,在producer发送数据默认的分区策略是轮询策略(没指定keyd的情况),这在我以前的文章有说到过详细解析kafka之kafka分区和副本。如果多条消息不是被发送到相同的分区,它们就不能被放入到一个batch中。
所以如果使用默认的轮询partition策略,可能会造成一个大的batch被轮询成多个小的batch的情况。鉴于此,kafka2.4的时候推出一种新的分区策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy会随机地选择另一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
鉴于小batch可能导致延时增加,之前对于无Key消息的分区策略效率很低。社区于2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。该策略是一种全新的策略,能够显著地降低给消息指定分区过程中的延时。
使用Sticky Partitioner有助于改进消息批处理,减少延迟,并减少broker的负载。
功能解析
sticky Partitioner实现的代码是在UniformStickyPartitioner
里面。贴下代码看看:
public class UniformStickyPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return stickyPartitionCache.partition(topic, cluster);
}
public void close() {}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
我们主要关注UniformStickyPartitioner#partition()
方法,可以看到,它是直接通过一个cache类获取相同的分区,这表示新的record会一直发送到同一个分区中,除非生成新的batch,触发了UniformStickyPartitioner#onNewBatch()
方法才会换分区。
可以看看RoundRobinPartitioner#partition()
方法(即轮询分区策略)进行对比,就能发现比较明显的对比。
这个sticky partitioner最大的好处就是性能较好,按照官方给出的测试结果,使用sticky partitioner测试可以减少50%的延时,吞吐也有相对应的提高。我自己测了下数据基本出入不大。
另外说明下,在kafka2.4以后,默认的partitioner分区策略,已经包含了sticky partitioner了,所以升级到kafka2.4以后,并不需要任何修改就能享受到性能到极大提升。这里可以看下kafka2.4版本的策略说明:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
有一点挺奇怪到,在测试过程中(使用bin/kafka-producer-perf-test.sh测试),发现DefaultPartitioner
的性能要比UniformStickyPartitioner
的性能要好一些,不确定是什么原因,知道到小伙伴可以在评论区给出答案:)
参考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner
以上~