­

【Kafka系列】副本機制和請求過程

  • 2019 年 12 月 16 日
  • 筆記

副本機制

複製功能是 Kafka 架構的核心功能,在 Kafka 文檔裏面 Kafka 把自己描述為 一個分佈式的、可分區的、可複製的提交日誌服務。複製之所以這麼關鍵,是因為消息的持久存儲非常重要,這能夠保證在主節點宕機後依舊能夠保證 Kafka 高可用。副本機制也可以稱為備份機制(Replication),通常指分佈式系統在多台網絡交互的機器上保存有相同的數據備份/拷貝。

Kafka 使用主題來組織數據,每個主題又被分為若干個分區,分區會部署在一到多個 broker 上,每個分區都會有多個副本,所以副本也會被保存在 broker 上,每個 broker 可能會保存成千上萬個副本。下圖是一個副本複製示意圖

如上圖所示,為了簡單我只畫出了兩個 broker ,每個 broker 指保存了一個 Topic 的消息,在 broker1 中分區0 是Leader,它負責進行分區的複製工作,把 broker1 中的分區0複製一個副本到 broker2 的主題 A 的分區0。同理,主題 A 的分區1也是一樣的道理。

副本類型分為兩種:一種是 Leader(領導者) 副本,一種是Follower(跟隨者)副本。

Leader 副本

Kafka 在創建分區的時候都要選舉一個副本,這個選舉出來的副本就是 Leader 領導者副本。

Follower 副本

除了 Leader 副本以外的副本統稱為 Follower 副本,Follower 不對外提供服務。下面是 Leader 副本的工作方式

這幅圖需要注意以下幾點

  • Kafka 中,Follower 副本也就是追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的請求。所有的請求都是由領導者副本來處理。或者說,所有的請求都必須發送到 Leader 副本所在的 broker 中,Follower 副本只是用作數據拉取,採用異步拉取的方式,並寫入到自己的提交日誌中,從而實現與 Leader 的同步
  • 當 Leader 副本所在的 broker 宕機後,Kafka 依託於 ZooKeeper 提供的監控功能能夠實時感知到,並開啟新一輪的選舉,從追隨者副本中選一個作為 Leader。如果宕機的 broker 重啟完成後,該分區的副本會作為 Follower 重新加入。

首領的另一個任務是搞清楚哪個跟隨者的狀態與自己是一致的。跟隨者為了保證與領導者的狀態一致,在有新消息到達之前先嘗試從領導者那裡複製消息。為了與領導者保持一致,跟隨者向領導者發起獲取數據的請求,這種請求與消費者為了讀取消息而發送的信息是一樣的。

跟隨者向領導者發送消息的過程是這樣的,先請求消息1,然後再接收到消息1,在時候到請求1之後,發送請求2,在收到領導者給發送給跟隨者之前,跟隨者是不會繼續發送消息的。這個過程如下

跟隨者副本在收到響應消息前,是不會繼續發送消息,這一點很重要。通過查看每個跟隨者請求的最新偏移量,首領就會知道每個跟隨者複製的進度。如果跟隨者在10s 內沒有請求任何消息,或者雖然跟隨者已經發送請求,但是在10s 內沒有收到消息,就會被認為是不同步的。如果一個副本沒有與領導者同步,那麼在領導者掉線後,這個副本將不會稱為領導者,因為這個副本的消息不是全部的。

與之相反的,如果跟隨者同步的消息和領導者副本的消息一致,那麼這個跟隨者副本又被稱為同步的副本。也就是說,如果領導者掉線,那麼只有同步的副本能夠稱為領導者。

關於副本機制我們說了這麼多,那麼副本機制的好處是什麼呢?

  • 能夠立刻看到寫入的消息,就是你使用生產者 API 成功向分區寫入消息後,馬上使用消費者就能讀取剛才寫入的消息
  • 能夠實現消息的冪等性,啥意思呢?就是對於生產者產生的消息,在消費者進行消費的時候,它每次都會看到消息存在,並不會存在消息不存在的情況

同步複製和異步複製

我在學習副本機制的時候,有個疑問,既然領導者副本和跟隨者副本是發送 - 等待機制的,這是一種同步的複製方式,那麼為什麼說跟隨者副本同步領導者副本的時候是一種異步操作呢?

我認為是這樣的,跟隨者副本在同步領導者副本後會把消息保存在本地 log 中,這個時候跟隨者會給領導者副本一個響應消息,告訴領導者自己已經保存成功了,同步複製的領導者會等待所有的跟隨者副本都寫入成功後,再返回給 producer 寫入成功的消息。而異步複製是領導者副本不需要關心跟隨者副本是否寫入成功,只要領導者副本自己把消息保存到本地 log ,就會返回給 producer 寫入成功的消息。下面是同步複製和異步複製的過程

同步複製

  • producer 通知 ZooKeeper 識別領導者
  • producer 向領導者寫入消息
  • 領導者收到消息後會把消息寫入到本地 log
  • 跟隨者會從領導者那裡拉取消息
  • 跟隨者向本地寫入 log
  • 跟隨者向領導者發送寫入成功的消息
  • 領導者會收到所有的跟隨者發送的消息
  • 領導者向 producer 發送寫入成功的消息

異步複製

和同步複製的區別在於,領導者在寫入本地log之後,直接向客戶端發送寫入成功消息,不需要等待所有跟隨者複製完成。

ISR

Kafka動態維護了一個同步狀態的副本的集合(a set of In-Sync Replicas),簡稱ISR,ISR 也是一個很重要的概念,我們之前說過,追隨者副本不提供服務,只是定期的異步拉取領導者副本的數據而已,拉取這個操作就相當於是複製,ctrl-c + ctrl-v大家肯定用的熟。那麼是不是說 ISR 集合中的副本消息的數量都會與領導者副本消息數量一樣呢?那也不一定,判斷的依據是 broker 中參數 replica.lag.time.max.ms 的值,這個參數的含義就是跟隨者副本能夠落後領導者副本最長的時間間隔。

replica.lag.time.max.ms 參數默認的時間是 10秒,如果跟隨者副本落後領導者副本的時間不超過 10秒,那麼 Kafka 就認為領導者和跟隨者是同步的。即使此時跟隨者副本中存儲的消息要小於領導者副本。如果跟隨者副本要落後於領導者副本 10秒以上的話,跟隨者副本就會從 ISR 被剔除。倘若該副本後面慢慢地追上了領導者的進度,那麼它是能夠重新被加回 ISR 的。這也表明,ISR 是一個動態調整的集合,而非靜態不變的。

Unclean 領導者選舉

既然 ISR 是可以動態調整的,那麼必然會出現 ISR 集合中為空的情況,由於領導者副本是一定出現在 ISR 集合中的,那麼 ISR 集合為空必然說明領導者副本也掛了,所以此時 Kafka 需要重新選舉一個新的領導者,那麼該如何選舉呢?現在你需要轉變一下思路,我們上面說 ISR 集合中一定是與領導者同步的副本,那麼不再 ISR 集合中的副本一定是不與領導者同步的副本了,也就是不再 ISR 列表中的跟隨者副本會丟失一些消息。如果你開啟 broker 端參數 unclean.leader.election.enable的話,下一個領導者就會在這些非同步的副本中選舉。這種選舉也叫做Unclean 領導者選舉

如果你接觸過分佈式項目的話你一定知道 CAP 理論,那麼這種 Unclean 領導者選舉其實是犧牲了數據一致性,保證了 Kafka 的高可用性。

你可以根據你的實際業務場景決定是否開啟 Unclean 領導者選舉,一般不建議開啟這個參數,因為數據的一致性要比可用性重要的多。

Kafka 請求處理流程

broker 的大部分工作是處理客戶端、分區副本和控制器發送給分區領導者的請求。這種請求一般都是請求/響應式的,我猜測你接觸最早的請求/響應的方式應該就是 HTTP 請求了。事實上,HTTP 請求可以是同步可以是異步的。一般正常的 HTTP 請求都是同步的,同步方式最大的一個特點是提交請求->等待服務器處理->處理完畢返回 這個期間客戶端瀏覽器不能做任何事。而異步方式最大的特點是 請求通過事件觸發->服務器處理(這時瀏覽器仍然可以做其他事情)-> 處理完畢

那麼我也可以說同步請求就是順序處理的,而異步請求的執行方式則不確定,因為異步需要創建多個執行線程,而每個線程的執行順序不同。

這裡需要注意一點,我們只是使用 HTTP 請求來舉例子,而 Kafka 採用的是 TCP 基於 Socket 的方式進行通訊

那麼這兩種方式有什麼缺點呢?

我相信聰明的你應該能馬上想到,同步的方式最大的缺點就是吞吐量太差,資源利用率極低,由於只能順序處理請求,因此,每個請求都必須等待前一個請求處理完畢才能得到處理。這種方式只適用於請求發送非常不頻繁的系統

異步的方式的缺點就是為每個請求都創建線程的做法開銷極大,在某些場景下甚至會壓垮整個服務。

響應式模型

說了這麼半天,Kafka 採用同步還是異步的呢?都不是,Kafka 採用的是一種 響應式(Reactor)模型,那麼什麼是響應式模型呢?簡單的說,Reactor 模式是事件驅動架構的一種實現方式,特別適合應用於處理多個客戶端並發向服務器端發送請求的場景,如下圖所示

Kafka 的 broker 端有個 SocketServer組件,類似於處理器,SocketServer 是基於 TCP 的 Socket 連接的,它用於接受客戶端請求,所有的請求消息都包含一個消息頭,消息頭中都包含如下信息

  • Request type (也就是 API Key)
  • Request version(broker 可以處理不同版本的客戶端請求,並根據客戶版本做出不同的響應)
  • Correlation ID — 一個具有唯一性的數字,用於標示請求消息,同時也會出現在響應消息和錯誤日誌中(用於診斷問題)
  • Client ID — 用於標識發送請求的客戶端

broker 會在它所監聽的每一個端口上運行一個 Acceptor 線程,這個線程會創建一個連接,並把它交給 Processor(網絡線程池), Processor 的數量可以使用 num.network.threads 進行配置,其默認值是3,表示每台 broker 啟動時會創建3個線程,專門處理客戶端發送的請求。

Acceptor 線程會採用輪詢的方式將入棧請求公平的發送至網絡線程池中,因此,在實際使用過程中,這些線程通常具有相同的機率被分配到待處理請求隊列中,然後從響應隊列獲取響應消息,把它們發送給客戶端。Processor 網絡線程池中的請求 – 響應的處理還是比較複雜的,下面是網絡線程池中的處理流程圖

Processor 網絡線程池接收到客戶和其他 broker 發送來的消息後,網絡線程池會把消息放到請求隊列中,注意這個是共享請求隊列,因為網絡線程池是多線程機制的,所以請求隊列的消息是多線程共享的區域,然後由 IO 線程池進行處理,根據消息的種類判斷做何處理,比如 PRODUCE 請求,就會將消息寫入到 log 日誌中,如果是FETCH請求,則從磁盤或者頁緩存中讀取消息。也就是說,IO線程池是真正做判斷,處理請求的一個組件。在IO 線程池處理完畢後,就會判斷是放入響應隊列中還是 Purgatory 中,Purgatory 是什麼我們下面再說,現在先說一下響應隊列,響應隊列是每個線程所獨有的,因為響應式模型中不會關心請求發往何處,因此把響應回傳的事情就交給每個線程了,所以也就不必共享了。

注意:IO 線程池可以通過 broker 端參數 num.io.threads 來配置,默認的線程數是8,表示每台 broker 啟動後自動創建 8 個IO 處理線程。

請求類型

下面是幾種常見的請求類型

生產請求

我在 真的,關於 Kafka 入門看這一篇就夠了 文章中提到過 acks 這個配置項的含義

簡單來講就是不同的配置對寫入成功的界定是不同的,如果 acks = 1,那麼只要領導者收到消息就表示寫入成功,如果acks = 0,表示只要領導者發送消息就表示寫入成功,根本不用考慮返回值的影響。如果 acks = all,就表示領導者需要收到所有副本的消息後才表示寫入成功。

在消息被寫入分區的首領後,如果 acks 配置的值是 all,那麼這些請求會被保存在 煉獄(Purgatory)的緩衝區中,直到領導者副本發現跟隨者副本都複製了消息,響應才會發送給客戶端。

獲取請求

broker 獲取請求的方式與處理生產請求的方式類似,客戶端發送請求,向 broker 請求主題分區中特定偏移量的消息,如果偏移量存在,Kafka 會採用 零複製 技術向客戶端發送消息,Kafka 會直接把消息從文件中發送到網絡通道中,而不需要經過任何的緩衝區,從而獲得更好的性能。

客戶端可以設置獲取請求數據的上限和下限,上限指的是客戶端為接受足夠消息分配的內存空間,這個限制比較重要,如果上限太大的話,很有可能直接耗盡客戶端內存。下限可以理解為攢足了數據包再發送的意思,這就相當於項目經理給程序員分配了 10 個bug,程序員每次改一個 bug 就會向項目經理彙報一下,有的時候改好了有的時候可能還沒改好,這樣就增加了溝通成本和時間成本,所以下限值得就是程序員你改完10個 bug 再向我彙報!!!如下圖所示

如圖你可以看到,在拉取消息 —> 消息 之間是有一個等待消息積累這麼一個過程的,這個消息積累你可以把它想像成超時時間,不過超時會跑出異常,消息積累超時後會響應回執。延遲時間可以通過 replica.lag.time.max.ms 來配置,它指定了副本在複製消息時可被允許的最大延遲時間。

元數據請求

生產請求和響應請求都必須發送給領導者副本,如果 broker 收到一個針對某個特定分區的請求,而該請求的首領在另外一個 broker 中,那麼發送請求的客戶端會收到非分區首領的錯誤響應;如果針對某個分區的請求被發送到不含有領導者的 broker 上,也會出現同樣的錯誤。Kafka 客戶端需要把請求和響應發送到正確的 broker 上。這不是廢話么?我怎麼知道要往哪發送?

事實上,客戶端會使用一種 元數據請求 ,這種請求會包含客戶端感興趣的主題列表,服務端的響應消息指明了主題的分區,領導者副本和跟隨者副本。元數據請求可以發送給任意一個 broker,因為所有的 broker 都會緩存這些信息。

一般情況下,客戶端會把這些信息緩存,並直接向目標 broker 發送生產請求和相應請求,這些緩存需要隔一段時間就進行刷新,使用metadata.max.age.ms 參數來配置,從而知道元數據是否發生了變更。比如,新的 broker 加入後,會觸發重平衡,部分副本會移動到新的 broker 上。這時候,如果客戶端收到 不是首領的錯誤,客戶端在發送請求之前刷新元數據緩存。

你點的每個好看,我都認真當成了喜歡