­

Kafka分區分配策略分析——重點:StickyAssignor

  • 2019 年 12 月 13 日
  • 筆記

為什麼Kafka在RangeAssigor、RoundRobinAssignor的基礎上,又新增了PartitionAssignor,它解決了什麼問題?

背景

用過Kafka的同學應該都知道Kafka的分區和消費組的概念。在Kafka中,每個Topic會包含多個分區,默認情況下一個分區只能被一個消費組下面的一個消費者消費,這裡就產生了分區分配的問題。Kafka中提供了多重分區分配演算法(PartitionAssignor)的實現:RangeAssigor、RoundRobinAssignor、StickyAssignor。本文主要介紹StickyAssignor,順帶會介紹RangeAssigor、RoundRobinAssignor作為分區分配的背景知識。

RangeAssignor

PartitionAssignor介面用於用戶定義實現分區分配演算法,以實現Consumer之間的分區分配。消費組的成員訂閱它們感興趣的Topic並將這種訂閱關係傳遞給作為訂閱組協調者的Broker。協調者選擇其中的一個消費者來執行這個消費組的分區分配並將分配結果轉發給消費組內所有的消費者。Kafka默認採用RangeAssignor的分配演算法。

RangeAssignor對每個Topic進行獨立的分區分配。對於每一個Topic,首先對分區按照分區ID進行排序,然後訂閱這個Topic的消費組的消費者再進行排序,之後盡量均衡的將分區分配給消費者。這裡只能是盡量均衡,因為分區數可能無法被消費者數量整除,那麼有一些消費者就會多分配到一些分區。

分配示意圖如下:

大致演算法如下:

assign(topic, consumers) {    // 對分區和Consumer進行排序    List<Partition> partitions = topic.getPartitions();    sort(partitions);    sort(consumers);    // 計算每個Consumer分配的分區數    int numPartitionsPerConsumer = partition.size() / consumers.size();    // 額外有一些Consumer會多分配到分區    int consumersWithExtraPartition = partition.size() % consumers.size();    // 計算分配結果    for (int i = 0, n = consumers.size(); i < n; i++) {      // 第i個Consumer分配到的分區的index          int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);          // 第i個Consumer分配到的分區數          int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);          // 分裝分配結果          assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));      }  }

RangeAssignor策略的原理是按照消費者總數和分區總數進行整除運算來獲得一個跨度,然後將分區按照跨度進行平均分配,以保證分區儘可能均勻地分配給所有的消費者。對於每一個Topic,RangeAssignor策略會將消費組內所有訂閱這個Topic的消費者按照名稱的字典序排序,然後為每個消費者劃分固定的分區範圍,如果不夠平均分配,那麼字典序靠前的消費者會被多分配一個分區。

這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來越嚴重,比如上圖中4個分區3個消費者的場景,C0會多分配一個分區。如果此時再訂閱一個分區數為4的Topic,那麼C0又會比C1、C2多分配一個分區,這樣C0總共就比C1、C2多分配兩個分區了,而且隨著Topic的增加,這個情況會越來越嚴重。

分配結果:

訂閱2個Topic,每個Topic4個分區,共3個Consumer

C0:[T0P0,T0P1,T1P0,T1P1]

C1:[T0P2,T1P2]

C2:[T0P3,T1P3]

RoundRobinAssignor

RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分區及所有消費者進行排序後盡量均衡的分配(RangeAssignor是針對單個Topic的分區進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那麼分配結果是盡量均衡的(消費者之間分配到的分區數的差值不會超過1)。如果訂閱的Topic列表是不同的,那麼分配結果是不保證「盡量均衡」的,因為某些消費者不參與一些Topic的分配。

分配示意圖如下:

相對於RangeAssignor,在訂閱多個Topic的情況下,RoundRobinAssignor的方式能消費者之間盡量均衡的分配到分區(分配到的分區數的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱的Topic越來越多,差值越來越大)。

對於訂閱組內消費者訂閱Topic不一致的情況:假設有三個消費者分別為C0、C1、C2,有3個Topic T0、T1、T2,分別擁有1、2、3個分區,並且C0訂閱T0,C1訂閱T0和T1,C2訂閱T0、T1、T0,那麼RoundRobinAssignor的分配結果如下:

看上去分配已經盡量的保證均衡了,不過可以發現C2承擔了4個分區的消費而C1訂閱了T1,是不是把T1P1交給C1消費能更加的均衡呢?

StickyAssignor

動機

儘管RoundRobinAssignor已經在RangeAssignor上做了一些優化來更均衡的分配分區,但是在一些情況下依舊會產生嚴重的分配偏差,比如消費組中訂閱的Topic列表不相同的情況下(這個情況可能更多的發生在發布階段,但是這真的是一個問題嗎?——可以參照Kafka官方的說明:KIP-49 Fair Partition Assignment Strategy)。更核心的問題是無論是RangeAssignor,還是RoundRobinAssignor,當前的分區分配演算法都沒有考慮上一次的分配結果。顯然,在執行一次新的分配之前,如果能考慮到上一次分配的結果,盡量少的調整分區分配的變動,顯然是能節省很多開銷的。

目標

從字面意義上看,Sticky是「粘性的」,可以理解為分配結果是帶「粘性的」——每一次分配變更相對上一次分配做最少的變動(上一次的結果是有粘性的),其目標有兩點:

1. 分區的分配盡量的均衡

2. 每一次重分配的結果盡量與上一次分配結果保持一致

當這兩個目標發生衝突時,優先保證第一個目標。第一個目標是每個分配演算法都盡量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。

我們先來看預期分配的結構,後續再具體分析StickyAssignor的演算法實現。

例如:

  • 有3個Consumer:C0、C1、C2
  • 有4個Topic:T0、T1、T2、T3,每個Topic有2個分區
  • 所有Consumer都訂閱了這4個分區

StickyAssignor的分配結果如下圖所示(增加RoundRobinAssignor分配作為對比):

上面的例子中,Sticky模式原來分配給C0、C2的分區都沒有發生變動,且最終C0、C1達到的均衡的目的。

再舉一個例子:

  • 有3個Consumer:C0、C1、C2
  • 3個Topic:T0、T1、T2,它們分別有1、2、3個分區
  • C0訂閱T0;C1訂閱T0、T1;C2訂閱T0、T1、T2

分配結果如下圖所示:

從以上兩個例子的分配結果可以看出,StickyAssignor是比RangeAssignor和RoundRobinAssignor更好的分配方式,不過它的實現也更加的複雜。

實現

StickyAssignor的實現程式碼是RangeAssignor和RoundRobinAssignor的十倍,複雜度則遠遠在十倍以上。目前基本沒有看到對這塊源碼實現的分析。

StickyAssignor分配演算法的核心邏輯如下:

  1. 先構建出當前的分配狀態:currentAssignment
    1. 如果currentAssignment為空,則是全新的分配
  2. 構建出partition2AllPotentialConsumers和consumer2AllPotentialPartitions兩個輔助後續分配的數據結構
    1. partition2AllPotentialConsumers是一個Map<TopicPartition, List<String>>,記錄著每個Partition可以分配給哪些Consumer
    2. consumer2AllPotentialPartitions是一個Map<String, List<TopicPartition>>,記錄著每個Consumer可以分配的Partition列表
  3. 補全currentAssignment,將不屬於currentAssignment的Consumer添加進去(如果新增了一個Consumer,這個Consumer上一次是沒參與分配的,新添加進去分配的Partition列表為空)
  4. 構建出currentPartitionConsumer來用於輔助的分配,currentPartitionConsumer記錄了當前每個Partition分配給了哪個Consumer——就是把currentAssignment從Consumer作為Key轉換到Partition作為Key用於輔助分配
  5. 對所有分區進行排序(排序結果為sortedPartitions),排序有兩種規則:
    1. 如果不是初次分配,並且每個Consumer訂閱是相同的:
      1. 對Consumer按照它所分配的Partition數進行排序
      2. 按照上一步的排序結果,將每個Consumer分配的分區插入到List中(List就是排序後的分區)
      3. 將不屬於任何Consumer的分區加入List中
    2. 否則:分區之間按照可以被分配的Consumer的數量進行排序
  6. 構造unassignedPartitions記錄所有要被分配的分區(初始為上一步排序過的所有分區,後續進行調整:將已分配的,不需要移除了Partition從unassignedPartitions中移除)
  7. 進行分區調整,來達到分區分配均衡的目的;分區的Rebalance包含多個步驟
    1. 將上一步未分配的分區(unassignedPartitions)分配出去。分配的策略是:按照當前的分配結果,每一次分配時將分區分配給訂閱了對應Topic的Consumer列表中擁有的分區最少的那一個Consumer
    2. 校驗每一個分區是否需要調整,如果分區不需要調整,則從sortedPartitions中移除。分區是否可以被調整的規則是:如果這個分區是否在partition2AllPotentialConsumers中屬於兩個或超過兩個Consumer。
    3. 校驗每個Consumer是否需要調整被分配的分區,如果不能調整,則將這個Consumer從sortedCurrentSubscriptions中移除,不參與後續的重分配。判斷是否調整的規則是:如果當前Consumer分配的分區數少於它可以被分配的最大分區數,或者它的分區滿足上一條規則。
    4. 將以上步驟中獲取的可以進行重分配的分區,進行重新的分配。每次分配時都進行校驗,如果當前已經達到了均衡的狀態,則終止調整。均衡狀態的判斷依據是Consumer之間分配的分區數量的差值不超過1;或者所有Consumer已經拿到了它可以被分配的分區之後仍無法達到均衡的上一個條件(比如c1訂閱t1,c2訂閱t2,t1 t2分區數相差超過1,此時沒法重新調整)。如果不滿足上面兩個條件,且一個Consumer所分配的分區數少於同一個Topic的其他訂閱者分配到的所有分區的情況,那麼還可以繼續調整,屬於不滿足均衡的情況——比如上文中RoundRobinAssignor的最後一個例子。
  8. 後續流程和普通分配一致,就不分析了(Sticky模式會保存分配結果)

StickyAssignor的分配演算法確實非常負責,筆者也是一步步Debug程式碼來分析整個過程的,希望上述分析的步驟對讀者能有一些幫助(建議對照著上面的步驟去Debug Kafka源碼中的單元測試來梳理這塊內容)。

總結

本文主要介紹了Kafka的幾種分區分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor。其中重點分析了StickyAssignor的實現,StickyAssignor的模式能比RangeAssignor和RoundRobinAssignor提供更加均衡的分配結果,在發生Consumer或者Partition變更的情況下,也能減少不必要的分區調整。總體而言StickyAssignor是一種更好的分配演算法,只是實現上稍微有一些複雜。