如何快速全面掌握Kafka?5000字吐血整理

  • 2020 年 3 月 13 日
  • 筆記

Kafka 是目前主流的分散式消息引擎及流處理平台,經常用做企業的消息匯流排、實時數據管道,本文挑選了 Kafka 的幾個核心話題,幫助大家快速掌握 Kafka,包括:

  • Kafka 體系架構
  • Kafka 消息發送機制
  • Kafka 副本機制
  • Kafka 控制器
  • Kafka Rebalance 機制

因為涉及內容較多,本文盡量做到深入淺出,全面的介紹 Kafka 原理及核心組件,不怕你不懂 Kafka。

1. Kafka 快速入門

Kafka 是一個分散式消息引擎與流處理平台,經常用做企業的消息匯流排、實時數據管道,有的還把它當做存儲系統來使用。早期 Kafka 的定位是一個高吞吐的分散式消息系統,目前則演變成了一個成熟的分散式消息引擎,以及流處理平台。

1.1 Kafka 體系架構

Kafka 的設計遵循生產者消費者模式,生產者發送消息到 broker 中某一個 topic 的具體分區里,消費者從一個或多個分區中拉取數據進行消費。拓撲圖如下:

目前,Kafka 依靠 Zookeeper 做分散式協調服務,負責存儲和管理 Kafka 集群中的元數據資訊,包括集群中的 broker 資訊、topic 資訊、topic 的分區與副本資訊等。

1.2 Kafka 術語

這裡整理了 Kafka 的一些關鍵術語:

  • Producer:生產者,消息產生和發送端。
  • Broker:Kafka 實例,多個 broker 組成一個 Kafka 集群,通常一台機器部署一個 Kafka 實例,一個實例掛了不影響其他實例。
  • Consumer:消費者,拉取消息進行消費。 一個 topic 可以讓若干個消費者進行消費,若干個消費者組成一個 Consumer Group 即消費組,一條消息只能被消費組中一個 Consumer 消費。
  • Topic:主題,服務端消息的邏輯存儲單元。一個 topic 通常包含若干個 Partition 分區。
  • Partition:topic 的分區,分散式存儲在各個 broker 中, 實現發布與訂閱的負載均衡。若干個分區可以被若干個 Consumer 同時消費,達到消費者高吞吐量。一個分區擁有多個副本(Replica),這是Kafka在可靠性和可用性方面的設計,後面會重點介紹。
  • message:消息,或稱日誌消息,是 Kafka 服務端實際存儲的數據,每一條消息都由一個 key、一個 value 以及消息時間戳 timestamp 組成。
  • offset:偏移量,分區中的消息位置,由 Kafka 自身維護,Consumer 消費時也要保存一份 offset 以維護消費過的消息位置。

1.3 Kafka 作用與特點

Kafka 主要起到削峰填谷(緩衝)、系統解構以及冗餘的作用,主要特點有:

  • 高吞吐、低延時:這是 Kafka 顯著的特點,Kafka 能夠達到百萬級的消息吞吐量,延遲可達毫秒級;
  • 持久化存儲:Kafka 的消息最終持久化保存在磁碟之上,提供了順序讀寫以保證性能,並且通過 Kafka 的副本機制提高了數據可靠性。
  • 分散式可擴展:Kafka 的數據是分散式存儲在不同 broker 節點的,以 topic 組織數據並且按 partition 進行分散式存儲,整體的擴展性都非常好。
  • 高容錯性:集群中任意一個 broker 節點宕機,Kafka 仍能對外提供服務。

2. Kafka 消息發送機制

Kafka 生產端發送消息的機制非常重要,這也是 Kafka 高吞吐的基礎,生產端的基本流程如下圖所示:

主要有以下方面的設計:

2.1 非同步發送

Kafka 自從 0.8.2 版本就引入了新版本 Producer API,新版 Producer 完全是採用非同步方式發送消息。生產端構建的 ProducerRecord 先是經過 keySerializer、valueSerializer 序列化後,再是經過 Partition 分區器處理,決定消息落到 topic 具體某個分區中,最後把消息發送到客戶端的消息緩衝池 accumulator 中,交由一個叫作 Sender 的執行緒發送到 broker 端。

這裡緩衝池 accumulator 的最大大小由參數 buffer.memory 控制,默認是 32M,當生產消息的速度過快導致 buffer 滿了的時候,將阻塞 max.block.ms 時間,超時拋異常,所以 buffer 的大小可以根據實際的業務情況進行適當調整。

2.2 批量發送

發送到緩衝 buffer 中消息將會被分為一個一個的 batch,分批次的發送到 broker 端,批次大小由參數 batch.size 控制,默認16KB。這就意味著正常情況下消息會攢夠 16KB 時才會批量發送到 broker 端,所以一般減小 batch 大小有利於降低消息延時,增加 batch 大小有利於提升吞吐量。

那麼生成端消息是不是必須要達到一個 batch 大小時,才會批量發送到服務端呢?答案是否定的,Kafka 生產端提供了另一個重要參數 linger.ms,該參數控制了 batch 最大的空閑時間,超過該時間的 batch 也會被發送到 broker 端。

2.3 消息重試

此外,Kafka 生產端支援重試機制,對於某些原因導致消息發送失敗的,比如網路抖動,開啟重試後 Producer 會嘗試再次發送消息。該功能由參數 retries 控制,參數含義代表重試次數,默認值為 0 表示不重試,建議設置大於 0 比如 3。

3. Kafka 副本機制

前面提及了 Kafka 分區副本(Replica)的概念,副本機制也稱 Replication 機制是 Kafka 實現高可靠、高可用的基礎。Kafka 中有 leader 和 follower 兩類副本。

3.1 Kafka 副本作用

Kafka 默認只會給分區設置一個副本,由 broker 端參數 default.replication.factor 控制,默認值為 1,通常我們會修改該默認值,或者命令行創建 topic 時指定 replication-factor 參數,生產建議設置 3 副本。副本作用主要有兩方面:

  • 消息冗餘存儲,提高 Kafka 數據的可靠性;
  • 提高 Kafka 服務的可用性,follower 副本能夠在 leader 副本掛掉或者 broker 宕機的時候參與 leader 選舉,繼續對外提供讀寫服務。

3.2 關於讀寫分離

這裡要說明的是 Kafka 並不支援讀寫分區,生產消費端所有的讀寫請求都是由 leader 副本處理的,follower 副本的主要工作就是從 leader 副本處非同步拉取消息,進行消息數據的同步,並不對外提供讀寫服務。

Kafka 之所以這樣設計,主要是為了保證讀寫一致性,因為副本同步是一個非同步的過程,如果當 follower 副本還沒完全和 leader 同步時,從 follower 副本讀取數據可能會讀不到最新的消息。

3.3 ISR 副本集合

Kafka 為了維護分區副本的同步,引入 ISR(In-Sync Replicas)副本集合的概念,ISR 是分區中正在與 leader 副本進行同步的 replica 列表,且必定包含 leader 副本。

ISR 列表是持久化在 Zookeeper 中的,任何在 ISR 列表中的副本都有資格參與 leader 選舉。

ISR 列表是動態變化的,並不是所有的分區副本都在 ISR 列表中,哪些副本會被包含在 ISR 列表中呢?副本被包含在 ISR 列表中的條件是由參數 replica.lag.time.max.ms 控制的,參數含義是副本同步落後於 leader 的最大時間間隔,默認10s,意思就是說如果某一 follower 副本中的消息比 leader 延時超過10s,就會被從 ISR 中排除。Kafka 之所以這樣設計,主要是為了減少消息丟失,只有與 leader 副本進行實時同步的 follower 副本才有資格參與 leader 選舉,這裡指相對實時。

3.4 Unclean leader 選舉

既然 ISR 是動態變化的,所以 ISR 列表就有為空的時候,ISR 為空說明 leader 副本也「掛掉」了,此時 Kafka 就要重新選舉出新的 leader。但 ISR 為空,怎麼進行 leader 選舉呢?

Kafka 把不在 ISR 列表中的存活副本稱為「非同步副本」,這些副本中的消息遠遠落後於 leader,如果選舉這種副本作為 leader 的話就可能造成數據丟失。Kafka broker 端提供了一個參數 unclean.leader.election.enable,用於控制是否允許非同步副本參與 leader 選舉;如果開啟,則當 ISR 為空時就會從這些副本中選舉新的 leader,這個過程稱為 Unclean leader 選舉。

前面也提及了,如果開啟 Unclean leader 選舉,可能會造成數據丟失,但保證了始終有一個 leader 副本對外提供服務;如果禁用 Unclean leader 選舉,就會避免數據丟失,但這時分區就會不可用。這就是典型的 CAP 理論,即一個系統不可能同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition Tolerance)中的兩個。所以在這個問題上,Kafka 賦予了我們選擇 C 或 A 的權利。

我們可以根據實際的業務場景選擇是否開啟 Unclean leader選舉,這裡建議關閉 Unclean leader 選舉,因為通常數據的一致性要比可用性重要的多。

4. Kafka 控制器

控制器(Controller)是 Kafka 的核心組件,它的主要作用是在 Zookeeper 的幫助下管理和協調整個 Kafka 集群。集群中任意一個 broker 都能充當控制器的角色,但在運行過程中,只能有一個 broker 成為控制器。

這裡先介紹下 Zookeeper,因為控制器的產生依賴於 Zookeeper 的 ZNode 模型和 Watcher 機制。Zookeeper 的數據模型是類似 Unix 作業系統的 ZNode Tree 即 ZNode 樹,ZNode 是 Zookeeper 中的數據節點,是 Zookeeper 存儲數據的最小單元,每個 ZNode 可以保存數據,也可以掛載子節點,根節點是 /。基本的拓撲圖如下:

Zookeeper 有兩類 ZNode 節點,分別是持久性節點和臨時節點。持久性節點是指客戶端與 Zookeeper 斷開會話後,該節點依舊存在,直到執行刪除操作才會清除節點。臨時節點的生命周期是和客戶端的會話綁定在一起,客戶端與 Zookeeper 斷開會話後,臨時節點就會被自動刪除。

Watcher 機制是 Zookeeper 非常重要的特性,它可以在 ZNode 節點上綁定監聽事件,比如可以監聽節點數據變更、節點刪除、子節點狀態變更等事件,通過這個事件機制,可以基於 ZooKeeper 實現分散式鎖、集群管理等功能。

4.1 控制器選舉

當集群中的任意 broker 啟動時,都會嘗試去 Zookeeper 中創建 /controller 節點,第一個成功創建 /controller 節點的 broker 則會被指定為控制器,其他 broker 則會監聽該節點的變化。當運行中的控制器突然宕機或意外終止時,其他 broker 能夠快速地感知到,然後再次嘗試創建 /controller 節點,創建成功的 broker 會成為新的控制器。

4.2 控制器功能

前面我們也說了,控制器主要作用是管理和協調 Kafka 集群,那麼 Kafka 控制器都做了哪些事情呢,具體如下:

  • 主題管理:創建、刪除 topic,以及增加 topic 分區等操作都是由控制器執行。
  • 分區重分配:執行 Kafka 的 reassign 腳本對 topic 分區重分配的操作,也是由控制器實現。
  • Preferred leader 選舉:這裡有一個概念叫 Preferred replica 即優先副本,表示的是分配副本中的第一個副本。Preferred leader 選舉就是指 Kafka 在某些情況下出現 leader 負載不均衡時,會選擇 preferred 副本作為新 leader 的一種方案。這也是控制器的職責範圍。
  • 集群成員管理:控制器能夠監控新 broker 的增加,broker 的主動關閉與被動宕機,進而做其他工作。這裡也是利用前面所說的 Zookeeper 的 ZNode 模型和 Watcher 機制,控制器會監聽 Zookeeper 中 /brokers/ids 下臨時節點的變化。
  • 數據服務:控制器上保存了最全的集群元數據資訊,其他所有 broker 會定期接收控制器發來的元數據更新請求,從而更新其記憶體中的快取數據。

從上面內容我們大概知道,控制器可以說是 Kafka 的心臟,管理和協調著整個 Kafka 集群,因此控制器自身的性能和穩定性就變得至關重要。

社區在這方面做了大量工作,特別是在 0.11 版本中對控制器進行了重構,其中最大的改進把控制器內部多執行緒的設計改成了單執行緒加事件隊列的方案,消除了多執行緒的資源消耗和執行緒安全問題,另外一個改進是把之前同步操作 Zookeeper 改為了非同步操作,消除了 Zookeeper 端的性能瓶頸,大大提升了控制器的穩定性。

5. Kafka 消費端 Rebalance 機制

前面介紹消費者術語時,提到了消費組的概念,一個 topic 可以讓若干個消費者進行消費,若干個消費者組成一個 Consumer Group 即消費組 ,一條消息只能被消費組中的一個消費者進行消費。我們用下圖表示Kafka的消費模型。

5.1 Rebalance 概念

就 Kafka 消費端而言,有一個難以避免的問題就是消費者的重平衡即 Rebalance。Rebalance 是讓一個消費組的所有消費者就如何消費訂閱 topic 的所有分區達成共識的過程,在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 的完成。因為要停止消費等待重平衡完成,因此 Rebalance 會嚴重影響消費端的 TPS,是應當盡量避免的。

5.2 Rebalance 發生條件

關於何時會發生 Rebalance,總結起來有三種情況:

  • 消費組的消費者成員數量發生變化
  • 消費主題的數量發生變化
  • 消費主題的分區數量發生變化

其中後兩種情況一般是計劃內的,比如為了提高消息吞吐量增加 topic 分區數,這些情況一般是不可避免的,後面我們會重點討論如何避免因為組內消費者成員數發生變化導致的 Rebalance。

5.3 Kafka 協調器

在介紹如何避免 Rebalance 問題之前,先來認識下 Kafka 的協調器 Coordinator,和之前 Kafka 控制器類似,Coordinator 也是 Kafka 的核心組件。

主要有兩類 Kafka 協調器:

  • 組協調器(Group Coordinator)
  • 消費者協調器(Consumer Coordinator)

Kafka 為了更好的實現消費組成員管理、位移管理,以及 Rebalance 等,broker 服務端引入了組協調器(Group Coordinator),消費端引入了消費者協調器(Consumer Coordinator)。每個 broker 啟動的時候,都會創建一個 GroupCoordinator 實例,負責消費組註冊、消費者成員記錄、offset 等元數據操作,這裡也可以看出每個 broker 都有自己的 Coordinator 組件。另外,每個 Consumer 實例化時,同時會創建一個 ConsumerCoordinator 實例,負責消費組下各個消費者和服務端組協調器之前的通訊。可以用下圖表示協調器原理:

客戶端的消費者協調器 Consumer Coordinator 和服務端的組協調器 Group Coordinator 會通過心跳不斷保持通訊。

5.4 如何避免消費組 Rebalance

接下來我們討論下如何避免組內消費者成員發生變化導致的 Rebalance。組內成員發生變化無非就兩種情況,一種是有新的消費者加入,通常是我們為了提高消費速度增加了消費者數量,比如增加了消費執行緒或者多部署了一份消費程式,這種情況可以認為是正常的;另一種是有消費者退出,這種情況多是和我們消費端程式碼有關,是我們要重點避免的。

正常情況下,每個消費者都會定期向組協調器 Group Coordinator 發送心跳,表明自己還在存活,如果消費者不能及時的發送心跳,組協調器會認為該消費者已經「死」了,就會導致消費者離組引發 Rebalance 問題。這裡涉及兩個消費端參數:session.timeout.ms 和 heartbeat.interval.ms,含義分別是組協調器認為消費組存活的期限,和消費者發送心跳的時間間隔,其中 heartbeat.interval.ms 默認值是3s,session.timeout.ms 在 0.10.1 版本之前默認 30s,之後默認 10s。另外,0.10.1 版本還有兩個值得注意的地方:

  • 從該版本開始,Kafka 維護了單獨的心跳執行緒,之前版本中 Kafka 是使用業務主執行緒發送的心跳。
  • 增加了一個重要的參數 max.poll.interval.ms,表示 Consumer 兩次調用 poll 方法拉取數據的最大時間間隔,默認值 5min,對於那些忙於業務邏輯處理導致超過 max.poll.interval.ms 時間的消費者將會離開消費組,此時將發生一次 Rebalance。

此外,如果 Consumer 端頻繁 FullGC 也可能會導致消費端長時間停頓,從而引發 Rebalance。因此,我們總結如何避免消費組 Rebalance 問題,主要從以下幾方面入手:

  • 合理配置 session.timeout.ms 和 heartbeat.interval.ms,建議 0.10.1 之前適當調大 session 超時時間盡量規避 Rebalance。
  • 根據實際業務調整 max.poll.interval.ms,通常建議調大避免 Rebalance,但注意 0.10.1 版本之前沒有該參數。
  • 監控消費端的 GC 情況,避免由於頻繁 FullGC 導致執行緒長時間停頓引發 Rebalance。

合理調整以上參數,可以減少生產環境中 Rebalance 發生的幾率,提升 Consumer 端的 TPS 和穩定性。

6. 總結

本文總結了 Kafka 體系架構、Kafka 消息發送機制、副本機制,Kafka 控制器、消費端 Rebalance 機制等各方面核心原理,通過本文的介紹,相信你已經對 Kafka 的內核知識有了一定的掌握,更多的 Kafka 原理實踐後面有時間再介紹。

往期文章精選:

1、乾貨 | Elasticsearch 內核解析之寫入流程

2、Apache Kafka 版本演進及特性介紹

3、京東JDHBase異地多活實踐

4、美團點評基於 Flink 的實時數倉平台實踐

如果您喜歡這篇文章,點贊和轉發都是一種鼓勵,期待得到您的認可 ❥(^_-)