圖解:Kafka 水印備份機制

  • 2019 年 11 月 11 日
  • 筆記

高可用是很多分散式系統中必備的特徵之一,Kafka 日誌的高可用是通過基於 leader-follower 的多副本同步實現的,每個分區下有多個副本,其中只有一個是 leader 副本,提供發送和消費消息,其餘都是 follower 副本,不斷地發送 fetch 請求給 leader 副本以同步消息,如果 leader 在整個集群運行過程中不發生故障,follower 副本不會起到任何作用,問題就在於任何系統都不能保證其穩定運行,當 leader 副本所在的 broker 崩潰之後,其中一個 follower 副本就會成為該分區下新的 leader 副本,那麼問題來了,在選為新的 leader 副本時,會導致消息丟失或者離散嗎?Kafka 是如何解決 leader 副本變更時消息不會出錯?以及 leader 與 follower 副本之間的數據同步是如何進行的?帶著這幾個問題,我們接著往下看,一起揭開 Kafka 水印備份的神秘面紗。

水印相關概念

在講解水印備份之前,我們必須要先搞清楚幾個關鍵的術語以及它們的含義,下面我用一張圖來示意 Kafka 分區副本的位移資訊:

如上圖所示,綠色部分表示已完全備份的消息,對消費者可見,紫色部分表示未完全備份的消息,對消費者不可見。

  • LEO(last end offset):日誌末端位移,記錄了該副本對象底層日誌文件中下一條消息的位移值,副本寫入消息的時候,會自動更新 LEO 值。
  • HW(high watermark):從名字可以知道,該值叫高水印值,HW 一定不會大於 LEO 值,小於 HW 值的消息被認為是「已提交」或「已備份」的消息,並對消費者可見。

leader 會保存兩個類型的 LEO 值,一個是自己的 LEO,另一個是 remote LEO 值,remote LEO 值就是 follower 副本的 LEO 值,意味著 follower 副本的 LEO 值會保存兩份,一份保存到 leader 副本中,一份保存到自己這裡。

remote LEO 值有什麼用呢?

它是決定 HW 值大小的關鍵,當 HW 要更新時,就會對比 LEO 值(也包括 leader LEO),取最小的那個做最新的 HW 值。

以下介紹 LEO 和 HW 值的更新機制:

LEO 更新:

  1. leader 副本自身的 LEO 值更新:在 Producer 消息發送過來時,即 leader 副本當前最新存儲的消息位移位置 +1;
  2. follower 副本自身的 LEO 值更新:從 leader 副本中 fetch 到消息並寫到本地日誌文件時,即 follower 副本當前同步 leader 副本最新的消息位移位置 +1;
  3. leader 副本中的 remote LEO 值更新:每次 follower 副本發送 fetch 請求都會包含 follower 當前 LEO 值,leader 拿到該值就會嘗試更新 remote LEO 值。

leader HW 更新:

  1. 故障時更新:
    1. 副本被選為 leader 副本時:當某個 follower 副本被選為分區的 leader 副本時,kafka 就會嘗試更新 HW 值;
    2. 副本被踢出 ISR 時:如果某個副本追不上 leader 副本進度,或者所在 broker 崩潰了,導致被踢出 ISR,leader 也會檢查 HW 值是否需要更新,畢竟 HW 值更新只跟處於 ISR 的副本 LEO 有關係。
  2. 正常時更新:
    1. producer 向 leader 副本寫入消息時:在消息寫入時會更新 leader LEO 值,因此需要再檢查是否需要更新 HW 值;
    2. leader 處理 follower FETCH 請求時:follower 的 fetch 請求會攜帶 LEO 值,leader 會根據這個值更新對應的 remote LEO 值,同時也需要檢查是否需要更新 HW 值。

follower HW 更新:

  1. follower 更新 HW 發生在其更新 LEO 之後,每次 follower Fetch 響應體都會包含 leader 的 HW 值,然後比較當前 LEO 值,取最小的作為新的 HW 值。

圖解水印備份過程

在了解了 Kafka 水印備份機制的相關概念之後,下面我用圖來幫大家更好地理解 Kafka 的水印備份過程,假設某個分區有兩個副本,min.insync.replica=1:

Step 1:leader 和 follower 副本處於初始化值,follower 副本發送 fetch 請求,由於 leader 副本沒有數據,因此不會進行同步操作;

Step 2:生產者發送了消息 m1 到分區 leader 副本,寫入該條消息後 leader 更新 LEO = 1;

Step 3:follower 發送 fetch 請求,攜帶當前最新的 offset = 0,leader 處理 fetch 請求時,更新 remote LEO = 0,對比 LEO 值最小為 0,所以 HW = 0,leader 副本響應消息數據及 leader HW = 0 給 follower,follower 寫入消息後,更新 LEO 值,同時對比 leader HW 值,取最小的作為新的 HW 值,此時 follower HW = 0,這也意味著,follower HW 是不會超過 leader HW 值的。

Step 4:follower 發送第二輪 fetch 請求,攜帶當前最新的 offset = 1,leader 處理 fetch 請求時,更新 remote LEO = 1,對比 LEO 值最小為 1,所以 HW = 1,此時 leader 沒有新的消息數據,所以直接返回 leader HW = 1 給 follower,follower 對比當前最新的 LEO 值 與 leader HW 值,取最小的作為新的 HW 值,此時 follower HW = 1。

基於水印備份機制的一些缺陷

從以上步驟可看出,leader 中保存的 remote LEO 值的更新總是需要額外一輪 fetch RPC 請求才能完成,這意味著在 leader 切換過程中,會存在數據丟失以及數據不一致的問題,下面我用圖來說明存在的問題:

  • 數據丟失

前面也說過,leader 中的 HW 值是在 follower 下一輪 fetch RPC 請求中完成更新的,如上圖所示,有副本 A 和 B,其中 B 為 leader 副本,A 為 follower 副本,在 A 進行第二段 fetch 請求,並接收到響應之後,此時 B 已經將 HW 更新為 2,如果這是 A 還沒處理完響應就崩潰了,即 follower 沒有及時更新 HW 值,A 重啟時,會自動將 LEO 值調整到之前的 HW 值,即會進行日誌截斷,接著會向 B 發送 fetch 請求,但很不幸的是此時 B 也發生宕機了,Kafka 會將 A 選舉為新的分區 Leader。當 B 重啟後,會從 向 A 發送 fetch 請求,收到 fetch 響應後,拿到 HW 值,並更新本地 HW 值,此時 HW 被調整為 1(之前是 2),這時 B 會做日誌截斷,因此,offsets = 1 的消息被永久地刪除了。

可能你會問,follower 副本為什麼要進行日誌截斷?

這是由於消息會先記錄到 leader,follower 再從 leader 中拉取消息進行同步,這就導致 leader LEO 會比 follower 的要大(follower 之間的 offset 也不盡相同,雖然最終會一致,但過程中會有差異),假設此時出現 leader 切換,有可能選舉了一個 LEO 較小的 follower 成為新的 leader,這時該副本的 LEO 就會成為新的標準,這就會導致 follower LEO 值有可能會比 leader LEO 值要大的情況,因此 follower 在進行同步之前,需要從 leader 獲取 LastOffset 的值(該值後面會有解釋),如果 LastOffset 小於 當前 LEO,則需要進行日誌截斷,然後再從 leader 拉取數據實現同步。

可能你還會問,日誌截斷會不會造成數據丟失?

前面也說過,HW 值以上的消息是沒有「已提交」或「已備份」的,因此消息也是對消費者不可見,即這些消息不對用戶作承諾,也即是說從 HW 值截斷日誌,並不會導致數據丟失(承諾用戶範圍內)。

  • 數據不一致/離散

以上情況,需要滿足以下其中一個條件才會發生:

  1. 宕機之前,B 已不在 ISR 列表中,unclean.leader.election.enable=true,即允許非 ISR 中副本成為 leader;
  2. B 消息寫入到 pagecache,但尚未 flush 到磁碟。

分區有兩個副本,其中 A 為 Leader 副本,B 為 follower 副本,A 已經寫入兩條消息,且 HW 更新到 2,B 只寫了 1 條消息,HW 為 1,此時 A 和 B 同時宕機,B 先重啟,B 成為了 leader 副本,這時生產者發送了一條消息,保存到 B 中,由於此時分區只有 B,B 在寫入消息時把 HW 更新到 2,就在這時候 A 重新啟動,發現 leader HW 為 2,跟自己的 HW 一樣,因此沒有執行日誌截斷,這就造成了 A 的 offset=1 的日誌與 B 的 offset=1 的日誌不一樣的現象。

leader epoch

為了解決 HW 更新時機是非同步延遲的,而 HW 又是決定日誌是否備份成功的標誌,從而造成數據丟失和數據不一致的現象,Kafka 引入了 leader epoch 機制,在每個副本日誌目錄下都創建一個 leader-epoch-checkpoint 文件,用於保存 leader 的 epoch 資訊,如下,leader epoch 長這樣:

它的格式為 (epoch offset),epoch 指的是 leader 版本,它是一個單調遞增的一個正整數值,每次 leader 變更,epoch 版本都會 +1,offset 是每一代 leader 寫入的第一條消息的位移值,比如:

(0, 0)  (1, 300)

以上第二個版本是從位移 300 開始寫入消息,意味著第一個版本寫入了 0-299 的消息。

leader epoch 具體的工作機制如下:

  • 當副本成為 leader 時:

這時,如果此時生產者有新消息發送過來,會首先新的 leader epoch 以及 LEO 添加到 leader-epoch-checkpoint 文件中。

  • 當副本變成 follower 時:
    1. 發送 LeaderEpochRequest 請求給 leader 副本,該請求包括了 follower 中最新的 epoch 版本;
    2. leader 返回給 follower 的相應中包含了一個 LastOffset,如果 follower last epoch = leader last epoch,則 LastOffset = leader LEO,否則取大於 follower last epoch 中最小的 leader epoch 的 start offset 值,舉個例子:假設 follower last epoch = 1,此時 leader 有 (1, 20) (2, 80) (3, 120),則 LastOffset = 80;
    3. follower 拿到 LastOffset 之後,會對比當前 LEO 值是否大於 LastOffset,如果當前 LEO 大於 LastOffset,則從 LastOffset 截斷日誌;
    4. follower 開始發送 fetch 請求給 leader 保持消息同步。

基於 leader epoch 的工作機制,我們接下來看看它是如何解決水印備份缺陷的:

(1)解決數據丟失:

如上圖所示,A 重啟之後,發送 LeaderEpochRequest 請求給 B,由於 B 還沒追加消息,此時 epoch = request epoch = 0,因此返回 LastOffset = leader LEO = 2 給 A,A 拿到 LastOffset 之後,發現等於當前 LEO 值,故不用進行日誌截斷。就在這時 B 宕機了,A 成為 leader,在 B 啟動回來後,會重複 A 的動作,同樣不需要進行日誌截斷,數據沒有丟失。

(2)解決數據不一致/離散:

如上圖所示,A 和 B 同時宕機後,B 先重啟回來成為分區 leader,這時候生產者發送了一條消息過來,leader epoch 更新到 1,此時 A 啟動回來後,發送 LeaderEpochRequest(follower epoch = 0) 給 B,B 判斷 follower epoch 不等於 最新的 epoch,於是找到大於 follower epoch 最小的 epoch = 1,即 LastOffset = epoch start offset = 1,A 拿到 LastOffset 後,判斷小於當前 LEO 值,於是從 LastOffset 位置進行日誌截斷,接著開始發送 fetch 請求給 B 開始同步消息,避免了消息不一致/離散的問題。