Kafka 重平衡 全流程解析
- 2019 年 10 月 30 日
- 筆記
前言
本文來自 極客時間 Kafka核心技術與實戰 這段時間有看 極客時間的這個課程, 這裡僅以分享的角度來做個筆記。 那麼本文將涉及到以下幾個知識點:
- 重平衡是什麼?為什麼要了解他?
- 發生重平衡的時機
- Kafka的心跳機制 與 Rebalance
- 消費者組狀態切換
- 重平衡全流程解析
重平衡是什麼?為什麼要了解他?
- 重平衡是什麼 Rebalance(重平衡 )本質上是一種協議, 規定了一個Consumer Group下的所有 Consumer 如何達成一致, 來分配訂閱Topic的每個分區。 說簡單點就是 給消費組每個消費者分配消費任務的過程。
- 為什麼要了解他? Rebalance是啟動一個消費者組必經的過程, 當然這不是最主要的,最主要的是,在消費的過程中, 在某些情況下會導致這個過程再次發生, 帶來的後果就是整個集群暫時性的癱瘓, 嚴重影響到Kafka的高可用
發生重平衡的時機
那麼 Rebalance 會在什麼時候發生呢?
- 訂閱主題數發生變化 這種情況一般不會發生, 如果發生,那也是因為我們的業務調整才會, 所以這種基本要麼不發生要麼就是不可避免。
- 主題分區發生變化 這種情況發生會相對多一點,但是也有限, 在部署Kafka集群前, 我們就需要考慮到該集群的容量, 以便來確定好分區數。 雖然不一定一步到位, 但是調整的次數應該是極其有限的, 一般也可以選擇在半夜低峰的時候進行調整,影響不大。
- 消費端的消費者組成員變化 基本上影響最大的就是這個原因了, 為什麼這麼說呢? 我們考慮下什麼時候消費者組的成員會發生變化就能大概了解了。
- 消費者處理消息超時, 即如果消費者處理消費的消息的時間超過了 Kafka集群配置的
max.poll.interval.ms
的值, 那麼該消費者將會自動離組 - 心跳超時, 如果消費者在指定的
session.timeout.ms
時間內沒有彙報心跳, 那麼Kafka就會認為該消費已經dead了
- 消費者處理消息超時, 即如果消費者處理消費的消息的時間超過了 Kafka集群配置的
可以看出,消費端的消費者組成員變化一般都是由於異常引起的, 所以其產生的 Rebalance 也是最難控制的。
Kafka的心跳機制 與 Rebalance
Kafka的心跳機制 與 Rebalance 有什麼關係呢? 事實上,重平衡過程是靠消費者端的心跳執行緒(Heartbeat Thread)通知到其他消費者實例的 每當消費者向其 coordinator 彙報心跳的時候, 如果這個時候 coordinator 決定開啟 Rebalance , 那麼 coordinator 會將REBALANCE_IN_PROGRESS
封裝到心跳的響應中, 當消費者接受到這個REBALANCE_IN_PROGRESS
, 他就知道需要開啟新的一輪 Rebalance 了, 所以heartbeat.interval.ms
除了是設置心跳的間隔時間, 其實也意味著 Rebalance 感知速度, 心跳越快,那麼 Rebalance 就能更快的被各個消費者感知。
在 Kafka 0.10.1.0 版本之前, 發送心跳請求是在消費者主執行緒完成的, 也就是你寫程式碼調用KafkaConsumer.poll
方法的那個執行緒。 這樣做有諸多弊病,最大的問題在於,消息處理邏輯也是在這個執行緒中完成的。 因此,一旦消息處理消耗了過長的時間, 心跳請求將無法及時發到協調者那裡, 導致協調者「錯誤地」認為該消費者已「死」。 自 0.10.1.0 版本開始, 社區引入了一個單獨的心跳執行緒來專門執行心跳請求發送,避免了這個問題。
消費者組狀態切換
為什麼要了解 消費者組狀態 呢? 這裡主要是為了方便講解 Rebalance 流程, 所以你需要大概了解一下消費者組的狀態切換, 如下圖

消費者流轉狀態.jpg
其流轉過程大概如下:

消費者狀態流程過程.jpg
一個消費者組最開始是Empty狀態, 當重平衡過程開啟後, 它會被置於PreparingRebalance狀態 等待成員加入, 成員都加入之後變更到CompletingRebalance狀態等待分配方案, 當coordinator分配完個消費者消費的分區後, 最後就流轉到Stable狀態完成重平衡。 當有新成員加入或已有成員退出時, 消費者組的狀態 從Stable直接跳到PreparingRebalance狀態, 此時,所有現存成員就必須重新申請加入組。 當所有成員都退出組後,消費者組狀態變更為Empty。
Kafka定期自動刪除過期位移的條件就是,組要處於Empty狀態。 因此,如果你的消費者組停掉了很長時間(超過7天), 那麼Kafka很可能就把該組的位移數據刪除了。
消費者端重平衡流程
在消費者端,重平衡分為兩個步驟:
- 加入組。 當組內成員加入組時,它會向 coordinator 發送JoinGroup請求。 在該請求中,每個成員都要將自己訂閱的主題上報, 這樣協調者就能收集到所有成員的訂閱資訊。 一旦收集了全部成員的JoinGroup請求後, Coordinator 會從這些成員中選擇一個擔任這個消費者組的領導者。 通常情況下,第一個發送JoinGroup請求的成員自動成為領導者。 領導者消費者的任務是收集所有成員的訂閱資訊, 然後根據這些資訊,制定具體的分區消費分配方案。 特別注意的是:這裡說的是消費者領導者。 選出領導者之後, Coordinator 會把消費者組訂閱資訊封裝進JoinGroup請求的 響應體中, 然後發給領導者,由領導者統一做出分配方案後, 進入到下一步:發送SyncGroup請求。 如下圖就是 JoinGroup 的全過程[圖片上傳中…(25-消費者組重平衡全流程解析.jpg-d67470-1567669412412-0)]

JoinGroup 流程解析.jpg
- 領導者消費者(Leader Consumer)分配方案。 領導者向 Coordinator 發送SyncGroup請求, 將剛剛做出的分配方案發給協調者。 值得注意的是,其他成員也會向 Coordinator 發送SyncGroup請求, 只不過請求體中並沒有實際的內容。 這一步的主要目的是讓 Coordinator 接收分配方案, 然後統一以 SyncGroup 響應的方式分發給所有成員, 這樣組內所有成員就都知道自己該消費哪些分區了。 如下圖:

SyncGroup全流程解析.jpg
消費者端重平衡流程 大概就這樣了,下面我們再來看看:Broker端重平衡
Broker端重平衡
要剖析協調者端處理重平衡的全流程, 我們必須要分幾個場景來討論。 這幾個場景分別是
- 新成員加入組
- 組成員主動離組
- 組成員崩潰離組
- 組成員提交位移。
接下來,我們一個一個來討論。
- 新成員入組。 新成員入組是指組處於Stable狀態後,有新成員加入。 如果是全新啟動一個消費者組,Kafka是有一些自己的小優化的,流程上會有些許的不同。 我們這裡討論的是,組穩定了之後有新成員加入的情形。 當協調者收到新的JoinGroup請求後, 它會通過心跳請求響應的方式通知組內現有的所有成員, 強制它們開啟新一輪的重平衡。 具體的過程和之前的客戶端重平衡流程是一樣的。 現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。

新成員入組場景.jpg
- 組成員主動離組。 何謂主動離組?就是指消費者實例所在執行緒或進程調用close()方法主動通知協調者它要退出。 這個場景就涉及到了第三類請求:LeaveGroup請求。 協調者收到LeaveGroup請求後,依然會以心跳響應的方式通知其他成員, 因此我就不再贅述了,還是直接用一張圖來說明。

組成員主動離組場景.jpg
- 組成員崩潰離組。 崩潰離組是指消費者實例出現嚴重故障,突然宕機導致的離組。 它和主動離組是有區別的, 因為後者是主動發起的離組,協調者能馬上感知並處理。 但崩潰離組是被動的,協調者通常需要等待一段時間才能感知到, 這段時間一般是由消費者端參數session.timeout.ms控制的。 也就是說,Kafka一般不會超過session.timeout.ms就能感知到這個崩潰。 當然,後面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。

組成員崩潰離組場景.jpg
- 重平衡時協調者對組內成員提交位移的處理。 正常情況下,每個組內成員都會定期彙報位移給協調者。 當重平衡開啟時,協調者會給予成員一段緩衝時間, 要求每個成員必須在這段時間內快速地上報自己的位移資訊, 然後再開啟正常的JoinGroup/SyncGroup請求發送。 還是老辦法,我們使用一張圖來說明。

組內成員提交位移場景.jpg
總結:
其實不論哪種方式,都是差不多的流程,這裡放開舉例,最主要的還是為了更加清晰,如果發生類似的問題,可以很快的從上面這些可能入手。
基本流程就是 Coordinator 感知到 消費者組的變化, 然後在心跳的過程中發送重平衡訊號通知各個消費者離組, 然後消費者重新以 JoinGroup 方式加入 Coordinator,並選出Consumer Leader。 當所有消費者加入 Coordinator, Consumer Leader會根據 Coordinator給予的分區資訊給出分區方案。 Coordinator 將該方案以 SyncGroup 的方式將該方案執行下去,通知各個消費者, 這樣就完成了一輪 重平衡了。