­

Kafka 進階必備之控制器

  • 2019 年 12 月 16 日
  • 筆記

如果你只追一個妹子並且對這個妹子特別用心的話,知道的人一定會說你是個好男人;如果你只是淺嘗輒止並且追了大部分妹子的話,知道的人一定會罵你渣男。

做技術也是一樣的,如果你對一門技術鑽研的特別深的話,那你一定是這個領域不可或缺的人才;如果你每個技術都想學並且遇到一些困難就退縮,那麼你就離被替代不遠了。

中國現在的社會就像是一劑催化劑,催生的都是節奏的人。周末我聽到了這麼一個事情:在我的大學裏,有這樣一個人,他是班長。家裡有些錢,可能接觸社會比較早,為人處事比較成熟,和導員關係非常好,可以說好到經常性的不來上課,導員竟然還幫忙寫假條;好到同學評價班長的時候,打分60分導員竟然私自幫忙改到80分。我說了一句話:這是社會催化的產物,校園也是一個小社會,很多事情從這個小社會就開始變質了。

我們從來不思考自己得到的是不是合規合法的。我記得我的一個人生導師和我說過這樣一句話:好好做人,別總是想尋求捷徑。這句話像茶一樣,慢慢品才香。

下面回到技術。

如果只是為了開發 Kafka 應用程序,或者只是在生產環境使用 Kafka,那麼了解 Kafka 的內部工作原理不是必須的。不過,了解 Kafka 的內部工作原理有助於理解 Kafka 的行為,也利用快速診斷問題。下面我們來探討一下這三個問題

  • Kafka 是如何進行複製的
  • Kafka 是如何處理來自生產者和消費者的請求的
  • Kafka 的存儲細節是怎樣的

如果感興趣的話,就請花費你一些時間,耐心看完這篇文章。

集群成員間的關係

我們知道,Kafka 是運行在 ZooKeeper 之上的,因為 ZooKeeper 是以集群形式出現的,所以 Kafka 也可以以集群形式出現。這也就涉及到多個生產者和多個消費者如何協調的問題,這個維護集群間的關係也是由 ZooKeeper 來完成的。如果你看過我之前的文章(真的,關於 Kafka 入門看這一篇就夠了),你應該會知道,Kafka 集群間會有多個 主機(broker),每個 broker 都會有一個 broker.id,每個 broker.id 都有一個唯一的標識符用來區分,這個標識符可以在配置文件裏手動指定,也可以自動生成。

Kafka 可以通過 broker.id.generation.enable 和 reserved.broker.max.id 來配合生成新的 broker.id。 broker.id.generation.enable參數是用來配置是否開啟自動生成 broker.id 的功能,默認情況下為true,即開啟此功能。自動生成的broker.id有一個默認值,默認值為1000,也就是說默認情況下自動生成的 broker.id 從1001開始。

Kafka 在啟動時會在 ZooKeeper 中 /brokers/ids 路徑下註冊一個與當前 broker 的 id 相同的臨時節點。Kafka 的健康狀態檢查就依賴於此節點。當有 broker 加入集群或者退出集群時,這些組件就會獲得通知。

  • 如果你要啟動另外一個具有相同 ID 的 broker,那麼就會得到一個錯誤 —— 新的 broker 會試着進行註冊,但不會成功,因為 ZooKeeper 裏面已經有一個相同 ID 的 broker。
  • 在 broker 停機、出現分區或者長時間垃圾回收停頓時,broker 會從 ZooKeeper 上斷開連接,此時 broker 在啟動時創建的臨時節點會從 ZooKeeper 中移除。監聽 broker 列表的 Kafka 組件會被告知該 broker 已移除。
  • 在關閉 broker 時,它對應的節點也會消失,不過它的 ID 會繼續存在其他數據結構中,例如主題的副本列表中,副本列表複製我們下面再說。在完全關閉一個 broker 之後,如果使用相同的 ID 啟動另一個全新的 broker,它會立刻加入集群,並擁有一個與舊 broker 相同的分區和主題。

Broker Controller

我們之前在講 Kafka Rebalance 重平衡的時候,提過一個群組協調器,負責協調群組間的關係,那麼 broker 之間也有一個控制器組件(Controller),它是 Kafka 的核心組件。它的主要作用是在 ZooKeeper 的幫助下管理和協調整個 Kafka 集群,集群中的每個 broker 都可以稱為 controller,但是在 Kafka 集群啟動後,只有一個 broker 會成為 Controller 。既然 Kafka 集群是依賴於 ZooKeeper 集群的,所以有必要先介紹一下 ZooKeeper 是什麼,可以參考作者的這一篇文章(ZooKeeper不僅僅是註冊中心,你還知道有哪些?)詳細了解,在這裡就簡單提一下 znode 節點的問題。

ZooKeeper 的數據是保存在節點上的,每個節點也被稱為znode,znode 節點是一種樹形的文件結構,它很像 Linux 操作系統的文件路徑,ZooKeeper 的根節點是 /

znode 根據數據的持久化方式可分為臨時節點和持久性節點。持久性節點不會因為 ZooKeeper 狀態的變化而消失,但是臨時節點會隨着 ZooKeeper 的重啟而自動消失。

znode 節點有一個 Watcher 機制:當數據發生變化的時候, ZooKeeper 會產生一個 Watcher 事件,並且會發送到客戶端。Watcher 監聽機制是 Zookeeper 中非常重要的特性,我們基於 Zookeeper 上創建的節點,可以對這些節點綁定監聽事件,比如可以監聽節點數據變更、節點刪除、子節點狀態變更等事件,通過這個事件機制,可以基於 ZooKeeper 實現分佈式鎖、集群管理等功能。

控制器的選舉

Kafka 當前選舉控制器的規則是:Kafka 集群中第一個啟動的 broker 通過在 ZooKeeper 里創建一個臨時節點 /controller 讓自己成為 controller 控制器。其他 broker 在啟動時也會嘗試創建這個節點,但是由於這個節點已存在,所以後面想要創建 /controller 節點時就會收到一個 節點已存在 的異常。然後其他 broker 會在這個控制器上註冊一個 ZooKeeper 的 watch 對象,/controller 節點發生變化時,其他 broker 就會收到節點變更通知。這種方式可以確保只有一個控制器存在。那麼只有單獨的節點一定是有個問題的,那就是單點問題

如果控制器關閉或者與 ZooKeeper 斷開鏈接,ZooKeeper 上的臨時節點就會消失。集群中的其他節點收到 watch 對象發送控制器下線的消息後,其他 broker 節點都會嘗試讓自己去成為新的控制器。其他節點的創建規則和第一個節點的創建原則一致,都是第一個在 ZooKeeper 里成功創建控制器節點的 broker 會成為新的控制器,那麼其他節點就會收到節點已存在的異常,然後在新的控制器節點上再次創建 watch 對象進行監聽。

控制器的作用

那麼說了這麼多,控制是什麼呢?控制器的作用是什麼呢?或者說控制器的這麼一個組件被設計用來幹什麼?別著急,接下來我們就要說一說。

Kafka 被設計為一種模擬狀態機的多線程控制器,它可以作用有下面這幾點

  • 控制器相當於部門(集群)中的部門經理(broker controller),用於管理部門中的部門成員(broker)
  • 控制器是所有 broker 的一個監視器,用於監控 broker 的上線和下線
  • 在 broker 宕機後,控制器能夠選舉新的分區 Leader
  • 控制器能夠和 broker 新選取的 Leader 發送消息

再細分一下可以具體分為如下 5 點

  • 主題管理 : Kafka Controller 可以幫助我們完成對 Kafka 主題創建、刪除和增加分區的操作,簡而言之就是對分區擁有最高行使權。

換句話說,當我們執行kafka-topics 腳本時,大部分的後台工作都是控制器來完成的。

  • 分區重分配: 分區重分配主要是指,kafka-reassign-partitions 腳本提供的對已有主題分區進行細粒度的分配功能。這部分功能也是控制器實現的。
  • Prefered 領導者選舉 : Preferred 領導者選舉主要是 Kafka 為了避免部分 Broker 負載過重而提供的一種換 Leader 的方案。
  • 集群成員管理: 主要管理 新增 broker、broker 關閉、broker 宕機
  • 數據服務: 控制器的最後一大類工作,就是向其他 broker 提供數據服務。控制器上保存了最全的集群元數據信息,其他所有 broker 會定期接收控制器發來的元數據更新請求,從而更新其內存中的緩存數據。這些數據我們會在下面討論

當控制器發現一個 broker 離開集群(通過觀察相關 ZooKeeper 路徑),控制器會收到消息:這個 broker 所管理的那些分區需要一個新的 Leader。控制器會依次遍歷每個分區,確定誰能夠作為新的 Leader,然後向所有包含新 Leader 或現有 Follower 的分區發送消息,該請求消息包含誰是新的 Leader 以及誰是 Follower 的信息。隨後,新的 Leader 開始處理來自生產者和消費者的請求,Follower 用於從新的 Leader 那裡進行複製。

這就很像外包公司的一個部門,這個部門就是專門出差的,每個人在不同的地方辦公,但是中央總部有一個部門經理,現在部門經理突然離職了。公司不打算外聘人員,決定從部門內部選一個能力強的人當領導,然後當上領導的人需要向自己的組員發送消息,這條消息就是任命消息和明確他管理了哪些人,大家都知道了,然後再各自給部門幹活。

當控制器發現一個 broker 加入集群時,它會使用 broker ID 來檢查新加入的 broker 是否包含現有分區的副本。如果有控制器就會把消息發送給新加入的 broker 和 現有的 broker。

上面這塊關於分區複製的內容我們接下來會說到。

broker controller 數據存儲

上面我們介紹到 broker controller 會提供數據服務,用於保存大量的 Kafka 集群數據。如下圖

可以對上面保存信息歸類,主要分為三類

  • broker 上的所有信息,包括 broker 中的所有分區,broker 所有分區副本,當前都有哪些運行中的 broker,哪些正在關閉中的 broker 。
  • 所有主題信息,包括具體的分區信息,比如領導者副本是誰,ISR 集合中有哪些副本等。
  • 所有涉及運維任務的分區。包括當前正在進行 Preferred 領導者選舉以及分區重分配的分區列表。

Kafka 是離不開 ZooKeeper的,所以這些數據信息在 ZooKeeper 中也保存了一份。每當控制器初始化時,它都會從 ZooKeeper 上讀取對應的元數據並填充到自己的緩存中。

broker controller 故障轉移

我們在前面說過,第一個在 ZooKeeper 中的 /brokers/ids下創建節點的 broker 作為 broker controller,也就是說 broker controller 只有一個,那麼必然會存在單點失效問題。kafka 為考慮到這種情況提供了故障轉移功能,也就是 Fail Over。如下圖

最一開始,broker1 會搶先註冊成功成為 controller,然後由於網絡抖動或者其他原因致使 broker1 掉線,ZooKeeper 通過 Watch 機制覺察到 broker1 的掉線,之後所有存活的 brokers 開始競爭成為 controller,這時 broker3 搶先註冊成功,此時 ZooKeeper 存儲的 controller 信息由 broker1 -> broker3,之後,broker3 會從 ZooKeeper 中讀取元數據信息,並初始化到自己的緩存中。

注意:ZooKeeper 中存儲的不是緩存信息,broker 中存儲的才是緩存信息。

broker controller 存在的問題

在 Kafka 0.11 版本之前,控制器的設計是相當繁瑣的。我們上面提到過一句話:Kafka controller 被設計為一種模擬狀態機的多線程控制器,這種設計其實是存在一些問題的

  • controller 狀態的更改由不同的監聽器並發執行,因此需要進行很複雜的同步,並且容易出錯而且難以調試。
  • 狀態傳播不同步,broker 可能在時間不確定的情況下出現多種狀態,這會導致不必要的額外的數據丟失
  • controller 控制器還會為主題刪除創建額外的 I/O 線程,導致性能損耗
  • controller 的多線程設計還會訪問共享數據,我們知道,多線程訪問共享數據是線程同步最麻煩的地方,為了保護數據安全性,控制器不得不在代碼中大量使用ReentrantLock 同步機制,這就進一步拖慢了整個控制器的處理速度。

broker controller 內部設計原理

在 Kafka 0.11 之後,Kafka controller 採用了新的設計,把多線程的方案改成了單線程加事件隊列的方案。如下圖所示

主要所做的改變有下面這幾點

第一個改進是增加了一個 Event Executor Thread,事件執行線程,從圖中可以看出,不管是 Event Queue 事件隊列還是 Controller context 控制器上下文都會交給事件執行線程進行處理。將原來執行的操作全部建模成一個個獨立的事件,發送到專屬的事件隊列中,供此線程消費。

第二個改進是將之前同步的 ZooKeeper 全部改為異步操作。ZooKeeper API 提供了兩種讀寫的方式:同步和異步。之前控制器操作 ZooKeeper 都是採用的同步方式,這次把同步方式改為異步,據測試,效率提升了10倍。

第三個改進是根據優先級處理請求,之前的設計是 broker 會公平性的處理所有 controller 發送的請求。什麼意思呢?公平性難道還不好嗎?在某些情況下是的,比如 broker 在排隊處理 produce 請求,這時候 controller 發出了一個 StopReplica 的請求,你會怎麼辦?還在繼續處理 produce 請求嗎?這個 produce 請求還有用嗎?此時最合理的處理順序應該是,賦予 StopReplica 請求更高的優先級,使它能夠得到搶佔式的處理。

《Kafka 權威指南》

https://blog.csdn.net/u013256816/article/details/80546337

https://learning.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch05.html#kafka_internals

https://www.cnblogs.com/kevingrace/p/9021508.html

https://www.cnblogs.com/huxi2b/p/6980045.html

《極客時間-Kafka核心技術與實戰》

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Redesign

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals