什麼,kafka能夠從follower副本讀數據了 —kafka新功能介紹

最近看了kafka2.4新版本的一些功能特性,不得不說,在kafka2.0以後,kafka自身就比較少推出一些新的feature了,基本都是一些修修補補的東西。倒是kafka connect和kafka stream相關的開發工作做的比較多。可能kafka的野心也不局限於要當一個中間件,而是要實現一個流處理系統的生態了。

這次要介紹的是我覺得比較有意思的兩個特性,一個是kafka支援從follower副本讀取數據,當然這個功能並不是為了提供讀取性能,後面再詳細介紹。另一個則是新推出的sticky partitioner功能,我猜是從rebalance的StickyAssignor中得到靈感,發現producer的分區策略也可以這樣搞,233,這個feature主要作用是提高性能。

這兩個feature都是kafka2.4.0版本推出的,如果想使用這些新feature,那麼不妨升級下吧~

follower副本讀取數據(consumer fetch from closest replica)

背景

在早先kafka的設計中,為了使consumer讀取數據能夠保持一致,是只允許consumer讀取leader副本的數據的。即follower replica只是單純地備份數據的作用。那推出follower replica fetch功能的背景是什麼呢?

舉個比較常見的場景,kafka存在多個數據中心,不同數據中心存在於不同的機房,當其中一個數據中心需要向另一個數據中心同步數據的時候,由於只能從leader replica消費數據,那麼它不得不進行跨機房獲取數據,而這些流量頻寬通常是比較昂貴的(尤其是雲伺服器)。即無法利用本地性來減少昂貴的跨機房流量。

所以kafka推出這一個功能,就是幫助類似這種場景,節約流量資源。並且這種功能似乎還可以和新推出的mirror maker2相互配合,實現多個數據源的數據同步,不過我自己還沒測試過。

rack功能介紹

要說follower replica fetch,那就不得不先說rack功能,這個是kafka比較早就推出的功能,是Kafka對機架感知提供了的基本支援,可以將其用於控制副本的放置,詳細內容可以參閱這篇Kafka機架感知文章。

使用方式,其實就是一個broker端的參數,broker.rack,這個參數可以說明當前broker在哪個機房。

舉上面文章中的例子,如果一個數據中心的集群分布如下:
kafka-rack功能

那麼可以這樣配置:

  • broker0 -> rack1
  • broker1 -> rack1
  • broker2 -> rack2
  • broker3 -> rack2

這樣其實就是相當於給broker打一個標籤,當新建topic,比如新建一個兩個副本 & 兩個分區的topic,kafka至少會自動給rack1或rack2分配全部分區的一個副本。什麼,你說要是創建兩個分區一個副本的topic該怎麼分。。。抱歉,我給不了答案。等你自己實踐然後評論跟我說下答案 =。=

replica fetch功能測試

OK,上面介紹的rack功能,我們就能發現,這個其實跟跨機房讀數據的這種場景是很搭的。在跨機房多數據中心場景中,如果數據中心A,一個副本放在數據中心B的機房中,只要讓數據中心B的consumer能夠讀數據中心A的那個replica的數據(follower副本)讀數據,那不就萬事大吉。

社區也是這樣想的,所以就推出了這個功能。讓消費者可以指定rack id,然後可以不從消費者讀取數據。要實現這個目的,需要先配置兩個參數:

replica.selector.class

  • broker端配置
  • 配置名:replica.selector.class
  • 解釋:ReplicaSelector實現類的全名,包括路徑 (比如 RackAwareReplicaSelector 即按 rack id 指定消費)
  • 默認:從 Leader 消費的 LeaderSelector

為了支援這個功能,kafka修改了這部分的介面實現,源碼中新增一個ReplicaSelector介面,如果用戶有自定義消費策略的需求,也可以繼承這個介面實現自己的功能。

目前這個介面有兩個實現類,一個是LeaderSelector,即從leader副本讀數據。另一個則是RackAwareReplicaSelector,會去到指定的rack id讀數據。

client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解釋:這個參數需要和broker端指定的broker.rack相同,表示去哪個rack中獲取數據。
  • 默認:null

這個參數只有在上面的replica.selector.class指定為RackAwareReplicaSelector且broekr指定了broker.rack才會生效。

這個功能要測試也挺簡單的,可以直接搭建一個兩個broker的kafka集群,配置broker.rack,然後使用consumer客戶端指定client.rack發送到非leader的節點查數據就行了。另外,可以使用這條命令查看網卡流量資訊:

sar -n DEV 1 300

存在問題

從follower replica讀取數據肯定有問題,最可能的問題就是落後節點的問題,從這樣的節點讀取數據會面臨什麼樣的情況呢?官方給出了幾種場景及解決辦法。先看看這張圖
img

主要有四種可能出現問題的情況,我們分別來看看應該如何解決:

Case 1(uncommitted offset)

這個場景是follower接收到數據但還未committed offset,這個時候,若消費者的offet消費到high watemark到log end offset之間的那段(Case 1黃色那段),會返回空數據,而不是一個錯誤資訊。直到這段內容 committed。

case 2(unavailable offset)

這種場景應該發生於慢節點的情況下,滿節點的broker還未接收到實際數據,但已經跟leader通訊知道有部分數據committed了(case 2黃色部分)。當遇到這種情況,consumer 消費到時候,會返回 OFFSET_NOT_AVAILABLE 錯誤資訊。

case 3(offset too small)

這種情況可能出現在消費者指定了 offset 的情況。那麼在指定不同auto.offset.reset的時候有不同的情況。

  1. If the reset policy is “earliest,” fetch the log start offset of the current replica that raised the out of range error.
  2. If the reset policy is “latest,” fetch the log end offset from the leader.
  3. If the reset policy is “none,” raise an exception.

case 4(offset too large)

遇到這種情況,會返回一個 broker 會返回一個 OFFSET_OUT_OF_RANGE 的錯誤。

但 OFFSET_OUT_OF_RANGE 遇到這種錯誤的時候也有多種可能,官方給出當 consumer 遇到這種問題的解決思路,

Use the OffsetForLeaderEpoch API to verify the current position with the leader.

  1. If the fetch offset is still valid, refresh metadata and continue fetching
  2. If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
  3. Otherwise, follow the same steps above as in case 3.

sticky partitioner功能

背景

kafka producer發送數據並不是一個一個消息發送,而是取決於兩個producer端參數。一個是linger.ms,默認是0ms,當達到這個時間後,kafka producer就會立刻向broker發送數據。另一個參數是batch.size,默認是16kb,當產生的消息數達到這個大小後,就會立即向broker發送數據。

按照這個設計,從直觀上思考,肯定是希望每次都儘可能填滿一個batch再發送到一個分區。但實際決定batch如何形成的一個因素是分區策略(partitioner strategy)。在Kafka2.4版本之前,在producer發送數據默認的分區策略是輪詢策略(沒指定keyd的情況),這在我以前的文章有說到過詳細解析kafka之kafka分區和副本。如果多條消息不是被發送到相同的分區,它們就不能被放入到一個batch中。

所以如果使用默認的輪詢partition策略,可能會造成一個大的batch被輪詢成多個小的batch的情況。鑒於此,kafka2.4的時候推出一種新的分區策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy會隨機地選擇另一個分區並會儘可能地堅持使用該分區——即所謂的粘住這個分區。

鑒於小batch可能導致延時增加,之前對於無Key消息的分區策略效率很低。社區於2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。該策略是一種全新的策略,能夠顯著地降低給消息指定分區過程中的延時。

使用Sticky Partitioner有助於改進消息批處理,減少延遲,並減少broker的負載。

功能解析

sticky Partitioner實現的程式碼是在UniformStickyPartitioner裡面。貼下程式碼看看:

public class UniformStickyPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }

    public void close() {}

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

我們主要關注UniformStickyPartitioner#partition()方法,可以看到,它是直接通過一個cache類獲取相同的分區,這表示新的record會一直發送到同一個分區中,除非生成新的batch,觸發了UniformStickyPartitioner#onNewBatch()方法才會換分區。

可以看看RoundRobinPartitioner#partition()方法(即輪詢分區策略)進行對比,就能發現比較明顯的對比。

這個sticky partitioner最大的好處就是性能較好,按照官方給出的測試結果,使用sticky partitioner測試可以減少50%的延時,吞吐也有相對應的提高。我自己測了下數據基本出入不大。

另外說明下,在kafka2.4以後,默認的partitioner分區策略,已經包含了sticky partitioner了,所以升級到kafka2.4以後,並不需要任何修改就能享受到性能到極大提升。這裡可以看下kafka2.4版本的策略說明:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

有一點挺奇怪到,在測試過程中(使用bin/kafka-producer-perf-test.sh測試),發現DefaultPartitioner的性能要比UniformStickyPartitioner的性能要好一些,不確定是什麼原因,知道到小夥伴可以在評論區給出答案:)

參考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner
以上~

Tags: