kafka3.x原理詳解看這篇就夠了
一、概述
(一)、kafka的定義
1、定義
1)kafka傳統的定義:kafka是一個分散式的基於發布/訂閱模式的消息隊列,主要用於大數據實時處理領域
2)kafka最新的定義:kafka是一個開源的分散式事件流平台(event stream platform),主要用高性能數據管道,流分析,數據集成和關鍵任務等領域
2、消息隊列
目前市面上大部分公司採用的消息隊列主要有kafka,activeMQ,rabbitMQ,rocketMQ等。kafka作為消息隊列,主要應用於大數據場景下,而在Javaee開發中更多採用的是activeMQ,rabbitMQ,rockectMQ等。
3、消息隊列的應用場景
傳統的消息隊列的主要應用場景包括:緩衝/削峰,解耦和非同步通訊
緩衝/削峰:在實際的應用系統中,如果數據生產端(比如其前端)的數據產生的速率與數據處理端(服務端)的數據處理速率相當或小於時,整合系統運行就不會有很大的壓力。但是當系統上了個秒殺活動或者雙11活動到來,前端用戶猛增,數據率隨之也會增加數倍,甚至是數十倍。但是服務端需要對數據進行處理,持久化等操作,處理速率必然跟不上數據產生的速度,久而久之系統就會產生數據積壓,最終就有可能導致系統的崩潰。在數據生產端和處理端之間使用消息隊列就可以解決這種問題。此時,消息隊列就發揮了不同系統之間數據的緩衝和削峰的作用。數據生產端將數據發送到消息隊列,然後隨即返迴響應,這個過程相對來說是非常快的。數據處理端則根據自己的處理速度從消息隊列中拉取數據。示意圖如下:
不使用消息隊列的情況
使用消息隊列的情況
解耦:允許獨立的拓展和修改兩邊的處理過程,但兩邊需要確保使用相同的介面約束。
非同步通訊:將處理的用戶數據寫入到消息隊列中,並立即返回處理結果,隊列中數據由另一個執行緒拉取出來做響應的處理。下面是用戶註冊,並把註冊成功的消息發送到用戶手機上的同步處理和非同步處理的流程。
(二)、kafka基礎架構
1、消息隊列的兩種模式
1)點對點模式
-
-
-
- 消費者主動拉取數據,數據消費完後就會在隊列中刪除
-
-
2)發布/訂閱模式
-
-
-
- 可以有有多個主題(topic)
- 消費者拉取數據消費完後,不刪除數據
- 每個消費者相互獨立,都可以消費到數據
-
-
2、基礎架構
1)producer:消息生產者,就是向broker發送消息的客戶端
2)consumer:消息消費者,就是從broker拉取數據的客戶端
3)consumer group:消費者組,由多個消費者consumer組成。消費者組內每個消費者負責消費不同的分區,一個分區只能由同一個消費者組內的一個消費者消費;消費者組之間相互獨立,互不影響。所有的消費者都屬於某個消費者組,即消費者組是一個邏輯上的訂閱者。
4)broker:一台伺服器就是一個broker,一個集群由多個broker組成,一個broker可以有多個topic。
5)topic:可以理解為一個隊列,所有的生產者和消費者都是面向topic的。
6)partition:分區,kafka中的topic為了提高拓展性和實現高可用而將它分布到不同的broker中,一個topic可以分為多個partition,每個partition都是有序的,即消息發送到隊列的順序跟消費時拉取到的順序是一致的。
7)replication:副本。一個topic對應的分區partition可以有多個副本,多個副本中只有一個為leader,其餘的為follower。為了保證數據的高可用性,leader和follower會盡量均勻的分布在各個broker中,避免了leader所在的伺服器宕機而導致topic不可用的問題。
8)leader:多個副本的主副本,生產者發送的數據和消費者消費的數據都是通過leader進行處理的。
9)follower:多個副本中除了leader副本,其餘的均為follower副本,也即從副本。從副本不會和生產者和消費者提供服務,而是實時同步主副本的數據。當主副本宕機後,通過一定演算法選舉出新的從副本成為主副本,繼續為生產者和消費者提供服務。
(三)、kafka常用命令行操作
1、主題相關
參數 | 描述 |
–bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機名稱和埠號。 |
–topic <String: topic> | 操作的 topic 名稱。 |
–create | 創建主題。 |
–delete | 刪除主題。 |
–alter | 修改主題。 |
–list | 查看所有主題。 |
–describe | 查看主題詳細描述。 |
–partitions <Integer: # of partitions> | 設置分區數。 |
–replication-factor<Integer: replication factor> | 設置分區副本。 |
–config <String: name=value> | 更新系統默認的配置。 |
2、生產者相關
參數 | 描述 |
–bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機名稱和埠號。 |
–topic <String: topic> | 操作的 topic 名稱。 |
3、消費者相關
參數 | 描述 |
–bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機名稱和埠號。 |
–topic <String: topic> | 操作的 topic 名稱。 |
–from-beginning | 從頭開始消費。 |
–group <String: consumer group id> | 指定消費者組名稱。 |
二、生產者
(一)、重要參數列表
參數名稱 |
描述 |
bootstrap.servers |
生產者連接集群所需的 broker 地 址 清 單 。可以設置 1 個或者多個,中間用逗號隔開。注意這裡並非需要所有的 broker 地址,因為生產者從給定的 broker里查找到其他 broker 資訊。 |
key.serializer 和 value.serializer |
指定發送消息的 key 和 value 的序列化類型。一定要寫全類名。 |
buffer.memory |
RecordAccumulator 緩衝區總大小,默認 32m。 |
batch.size |
緩衝區一批數據最大值,默認 16k。適當增加該值,可以提高吞吐量,但是如果該值設置太大,會導致數據傳輸延遲增加。 |
linger.ms |
如果數據遲遲未達到 batch.size,sender 等待 linger.time之後就會發送數據。單位 ms,默認值是 0ms,表示沒有延遲。生產環境建議該值大小為 5-100ms 之間。 |
acks |
|
max.in.flight.requests.per.connection |
允許最多沒有返回 ack 的次數,默認為 5,開啟冪等性要保證該值是 1-5 的數字。 |
retries |
當消息發送出現錯誤的時候,系統會重發消息。retries表示重試次數。默認是 int 最大值,2147483647。如果設置了重試,還想保證消息的有序性,需要設置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時候,其他的消息可能發送成功了。 |
retry.backoff.ms |
兩次重試之間的時間間隔,默認是 100ms。 |
enable.idempotence |
是否開啟冪等性,默認 true,開啟冪等性。 |
compression.type |
生產者發送的所有數據的壓縮方式。默認是 none,也就是不壓縮。支援壓縮類型:none、gzip、snappy、lz4 和 zstd。 |
(二)、發送流程以及發送API
1、發送流程
(三)、分區
1、分區的好處
-
-
- 便於合理使用存儲資源。每一個partition存儲在不同的broker上面,可以把海量的數據切割成更小數據存儲在不同的broker上。合理是分配分區的任務,可以實現負載均衡效果,避免單機數據量太大而導致的壓力。
- 提高並行度。生產者可以以分區為單位發送數據;消費者可以以分區為單位消費數據。
-
2、分區策略
1)、默認分區策略:DefaultPartitioner
-
-
- 若生產者發送數據時指明partition,那麼就會將該數據發送指定的分區,比如partition=0,則數據發送到第零個分區上
- 若沒有指明partition,但是有key的情況下,用key的hashcode值與該topic對應的partition的數量進行取余得到partition,然後將數據發送到該分區上。比如:key的hashcode值為5,topic對應的分區數為3,那麼 5 % 3 = 2,所以該數據應該發送到第二個分區。
- 既沒有指定partition和key,kafka採用粘性分區器,會隨機選擇一個分區,然後儘可能使用該分區,指導該分區batch-size滿了或已完成,然後再隨機尋找另一個分區(與當前的分區不一樣)。比如:這次隨機到分區0,等當前批次滿了或者linger.ms時間已到,kafka再隨機一個分區,如果仍然隨機到分區0則繼續隨機。
-
2)自定義分區器
在實際的企業應用中,可能會有不用的場景,默認的分區器無法滿足需求,那麼就需要自定也分區器來滿足需求,比如某個分區的伺服器性能比較好,另一個是比較久的伺服器,性能相對差一點,那麼就需要通過自定義分區器讓更多的數據向性能更好的分區傾斜。
自定義分區器的實現:第一,定義類實現 Partitioner 介面。第二,重寫partition()方法。第三,在生產者配置中指定自定義的分區器。
(四)、提高生產者的吞吐量
要想更好的提高生產者的吞吐量,則必須先了解生產者發送數據的流程,具體流程可以看靠(一)的介紹。以下是提高生產者的吞吐量的建議。
-
- 修改batch.size的大小。batch.size默認大小為16k,提高批次大小可以一定程度提高吞吐量。這就好比用小卡車拉貨物和用大卡車拉貨物,小卡車單位時間內不能一次拉完貨物,那麼來回就需要消耗額外時間,而大卡車一次性把貨物拉走,那麼就節省了來回的時間。
- 修改linger.ms的時間,默認為0,即直接發送數據,一般設置為5-100ms,通過設置延遲時間一次性可以發送更多的數據。
-
修改compression.type的數據壓縮類型,默認snappy。根據不同的業務場景選擇不同的數據壓縮方法,提高數據壓縮率。kafka提供的壓縮類型有:gzip,snappy,lz4,zstd。
-
修改RecordAccumulator緩衝區的大小。默認32m,增加緩衝區大小可以那麼每個batch.size的值就更大,每次發送的數據更多。
(五)、數據可靠性
數據的可靠性是指producer發送數據到kafka收到應答後,該數據都能成功落盤,那麼這次發送是可靠的。但是,為了適應不用的應用場景以及實現高可用,kafka會將數據備份到不同的副本當中,在數據同步的過程中如果出現的故障,那麼就有可能出現數據丟失,重複情況。想要弄清楚kafka數據可靠性,就必須先要了解kafka中ACK的應答原理。
1、ACK應答原理
ACK應答是指在leader分區接收到生產者的數據後,何時對生產者做出應答的策略。ACK可選的值有0,1,-1三個,可以在生產者的配置項 acks 中設置,ACK設置不同,對生產者做出應答的時機也不同。
ACK=0:可靠性級別最低。leader收到生產者數據後不需要等數據落盤,立即對生產者做出應答。生產者收到應答後認為leader已成功接收數據,因此不需要再發當前數據了。但是,如果leader在將記憶體中的數據落盤時突然出現故障,那麼這條數據因為沒有保存到磁碟中而導致數據的丟失。
ACK=1:可靠性級別較高。leader收到生產者的數據並將數據落盤後,對生產者做出應答。生產者收到應答後繼續發送其他數據。如果leader做出應答並且follower未同步到該數據時,leader出現故障。kafka會重新在follower中選出新的leader,而新的leader心有同步到數據,生產者也不會再發該數據,因此導致該數據的丟失。
ACK=-1(all):可靠性級別最高,kafka的acks默認值。leader收到數據並落盤,並且確認所有follower收到數據後再給生產者應答。此時,所有分區副本都有該數據了,即使任意分區出現故障數據仍然是完整的。
(六)、數據去重
1、數據重複原因
在使用kafka時為了保證數據的高可靠性,我們一般都會將應答級別設置為-1(all),即leader的ISR列表的follower均收到數據後再應答。非常不幸的是,leader在收到所有的follower的確認後發生故障,所有的分區均已保存到磁碟中,但是生產者沒有收到應答,認為leader沒有收到生產者發送請求,於是嘗試重新發送請求。由於leader發生故障,kafka重新選舉leader,生產者將數據再一次發送到新的leader上,所以造成的數據重複。
2、數據去重
kafka 0.11之後引入了冪等性和事務兩大特性。利用這兩個特性可解決數據重複的問題。
1)數據傳遞語義
-
-
-
-
- 至少一次(At Least Once):ACK級別為-1 + 分區副本大於等於2 + ISR中應答的最小副本數量大於等於2;
- 至多一次(At Most Once):ACK應答級別為 0;
-
-
-
At Least Once可以保證數據不丟失,但不能保證數據重複。
At Most Once 可以保證數據不重複,但不能保證數據不丟失。
2)冪等性的原理
冪等性:是指無論producer發送多少條重複的數據,broker端都只會持久化一條數據,保證了數據不重複。
數據重複的判斷依據:具有<PID, Partition, SeqNumber>相同主鍵的消息提交時,Broker只會持久化一條。其中PID是Kafka每次重啟都會分配一個新的;Partition 表示分區號;Sequence Number是單調自增的。
從數據重複判斷依據來看,冪等性只能保證單分區會話內數據不重複。
開啟冪等性:enable.idempotence = true,默認為true。
3)事務
kafka的事務需要跟冪等性配合起來使用。開啟事務就必須開始冪等性。
(七)、數據亂序和有序
1、數據亂序
kafka的producer客戶端在向broker發送數據時並不是直接將數據發送出去,而是將數據先快取到本地的雙端快取隊列中,sender執行緒會不斷地檢測快取隊列中地數據,若隊列中地數據達到了設置的(batch.size)值地容量或者達到一定的時間(linger.ms),sender就會創建一個InFlightRequests執行緒,該執行緒負責將數據發送到broker上。kafka 默認的InFlightRequests是5個,InFlightRequests執行緒發送數據後不需要broker的應答就可以發下一個數據,因此最多可以一起發5個請求。比如,需要發送0,1,2,3,4這五個數據依次被發送出去,但是2這個數據沒有接收成功,客戶端則重新2的數據,此時broker接收到的數據的順序就變為了01342,而非01234。
2、數據有序
1)kafka在1.x之前保證數據單分區有序,需要將InFlightRequests執行緒數設置為1個(max.in.flight.requests.per.connection=1)。當執行緒數為1時就能保證broker收到數據確認後再發下一條數據。
2)kafka在1.x以後在未開啟冪等性的情況下,處理流程跟1.x以前的版本一樣。
3)1.x以後在開啟冪等性的情況下,可以將max.in.flight.requests.per.connection設置為小於等於5。原因是,Kafka服務端會快取最近發過來的元數據,等快取滿了5個後就會對這些元數據進行排序,這樣就可以保證數據有序了。
三、broker
(一)、工作流程
1、重要參數
參數名稱 | 描述 |
replica.lag.time.max.ms | ISR 中,如果 Follower 長時間未向 Leader 發送通訊請求或同步數據,則該 Follower 將被踢出 ISR。該時間閾值,默認 30s。 |
auto.leader.rebalance.enable | 默認是 true。 自動 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默認是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默認值 300 秒。檢查 leader 負載是否平衡的間隔時間。 |
log.segment.bytes | Kafka 中 log 日誌是分成一塊塊存儲的,此配置是指 log 日誌劃分 成塊的大小,默認值 1G。 |
log.index.interval.bytes | 默認 4kb,kafka 裡面每當寫入了 4kb 大小的日誌(.log),然後就往 index 文件裡面記錄一個索引。 |
2、zookeeper存儲的kafka資訊
3、工作的總流程
① broker啟動,向zookeeper註冊broker節點資訊
② 向zookeeper註冊controller,若註冊成功則該controller負責選舉leader,監聽broker節點等工作
③ 註冊成功的controller開始監聽brokers節點變化
④ 註冊成功的controller舉行leader選舉。按照AR列表優先選取排在前面的節點,若該節點在ISR中存活則成為leader,否則繼續輪詢。例如:AR[0,1,2],ISR[1,2],controller會從broker 0 開始輪詢,發現ISR中並沒有0,那麼繼續輪詢到1,發現ISR有1則1成功leader。
⑤ 註冊成功的controller將選舉結果寫入到zookeeper中,同時其他controller也監聽該資訊,等leader故障後方便立即上位成為新的leader。
⑥ 加入leader掛掉了,controller監聽到節點發送變化,開始拉取ISR資訊,然後開始重新選擇leader。
(二)、副本
1、基本資訊
-
-
- 作用:提高數據可用性
- Kafka默認的副本數為1,生產環境可設置為2,保證數據可靠性。不建議設置太多的副本,一者增加磁碟存儲空間,二者增加副本同步時的網路開銷,影響效率。
- 副本分為 leader 和follower。leader只有一個,其餘的為follower。生產者的數據發給的是leader,消費者消費的消息也是來自leader的。follower同步leader的消息。
- AR:分區中所有副本的總和。
- ISR:分區中能夠和leader保持同步的副本,包括leader本身。如果follower長時間未向leader發送通訊資訊或同步請求,那麼該follower就會從ISR中移除。leader掛了後ISR中的follower重新選舉leader。
- OSR:分區中跟leader同步資訊差距較大的副本或者是掛了的副本。
- AR = ISR + OSR
-
2、leader選舉流程
Kafka集群中有一個會被選舉為controller leader,選舉方式搶佔式,誰先搶佔到zookeeper的節點,誰就能成為leader。該controller負責Kafka集群broker的上下線,topic分區副本的分配和leader選舉等工作。
3、leader和follower故障處理
LEO(log end offset):每個副本的最後一個offset,即最新offset + 1。
HW(high watermark):所有副本中最小的LEO。
1)leader發生故障
leader發生故障後,kafka會從ISR中重新選舉出新的leader
為保證多副本之前的數據一致性,其餘的follower會先將自己高於HW的數據截掉,然後同步新leader的數據。
2) follower發生故障
follower發生故障後會被臨時踢出ISR。
此時leader和follower繼續接收數據。
等follower恢復後讀取保存在磁碟的HW,並將log文件中高於HW部分的數據截取丟棄,從HW位置開始同步leader的數據。
follower同步到LEO大於等於分區的HW時,kafka就將該副本重新加入到ISR中。
4、副本分配
1)盡量將所有副本平均地分配到所有地broker上。
2)每個broker分配到地leader儘可能一樣多。
3)leader和follower儘可能地分配到不同地broker上。
5、leader分區自平衡
正常情況下,Kafka本身會儘可能地將leader分區均勻地分布到各個機器上,這樣使得每個機器的讀寫吞吐量比較均勻。但是,如果當leader分區故障後,另一個follower就會迅速上位並且承擔之前的leader的工作,讀寫請求的壓力就會上升,造成集群負載不平衡。kafka允許的每個broker不平衡的比率默認為10%,如果超過了這個值就會觸發leader分區平衡。
相關參數:
參數名稱 | 描述 |
auto.leader.rebalance.enable | 默認是 true。 自動 Leader Partition 平衡。生產環境中,leader 重選舉的代價比較大,可能會帶來性能影響,建議設置為 false 關閉。 |
leader.imbalance.per.broker.percentage | 默認是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默認值 300 秒。檢查 leader 負載是否平衡的間隔時間。 |
(三)、數據存儲
1、存儲機制
Kafka中topic是邏輯概念,但是分區partition是物理概念,生產者發送的數據都是存儲在partition中的,每個partition中都有一個log文件,數據實際都是存儲在log文件中,生產者生產的數據都會最佳到該log文件的末端。為防止log文件過大導致數據檢索過慢,kafka將log文件進行切片,每個片成為segment,每個segment中包括「.index」文件,「.log」文件和「.timestamp」等文件。
log文件:保存實際數據的文件。
index文件:偏移量索引文件,文件為segment第一個數據offset值。index文件是一個稀疏索引,每往log文件中寫入4kb的數據才向index文件中寫入一條索引。
timestamp文件:時間戳索引文件,文件名為segment第一條數據的offset值。
2、數據文件清楚策略
Kafka提供的文件刪除策略有delete和compact兩種。
delete策略:將過期數據刪除
log.cleanup.plocy = delete 所有數據啟用刪除策略
-
-
-
- 基於時間刪除,默認打開。以segment中所有記錄中最後記錄的時間戳作為該文件的時間戳。
- 基於大小刪除,默認關閉。超過設置的所有日誌總大小,刪除最早的segment。
-
-
compact日誌壓縮:對於相同key的不同value值,只保留最後一個value值。壓縮後的offset可能是不連續的。這種策略只適合某些特殊的場景,比如key保存的是用戶id,value保存的用戶資訊,通過壓縮策略就能將舊的用戶數據刪除,只保留新的用戶資訊。
(四)、高效讀寫數據
1、Kafka是基於分散式集群,採用分區技術,數據讀寫並行度高。
2、讀數據採用稀疏索引,消費者能夠快速定位消費的數據。
3、順序寫磁碟,producer生產數據只是在log文件的末端追加數據,因此寫磁碟的速度很快。
4、Kafka以來系統底層的pagecache技術和零拷貝技術,能夠提高磁碟數據讀寫速度。
四、消費者
(一)、重要參數
參數名稱 | 描述 |
bootstrap.servers | 向 Kafka 集群建立初始連接用到的 host/port 列表。 |
key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化類型。一定要寫全 類名。 |
group.id | 標記消費者所屬的消費者組。 |
enable.auto.commit | 默認值為 true,消費者會自動周期性地向伺服器提交偏移量。 |
auto.commit.interval.ms | 如果設置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。 |
auto.offset.reset |
當 Kafka 中沒有初始偏移量或當前偏移量在伺服器中不存在如,數據被刪除了),該如何處理? earliest:自動重置偏移量到最早的偏移量。 latest:默認,自動重置偏移量為最新的偏移量。 none:如果消費組原來的(previous)偏移量存在,則向消費者拋異常。 anything:向消費者拋異常。 |
offsets.topic.num.partitions | __consumer_offsets 的分區數,默認是 50 個分區。 |
heartbeat.interval.ms | Kafka 消費者和 coordinator 之間的心跳時間,默認 3s。該條目的值必須小於 session.timeout.ms ,也不應該高於session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消費者和 coordinator 之間連接超時時間,默認 45s。超過該值,該消費者被移除,消費者組執行再平衡。 |
(二)、消費方式
1、pull(拉)模式
consumer主動拉取服務端的數據。Kafka採用這種方式。
優點:consumer根據自己的處理能力拉取服務端的數據。
缺點:當服務端沒有數據時,consumer仍然不斷從服務端拉取數據,會消耗consumer一定的資源。
2、push(推)模式
服務端主動推消息給consumer。
優點:服務端只要有數據是才會推數據給consumer。
缺點:服務端發送消息的速率很難適應所有consumer處理消息的速率。
(三)、消費者消費流程
1、總體消費流程
1)消費者通過offset拉取broker中指定位置的消息。offset則保存在系統主題中,系統主題保存在磁碟中,所以即使服務端出現故障或重啟等能夠按上次消費的位置開始消費。
2)一個消費者可以消費多個分區的數據。
3)每個分區的數據只能由消費者組的一個消費者消費。
2、消費者組
1)消費者在消費消息時必須指定消費者組id(group.id),具有相同的group.id組成一個消費者組。
2)消費者組的消費者消費不同分區的數據。一個分區只能由一個消費者消費。
3)消費者組之間相互獨立,互不影響。
4)消費者組的消費者數量應小於等於分區數量,若消費者數量大於分區數,多餘的消費者則不會消費消息。
5)消費者消費消息後發送offset給服務端。
3、消費者初始化流程
(四)、分區的分配策略以及再平衡
1、Range分區策略
首先對同一個topic的分區對分區序號進行排序,然後消費者按照字母排序,利用分區數除以消費者數量得出每個消費者平均消費分區數,若剩餘多的則按順序消費。
注意:當topic數量比較多時,使用Range分區策略會導致排在前面的消費總是分配更多的消費者,造成數據傾斜。
再平衡策略:當某個消費者掛了,Kafka會將該消費者的任務全部交給正常消費的消費者。
2、RoundRobin分區策略
先進行排序,然後消費依次輪詢消費分區數據。
再平衡:觸發再平衡後,將故障的消費者的任務輪詢交給正常消費的消費者。
3、Sticky分區策略
粘性分區是 Kafka 從 0.11.x 版本開始引入這種分配策略,首先會盡量均衡的放置分區到消費者上面,在出現同一消費者組內消費者出現問題的時候,會盡量保持原有分配的分區不變化。
再平衡:儘可能均衡的隨機分成 0 和 1 號分區數據,分別由 1 號消費者或者 2 號消費者消費。
(五)、offset位置
1、offset的維護
Kafka0.9版本之前,consumer默認將offset保存在Zookeeper中。
從0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets。
2、offset提交
分為自動提交和手動提交,默認為自動提交
1)自動提交
enable.auto.commit:設置為true
auto.commit.interval.ms:自動提交offset的間隔時間,默認為5s
2)手動提交
enable.auto.commit :設置為false。用戶可以根據具體場景進行提交,更加靈活。手動提交可分為同步提交和非同步提交兩張方式。
3、指定offset消費
當消費者第一次消費或者offset資訊丟失,這是消費者該如何進行消費呢?Kafka提供了三種方式:earliest,latest和none
earliest:自動將offset設置為最開始的位置。
latest:自動將offset設置為最新的offset位置。
none:若沒有找到之前的offset資訊,則會拋出異常。
4、指定時間消費
消費者根據指定時間(比如1天前)獲取數據,通過broker的時間戳索引文件查到該時間對應的offset,然後用該offset獲取對應的數據。
(六)、漏消費和重複消費
1、漏消費
先提交offset後處理數據,可能會造成漏消費消息。一般用戶設置為手動非同步提交offset會引起這種問題。
場景:消費者offset提交方式設置為手動非同步提交,消費者把offset提交出去,但是數據還未落盤,此時消費者掛掉,分區已經將最新的offset保存起來了。消費者重啟後獲取的offset則是最新提交的offset了,造成數據遺漏處理。
2、重複消費
已經消費了消息,但是沒提交offset。一般消費者設置為自動提交會有這種問題。
場景:消費者設置offset自動提交時間為5s,在第2s時消費者掛掉,在掛掉前已經處理部分數據,但offset沒有提交到broker。消費者重啟後拿到的offset則為舊的offset,之前處理過的數據又被拉取一遍,造成重複消費數據。
(七)、消費者事務
若要保證消費者消費數據時不漏消費,不重複消費,則需要將消費者端在消費消息和提交offset的操作看作是依次原子操作。
(八)、數據積壓
所謂數據積壓是指broker中保存大量未消費的消息。若長時間未消費且觸發了刪除策略,那麼這部分數據就會丟失。可能造成數據積壓的原因有消費者的處理能力不足,分區數較少,可適當增加分區數量,使分數量等於消費者數量。也可以把消費者每次拉取數據的大小往上調。
五、總結
Kafka憑藉自身優秀的性能和海量數據的處理能力以及數據的可靠性保證,組件在大數據領域佔據主流地位。