Kafka原理分析之基礎篇
原創文章,轉載請標註。//www.cnblogs.com/boycelee/p/14728638.html
一、Kafka二、解決問題異步處理應用解耦流量削峰三、特性讀寫效率網絡傳輸並發能力持久化能力可靠性水平擴展四、基本概念消息&批次消息批次主題&分區日誌Log基本概念Log保存與壓縮日誌保存日誌壓縮Broker副本生產者消費者消費者組消息傳遞模式Kafka架構概圖五、核心特性詳解消費者單消費者組多消費者組心跳機制再平衡機制再平衡觸發條件避免再平衡消費者判「死」條件消費者沒有定期地向Coordinator發送心跳請求規定時間內沒有消費完poll方法返回的消息避免消費者被判「死」避免被「條件1」判死避免被「條件2」判死位移管理位移主題引入原因消息格式位移提交自動提交手動提交分區副本機制副本機制的優點副本定義副本角色同分區多副本,如何保證副本消息一致性?追隨者副本不對外提供服務的原因同步副本(ISR)與非同步副本(OSR)同步副本的標準HWLEOLeader選舉少部分副本宕機全部副本宕機為什麼不少數服從多數?物理存儲存儲概述基本概念文件類別日誌存儲索引消息壓縮偏移量索引六、參考七、總結
一、Kafka
Kafka是一個分佈式的消息系統。
二、解決問題
消息系統通常被應用於異步處理、應用解耦、流量削峰、消息通信等場景。
異步處理

生產者將消息寫入消息隊列中,消費者異步拉取消息隊列消息,從而提升消息處理能力。
應用解耦

Kafka作為消息傳遞的媒介,各子系統只需要做系統責任內的事情。生產者-消費者模式,Kafka就是消息隊列。
流量削峰

正常情況下,上游服務(如報價、營銷等)常年流量較大,面對大流量時能夠較為從容地應對,但下游應用(如:交易、訂單等)由於常年流量較小,面對大流量時會因為準備不足,而導致系統被打垮,引發雪崩。
為了應對這一問題,可以利用消息隊列作為臨時數據存儲節點,消費者根據自身消費能力,通過拉取的方式控制消費速度,達到流量削峰的目的。
三、特性
讀寫效率
Kafka在面對大流量數據時,能夠高效地處理消息的存儲與查詢。通過軟件設計避免硬件讀取磁盤的性能瓶頸。
網絡傳輸
批量讀取消息,對消息進行批量壓縮,從而提升網絡利用率。
並發能力
Kafka支持消息分區,每個分區內保證消息的順序性,多分區之間能夠支持並發操作,提升Kafka並發操作。
持久化能力
Kafka將消息持久化至硬盤。網絡傳輸不可靠,所以需要將數據進行持久化。其中利用了零拷貝、順序讀、順序寫、頁緩存等技術使Kafa具備高吞吐特性。
可靠性
支持分區多副本,Leader副本負責讀寫,Follow副本只負責同步Leader副本數據,實現消息冗餘備份,提升Kafka容災能力。
水平擴展
多Producer、Broker、Consumer,均為分佈式,多Consumer可以加入同一Consumer Group,每個分區只能分配一個Consumer,當Kafka服務端增加分區數量進行水平擴展時,可以向Consumer Group添加Consumer,提升消費能力。當Consumer Group中有Consumer出現故障下線時,能通過再平衡(Rebalance)對分區進行再分配。
四、基本概念
消息&批次
消息
(1)消息是Kafka的基本單位;
(2)消息由key和value的byte數組構成;
(3)key能夠根據策略將消息發送到指定分區。
批次
(1)為了提升效率,消息被分批寫入kafka,同一組消息必須屬於同一主題的同一分區;
(2)分批發送能夠降低網絡開銷,提升傳輸速度。
主題&分區
主題(Topic)是用於存儲消息分類關係的邏輯單元,可以看做存儲消息的集合。分區(partition)是Kafka數據存儲的基本單元,可以看做存儲消息的集合的子集。Kafka消息通過主題進行分類,同一Topic的不同分區(partition)會分配在不用的Broker上,分區機制提供橫向擴展的基礎,可以通過增加並在其上分配partition來提升Kafka的消息並行處理能力。

日誌
Log基本概念
(1)分區邏輯上對應一個Log,生產者將消息寫入分區實際是寫入分區對應的Log;
(2)Log可以對應磁盤上的文件夾,其由多個Segment組成,每個Segment對應一個日誌文件和索引文件;
(3)當Segment大小超出限制時,就會創建新的Segment;
(4)Kafka採用順序I/O,所以只會向最新的Segment追加數據;
(5)索引採用稀疏索引,運行時將其映射至內存中,提升索引速度。

Log保存與壓縮
日誌保存
(1)時間限制
根據保留時間,當消息在kafka中保存的時間超過指定時間,就會被刪除。
(2)大小限制
根據Topic存儲大小,當Topic所佔日誌的大小大於一個閾值,則可以開始刪除最舊的消息。Kafka會啟動一個新的線程,定期檢查是否存在可以刪除的消息。
日誌壓縮
很多場景中,Kafka消息的key與value值會不斷變化,就像數據庫中的數據會不斷被修改,消費者只會關心最新的key對應的value。如果開啟日誌壓縮功能,Kafka會開啟線程,定時對相同key的消息進行合併,並保留最新的value值。
Broker
獨立的Kafka服務就是一個broker,broker主要的工作就是接受生產者發送來的消息,分配offset並保存到磁盤中。Broker除了接受生產者發送的消息,還處理消費者、其他Broker的請求,根據請求類型進行相應處理行和響應返回。正常情況下一台機器對應一個broker。
副本
所謂副本就是對消息進程冗餘備份,分佈式系統在不同機器上相互保存對方數據。在Kafka中,每個分區(partition)可以有多個副本,每個副本中的消息是一樣的(在同一時刻,多台機器之間的消息並不完全一致)。
生產者
生產者(Producer)的主要工作是生成消息。將消息發佈根據規則推送到Topic的對應分區中。例如:(1)對key進行hash;(2)輪詢;(3)自定義。
消費者
消費者(Consumer)的主要工作消費消息。從對應分區中拉取Topic的消息進行消費。消費者需要通過offset記錄自己的消費位置。
消費者組
多個消費者(Consumer)構成消費者組(Consumer Group)。消費者組(Consumer Group)訂閱的主題(Topic)的每個分區只能被分配給,在同一個消費者組中的一個消費者處理。但一個消費者可以消費同一主題(Topic)的多個分區。

消息傳遞模式

kafka沒有消息推送,只有消息拉取。但消費者可以通過輪詢拉取的方式實現消息推送功能。
Kafka架構概圖

五、核心特性詳解
消費者
(1)消費者從訂閱的主題消費消息的偏移量保存至名字為”__consumer_offsets”的主題中;
(2)推薦使用Kafka來存儲消費者偏移量,zookeeper不適合高並發。
單消費者組
多個消費同一主題的消費者只要將group_id設置相同,就可以組成消費者組。
情況一:一個消費者組中,只有一個消費者。

情況二:消費者組中有多個消費者。

情況三:分區數與消費者組數相同。

情況四:消費者組中消費者數量大於分區數。閑置的消費者不會接收消息。

多消費者組
一個主題對應多個消費者組,每個消費者組都能夠消費該主題的所有消息。

心跳機制
Kafka的心跳機制保證Consumer和Broker之間的健康,當Broker Coordinator正常時,Consumer才會發送心跳。
再平衡機制
再平衡是規定消費者組下消費者與主題的分區之間發生變化時如何分配的協議。
再平衡觸發條件
(1)消費組內消費者發生變化。(消費組數量變化,例如消費組宕機退出消費組)
(2)主題對應分區數發生變化。(kafka只支持增加分區)
(3)訂閱主題發生變化。(消費組使用正則表達式訂閱主題,此時恰好新建了對應主題)
情況一:正常情況,每個分區只能分配給一個消費者。

情況二:消費者機器宕機,消費者退出消費組,觸發再平衡,重新給消費者組中的消費者分配分區。

情況三:Broker機器宕機,導致分區3無法提供服務。如果分區有副本則觸發再平衡,如果沒有副本則消費者3閑置。

情況四:使用正則表達式訂閱主題,當新增主題時,主題對應的分區會分配給當前消費者,會觸發再平衡。

避免再平衡
訂閱主題數和主題分區數發生變化,一般情況下是運維主動觸發,正常情況下不需要避免再平衡。所以我們可以重點關注由消費者組消費者數量變化而引發的重平衡。
在再平衡完成後,每個消費者實例會定時向Coodinator發送心跳請求。
消費者判「死」條件
消費者沒有定期地向Coordinator發送心跳請求
(1)session.timeout.ms參數標識判定消費者死亡的時間閾值。參數默認值為10秒,即如果10秒內沒有收到Group下的某Consumer實例的心跳請求,則被判定該Consumer實例「死亡」,移出Group。
(2)heartbeat.interval.ms參數標識心跳請求發送的頻率。值越小,Consumer實例發送心跳請求的頻率就越高。
規定時間內沒有消費完poll方法返回的消息
(1)max.poll.interval.ms參數標識Consumer實例調用poll方法的最大時間間隔。默認值是5分鐘,表示Comsumer如果在5分鐘內無法消費完poll方法返回的消息,則會被移出Group。
避免消費者被判「死」
避免被「條件1」判死
session.timeout.ms >= 3 * heartbeat.interval.ms。保證Consumer被判死前至少經過3輪心跳請求。
例如:設置 session.timeout.ms = 6s;設置 heartbeat.interval.ms = 2s。
避免被「條件2」判死
儘可能將max.poll.interval.ms時間設置大一些。可以將消費者實例中的最長耗時作為依據,再此基礎之上擴大1-1.5倍。為業務處理留下充足的處理時間,避免由於消息消費時間過長而導致再平衡。
位移管理
位移主題
Kafka中消費者根據消息的位移順序消費消息,消費者的位移由消費者管理,可以存儲在zookeeper中,可以存儲於Kafka主題__consumer_offse中hjmgbknjk.n,jvgnvmnn/.vt。sconsumer_offsets就是位移主題。
引入原因
(1)老版本的位移管理依託Zookeeper,會自動或手動的方式將位移數據提交至Zookeeper進行保存。當Consumer重啟後,它就能自動從Zookeeper中讀取位移數據,從上次截止消費的地方繼續消費。這種設計是的Kafka Broker不需要保存位移數據。
(2)但Zookeeper不適合高頻寫操作,所以在0.8.2.x版本後新版本的Consumer推出了全新的位移管理機制。將Consumer的位移數據作為一條普通的Kafka消息,提交到__consumer_offsets。
(3)正情況下不需要修改它,也不可以隨意地向該主題寫消息,因為這會導致Kafka無法正常解析。
消息格式
(1)Key中包含GroupID、主題名、分區號;
(2)Value中包含位移值。
位移提交
(1)Consumer需要向Kafka記錄自己的位移數據,這個彙報過程稱為 提交位移(Committing Offsets)
(2)Consumer 需要為分配給它的每個分區提交各自的位移數據
(3)位移提交的由Consumer端負責的,Kafka只負責保管。__consumer_offsets
(4)位移提交分為自動提交和手動提交
(5)位移提交分為同步提交和異步提交
自動提交
(1)設置enable.auto.commit值為true;
(2)通過auto.commit.interval.ms,可以設置自動提交的時間間隔,默認值為5秒;
(3)Kafka會保證在開始調用poll方法時,提交上次poll返回的所有消息的位移信息。poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況;
(4)自動提交會出現消息重複消費。例:Consumer每 5s提交offset,當提交位移信息後3秒發生了再平衡,所有Consumer都會從上次提交的offset開始消費,但此時獲取的offset已經是3秒前的offset了,所以我們又會重新消費再平衡前3秒的所有數據。我們只能夠縮小提交offset的時間窗口,但無法避免重複消費。
手動提交
1、同步提交
(1)使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新offset;
(2)該方法為同步操作,等待直到 offset 被成功提交才返回;
1while (true) {
2 ConsumerRecords<String, String> records =
3 consumer.poll(Duration.ofSeconds(1));
4 process(records); // 處理消息
5 try {
6 consumer.commitSync();
7 } catch (CommitFailedException e) {
8 handle(e); // 處理提交失敗異常
9 }
(3)同步提交會使Consumer處於阻塞狀態;
(4)同步提交在出現異常時會自動重試。
2、異步提交
(1)使用異步提交規避Consumer阻塞;
(2)異常(GC、網絡抖動)時使用同步提交進行重試。
1try {
2 while(true) {
3 ConsumerRecords<String, String> records =
4 consumer.poll(Duration.ofSeconds(1));
5 process(records); // 處理消息
6 commitAysnc(); // 使用異步提交規避阻塞
7 }
8} catch(Exception e) {
9 handle(e); // 處理異常
10} finally {
11 try {
12 consumer.commitSync(); // 最後一次提交使用同步阻塞式提交
13 } finally {
14 consumer.close();
15 }
分區
分區(Partition)是Kafka數據的基本單元。同一個主題(topic)數據會被分散存儲到多個partion中,這些分區可以被分配到同一台機器或不同機器上。優點是有利於水平擴展,避免單台機器在磁盤空間和性能上的限制,同時可以通過複製來增加數據冗餘,從而提升容災能力。為了做到均勻分佈,一般partition的數量一般是Broker Server數量的整數倍。
副本機制
副本機制的優點
分區擁有多個副本,提供冗餘數據,有利於確保kafka的高可用性。
副本定義
(1)每個主題可以分為多個分區,每個分區下配置多個副本;
(2)副本的本質是一個只能追加寫消息的提交日誌;
(3)同一分區下的所有副本保存相同的消息序列;
(4)分區寫的不同副本分散保存在不同Broker上,應對Broker宕機時分區數據不可用的情況。

副本角色
同分區多副本,如何保證副本消息一致性?
最常見的解決方案是基於領導者(Leader-based)的副本機制。

1、副本分為兩類
(1)領導者副本;
(2)追隨者副本。
2、在Kafka的副本機制與其他分佈式系統不同
(1)在kafka中,追隨者副本不對外提供服務。所有請求都必須由領導者副本來處理;
(2)追隨者副本的唯一任務就是從領導者副本異步拉取消息,並寫入自己的提交日誌中,從而實現與領導者副本的同步。
3、領導者副本所處Broker宕機
(1)Kafka依託Zookeeper提供的監控功能能夠感知Broker宕機,並開啟一輪新的選舉;
(2)老Leader副本重啟後,只能作為追隨者副本加入集群中。
追隨者副本不對外提供服務的原因
1、方便實現「Read-your-writes」
(1)生產者使用API想Kafka寫入消息成功後,能夠立馬使用消費者API查看到剛才生產的信息。
(2)如果允許追隨者副本對外提供服務,由於追隨者副本是異步的,因此就可能出現追隨者副本沒有從領導者副本拉取到最新消息的情況,就會出現無法立刻讀到最新寫入的消息。
2、方便實現單調讀(Monotonic Reads)
(1)什麼是單調讀?對於消息消費者而言,消息不會時有時無。
(2)如果允許追隨者副本對外提供服務,由於追隨者副本是異步的,多個副本從領導者副本拉取的消息不一定同步,就會出現多次請求讀取不同的追隨者副本的情況,數據讀取時有時無。如果讀取全由領導者副本來處理,那麼Kafka就很實現單調讀一致性。
同步副本(ISR)與非同步副本(OSR)
由於追隨者副本需要異步去拉取領導者副本,那麼我們就需要確定再怎麼樣才算與領導者副本同步。
Kafka引入了In-Sync Replicas,也就是ISR(同步)副本集合,該副本在Zookeeper上維護。如果存在於ISR中則意味着與領導者副本同步,相反則為非同步副本(OSR)
同步副本的標準

(1)replica.lag.time.max.ms參數值標識Follower副本能夠慢於Leader副本的最長時間間隔,默認值為10秒。
(2)若Follower副本落後於Leader副本的最長連續時間間隔不超過該replica.lag.time.max.ms參數值設定的大小,則認定該Follower副本與Leader副本是同步的,否則認定為非同步,會將副本從ISR副本集合中移出(Follower副本的拉取速度慢於Leader副本寫入消息的速度,且時間間隔超過設定閾值)。
(3)ISR是動態調整集合,非靜態不變的。當Follower副本追上進度時,就會重新被添加會ISR集合。
HW
高水位(HW是High Watermark的縮寫),表示一個特定消息的偏移量,消費者只能拉取到這個offset之前的數據。
LEO
LEO是Log End Offset的縮寫,表示當前日誌文件下一條待寫入消息的offset

Leader選舉
少部分副本宕機
(1)當Leader副本對應的broker宕機後,就會從Follower副本中選擇一個副本作為Leader;
(2)當宕機的broker恢復後就會重新從leader中pull數據。
全部副本宕機
unclean.leader.election.enable 控制是否允許 Unclean 領導者選舉。
(1)不開啟Unclean。等待ISR中的一個恢復,並選擇其當leader;(等待時間較長,可用性降低)
(2)開啟Unclean。選擇第一個恢復的副本作為新的leader,無論是否是ISR副本。(開啟會造成數據丟失)
(3)正常情況下建議不開啟,雖然犧牲了高可用性,但維護了數據一致性,避免消息丟失。
為什麼不少數服從多數?
選擇Leader副本時如果需要超過半數的同步副本同意,算法所需的冗餘同步副本較多。(一台機器失敗,就需要3個同步副本)
物理存儲
存儲概述
基本概念
(1)Kafka使用日誌文件保存生產者發送的消息;
(2)每條消息都有一個offset值表示它在分區中的偏移量;
(3)offset值是邏輯值並不是真實存在的物理地址。其類似於數據庫中的主鍵,唯一標識了數據庫表中的一條數據。而offset在Kafka中的某個分區唯一標識一條消息。
(4)Log與分區一一對應,Log並不是一個文件而是一個文件夾;
(5)文件夾以topicName_pratiitonID命名,分區消息全部都存儲在次文件夾下的日誌文件中;
(6)Kafka通過分段的方式將Log分為多個LogSegment,LogSegment是邏輯概念,對應磁盤上的Log目錄下的一個日誌文件和索引文件;
(7)日誌文件的命名規則是[baseOffset].log,baseOffset是日誌文件中第一條消息的offset;
(8)Kafka日誌是順序追加的;
(9)每個日誌文件都對應一個索引文件,索引文件使用稀疏索引的方式為文件日誌中部分消息建立索引。
(10)日誌文件結構圖

(11)Log示例

創建了一個tp_demo_01的主題,其中存在6個partition,對應的每個partition下存在一個Topic-partition命名的消息日誌。
(12)LogSegment示例

文件類別

日誌存儲
索引
為了提升消息查找的速度,Kafka從0.8版本開始,為每個日誌文件添加對應的索引文件。IndexFile與MassageSet File共同組成了LogSegment。
偏移量索引文件用於記錄消息偏移量與物理地址之間的映射關係。時間戳索引文件則根據時間戳查找對應的偏移量。
索引文件中的索引項的格式:每個索引項為8位元組,分為兩部分,第一部分是相對offset(4位元組),即相對於baseOffset的偏移量(baseOffset就是基準偏移量,日誌文件的命名以基準偏移量來命名)。第二部分是物理地址(4位元組),即其索引消息在日誌文件中對應的position位置。通過這兩部分就能實現offset與物理地址之間的映射。
消息壓縮

偏移量索引

舉例
假設需要查找startOffset為1067。需要將offset=1067轉換成對應的物理地址,其過程是怎樣的?
(1)將絕對offset轉化為相對offset,絕對offset減去baseOffset,得到相對offset=67;
(2)通過相對offset查找索引文件,得到(58,1632)索引項(通過跳錶的方式定位到某一個index文件,再通過二分法找到不大於相對offset的最大索引項);
(3)從position為1632處開始順序查找,找到絕對offset=1067的消息;
(4)最終會得到offset為1070的位置消息。(因為消息被壓縮,offset=1067這條消息被壓縮後一起構成offset=1070這條外層消息)。
六、參考
《Apache Kafka源碼剖析》
《極客時間-Kafka核心技術與實戰》
《拉鉤Java-Kafka》
七、總結
本文從場景、特性、基本概念、核心特性等多個維度較為詳細地闡述了Kafka的相關知識。關於kafka穩定性與具體源碼實現會在進階篇中闡述。
懂得不多,做得太少。歡迎批評與指正!
原創文章,轉載請標註。//www.cnblogs.com/boycelee/p/14728638.html