Apache Kafka内核深度剖析

  • 2020 年 2 月 26 日
  • 筆記

目前来说市面上可以选择的消息队列非常多,像activemq,rabbitmq,zeromq已经被大多数人耳熟能详,特别像activemq早期应用在企业中的总线通信,基本作为企业级IT设施解决方案中不可或缺的一部分。目前来说Kafka已经非常稳定,并且逐步应用更加广泛,已经算不得新生事物,但是不可否认Kafka一枝独秀如同雨后春笋,非常耀眼,今天我们仔细分解一下Kafka,了解一下它的内幕。以下的内容版本基于当前最新的Kafka稳定版本2.4.0。文章主要包含以下内容:

  • Kafka为什么快
  • Kafka为什么稳
  • Kafka该怎么用

该文章为开篇引导之做,后续会有对应的HBase,Spark,Kylin,Pulsar等相关组件的剖析。

Kafka为什么快

快是一个相对概念,没有对比就没有伤害,因此通常我们说Kafka是相对于我们常见的activemq,rabbitmq这类会发生IO,并且主要依托于IO来做信息传递的消息队列,像zeromq这种基本纯粹依靠内存做信息流传递的消息队列,当然会更快,但是此类消息队列只有特殊场景下会使用,不在对比之列。

因此当我们说Kakfa快的时候,通常是基于以下场景:

  • 吞吐量:当我们需要每秒处理几十万上百万message的时候,相对其他MQ,Kafka处理的更快。
  • 高并发:当具有百万以及千万的consumer的时候,同等配置的机器下,Kafka所拥有的Producer和Consumer会更多。
  • 磁盘锁:相对其他MQ,Kafka在进行IO操作的时候,其同步锁住IO的场景更少,发生等待的时间更短。

那么基于以上几点,我们来仔细探讨一下,为什么Kafka就快了。

消息队列的推拉模型

首先,如果我们单纯站在Consumer的角度来看“Kafka快”,是一个伪命题,因为相比其他MQ,Kafka从Producer产生一条Message到Consumer消费这条Message来看它的时间一定是大于等于其他MQ的,背后的原因涉及到消息队列设计的两种模型:推模型和拉模型,如下图所示:

对于拉模型来说,Producer产生Message后,会主动发送给MQ Server,为了提升性能和减少开支,部分Client还会设计成批量发送,但是无论是单条还是批量,Producer都会主动推送消息到MQ Server,当MQ Server接收到消息后,对于拉模型,MQ Server不会主动发送消息到Consumer,同时也不会维持和记录消息的offset,Consumer会自动设置定时器到服务端去询问是否有新的消息产生,通常时间是不超过100ms询问一次,一旦产生新的消息则会同步到本地,并且修改和记录offset,服务端可以辅助存储offset,但是不会主动记录和校验offset的合理性,同时Consumer可以完全自主的维护offset以便实现自定义的信息读取。

对于推模型来说,服务端收到Message后,首先会记录消息的信息,并且从自己的元信息数据库中查询对应的消息的Consumer有谁,由于服务器和Consumer在链接的时候建立了长链接,因此可以直接发送消息到Consumer。

Kafka是基于拉模型的消息队列,因此从Consumer获取消息的角度来说,延迟会小于等于轮询的周期,所以会比推模型的消息队列具有更高的消息获取延迟,但是推模型同样又其问题。首先,由于服务器需要记录对应的Consumer的元信息,包括消息该发给谁,offset是多少,同时需要向Consumer推送消息,必然会带来系列的问题:假如这一刻网络不好,Consumer没有收到,消息没有发成功怎么办?假设消息发出去了,我怎么知道它有没有收到?因此服务器和Consumer之间需要首先多层确认口令,以达到至少消费一次,仅且消费一次等特性。

Kafka此类的拉模型将这一块功能都交由Consumer自动维护,因此服务器减少了更多的不必要的开支,因此从同等资源的角度来讲,Kafka具备链接的Producer和Consumer将会更多,极大的降低了消息堵塞的情况,因此看起来更快了。

OS Page Cache和Buffer Cache

太阳底下无新鲜事,对于一个框架来说,要想运行的更快,通常能用的手段也就那么几招,Kafka在将这一招用到了极致,其中之一就是极大化的使用了OS的Cache,主要是Page Cache和Buffer Cache。对于这两个Cache,使用Linux的同学通常不会陌生,例如我们在Linux下执行free命令的时候会看到如下的输出:

(图片来自网络)

会有两列名为buffers和cached,也有一行名为“-/+ buffers/cache”。这两个信息的具体解释如下:

pagecache:文件系统层级的缓存,从磁盘里读取的内容是存储到这里,这样程序读取磁盘内容就会非常快,比如使用Linux的grep和find等命令查找内容和文件时,第一次会慢很多,再次执行就快好多倍,几乎是瞬间。另外page cache的数据被修改过后,也即脏数据,等到写入磁盘时机到来时,会转移到buffer cache 而不是直接写入到磁盘。我们看到的cached这列的数值表示的是当前的页缓存(page cache)的占用量,page cache文件的页数据,页是逻辑上的概念,因此page cache是与文件系统同级的。

buffer cache:磁盘等块设备的缓冲,内存的这一部分是要写入到磁盘里的 。buffers列表示当前的块缓存(buffer cache)占用量,buffer cache用于缓存块设备(如磁盘)的块数据。块是物理上的概念,因此buffer cache是与块设备驱动程序同级的。

两者都是用来加速数据IO,将写入的页标记为dirty,然后向外部存储flush,读数据时首先读取缓存,如果未命中,再去外部存储读取,并且将读取来的数据也加入缓存。操作系统总是积极地将所有空闲内存都用作page cache和buffer cache,当os的内存不够用时也会用LRU等算法淘汰缓存页。

有了以上概念后,我们再看来Kafka是怎么利用这个特性的。首先,对于一次数据IO来说,通常会发生以下的流程:

  • 操作系统将数据从磁盘拷贝到内核区的pagecache
  • 用户程序将内核区的pagecache拷贝到用户区缓存
  • 用户程序将用户区的缓存拷贝到socket缓存中
  • 操作系统将socket缓存中的数据拷贝到网卡的buffer上,发送数据

可以发现一次IO请求操作进行了2次上下文切换和4次系统调用,而同一份数据在缓存中多次拷贝,实际上对于拷贝来说完全可以直接在内核态中进行,也就是省去第二和第三步骤,变成这样:

正因为可以如此的修改数据的流程,于是Kafka在设计之初就参考此流程,尽可能大的利用os的page cache来对数据进行拷贝,尽量减少对磁盘的操作。如果kafka生产消费配合的好,那么数据完全走内存,这对集群的吞吐量提升是很大的。早期的操作系统中的page cache和buffer cache是分开的两块cache,后来发现同样的数据可能会被cache两次,于是大部分情况下两者都是合二为一的。

Kafka虽然使用JVM语言编写,在运行的时候脱离不了JVM和JVM的GC,但是Kafka并未自己去管理缓存,而是直接使用了OS的page cache作为缓存,这样做带来了以下好处:

  • JVM中的一切皆对象,所以无论对象的大小,总会有些额外的JVM的对象元数据浪费空间。
  • JVM自己的GC不受程序手动控制,所以如果使用JVM作为缓存,在遇到大对象或者频繁GC的时候会降低整个系统的吞吐量。
  • 程序异常退出或者重启,所有的缓存都将失效,在容灾架构下会影响快速恢复。而page cache因为是os的cache,即便程序退出,缓存依旧存在。

所以Kafka优化IO流程,充分利用page cache,其消耗的时间更短,吞吐量更高,相比其他MQ就更快了,用一张图来简述三者之间的关系如下:

当Producer和Consumer速率相差不大的情况下,Kafka几乎可以完全实现不落盘就完成信息的传输。

追加顺序写入

除了前面的重要特性之外,Kafka还有一个设计,就是对数据的持久化存储采用的顺序的追加写入,Kafka在将消息落到各个topic的partition文件时,只是顺序追加,充分的利用了磁盘顺序访问快的特性。

(图片来自网络)

Kafka的文件存储按照topic下的partition来进行存储,每一个partition有各自的序列文件,各个partition的序列不共享,主要的划分按照消息的key进行hash决定落在哪个分区之上,我们先来详细解释一下Kafka的各个名词,以便充分理解其特点:

  • broker:Kafka中用来处理消息的服务器,也是Kafka集群的一个节点,多个节点形成一个Kafka集群。
  • topic:一个消息主题,每一个业务系统或者Consumer需要订阅一个或者多个主题来获取消息,Producer需要明确发生消息对于的topic,等于信息传递的口令名称。
  • partition:一个topic会拆分成多个partition落地到磁盘,在kafka配置的存储目录下按照对应的分区ID创建的文件夹进行文件的存储,磁盘可以见的最大的存储单元。
  • segment:一个partition会有多个segment文件来实际存储内容。
  • offset:每一个partition有自己的独立的序列编号,作用域仅在当前的partition之下,用来对对应的文件内容进行读取操作。
  • leader:每一个topic需要有一个leader来负责该topic的信息的写入,数据一致性的维护。
  • controller:每一个kafka集群会选择出一个broker来充当controller,负责决策每一个topic的leader是谁,监听集群broker信息的变化,维持集群状态的健康。

可以看到最终落地到磁盘都是Segment文件,每一个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便老的 segment file快速被删除。因为Kafka处理消息的力度是到partition,因此只需要保持好partition对应的顺序处理,segment可以单独维护其状态。

segment的文件由index file和data file组成,落地在磁盘的后缀为.index和.log,文件按照序列编号生成,如下所示:

(图片来自网络)

其中index维持着数据的物理地址,而data存储着数据的偏移地址,相互关联,这里看起来似乎和磁盘的顺序写入关系不大,想想HDFS的块存储,每次申请固定大小的块和这里的segment?是不是挺相似的?另外因为有index文的本身命名是以offset作为文件名的,在进行查找的时候可以快速根据需要查找的offset定位到对应的文件,再根据文件进行内容的检索。因此Kafka的查找流程为先根据要查找的offset对文件名称进行二分查找,找到对应的文件,再根据index的元数据的物理地址和log文件的偏移位置结合顺序读区到对应offset的位置的内容即可。

segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间,特别是在随机读取的场景下,Kafka非常不合适。所以因为Kafka特殊的存储设计,也让Kafka感觉起来,更快。


Kafka为什么稳

前面提到Kafka为什么快,除了快的特性之外,Kafka还有其他特点,那就是:稳。Kafka的稳体现在几个维度:

  • 数据安全,几乎不会丢数据。
  • 集群安全,发生故障几乎可以Consumer无感知切换。
  • 可用性强,即便部分partition不可用,剩余的partition的数据依旧不影响读取。
  • 流控限制,避免大量Consumer拖垮服务器的带宽。

限流机制

对于Kafka的稳,通常是由其整体架构设计决定,很多优秀的特性结合在一起,就更加的优秀,像Kafka的Qutota就是其中一个,既然是限流,那就意味着需要控制Consumer或者Producer的流量带宽,通常限制流量这件事需要在网卡上作处理,像常见的N路交换机或者高端路由器,所以对于Kafka来说,想要操控OS的网卡去控制流量显然具有非常高的难度,因此Kafka采用了另外一个特别的思路,即:没有办法控制网卡通过的流量大小,就控制返回数据的时间。对于JVM程序来说,就是一个wait或者seelp的事情。

所以对于Kafka来说,有一套特殊的时延计算规则,Kafka按照一个窗口来统计单位时间传输的流量,当流量大小超过设置的阈值的时候,触发流量控制,将当前请求丢入Kafka的Qutota Manager,等到延迟时间到达后,再次返回数据。我们通过Kafka的ClientQutotaManager类中的方法来看:

这几行代码代表了Kafka的限流计算逻辑,大概的思路为:假设我们设定当前流量上限不超过T,根据窗口计算出当前的速率为O,如果O超过了T,那么会进行限速,限速的公示为:

X = (O - T)/ T * W

X为需要延迟的时间,让我举一个形象的例子,假设我们限定流量不超过10MB/s,过去5秒(公示中的W,窗口区间)内通过的流量为100MB,则延迟的时间为:(100-5*10)/ 10=5秒。这样就能够保障在下一个窗口运行完成后,整个流量的大小是不会超过限制的。通过KafkaApis里面对Producer和Consumer的call back代码可以看到对限流的延迟返回:

对于kafka的限流来讲,默认是按照client id或者user来进行限流的,从实际使用的角度来说,意义不是很大,基于topic或者partition分区级别的限流,相对使用场景更大,ThoughtWroks曾经帮助某客户修改Kafka核心源码,实现了基于topic的流量控制。

竞选机制

Kafka背后的元信息重度依赖Zookeeper,再次我们不解释Zookeeper本身,而是关注Kafka到底是如何使用zk的,首先一张图解释Kafka对zk的重度依赖:

(图片来源于网络)

利用zk除了本身信息的存储之外,最重要的就是Kafka利用zk实现选举机制,其中以controller为主要的介绍,首先controller作为Kafka的心脏,主要负责着包括不限于以下重要事项:

也就是说Controller是Kafka的核心角色,对于Controller来说,采用公平竞争,任何一个Broker都有可能成为Controller,保障了集群的健壮性,对于Controller来说,其选举流程如下:

  • 先获取 zk 的 /cotroller 节点的信息,获取 controller 的 broker id,如果该节点不存在(比如集群刚创建时),* 那么获取的 controller id 为-1。
  • 如果 controller id 不为-1,即 controller 已经存在,直接结束流程。
  • 如果 controller id 为-1,证明 controller 还不存在,这时候当前 broker 开始在 zk 注册 controller;。
  • 如果注册成功,那么当前 broker 就成为了 controller,这时候开始调用 onBecomingLeader() 方法,正式初始化 controller(注意:controller 节点是临时节点,如果当前 controller 与 zk 的 session 断开,那么 controller 的临时节点会消失,会触发 controller 的重新选举)。
  • 如果注册失败(刚好 controller 被其他 broker 创建了、抛出异常等),那么直接返回。

其代码直接通过KafkaController可以看到:

一旦Controller选举出来之后,则其他Broker会监听zk的变化,来响应集群中Controller挂掉的情况:

从而触发新的Controller选举动作。对于Kafka来说,整个设计非常紧凑,代码质量相当高,很多设计也非常具有借鉴意义,类似的功能在Kafka中有非常多的特性体现,这些特性结合一起,形成了Kafka整个稳定的局面。


Kafka该怎么用

虽然Kafka整体看起来非常优秀,但是Kafka也不是全能的银弹,必然有其对应的短板,那么对于Kafka如何,或者如何能用的更好,则需要经过实际的实践才能得感悟的出。经过归纳和总结,能够发现以下不同的使用场景和特点。

  • Kafka 并不合适高频交易系统:Kafka虽然具有非常高的吞吐量和性能,但是不可否认,Kafka在单条消息的低延迟方面依旧不如传统MQ,毕竟依托推模型的MQ能够在实时消息发送的场景下取得先天的优势。
  • Kafka并不具备完善的事务机制:0.11之后Kafka新增了事务机制,可以保障Producer的批量提交,为了保障不会读取到脏数据,Consumer可以通过对消息状态的过滤过滤掉不合适的数据,但是依旧保留了读取所有数据的操作,即便如此Kafka的事务机制依旧不完备,背后主要的原因是Kafka对client并不感冒,所以不会统一所有的通用协议,因此在类似仅且被消费一次等场景下,效果非常依赖于客户端的实现。
  • Kafka的异地容灾方案非常复杂:对于Kafka来说,如果要实现跨机房的无感知切换,就需要支持跨集群的代理,因为Kafka特殊的append log的设计机制,导致同样的offset在不同的broker和不同的内容上无法复用,也就是文件一旦被拷贝到另外一台服务器上,将不可读取,相比类似基于数据库的MQ,很难实现数据的跨集群同步,同时对于offset的复现也非常难,曾经帮助客户实现了一套跨机房的Kafka 集群Proxy,投入了非常大的成本。
  • Kafka Controller架构无法充分利用集群资源:Kafka Controller类似于Es的去中心化思想,按照竞选规则从集群中选择一台服务器作为Controller,意味着改服务器即承担着Controller的职责,同时又承担着Broker的职责,导致在海量消息的压迫下,该服务器的资源很容易成为集群的瓶颈,导致集群资源无法最大化。Controller虽然支持HA但是并不支持分布式,也就意味着如果要想Kafka的性能最优,每一台服务器至少都需要达到最高配置。
  • Kafka不具备非常智能的分区均衡能力:通常在设计落地存储的时候,对于热点或者要求性能足够高的场景下,会是SSD和HD的结合,同时如果集群存在磁盘容量大小不均等的情况,对于Kafka来说会有非常严重的问题,Kafka的分区产生是按照paratition的个数进行统计,将新的分区创建在个数最少的磁盘上,见下图:

曾经我帮助某企业修改了分区创建规则,考虑了容量的情况,也就是按照磁盘容量进行分区的选择,紧接着带来第二个问题:容量大的磁盘具备更多的分区,则会导致大量的IO都压向该盘,最后问题又落回IO,会影响该磁盘的其他topic的性能。所以在考虑MQ系统的时候,需要合理的手动设置Kafka的分区规则。


结尾

Kafka并不是唯一的解决方案,像几年前新生势头挺厉害的pulsar,以取代Kafka的口号冲入市场,也许会成为下一个解决Kafka部分痛点的框架,下文再讲述pulsar。


– 相关阅读 –