Kafka及周邊深度了解

  • 2019 年 12 月 23 日
  • 筆記

本文屬於原創,轉載註明出處

0 前言

文章有點長,但是寫的都挺直白的,慢慢看下來還是比較容易看懂,從Kafka的大體簡介到Kafka的周邊產品比較,再到Kafka與Zookeeper的關係,進一步理解Kafka的特性,包括Kafka的分區和副本以及消費組的特點及應用場景簡介。

1 簡介

Apache Kafka 是一個分散式流處理平台,注意是平台:

  • 發布 & 訂閱,類似消息系統,並發能力強,通過集群可以實現數據匯流排作用,輕輕鬆鬆實現流式記錄數據分散式讀寫
  • 以高容錯的方式存儲海量流式數據
  • 可以在流式記錄數據產生時就進行處理

從上面的一個Kafka小型應用架構圖可以了解Kafka周邊及它的實際能扮演的角色,圖中Kafka集群連接了六個數據輸入輸出部分,分別是Kafka ProducerKafka Connect SourceKafka Streams/KSQLKafka ConsumerKafka Connect Sink。而這些數據的輸入輸出都可以通過Kafka提供的四個核心API組去解決(除Kafka AdminClient API外):

  • Kafka Producer API 允許一個應用程式發布一串流式的數據到一個或者多個Kafka主題(Topic)
  • Kafka Consumer API 允許一個應用程式訂閱一個或多個主題(Topic) ,並且對接收到的流式數據進行處理
  • Kafka Streams API 允許一個應用程式作為一個串流處理器,消費一個或者多個主題(Topic)產生的輸入流,然後生產一個輸出流到一個或多個主題(Topic)中去,在輸入輸出流中進行有效的轉換
  • Kafka Connector API 允許構建並運行可重用的生產者或者消費者,將Kafka Topics連接到已存在的應用程式或者資料庫系統。比如,連接到一個關係型資料庫,捕捉表(table)的所有變更內容。

我們對Kafka的發布 & 訂閱功能的作用比較清楚,而圖中的KSQL和Kafka Streams是怎麼個回事呢?

首先我們需要清楚什麼是流處理?流處理可以認為是消息的實時處理,比如在一個時間段內,源源不斷地有數據資訊進來,而每時每刻都能夠對這些數據有一個最後的結果處理,那麼這就是流處理,而如果是每隔一個小時或者更久處理一次,那叫大數據分析或者批處理。它的特點更多是實時性的分析,在流式計算模型中,輸入是持續的,可以認為在時間上是無界的,也就意味著,永遠拿不到全量數據去做計算,同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。類似的比較有:HadoopStorm以及Spark StreamingFlink是常用的分散式計算組件,其中Hadoop是對非實時數據做批量處理的組件;StormSpark StreamingFlink是針對實時數據做流式處理的組件,而Kafka Streams是後起之秀。

關於KSQL呢?

  • KSQL 是 Apache Kafka 的數據流 SQL 引擎,它使用 SQL 語句替代編寫大量程式碼去實現流處理任務,而Kafka Streams是Kafka中專門處理流數據的
  • KSQL 基於 Kafka 的 Stream API 構建,它支援過濾(filter)、轉換(map)、聚合(aggregations)、連接(join)、加窗操作和 Sessionization(即捕獲單一會話期間的所有的流事件)等流處理操作,簡化了直接使用Stream API編寫 Java 或者 Scala 程式碼,只需使用簡單的 SQL 語句就可以開始處理流處理
  • KSQL 語句操作實現上都是分散式的、容錯的、彈性的、可擴展的和實時的
  • KSQL 的用例涉及實現實時報表和儀錶盤、基礎設施和物聯網設備監控、異常檢測和欺騙行為報警等

2 相關概念簡介

  • Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker
  • Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic
  • Partition:Parition是物理上的概念,每個Topic包含一個或多個Partition
  • Replication:副本,一個partition可以設置一個或者多個副本,副本主要保證系統能夠持續不丟失地對外提供服務。在創建topic的時候可以設置partition的replication數
  • Segment:段文件,kafka中最小數據存儲單位,kafka可以存儲多個topic,各個topic之間隔離沒有影響,一個topic包含一個或者多個partition,每個partition在物理結構上是一個文件夾,文件夾名稱以topic名稱加partition索引的方式命名,一個partition包含多個segment,每個segment以message在partition中的起始偏移量命名以log結尾的文件,producer向topic中發布消息會被順序寫入對應的segment文件中。kafka為了提高寫入和查詢速度,在partition文件夾下每一個segment log文件都有一個同名的索引文件,索引文件以index結尾。
  • Offset:消息在分區中偏移量,用來在分區中唯一地標識這個消息。
  • Producer:消息生產者,負責發布消息到Kafka broker
  • Consumer:消息消費者,向Kafka broker讀取消息的客戶端
  • Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)

3 Kafka與ActiveMQ、ZeroMQ、RabbitMQ、RocketMQ、Redis

這個主要是針對消息中間件的選型評估,這裡我們講述一些概念。其他更詳細的這裡有篇文章講了: https://juejin.im/post/5b32044ef265da59654c3027 。

3.1 消息隊列、點對點和PUB/SUB

在開始之前,我們也需要稍微了解下JMS(Java Messaging System),是一個Java平台中關於面向消息中間件(MOM)的API。JMS支援兩種消息模式,一個就是P2P模式,一個就是發布訂閱模式。後面會說到哪些消息件支援JMS。

消息隊列有兩種消息模型:點對點(Point to Point,PTP)和發布/訂閱(PUB/SUB)模式。

消息隊列點對點,顧名思義,是一個隊列,資訊只能一對一,一個消息被一個消費者使用完了,那麼就不會存在隊列中了,就像郵差給別人投遞郵件,不可能這封信還有副本,而且還能保證這封信安全送到指定的人手裡(這是框架賦予的能力)。

PUB/SUB消息訂閱發布就不一樣了,它的特徵就是支援多對一,一對一,一對多,就像期刊報社一樣,出版的期刊或者報紙,需要可以傳遞到不同人手裡,而且還可以拿到以前日期的期刊或者報紙(這是框架賦予的能力)。

  • RabbitMQ是消息代理,持多種消息傳遞協議,如AMQP,MQTT3.1,XMPP, SMTP, STOMP,HTTP, WebSockets協議,由內在高並發的Erlanng語言開發,用在實時的對可靠性要求比較高的消息傳遞上。它既支援消息隊列點對點,也支援PUB/SUB。RabbitMQ對JMS所有特性並不完全支援(https://www.rabbitmq.com/jms-client.html#limitations)
  • Redis以記憶體資料庫而聞名。但是,也可以將其用作消息隊列點對點和PUB/SUB管理工具,不過因為記憶體緩衝區的效率,如果消費者失去了與隊列的連接,那麼很有可能在連接丟失時丟失消息。另外,在實現消息隊列點對點功能上,至少要創建3個隊列:主隊列、工作隊列、被拒絕隊列,實現有點複雜。
  • Apache RocketMQ作為阿里開源的一款高性能、高吞吐量的分散式消息中間件,PUB/SUB就是基本功能了,支援消息優先順序、消息有序保證、消息過濾,保證每個消息至少投遞一次。RocketMQ的集群消費功能大等於PTP模型。因為RocketMQ單個Consumer Group內的消費者類似於PTP,單個Consumer Group裡面的消費者均攤消息,等於實現點對點功能,接收者單位是Group。
  • Apache ActiveMQ支援點對點和PUB/SUB,支援多種跨語言客戶端和協議,具有易於使用的企業集成模式和許多高級功能,同時完全支援JMS 1.1和j2ee1.4
  • ZeroMQ是用C實現的,性能高、輕量級自然是它的特點。ZeroMQ 並非嚴格意義上的 at least once 或者 at most once,以其 Pub/Sub 模式來說,ZeroMQ 構建了消息確認和重傳機制,卻未對消息進行持久化,那麼記憶體耗盡或者進程崩潰都會造成消息丟失,而重傳則可能會造成消息被發送 1 到 n 次。當然,在企業級WEB服務中,尤其是微服務中我們對ZeroMQ的選擇是偏少的。
  • Kafka更多的是作為發布/訂閱系統,結合Kafka Stream,也是一個流處理系統

3.2 關於持久化

  • ZeroMQ支援記憶體、磁碟,不支援資料庫持久化
  • Kafka支援記憶體、磁碟(主),支援資料庫持久化,支援大量數據堆積
  • RabbitMQ支援記憶體、磁碟,支援數據堆積,但是數據堆積影響生產效率
  • ActiveMQ支援記憶體、磁碟,支援資料庫持久化
  • RocketMQ的所有消息都是持久化的,先寫入系統 pagecache(頁高速緩衝存儲器),然後刷盤,可以保證記憶體與磁碟都有一份數據,訪問時,直接從記憶體讀取

3.3 關於吞吐量

  • RabbitMQ在吞吐量方面稍遜於Kafka,他們的出發點不一樣,RabbitMQ支援對消息的可靠的傳遞,支援事務,不支援批量的操作;基於存儲的可靠性的要求存儲可以採用記憶體或者硬碟。
  • Kafka具有高的吞吐量,內部採用消息的批量處理,zero-copy機制,數據的存儲和獲取是本地磁碟順序批量操作,具有O(1)的複雜度,消息處理的效率很高
  • ZeroMQ也具有很高的吞吐量
  • RocketMQ相比RabbitMQ的吞吐量要大,但是沒有Kafka的大
  • ActiveMQ相對RabbitMQ而言要弱

3.4 關於集群

  • Kafka:天然的'Leader-Slave'無狀態集群,每台伺服器既是Master也是Slave
  • ZeroMQ:去中心化,不支援集群
  • RabbitMQ:支援簡單集群
  • RocketMQ:支援集群,常用多對'Master-Slave' 模式
  • ActiveMQ:支援簡單集群模式,比如'主-備',對高級集群模式支援不好。

3.5 關於負載均衡

  • Kafka:支援負載均衡,結合內置Zookeeper,有效的實現Kafka集群的Load Balancer
  • ZeroMQ:去中心化,不支援負載均衡,本身只是一個多執行緒網路庫
  • RocketMQ:支援負載均衡
  • RabbitMQ:對負載均衡的支援不好
  • ActiveMQ:支援負載均衡,可以基於Zookeeper實現負載均衡

3.6 關於單機隊列數

單機隊列數越大,單機可以創建更多主題,因為每個主題都是由一批隊列組成,消費者的集群規模和隊列數成正比,隊列越多,消費類集群可以越大。

  • Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長。Kafka分區數無法過多的問題
  • RocketMQ單機支援最高5萬個隊列,負載不會發生明顯變化

4.1 流處理框架特點和處理方式

上面我們說過了流處理就是對數據集進行連續不斷的處理,聚合,分析的過程,它的延遲要求儘可能的低(毫秒級或秒級),從流處理的幾個重要方面來講述,分散式流處理框架需要具有如下特點:

  • 消息傳輸正確性保證,保證區分有:
    • 消息At Most Once,即消息可以丟失或者傳遞一次
    • 消息At Least Once,即消息至少一次,存在重複傳遞的情況
    • 消息Exactly Once,即消息不會丟失也不會重複傳遞
  • 高容錯性:在發生諸如節點故障、網路故障等故障時,框架應該能夠恢復,並且應該從它離開的地方重新開始處理。這是通過不時地檢查流到某個持久性存儲的狀態來實現的。
  • 狀態管理:絕大部分分散式系統都需要保持狀態處理的邏輯。流處理平台應該提供存儲,訪問和更新狀態資訊的能力
  • 高性能:這包括低延遲(記錄處理的時間)、高吞吐量(throughput,記錄處理/秒)和可伸縮性。延遲應儘可能短,吞吐量應儘可能多,不過這很難同時兼顧到這兩者,需要做一個平衡
  • 高級特性Event Time Processing(事件時間處理)、水印、支援窗口,如果流處理需求很複雜,則需要這些特性。例如,基於在源程式碼處生成記錄的時間來處理記錄(事件時間處理)
  • 成熟度:如果框架已經被大公司證明並在大規模上進行了測試,這就很好。更有可能在論壇或者其他地方獲得良好的社區支援和幫助

流處理的方式有兩種:

  • Native Streaming
    • 每一個傳入的記錄一到達就被處理,而不必等待其他記錄。有一些持續運行的進程(我們稱之為operators/tasks/bolts,命名取決於框架)會永遠運行,並且每個記錄都會經過這些進程來進行處理,示例:Storm、Flink、Kafka Streams。
  • Micro-batching
    • 快速批處理,這意味著每隔幾秒鐘傳入的記錄都會被批處理在一起,然後以幾秒的延遲在一個小批中處理,例如: Spark Streaming

這兩種方法都有一些優點和缺點。當每個記錄一到達就被處理時,處理結果就感覺很自然,允許框架實現儘可能最小的延遲。但這也意味著在不影響吞吐量的情況下很難實現容錯,因為對於每個記錄,我們需要在處理後跟蹤和檢查點。此外,狀態管理也很容易,因為有長時間運行的進程可以輕鬆地維護所需的狀態;而小批處理方式,則完全相反,容錯是附帶就有了,因為它本質上是一個批處理,吞吐量也很高,因為處理和檢查點將一次性完成記錄組。但它會以一定的延遲為代價,讓人感覺不像是自然的流處理。同時,高效的狀態管理也將是一個挑戰。

4.2 主流流處理框架比對

流處理框架

特點

缺點

Strom是流處理界的hadoop。它是最古老的開源流處理框架,也是最成熟、最可靠的流處理框架之一

非常低的延遲,真正的流處理,成熟和高吞吐量;非常適合不是很複雜流式處理場景;

消息至少一次保證機制;沒有高級功能,如事件時間處理、聚合、窗口、會話、水印;

Spark Streaming

支援Lambda架構,免費提供Spark;高吞吐量,適用於許多不需要子延遲的場景;簡單易用的高級api;社區支援好;此外,結構化流媒體更為抽象,在2.3.0版本中可以選擇在微批處理和連續流媒體模式之間切換;保證消息恰好傳遞一次;

不是真正的流媒體,不適合低延遲要求;參數太多,很難調參;在許多高級功能上落後於Flink;

Flink

支援Lambda架構;開源流媒體領域的創新領導者;第一個真正的流式處理框架,具有所有高級功能,如事件時間處理、水印等;低延遲,高吞吐量,可根據需要配置;自動調整,沒有太多參數需要調整;保證消息恰好傳遞一次;在像Uber、阿里巴巴這樣的規模大公司接受。

進入流處理界晚,還沒被廣泛接受;社區支援相對較少,不過在蓬勃發展;

Kafka Streams

非常輕量級的庫,適用於微服務和物聯網應用;不需要專用群集;繼承了卡夫卡所有的優良品質;支援流連接,內部使用rocksDb來維護狀態。保證消息恰好傳遞一次;

與卡夫卡緊密結合,否則無法使用;剛剛起步,還未有大公司選擇使用;不合適重量級的流處理;

總的來說,Flink作為專門流處理是一個很好的選擇,但是對於輕量級並且和Kafka一起使用時,Kafka Streaming是個不錯的選擇。

5 Zookeeper & Kafka?

Zookeeper在Kafka集群中主要用於協調管理,主要作用:

  • Kafka將元數據資訊保存在Zookeeper中
  • 通過Zookeeper的協調管理來實現整個kafka集群的動態擴展
  • 實現整個集群的負載均衡
  • Producer通過 Zookeeper 感知 partition 的Leader
  • 保存Consumer消費的狀態資訊。
  • 通過 ZK 管理集群配置,選舉 Kafka Leader,以及在 Consumer Group 發生變化時進行 Rebalance

Zookeeper是由java編寫的,所以需要先安裝JDK。

5.1 Zookeeper是必須要有的嗎?

是的,在Kafka中,儘管你只想使用一個代理、一個主題和一個分區,其中有一個生產者和多個消費者,不希望使用Zookeeper,浪費開銷,但是這情況也需要Zookeeper,協調分散式系統中的任務、狀態管理、配置等,而且使用單節點的場景顯然沒有利用到Kafka的優點。

另外,Apacke Kafka維護團隊開始討論去除Zookeeper了(2019年11月6日),目前,Kafka使用ZooKeeper來存儲分區和代理的元數據,並選擇一個Broker作為Kafka控制器,而希望通過刪除對ZooKeeper的依賴,將使Kafka能夠以一種更具伸縮性和健壯性的方式管理元數據,啟用對更多分區的支援,它還將簡化Kafka的部署和配置,因為ZooKeeper是一個單獨的系統,具有自己的配置文件語法,管理工具和部署模式。另外Kafka和ZooKeeper配置是分開的,所以很容易出錯。例如,管理員可能在Kafka上設置了SASL,並且錯誤地認為他們已經保護了通過網路傳輸的所有數據。實際上,這樣做還必須在單獨的外部ZooKeeper系統中配置安全性。統一兩個系統將提供統一的安全配置模型。將來Kafka可能希望支援單節點Kafka模式,這對於想要快速測試Kafka而無需啟動多個守護程式的人很有用,刪除掉ZooKeeper的依賴關係使之成為可能。

5.2 Zookeeper在Kafka中是自帶的,可以使用自定義安裝的ZK嗎?

這個當然是可以的,你可以不啟動Kafka自帶的ZK。

6 理解Kafka數據模型: Topics、Partitions及Replication

Kafka的分區機制實現了Topic的水平擴展和順序性保證。這一節我們深度了解下是怎麼回事?

Topic在邏輯上可以被認為是一個隊列。每條消費都必須指定它的topic,可以簡單理解為必須指明把這條消息放進哪個隊列里。為了使得Kafka的吞吐率可以水平擴展,物理上把topic分成一個或多個partition,每個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的所有消息和索引文件,比如我們創建了一個主題叫xiaobiao,然後Kafka有三個Brokers,結合《Kafka,ZK集群開發或部署環境搭建及實驗》這一篇文章中的實驗環節,我們創建主題的時候需要指定:

# 利用Kafka提供的命令行腳本,創建兩分區兩副本的主題xiaobiao  ./bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partitions 2 --topic xiaobiao

兩分區,兩副本,如何理解呢?我們指定了三個服務,我們將xiaobiao主題分為兩個子部分,可以認為是兩個子隊列,對應的在物理上,我們可以在log.dir參數設定的目錄下看到兩個文件夾xiaobiao-0xiaobiao-1,不過根據Kafka的分區策略,對於多個Kafka Brokers,分區(多個文件夾)一般會分散在不同的Broker上的log.dir設定的目錄下,當只有一個Broker時,所有的分區就只分配到該Broker上,消息會通過負載均衡發布到不同的分區上,消費者會監測偏移量來獲取哪個分區有新數據,從而從該分區上拉取消息數據。這是分區的表現。不過分區數越多,在一定程度上會提升消息處理的吞吐量,因為Kafka是基於文件進行讀寫,因此也需要打開更多的文件句柄,也會增加一定的性能開銷,但是Kafka社區已經在制定解決方案,實現更多的分區,而性能不會受太多影響。

如果分區過多,那麼日誌分段也會很多,寫的時候由於是批量寫,其實就會變成隨機寫了,隨機 I/O 這個時候對性能影響很大。所以一般來說 Kafka 不能有太多的 Partition。

那副本呢?顧名思義,即主題的副本個數,即我們上面有兩個主題分區,即物理上兩個文件夾,那麼指定副本為2後,則會複製一份,則會有兩個xiaobai-0兩個xiaobai-1,副本位於集群中不同的broker上,也就是說副本的數量不能超過broker的數量,否則創建主題時就會失敗。那麼副本有什麼用呢?當Kafka某個代理(Broker)出現故障且無法為請求(Consumer)提供服務時,為了達到可用性的唯一目的而設置有多個數據副本,這樣就不擔心集群中某個Broker掛掉了,這裡也進一步可以知道,達到這個作用,那麼一個主題的分區副本是需要在不同的Broker上的,而且對應副本分區是保持數據同步的。不可避免地,副本越多,那麼對Kafka的吞吐量是會造成影響的。下圖就是Replication Factor等於2時數據同步示意圖:

分區Leader: 對於每個分區,都有一個副本被指定為Leader。Leader負責發送和接收該分區的數據,所有其他副本都稱為分區的同步副本(或跟隨者)。

In sync replicas是分區的所有副本的子集,該分區與主分區具有相同的消息。

比如當Broker2 掛掉後,由於broker 2是分區1的負責人(Leader),因此現在無法訪問分區1。發生這個情況的時候Kafka會自動選擇一個同步副本(在上圖中只有一個副本)並使它成為領導者(Leader)。現在,當broker 2重新上線時,broker 2中分區1可以再次嘗試成為Leader。

當然,上面所說副本和分區沒有具體深入到內部機制是怎麼實現的,怎麼保證的,這裡就先不展開了。

7 Kafka的Consumer Group

Consumer Group:每一個消費者實例都屬於一個消費Group,每一條消息只會被同一個消費Group里的一個消費者實例消費(不同消費Group可以同時消費同一條消息)。不同於一般的隊列,Kafka實現了消息被消費完後也不會將消息刪除的功能,即我們能夠藉助Kafka實現離線處理和實時處理,跟Hadoop和Flink這兩者特性可以對應起來,因此可以分配兩個不同消費組分別將數據送入不同處理任務中。

8 總結

這一篇文章讓我們對Kafka有了個基本的認識,可以做消息訂閱/發布系統,可以做實時流處理,對Kafka的分區和副本有了一定的認識,對Kafka的消費組的特性也有了個基本了解,接下來就進入實踐,實踐過後,我們再深入探討Kafka的內部原理和實現機制。

9 參考資料

  • http://kafka.apache.org/intro.html
  • http://kafka.apachecn.org/intro.html
  • https://stackoverflow.com/questions/23751708/is-zookeeper-a-must-for-kafka
  • https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions
  • https://www.bigendiandata.com/2016-11-15-Data-Types-Compared/
  • https://stackoverflow.com/questions/44014975/kafka-consumer-api-vs-streams-api
  • https://kafka.apache.org/documentation/streams/
  • https://medium.com/@stephane.maarek/the-kafka-api-battle-producer-vs-consumer-vs-kafka-connect-vs-kafka-streams-vs-ksql-ef584274c1e
  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
  • https://juejin.im/post/5b32044ef265da59654c3027
  • https://www.infoq.cn/article/democratizing-stream-processing-apache-kafka-ksql
  • https://cloud.tencent.com/developer/article/1031210
  • http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/#
  • http://www.54tianzhisheng.cn/2018/01/04/Kafka/
  • https://www.infoq.cn/article/kafka-analysis-part-7
  • https://juejin.im/post/5b32044ef265da59654c3027
  • http://kafka.apachecn.org/documentation.html
  • https://www.linkedin.com/pulse/message-que-pub-sub-rabbitmq-apache-kafka-pubnub-krishnakantha
  • https://www.rabbitmq.com/
  • https://medium.com/@anvannguyen/redis-message-queue-rpoplpush-vs-pub-sub-e8a19a3c071b
  • http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
  • https://activemq.apache.org/components/artemis/documentation/2.0.0/address-model.html
  • https://www.journaldev.com/9731/jms-tutorial
  • https://medium.com/@chandanbaranwal/spark-streaming-vs-flink-vs-storm-vs-kafka-streams-vs-samza-choose-your-stream-processing-91ea3f04675b
  • https://medium.com/@_amanarora/replication-in-kafka-58b39e91b64e
  • http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal