Kafka 重平衡 全流程解析

  • 2019 年 10 月 30 日
  • 筆記

前言

本文來自 極客時間 Kafka核心技術與實戰 這段時間有看 極客時間的這個課程, 這裡僅以分享的角度來做個筆記。 那麼本文將涉及到以下幾個知識點:

  1. 重平衡是什麼?為什麼要了解他?
  2. 發生重平衡的時機
  3. Kafka的心跳機制 與 Rebalance
  4. 消費者組狀態切換
  5. 重平衡全流程解析

重平衡是什麼?為什麼要了解他?

  • 重平衡是什麼 Rebalance(重平衡 )本質上是一種協議, 規定了一個Consumer Group下的所有 Consumer 如何達成一致, 來分配訂閱Topic的每個分區。 說簡單點就是 給消費組每個消費者分配消費任務的過程。
  • 為什麼要了解他? Rebalance是啟動一個消費者組必經的過程, 當然這不是最主要的,最主要的是,在消費的過程中, 在某些情況下會導致這個過程再次發生, 帶來的後果就是整個集群暫時性的癱瘓, 嚴重影響到Kafka的高可用

發生重平衡的時機

那麼 Rebalance 會在什麼時候發生呢?

  1. 訂閱主題數發生變化 這種情況一般不會發生, 如果發生,那也是因為我們的業務調整才會, 所以這種基本要麼不發生要麼就是不可避免。
  2. 主題分區發生變化 這種情況發生會相對多一點,但是也有限, 在部署Kafka集群前, 我們就需要考慮到該集群的容量, 以便來確定好分區數。 雖然不一定一步到位, 但是調整的次數應該是極其有限的, 一般也可以選擇在半夜低峰的時候進行調整,影響不大。
  3. 消費端的消費者組成員變化 基本上影響最大的就是這個原因了, 為什麼這麼說呢? 我們考慮下什麼時候消費者組的成員會發生變化就能大概了解了。
    1. 消費者處理消息超時, 即如果消費者處理消費的消息的時間超過了 Kafka集群配置的 max.poll.interval.ms 的值, 那麼該消費者將會自動離組
    2. 心跳超時, 如果消費者在指定的session.timeout.ms時間內沒有彙報心跳, 那麼Kafka就會認為該消費已經dead了

可以看出,消費端的消費者組成員變化一般都是由於異常引起的, 所以其產生的 Rebalance 也是最難控制的。

Kafka的心跳機制 與 Rebalance

Kafka的心跳機制 與 Rebalance 有什麼關係呢? 事實上,重平衡過程是靠消費者端的心跳執行緒(Heartbeat Thread)通知到其他消費者實例的 每當消費者向其 coordinator 彙報心跳的時候, 如果這個時候 coordinator 決定開啟 Rebalance , 那麼 coordinator 會將REBALANCE_IN_PROGRESS封裝到心跳的響應中, 當消費者接受到這個REBALANCE_IN_PROGRESS, 他就知道需要開啟新的一輪 Rebalance 了, 所以heartbeat.interval.ms除了是設置心跳的間隔時間, 其實也意味著 Rebalance 感知速度, 心跳越快,那麼 Rebalance 就能更快的被各個消費者感知。

在 Kafka 0.10.1.0 版本之前, 發送心跳請求是在消費者主執行緒完成的, 也就是你寫程式碼調用KafkaConsumer.poll方法的那個執行緒。 這樣做有諸多弊病,最大的問題在於,消息處理邏輯也是在這個執行緒中完成的。 因此,一旦消息處理消耗了過長的時間, 心跳請求將無法及時發到協調者那裡, 導致協調者「錯誤地」認為該消費者已「死」。 自 0.10.1.0 版本開始, 社區引入了一個單獨的心跳執行緒來專門執行心跳請求發送,避免了這個問題。

消費者組狀態切換

為什麼要了解 消費者組狀態 呢? 這裡主要是為了方便講解 Rebalance 流程, 所以你需要大概了解一下消費者組的狀態切換, 如下圖

消費者流轉狀態.jpg

其流轉過程大概如下:

消費者狀態流程過程.jpg

一個消費者組最開始是Empty狀態, 當重平衡過程開啟後, 它會被置於PreparingRebalance狀態 等待成員加入, 成員都加入之後變更到CompletingRebalance狀態等待分配方案, 當coordinator分配完個消費者消費的分區後, 最後就流轉到Stable狀態完成重平衡。 當有新成員加入或已有成員退出時, 消費者組的狀態 從Stable直接跳到PreparingRebalance狀態, 此時,所有現存成員就必須重新申請加入組。 當所有成員都退出組後,消費者組狀態變更為Empty。

Kafka定期自動刪除過期位移的條件就是,組要處於Empty狀態。 因此,如果你的消費者組停掉了很長時間(超過7天), 那麼Kafka很可能就把該組的位移數據刪除了。

消費者端重平衡流程

在消費者端,重平衡分為兩個步驟:

  1. 加入組。 當組內成員加入組時,它會向 coordinator 發送JoinGroup請求。 在該請求中,每個成員都要將自己訂閱的主題上報, 這樣協調者就能收集到所有成員的訂閱資訊。 一旦收集了全部成員的JoinGroup請求後, Coordinator 會從這些成員中選擇一個擔任這個消費者組的領導者。 通常情況下,第一個發送JoinGroup請求的成員自動成為領導者。 領導者消費者的任務是收集所有成員的訂閱資訊, 然後根據這些資訊,制定具體的分區消費分配方案。 特別注意的是:這裡說的是消費者領導者。 選出領導者之後, Coordinator 會把消費者組訂閱資訊封裝進JoinGroup請求的 響應體中, 然後發給領導者,由領導者統一做出分配方案後, 進入到下一步:發送SyncGroup請求。 如下圖就是 JoinGroup 的全過程[圖片上傳中…(25-消費者組重平衡全流程解析.jpg-d67470-1567669412412-0)]

JoinGroup 流程解析.jpg

  1. 領導者消費者(Leader Consumer)分配方案。 領導者向 Coordinator 發送SyncGroup請求, 將剛剛做出的分配方案發給協調者。 值得注意的是,其他成員也會向 Coordinator 發送SyncGroup請求, 只不過請求體中並沒有實際的內容。 這一步的主要目的是讓 Coordinator 接收分配方案, 然後統一以 SyncGroup 響應的方式分發給所有成員, 這樣組內所有成員就都知道自己該消費哪些分區了。 如下圖:

SyncGroup全流程解析.jpg

消費者端重平衡流程 大概就這樣了,下面我們再來看看:Broker端重平衡

Broker端重平衡

要剖析協調者端處理重平衡的全流程, 我們必須要分幾個場景來討論。 這幾個場景分別是

  • 新成員加入組
  • 組成員主動離組
  • 組成員崩潰離組
  • 組成員提交位移。

接下來,我們一個一個來討論。

  • 新成員入組。 新成員入組是指組處於Stable狀態後,有新成員加入。 如果是全新啟動一個消費者組,Kafka是有一些自己的小優化的,流程上會有些許的不同。 我們這裡討論的是,組穩定了之後有新成員加入的情形。 當協調者收到新的JoinGroup請求後, 它會通過心跳請求響應的方式通知組內現有的所有成員, 強制它們開啟新一輪的重平衡。 具體的過程和之前的客戶端重平衡流程是一樣的。 現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。

新成員入組場景.jpg

  • 組成員主動離組。 何謂主動離組?就是指消費者實例所在執行緒或進程調用close()方法主動通知協調者它要退出。 這個場景就涉及到了第三類請求:LeaveGroup請求。 協調者收到LeaveGroup請求後,依然會以心跳響應的方式通知其他成員, 因此我就不再贅述了,還是直接用一張圖來說明。

組成員主動離組場景.jpg

  • 組成員崩潰離組。 崩潰離組是指消費者實例出現嚴重故障,突然宕機導致的離組。 它和主動離組是有區別的, 因為後者是主動發起的離組,協調者能馬上感知並處理。 但崩潰離組是被動的,協調者通常需要等待一段時間才能感知到, 這段時間一般是由消費者端參數session.timeout.ms控制的。 也就是說,Kafka一般不會超過session.timeout.ms就能感知到這個崩潰。 當然,後面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。

組成員崩潰離組場景.jpg

  • 重平衡時協調者對組內成員提交位移的處理。 正常情況下,每個組內成員都會定期彙報位移給協調者。 當重平衡開啟時,協調者會給予成員一段緩衝時間, 要求每個成員必須在這段時間內快速地上報自己的位移資訊, 然後再開啟正常的JoinGroup/SyncGroup請求發送。 還是老辦法,我們使用一張圖來說明。

組內成員提交位移場景.jpg

總結:

其實不論哪種方式,都是差不多的流程,這裡放開舉例,最主要的還是為了更加清晰,如果發生類似的問題,可以很快的從上面這些可能入手。

基本流程就是 Coordinator 感知到 消費者組的變化, 然後在心跳的過程中發送重平衡訊號通知各個消費者離組, 然後消費者重新以 JoinGroup 方式加入 Coordinator,並選出Consumer Leader。 當所有消費者加入 Coordinator, Consumer Leader會根據 Coordinator給予的分區資訊給出分區方案。 Coordinator 將該方案以 SyncGroup 的方式將該方案執行下去,通知各個消費者, 這樣就完成了一輪 重平衡了。