如何系統的了解Kafka
1.概述
在大數據的浪潮下,時時刻刻都會產生大量的數據。比如社交媒體、博客、電子商務等等,這些數據會以不同的類型存儲在不同的平台裏面。為了執行ETL(提取、轉換、加載)操作,需要一個消息中間件系統,該系統應該是異步和低耦合的,即來自各種存儲系統(如HDFS、Cassandra、RDBMS等)的數據可以同時轉存在一個地方,而所有這些數據源都是彼此獨立的。解決這個問題的方法之一是Kafka,它是一個開源的分佈式消息處理平台。
2.內容
2.1 專業術語
- Message:它基本上是一個鍵值對,在值部分包含有用的數據和記錄;
- Topic:對於多租戶,可以創建多個主題,這只是發佈和訂閱消息的名稱;
- Partition:對於多線程任務,可以在一個Topic中,創建多個分區,提升生產者和消費者的性能;
- Offset:消息以類似於提交日誌的順序形式存儲,並且從0開始為每個消息提供順序ID(每個分區偏移量從0開始);
- Broker:Kafka集群由多個服務節點組成,這些服務節點只是集群中託管Zookeeper維護的無狀態服務器的節點,英文這裡沒有主從概念,所以所有的Broker都是同級別的(每個Partition上會有Leader和Follower);
- Consumer:用於消費Topic的應用程序;
- ConsumerGroup:消費不同Topic所使用的相同GroupID;
- Producer:用於生產Topic的應用程序。
2.2 Kafka為何需要Zookeeper
Zookeeper是一個分佈式集群管理系統,它是一個為分佈式應用提供一致性服務的軟件,提供的功能包含:配置維護、域名服務、分佈式服務等。它目標就是封裝好複雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩定的提供提供給用戶。而在Kafka中,它提供了以下功能:
- 控制器選舉:對於特定主題,分區中的所有讀寫操作都是通過複製副本的數據來完成的,每當Leader宕機,Zookeeper就會選舉出新的Leader來提供服務;
- 配置Topic:與某個Topic相關的元數據,即某個特定Topic是否位於Broker中,有多少個Partition等存儲在Zookeeper中,並在生產消息時持續同步;
- ACL:Topic的權限控制均在Zookeeper中進行維護。
2.3 Kafka有哪些特性?
Kafka的一些關鍵特性,使得它更加受到喜愛,針對傳統消息系統的不同:
- 高吞吐量:吞吐量表示每秒可以處理的消息數(消息速率)。由於我們可以將Topic分佈到不同的Broker上,因此我們可以實現每條數以千次的讀寫操作;
- 分佈式:分佈式系統是一個被分割成多台運行的機器的系統,所有這些機器在一個集群中協同工作,在最終用戶看來是一個單一的節點。Kafka是分佈式的,因為它存儲、讀取和寫入多個節點上的數據,這些節點被稱為Broker,它與Zookeeper一起共同創建了一個稱為Kafka集群的生態系統;
- 持久性:消息隊列完全保存在磁盤上,而不是保存在內存中,同一數據的多個副本(ISR)可以跨不同的節點存儲。因此,不存在由於故障轉移場景而導致數據丟失的可能性,並使其具有持久性;
- 可伸縮性:任何系統都可以水平或垂直伸縮,縱向可伸縮性意味着向相同的節點添加更多的資源,如CPU、內存,並且會產生很高的操作成本。水平可伸縮性可以通過簡單的在集群中添加幾個節點來實現,這增加了容量需求。Kafka水平擴展意味着當我們的容量用完時,我們可以在集群中添加一個新的節點。
2.4 Producer如何寫數據?
生產者首先獲取Topic中的元數據,以便知道需要使用消息更新哪個Broker。元數據也存儲在Broker中,並與Zookeeper保持連續同步。因此,若有多個生產者都希望連接到Zookeeper來訪問元數據,會導致性能降低。當生產者獲取了Topic和元數據信息,它就會在Leader所在的Broker節點的日誌中寫入消息,而之後Follower(ISR)會將其複製進行同步。
在寫入操作可以是同步的,即僅當Follower還在其日誌中同步消息時,或者異步,即只有Leader更新信息消息,狀態發生給生產者。磁盤上的消息可以保留特定的持續時間,在此期限後,將自動清除舊消息,並且不再可供使用。默認情況下,設置為7天。可以通過三種策略將消息寫到Topic。
- send(key,value,topic,partition):專門提供需要進行寫操作的分區。不建議使用該方式,因為它可能會導致分區大小不均衡;
- send(key,value,topic):在這裡,默認的HashPartitioner用於確定要寫入消息的分區,方式查找key的Hash並進行取模,該Topic的分區,也可以編寫我們自己定義的分區程序;
- send(null,value,topic):在這種情況下,消息以循環方式存儲在所有分區中。
Java生產者示例代碼如下:
public class JProducer extends Thread { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); long counter = 1L; while (true) { String json = "{\"id\":" + (counter++) + ",\"date\":\"" + new Date().toString() + "\"}"; String k = "key" + counter; producer.send(new ProducerRecord<String, String>("test01", k, json), (recordMetadata, e) -> { if (e == null) { System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset()); } else { e.printStackTrace(); } }); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // producer.close(); } }
2.5 消費者如何訂閱消息?
由於Kafka的速度非常快並且可以獲取實時消息,因此單個消費者肯定會在Topic中讀取很大一部分消息時出現延遲。為了克服這類問題,可以創建一個消費者組,該消費者組由多個具有相同GroupID的消費者組成。每個使用者都連接有一個唯一的分區,該分區所在所有使用者之間平均分配。將分區分配給特定使用者是消費者組協調器的責任,協調器由Broker被提名擔任該角色。為了管理活躍的消費者,消費者組中的所有成員會將它們的心跳發送到組協調器。
關於消費分區與消費線程的對應關係,理論上消費線程數應該小於等於分區數。之前是有這樣一種觀點,一個消費線程對應一個分區,當消費線程等於分區數是最大化線程的利用率。直接使用KafkaConsumer Client實例,這樣使用確實沒有什麼問題。但是,如果我們有富裕的CPU,其實還可以使用大於分區數的線程,來提升消費能力,這就需要我們對KafkaConsumer Client實例進行改造,實現消費策略預計算,利用額外的CPU開啟更多的線程,來實現消費任務分片。
在0.10.x以後的版本中,Kafka底層架構發生了變化,將消費者的信息由Zookeeper存儲遷移到Topic(__consumer_offsets)中進行存儲。消費的偏移量Key(groupid, topic, partition)以及Value(Offset, …)
Java消費者示例代碼如下:
public class JConsumer extends Thread { private String groupId; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1; i++) { JConsumerSsl jc = new JConsumerSsl("jgroup" + i); jc.start(); } } public JConsumerSsl(String groupId) { this.groupId = groupId; } @Override public void run() { consumer(this.groupId); } public static void consumer(String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test01")); boolean flag = true; while (flag) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); try { // sleep(60 * 1000L); } catch (Exception e) { e.printStackTrace(); } } consumer.close(); } }
2.6 Kafka為何如此之快?
由於Kafka遵循了一定的策略,這也是它設計的一部分,以使得它性能更好、更快。
- 沒有隨機磁盤訪問:它使用稱為不可變隊列的順序數據結構,其中讀寫操作始終為恆定時間O(1)。它在末尾附加消息,並從頭開始或者從特定偏移量讀取;
- 順序IO:現代操作系統將其大不部分可用的內存分配給磁盤緩存,並且更快的用於存儲和檢索順序數據;
- 零拷貝:由於根本沒有修改數據,因此將磁盤中的數據不必要的加載到應用程序內存中,因此,它沒有將其加載到應用程序中,而是通過Socket位元組,緩衝區以及網絡從context緩存區發送了相同的數據;
- 消息批處理:為了避免多次網絡交互,將多個消息分組在一起;
- 消息壓縮:在通過網絡傳輸消息之前,使用gzip、snappy等壓縮算法對消息進行壓縮,然後在Consumer中使用API將其解壓。
2.7 數據如何存儲在Broker上?
在打開Kafka服務器之前,Broker中的所有消息都存儲在配置文件中的配置的日誌目錄中,在該目錄內,可以找到包含特定Topic分區的文件夾,其格式topic_name-partition_number,例如topic1-0。另外,__consumer_offsets這個Topic也存儲在同一日誌目錄中。
在特定Topic的分區目錄中,可以找到Kafka的Segment文件xxx.log,索引文件xxx.index和時間索引xxx.timeindex。當達到舊的Segment大小或者時間限制時,會在創建新的Segment文件時將屬於該分區的所有數據寫入活躍的Segment中。索引將每個偏移量映射到其消息在日誌中的位置,由於偏移量時順序的,因此將二進制搜索應用於在特定偏移量的日誌文件中查找數據索引。
2.8 日誌壓縮
- 任何保持在日誌頭部以內的使用者都將看到所寫的每條消息,這些消息將具有順序偏移量。可以使用Topic的min.compaction.lag.ms屬性來保證消息在被壓縮之前必須經過的最短時間。也就是說,它為每個消息在(未壓縮)頭部停留的時間提供了一個下限。可以使用Topic的max.compaction.lag.ms屬性來保證從編寫消息到消息符合壓縮條件之間的最大延時
- 消息始終保持順序,壓縮永遠不會重新排序消息,只是刪除一些而已
- 消息的偏移量永遠不會改變,它是日誌中位置的永久標識符
- 從日誌開始的任何使用者將至少看到所有記錄的最終狀態,按記錄的順序寫入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的時間內到達日誌的頭部,則會看到已刪除記錄的所有delete標記。保留時間默認是24小時。
詳情分析可閱讀《Kafka日誌壓縮剖析》。
3.總結
以上就是筆者給大家簡要的匯總了Kafka的各個知識點,包含常見的術語、Consumer & Producer的使用方式、存儲流程等
另外,筆者開源的一款Kafka監控關係系統Kafka-Eagle,喜歡的同學可以Star一下,進行關注。
Kafka Eagle源代碼地址://github.com/smartloli/kafka-eagle
4.結束語
這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。