Apache Kafka內核深度剖析

  • 2020 年 2 月 26 日
  • 筆記

目前來說市面上可以選擇的消息隊列非常多,像activemq,rabbitmq,zeromq已經被大多數人耳熟能詳,特別像activemq早期應用在企業中的匯流排通訊,基本作為企業級IT設施解決方案中不可或缺的一部分。目前來說Kafka已經非常穩定,並且逐步應用更加廣泛,已經算不得新生事物,但是不可否認Kafka一枝獨秀如同雨後春筍,非常耀眼,今天我們仔細分解一下Kafka,了解一下它的內幕。以下的內容版本基於當前最新的Kafka穩定版本2.4.0。文章主要包含以下內容:

  • Kafka為什麼快
  • Kafka為什麼穩
  • Kafka該怎麼用

該文章為開篇引導之做,後續會有對應的HBase,Spark,Kylin,Pulsar等相關組件的剖析。

Kafka為什麼快

快是一個相對概念,沒有對比就沒有傷害,因此通常我們說Kafka是相對於我們常見的activemq,rabbitmq這類會發生IO,並且主要依託於IO來做資訊傳遞的消息隊列,像zeromq這種基本純粹依靠記憶體做資訊流傳遞的消息隊列,當然會更快,但是此類消息隊列只有特殊場景下會使用,不在對比之列。

因此當我們說Kakfa快的時候,通常是基於以下場景:

  • 吞吐量:當我們需要每秒處理幾十萬上百萬message的時候,相對其他MQ,Kafka處理的更快。
  • 高並發:當具有百萬以及千萬的consumer的時候,同等配置的機器下,Kafka所擁有的Producer和Consumer會更多。
  • 磁碟鎖:相對其他MQ,Kafka在進行IO操作的時候,其同步鎖住IO的場景更少,發生等待的時間更短。

那麼基於以上幾點,我們來仔細探討一下,為什麼Kafka就快了。

消息隊列的推拉模型

首先,如果我們單純站在Consumer的角度來看「Kafka快」,是一個偽命題,因為相比其他MQ,Kafka從Producer產生一條Message到Consumer消費這條Message來看它的時間一定是大於等於其他MQ的,背後的原因涉及到消息隊列設計的兩種模型:推模型和拉模型,如下圖所示:

對於拉模型來說,Producer產生Message後,會主動發送給MQ Server,為了提升性能和減少開支,部分Client還會設計成批量發送,但是無論是單條還是批量,Producer都會主動推送消息到MQ Server,當MQ Server接收到消息後,對於拉模型,MQ Server不會主動發送消息到Consumer,同時也不會維持和記錄消息的offset,Consumer會自動設置定時器到服務端去詢問是否有新的消息產生,通常時間是不超過100ms詢問一次,一旦產生新的消息則會同步到本地,並且修改和記錄offset,服務端可以輔助存儲offset,但是不會主動記錄和校驗offset的合理性,同時Consumer可以完全自主的維護offset以便實現自定義的資訊讀取。

對於推模型來說,服務端收到Message後,首先會記錄消息的資訊,並且從自己的元資訊資料庫中查詢對應的消息的Consumer有誰,由於伺服器和Consumer在鏈接的時候建立了長鏈接,因此可以直接發送消息到Consumer。

Kafka是基於拉模型的消息隊列,因此從Consumer獲取消息的角度來說,延遲會小於等於輪詢的周期,所以會比推模型的消息隊列具有更高的消息獲取延遲,但是推模型同樣又其問題。首先,由於伺服器需要記錄對應的Consumer的元資訊,包括消息該發給誰,offset是多少,同時需要向Consumer推送消息,必然會帶來系列的問題:假如這一刻網路不好,Consumer沒有收到,消息沒有發成功怎麼辦?假設消息發出去了,我怎麼知道它有沒有收到?因此伺服器和Consumer之間需要首先多層確認口令,以達到至少消費一次,僅且消費一次等特性。

Kafka此類的拉模型將這一塊功能都交由Consumer自動維護,因此伺服器減少了更多的不必要的開支,因此從同等資源的角度來講,Kafka具備鏈接的Producer和Consumer將會更多,極大的降低了消息堵塞的情況,因此看起來更快了。

OS Page Cache和Buffer Cache

太陽底下無新鮮事,對於一個框架來說,要想運行的更快,通常能用的手段也就那麼幾招,Kafka在將這一招用到了極致,其中之一就是極大化的使用了OS的Cache,主要是Page Cache和Buffer Cache。對於這兩個Cache,使用Linux的同學通常不會陌生,例如我們在Linux下執行free命令的時候會看到如下的輸出:

(圖片來自網路)

會有兩列名為buffers和cached,也有一行名為「-/+ buffers/cache」。這兩個資訊的具體解釋如下:

pagecache:文件系統層級的快取,從磁碟里讀取的內容是存儲到這裡,這樣程式讀取磁碟內容就會非常快,比如使用Linux的grep和find等命令查找內容和文件時,第一次會慢很多,再次執行就快好多倍,幾乎是瞬間。另外page cache的數據被修改過後,也即臟數據,等到寫入磁碟時機到來時,會轉移到buffer cache 而不是直接寫入到磁碟。我們看到的cached這列的數值表示的是當前的頁快取(page cache)的佔用量,page cache文件的頁數據,頁是邏輯上的概念,因此page cache是與文件系統同級的。

buffer cache:磁碟等塊設備的緩衝,記憶體的這一部分是要寫入到磁碟里的 。buffers列表示當前的塊快取(buffer cache)佔用量,buffer cache用於快取塊設備(如磁碟)的塊數據。塊是物理上的概念,因此buffer cache是與塊設備驅動程式同級的。

兩者都是用來加速數據IO,將寫入的頁標記為dirty,然後向外部存儲flush,讀數據時首先讀取快取,如果未命中,再去外部存儲讀取,並且將讀取來的數據也加入快取。作業系統總是積極地將所有空閑記憶體都用作page cache和buffer cache,當os的記憶體不夠用時也會用LRU等演算法淘汰快取頁。

有了以上概念後,我們再看來Kafka是怎麼利用這個特性的。首先,對於一次數據IO來說,通常會發生以下的流程:

  • 作業系統將數據從磁碟拷貝到內核區的pagecache
  • 用戶程式將內核區的pagecache拷貝到用戶區快取
  • 用戶程式將用戶區的快取拷貝到socket快取中
  • 作業系統將socket快取中的數據拷貝到網卡的buffer上,發送數據

可以發現一次IO請求操作進行了2次上下文切換和4次系統調用,而同一份數據在快取中多次拷貝,實際上對於拷貝來說完全可以直接在內核態中進行,也就是省去第二和第三步驟,變成這樣:

正因為可以如此的修改數據的流程,於是Kafka在設計之初就參考此流程,儘可能大的利用os的page cache來對數據進行拷貝,盡量減少對磁碟的操作。如果kafka生產消費配合的好,那麼數據完全走記憶體,這對集群的吞吐量提升是很大的。早期的作業系統中的page cache和buffer cache是分開的兩塊cache,後來發現同樣的數據可能會被cache兩次,於是大部分情況下兩者都是合二為一的。

Kafka雖然使用JVM語言編寫,在運行的時候脫離不了JVM和JVM的GC,但是Kafka並未自己去管理快取,而是直接使用了OS的page cache作為快取,這樣做帶來了以下好處:

  • JVM中的一切皆對象,所以無論對象的大小,總會有些額外的JVM的對象元數據浪費空間。
  • JVM自己的GC不受程式手動控制,所以如果使用JVM作為快取,在遇到大對象或者頻繁GC的時候會降低整個系統的吞吐量。
  • 程式異常退出或者重啟,所有的快取都將失效,在容災架構下會影響快速恢復。而page cache因為是os的cache,即便程式退出,快取依舊存在。

所以Kafka優化IO流程,充分利用page cache,其消耗的時間更短,吞吐量更高,相比其他MQ就更快了,用一張圖來簡述三者之間的關係如下:

當Producer和Consumer速率相差不大的情況下,Kafka幾乎可以完全實現不落盤就完成資訊的傳輸。

追加順序寫入

除了前面的重要特性之外,Kafka還有一個設計,就是對數據的持久化存儲採用的順序的追加寫入,Kafka在將消息落到各個topic的partition文件時,只是順序追加,充分的利用了磁碟順序訪問快的特性。

(圖片來自網路)

Kafka的文件存儲按照topic下的partition來進行存儲,每一個partition有各自的序列文件,各個partition的序列不共享,主要的劃分按照消息的key進行hash決定落在哪個分區之上,我們先來詳細解釋一下Kafka的各個名詞,以便充分理解其特點:

  • broker:Kafka中用來處理消息的伺服器,也是Kafka集群的一個節點,多個節點形成一個Kafka集群。
  • topic:一個消息主題,每一個業務系統或者Consumer需要訂閱一個或者多個主題來獲取消息,Producer需要明確發生消息對於的topic,等於資訊傳遞的口令名稱。
  • partition:一個topic會拆分成多個partition落地到磁碟,在kafka配置的存儲目錄下按照對應的分區ID創建的文件夾進行文件的存儲,磁碟可以見的最大的存儲單元。
  • segment:一個partition會有多個segment文件來實際存儲內容。
  • offset:每一個partition有自己的獨立的序列編號,作用域僅在當前的partition之下,用來對對應的文件內容進行讀取操作。
  • leader:每一個topic需要有一個leader來負責該topic的資訊的寫入,數據一致性的維護。
  • controller:每一個kafka集群會選擇出一個broker來充當controller,負責決策每一個topic的leader是誰,監聽集群broker資訊的變化,維持集群狀態的健康。

可以看到最終落地到磁碟都是Segment文件,每一個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便老的 segment file快速被刪除。因為Kafka處理消息的力度是到partition,因此只需要保持好partition對應的順序處理,segment可以單獨維護其狀態。

segment的文件由index file和data file組成,落地在磁碟的後綴為.index和.log,文件按照序列編號生成,如下所示:

(圖片來自網路)

其中index維持著數據的物理地址,而data存儲著數據的偏移地址,相互關聯,這裡看起來似乎和磁碟的順序寫入關係不大,想想HDFS的塊存儲,每次申請固定大小的塊和這裡的segment?是不是挺相似的?另外因為有index文的本身命名是以offset作為文件名的,在進行查找的時候可以快速根據需要查找的offset定位到對應的文件,再根據文件進行內容的檢索。因此Kafka的查找流程為先根據要查找的offset對文件名稱進行二分查找,找到對應的文件,再根據index的元數據的物理地址和log文件的偏移位置結合順序讀區到對應offset的位置的內容即可。

segment index file採取稀疏索引存儲方式,它減少索引文件大小,通過mmap可以直接記憶體操作,稀疏索引為數據文件的每個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來需要消耗更多的時間,特別是在隨機讀取的場景下,Kafka非常不合適。所以因為Kafka特殊的存儲設計,也讓Kafka感覺起來,更快。


Kafka為什麼穩

前面提到Kafka為什麼快,除了快的特性之外,Kafka還有其他特點,那就是:穩。Kafka的穩體現在幾個維度:

  • 數據安全,幾乎不會丟數據。
  • 集群安全,發生故障幾乎可以Consumer無感知切換。
  • 可用性強,即便部分partition不可用,剩餘的partition的數據依舊不影響讀取。
  • 流控限制,避免大量Consumer拖垮伺服器的頻寬。

限流機制

對於Kafka的穩,通常是由其整體架構設計決定,很多優秀的特性結合在一起,就更加的優秀,像Kafka的Qutota就是其中一個,既然是限流,那就意味著需要控制Consumer或者Producer的流量頻寬,通常限制流量這件事需要在網卡上作處理,像常見的N路交換機或者高端路由器,所以對於Kafka來說,想要操控OS的網卡去控制流量顯然具有非常高的難度,因此Kafka採用了另外一個特別的思路,即:沒有辦法控制網卡通過的流量大小,就控制返回數據的時間。對於JVM程式來說,就是一個wait或者seelp的事情。

所以對於Kafka來說,有一套特殊的時延計算規則,Kafka按照一個窗口來統計單位時間傳輸的流量,當流量大小超過設置的閾值的時候,觸發流量控制,將當前請求丟入Kafka的Qutota Manager,等到延遲時間到達後,再次返回數據。我們通過Kafka的ClientQutotaManager類中的方法來看:

這幾行程式碼代表了Kafka的限流計算邏輯,大概的思路為:假設我們設定當前流量上限不超過T,根據窗口計算出當前的速率為O,如果O超過了T,那麼會進行限速,限速的公示為:

X = (O - T)/ T * W

X為需要延遲的時間,讓我舉一個形象的例子,假設我們限定流量不超過10MB/s,過去5秒(公示中的W,窗口區間)內通過的流量為100MB,則延遲的時間為:(100-5*10)/ 10=5秒。這樣就能夠保障在下一個窗口運行完成後,整個流量的大小是不會超過限制的。通過KafkaApis裡面對Producer和Consumer的call back程式碼可以看到對限流的延遲返回:

對於kafka的限流來講,默認是按照client id或者user來進行限流的,從實際使用的角度來說,意義不是很大,基於topic或者partition分區級別的限流,相對使用場景更大,ThoughtWroks曾經幫助某客戶修改Kafka核心源碼,實現了基於topic的流量控制。

競選機制

Kafka背後的元資訊重度依賴Zookeeper,再次我們不解釋Zookeeper本身,而是關注Kafka到底是如何使用zk的,首先一張圖解釋Kafka對zk的重度依賴:

(圖片來源於網路)

利用zk除了本身資訊的存儲之外,最重要的就是Kafka利用zk實現選舉機制,其中以controller為主要的介紹,首先controller作為Kafka的心臟,主要負責著包括不限於以下重要事項:

也就是說Controller是Kafka的核心角色,對於Controller來說,採用公平競爭,任何一個Broker都有可能成為Controller,保障了集群的健壯性,對於Controller來說,其選舉流程如下:

  • 先獲取 zk 的 /cotroller 節點的資訊,獲取 controller 的 broker id,如果該節點不存在(比如集群剛創建時),* 那麼獲取的 controller id 為-1。
  • 如果 controller id 不為-1,即 controller 已經存在,直接結束流程。
  • 如果 controller id 為-1,證明 controller 還不存在,這時候當前 broker 開始在 zk 註冊 controller;。
  • 如果註冊成功,那麼當前 broker 就成為了 controller,這時候開始調用 onBecomingLeader() 方法,正式初始化 controller(注意:controller 節點是臨時節點,如果當前 controller 與 zk 的 session 斷開,那麼 controller 的臨時節點會消失,會觸發 controller 的重新選舉)。
  • 如果註冊失敗(剛好 controller 被其他 broker 創建了、拋出異常等),那麼直接返回。

其程式碼直接通過KafkaController可以看到:

一旦Controller選舉出來之後,則其他Broker會監聽zk的變化,來響應集群中Controller掛掉的情況:

從而觸發新的Controller選舉動作。對於Kafka來說,整個設計非常緊湊,程式碼品質相當高,很多設計也非常具有借鑒意義,類似的功能在Kafka中有非常多的特性體現,這些特性結合一起,形成了Kafka整個穩定的局面。


Kafka該怎麼用

雖然Kafka整體看起來非常優秀,但是Kafka也不是全能的銀彈,必然有其對應的短板,那麼對於Kafka如何,或者如何能用的更好,則需要經過實際的實踐才能得感悟的出。經過歸納和總結,能夠發現以下不同的使用場景和特點。

  • Kafka 並不合適高頻交易系統:Kafka雖然具有非常高的吞吐量和性能,但是不可否認,Kafka在單條消息的低延遲方面依舊不如傳統MQ,畢竟依託推模型的MQ能夠在實時消息發送的場景下取得先天的優勢。
  • Kafka並不具備完善的事務機制:0.11之後Kafka新增了事務機制,可以保障Producer的批量提交,為了保障不會讀取到臟數據,Consumer可以通過對消息狀態的過濾過濾掉不合適的數據,但是依舊保留了讀取所有數據的操作,即便如此Kafka的事務機制依舊不完備,背後主要的原因是Kafka對client並不感冒,所以不會統一所有的通用協議,因此在類似僅且被消費一次等場景下,效果非常依賴於客戶端的實現。
  • Kafka的異地容災方案非常複雜:對於Kafka來說,如果要實現跨機房的無感知切換,就需要支援跨集群的代理,因為Kafka特殊的append log的設計機制,導致同樣的offset在不同的broker和不同的內容上無法復用,也就是文件一旦被拷貝到另外一台伺服器上,將不可讀取,相比類似基於資料庫的MQ,很難實現數據的跨集群同步,同時對於offset的復現也非常難,曾經幫助客戶實現了一套跨機房的Kafka 集群Proxy,投入了非常大的成本。
  • Kafka Controller架構無法充分利用集群資源:Kafka Controller類似於Es的去中心化思想,按照競選規則從集群中選擇一台伺服器作為Controller,意味著改伺服器即承擔著Controller的職責,同時又承擔著Broker的職責,導致在海量消息的壓迫下,該伺服器的資源很容易成為集群的瓶頸,導致集群資源無法最大化。Controller雖然支援HA但是並不支援分散式,也就意味著如果要想Kafka的性能最優,每一台伺服器至少都需要達到最高配置。
  • Kafka不具備非常智慧的分區均衡能力:通常在設計落地存儲的時候,對於熱點或者要求性能足夠高的場景下,會是SSD和HD的結合,同時如果集群存在磁碟容量大小不均等的情況,對於Kafka來說會有非常嚴重的問題,Kafka的分區產生是按照paratition的個數進行統計,將新的分區創建在個數最少的磁碟上,見下圖:

曾經我幫助某企業修改了分區創建規則,考慮了容量的情況,也就是按照磁碟容量進行分區的選擇,緊接著帶來第二個問題:容量大的磁碟具備更多的分區,則會導致大量的IO都壓向該盤,最後問題又落回IO,會影響該磁碟的其他topic的性能。所以在考慮MQ系統的時候,需要合理的手動設置Kafka的分區規則。


結尾

Kafka並不是唯一的解決方案,像幾年前新生勢頭挺厲害的pulsar,以取代Kafka的口號沖入市場,也許會成為下一個解決Kafka部分痛點的框架,下文再講述pulsar。


– 相關閱讀 –