Kafka作為分佈式消息系統的系統解析

Kafka概述

Apache Kafka由Scala和Java編寫,基於生產者和消費者模型作為開源的分佈式發佈訂閱消息系統。它提供了類似於JMS的特性,但設計上又有很大區別,它不是JMS規範的實現,如Kafka允許多個消費者主動拉取數據,而在JMS中只有點對點模式消費者才會主動拉取數據。

Kafka對消息保存時根據topic進行歸類,發送消息者稱為producer,消息接收者稱為consumer。Kafka集群由多個Kafka實例組成,每個實例稱為broker。並且Kafka集群基於zookeeper保存一些meta信息,來保證系統的高可用性。

生產者可以直接把數據傳遞給broker,broker通過zookeeper進行leader和follower的選舉管理;消費者可以通過zookeeper保存讀取的位置offset以及讀取的topic的分區信息。這樣做有以下幾個好處:

1. 生產者和消費者的負載解耦

2. 消費者可以按照自己的「能力」拉取數據

3. 消費者可以自定義消費數量

Kafka與傳統消息系統相比,有以下不同:

1. Kafka是分佈式的,易於水平擴展

2. 同時為發佈和訂閱提供高吞吐量

3. 支持多訂閱者,當失敗時能自動對消費者進行rebalance

4. 將消息持久化到磁盤,因此可用於批量消費,例如ETL以及實時應用程序

 

Kafka中的重要概念

名稱

解釋

broker

Kafka集群中的實例進程,負責數據存儲。在Kafka集群中每個broker都有一個唯一的brokerId。通過broker來接受producer和consumer的請求,並把消息持久化到磁盤。每個Kafka集群中會選舉出一個broker來擔任Controller,負責處理分區的leader選舉,協調分區遷移等工作

topic

Kafka根據topic對消息進行歸類(邏輯劃分),發佈到Kafka集群的每條消息都需要指定一個topic。落到磁盤上對應的是partition目錄,partition目錄中有多個segement組合(後綴為index、log的文件)。一個topic對應一個或多個partition,一個partition對應多個segment組合

producer

向broker發送消息的生產者。負責數據生產和數據分發。生產者代碼可以集成到任務系統中。

數據分發策略默認為defaultPartition  Utils.abs(key.hashCode)%numPartitions

consumer

從broker讀取消息的消費者【實際上consumer是通過與zookeeper通信獲取broker地址進行消息消費】

ConsumerGroup

數據消費者組,ConsumerGroup(以下簡稱CG)可以有多個。可以把多個consumer線程劃分為一個組,組裏面所有成員共同消費一個topic的數據(一個topic的多個partition),組員之間不能重複消費

partition

分區,一個topic可以分為多個partition(分佈在多個broker上,實現擴展性),每個partition可以設置多個副本,會從多個副本中選取出一個leader負責讀寫操作。每個partition是一個有序的隊列(partition中的每條消息都會被分配一個有序的id(offset)),Kafka只保證按每個partition內部有序並且被順序消費,不保證一個topic的整體(多個partition間)的順序

offset

每條消息在文件中的偏移量。Kafka的存儲文件都是按照offset.index來命名,方便查找

zookeeper

保存meta信息,管理集群配置,以及在CG發生變化時進行rebalance

Replication

Kafka支持以partition為單位對消息進行冗餘備份,每個partition都可以配置至少1個Replication(副本數包括本身)

leader partition

每個Replication集合中的partition都會選出一個唯一的leader,所有的讀寫請求都由leader處理。其他Replicas從leader處把數據更新同步到本地,過程類似MySQL中的Binlog同步。

ISR(In-Sync Replica)

Replicas的一個子集,表示目前”活着的”且與leader能夠保持”聯繫”的Replicas集合。由於讀寫都是首先落到leader上,所以一般來說通過同步機制從leader上拉取數據的Replica都會和leader有一些延遲(包括了延遲時間和延遲條數兩個維度),任意一個超過閾值都會把該Replica踢出ISR。每個partition都有它自己獨立的ISR。

Kafka中的廣播和單播

每個consumer屬於一個特定的CG,一條消息可以發送到多個不同的CG,但是一個CG中只能有一個consumer能夠消費該消息。目的:實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)。一個topic可以有多個CG。topic的消息會複製(邏輯概念)到所有的CG,但每個partition只會把消息發給該CG中的一個consumer。


【實現廣播】每個consumer有一個獨立的CG

【實現單播】所有的consumer在同一個CG


用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。

 


Kafka消息的分發

1. producer客戶端負責消息的分發

1)Kafka集群中的任何一個broker都可以向producer提供metadata信息,這些metadata中包含集群中存活的servers列表、partitions leader列表等信息

2)當producer獲取到metadata信息之後,producer將會和topic下所有partition leader保持socket連接

3)消息由producer直接通過socket發送到broker,中間不會經過任何”路由層”,消息被路由到哪個partition上由producer通過一些策略如隨機、輪巡等決定

如果一個topic中有多個partition,那麼在producer端實現”消息均衡分發”是非常必要的。在producer端的配置文件中,開發者可以指定partition路由的方式,具體流程:

1)連接broker-list中任意一台broker服務器

2)發送數據時,需要知道topic對應的partition個數及leader partition所在節點。這些信息由broker提供,每一個broker都能提供一份元數據信息(如哪些broker是存活的,哪個topic有多少分區,哪個分區是leader)

3)數據生產,數據發送到哪個partition的leader由producer代碼決定


4)數據通過socket連接,直接發送到partition所在的broker

2. producer消息發送的應答機制

設置發送數據是否需要服務端的反饋,由參數request.required.acks的值決定:

0: producer不會等待broker發送ack。最低延遲,持久化保證弱,當server掛掉時會丟失數據

1: 當leader接收到消息之後發送ack。當前leader接收到數據後,producer會得到一個ack,更好的持久性,因為在server確認請求成功後,client才會返回。如果數據剛寫到leader還沒來得及複製leader就掛了,消息可能會丟失

-1: 當所有的follower都同步消息成功後發送ack。最好的持久性,只要有一個replica存活,數據就不會丟失。但相對延遲高


3. 分發策略

默認分發策略:def partition(key: T, numPartitions: Int): Int = {


    Utils.abs(key.hashCode) % numPartitions}。

其他策略:輪詢、隨機等。

 


consumer與topic關係

通常情況下,一個消費者組有多個consumer,並且一個consumer只會屬於一個消費者組。這樣不僅可以提高topic中消息的並發消費能力,還能提高”故障容錯”。如消費組中的某個consumer掛掉,那麼它消費的partition將會由同組內其他的consumer自動接管。

一個CG中所有的consumer將會交錯的消費整個topic,每個消費組中consumer消息消費互相獨立,可以認為一個消費者組就是一個”訂閱”者。

注意:對於topic中的一條特定的消息,只會被訂閱此topic的每個消費者組中的其中一個consumer消費。同時,Kafka的設計原理決定,對於一個topic,同一個消費者組中如果有多於partition個數的consumer,則意味着某些consumer將無法消費消息。

 

consumer負載均衡

最好是一個partition對應一個consumer。如果consumer數量過多,必然有空閑的consumer。

當一個消費者組中,有consumer加入或者離開時,會觸發partition消費的rebalance。均衡的最終目的為了提升topic的並發消費能力,步驟如下:

比如一個topic有4個分區:P0、P1、P2、P3,一個CG中有C1、C2兩個consumer。

首先根據partition索引號對partition排序:P0、P1、P2、P3,再根據consumer的id排序:C0、C1

計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)


然後依次分配partition: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

 

Kafka文件存儲機制

1. 文件存儲基本結構

在Kafka文件存儲中,同一個topic下有一個或多個不同partition,每個partition為一個目錄,partition命名規則為topic名稱+有序序號,第一個partition序號從0開始,序號最大值為partition數量減1。

每個partition相當於一個巨型文件被平均分配到多個大小相等segment段數據文件中。但每個段segment file消息數量不一定相等,這種特性方便老的segment file快速被刪除即方便已被消費的消息的清理,提高磁盤利用率。

segment文件生命周期由服務端配置參數(log.segment.bytes:當segment文件達到多大時滾動生成一個新的segment文件,log.roll.{ms,hours}:滾動生成新的segment的時間即使沒有達到設置的segment文件最大值等若干參數)決定。 

segment的意義:當Kafka producer不斷發送消息,必然會引起partition文件的無限擴張,這樣對於消息文件的維護以及已經被消費的消息的清理帶來嚴重影響。通過參數設置segment可以指定保留多長時間的數據,及時清理已經被消費的消息,提高磁盤利用率,目前默認保存7天數據。


2. partition segment

1)segment file組成

由2大部分組成,分別為index file和data file,兩個文件一一對應,成對出現,後綴”.index”和「.log」分別表示為segment索引文件、數據文件。數值大小為64位,20位數字字符長度,沒有數字用0填充,如下:

2)segment 文件命名規則


partition全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充

3)索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址

4)segment data file由許多message組成,物理結構如下:

關鍵字

解釋說明

8 byte offset

在partition內每條消息都有一個有序的id號:offset,它可以唯一確定每條消息在partition內的位置。即offset表示partition的第多少個message

4 byte message size

message大小

4 byte CRC32

用crc32校驗message

1 byte “magic”

表示本次發佈Kafka服務程序協議版本號

1 byte “attributes”

表示為獨立版本、或標識壓縮類型、或編碼類型

4 byte key length

表示key的長度,當key為-1時,K byte key字段不填

value bytes payload

表示實際消息數據

   


Kafka查找message

已知offset查找相應的message,需要通過下面2個步驟查找:

1. 查找segment file

00000000000000000000.index表示最開始的文件,起始偏移量為0

00000000000000000099.index的消息量起始偏移量為100=99+1

00000000000000000999.index的起始偏移量為1000=999+1


其他後續文件依次類推。以起始偏移量命名並排序這些文件,只要根據offset按照”二分查找”文件列表,就可以快速定位到具體文件。

2. 通過segment file查找message

根據offset,依次定位index元數據物理位置和log的物理偏移位置,然後再在log中順序查找直至找到對應offset位置即可。


 

關注微信公眾號:大數據學習與分享,獲取更對技術乾貨