如何系統的了解Kafka

1.概述

在大數據的浪潮下,時時刻刻都會產生大量的數據。比如社交媒體、博客、電子商務等等,這些數據會以不同的類型存儲在不同的平台裏面。為了執行ETL(提取、轉換、加載)操作,需要一個消息中間件系統,該系統應該是異步和低耦合的,即來自各種存儲系統(如HDFS、Cassandra、RDBMS等)的數據可以同時轉存在一個地方,而所有這些數據源都是彼此獨立的。解決這個問題的方法之一是Kafka,它是一個開源的分佈式消息處理平台。

2.內容

2.1 專業術語

  • Message:它基本上是一個鍵值對,在值部分包含有用的數據和記錄;
  • Topic:對於多租戶,可以創建多個主題,這只是發佈和訂閱消息的名稱;
  • Partition:對於多線程任務,可以在一個Topic中,創建多個分區,提升生產者和消費者的性能;
  • Offset:消息以類似於提交日誌的順序形式存儲,並且從0開始為每個消息提供順序ID(每個分區偏移量從0開始);
  • Broker:Kafka集群由多個服務節點組成,這些服務節點只是集群中託管Zookeeper維護的無狀態服務器的節點,英文這裡沒有主從概念,所以所有的Broker都是同級別的(每個Partition上會有Leader和Follower);
  • Consumer:用於消費Topic的應用程序;
  • ConsumerGroup:消費不同Topic所使用的相同GroupID;
  • Producer:用於生產Topic的應用程序。

2.2 Kafka為何需要Zookeeper

Zookeeper是一個分佈式集群管理系統,它是一個為分佈式應用提供一致性服務的軟件,提供的功能包含:配置維護、域名服務、分佈式服務等。它目標就是封裝好複雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩定的提供提供給用戶。而在Kafka中,它提供了以下功能:

  • 控制器選舉:對於特定主題,分區中的所有讀寫操作都是通過複製副本的數據來完成的,每當Leader宕機,Zookeeper就會選舉出新的Leader來提供服務;
  • 配置Topic:與某個Topic相關的元數據,即某個特定Topic是否位於Broker中,有多少個Partition等存儲在Zookeeper中,並在生產消息時持續同步;
  • ACL:Topic的權限控制均在Zookeeper中進行維護。

2.3 Kafka有哪些特性?

Kafka的一些關鍵特性,使得它更加受到喜愛,針對傳統消息系統的不同:

  • 高吞吐量:吞吐量表示每秒可以處理的消息數(消息速率)。由於我們可以將Topic分佈到不同的Broker上,因此我們可以實現每條數以千次的讀寫操作;
  • 分佈式:分佈式系統是一個被分割成多台運行的機器的系統,所有這些機器在一個集群中協同工作,在最終用戶看來是一個單一的節點。Kafka是分佈式的,因為它存儲、讀取和寫入多個節點上的數據,這些節點被稱為Broker,它與Zookeeper一起共同創建了一個稱為Kafka集群的生態系統;
  • 持久性:消息隊列完全保存在磁盤上,而不是保存在內存中,同一數據的多個副本(ISR)可以跨不同的節點存儲。因此,不存在由於故障轉移場景而導致數據丟失的可能性,並使其具有持久性;
  • 可伸縮性:任何系統都可以水平或垂直伸縮,縱向可伸縮性意味着向相同的節點添加更多的資源,如CPU、內存,並且會產生很高的操作成本。水平可伸縮性可以通過簡單的在集群中添加幾個節點來實現,這增加了容量需求。Kafka水平擴展意味着當我們的容量用完時,我們可以在集群中添加一個新的節點。

2.4 Producer如何寫數據?

生產者首先獲取Topic中的元數據,以便知道需要使用消息更新哪個Broker。元數據也存儲在Broker中,並與Zookeeper保持連續同步。因此,若有多個生產者都希望連接到Zookeeper來訪問元數據,會導致性能降低。當生產者獲取了Topic和元數據信息,它就會在Leader所在的Broker節點的日誌中寫入消息,而之後Follower(ISR)會將其複製進行同步。

 

 

 在寫入操作可以是同步的,即僅當Follower還在其日誌中同步消息時,或者異步,即只有Leader更新信息消息,狀態發生給生產者。磁盤上的消息可以保留特定的持續時間,在此期限後,將自動清除舊消息,並且不再可供使用。默認情況下,設置為7天。可以通過三種策略將消息寫到Topic。

  1. send(key,value,topic,partition):專門提供需要進行寫操作的分區。不建議使用該方式,因為它可能會導致分區大小不均衡;
  2. send(key,value,topic):在這裡,默認的HashPartitioner用於確定要寫入消息的分區,方式查找key的Hash並進行取模,該Topic的分區,也可以編寫我們自己定義的分區程序;
  3. send(null,value,topic):在這種情況下,消息以循環方式存儲在所有分區中。

Java生產者示例代碼如下:

public class JProducer extends Thread {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        long counter = 1L;
        while (true) {
            String json = "{\"id\":" + (counter++) + ",\"date\":\"" + new Date().toString() + "\"}";
            String k = "key" + counter;
            producer.send(new ProducerRecord<String, String>("test01", k, json), (recordMetadata, e) -> {
                if (e == null) {
                    System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset());
                } else {
                    e.printStackTrace();
                }
            });
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // producer.close();
    }

}

2.5 消費者如何訂閱消息?

由於Kafka的速度非常快並且可以獲取實時消息,因此單個消費者肯定會在Topic中讀取很大一部分消息時出現延遲。為了克服這類問題,可以創建一個消費者組,該消費者組由多個具有相同GroupID的消費者組成。每個使用者都連接有一個唯一的分區,該分區所在所有使用者之間平均分配。將分區分配給特定使用者是消費者組協調器的責任,協調器由Broker被提名擔任該角色。為了管理活躍的消費者,消費者組中的所有成員會將它們的心跳發送到組協調器。

關於消費分區與消費線程的對應關係,理論上消費線程數應該小於等於分區數。之前是有這樣一種觀點,一個消費線程對應一個分區,當消費線程等於分區數是最大化線程的利用率。直接使用KafkaConsumer Client實例,這樣使用確實沒有什麼問題。但是,如果我們有富裕的CPU,其實還可以使用大於分區數的線程,來提升消費能力,這就需要我們對KafkaConsumer Client實例進行改造,實現消費策略預計算,利用額外的CPU開啟更多的線程,來實現消費任務分片。

在0.10.x以後的版本中,Kafka底層架構發生了變化,將消費者的信息由Zookeeper存儲遷移到Topic(__consumer_offsets)中進行存儲。消費的偏移量Key(groupid, topic, partition)以及Value(Offset, …)

 

 

 Java消費者示例代碼如下:

public class JConsumer extends Thread {

    private String groupId;

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            JConsumerSsl jc = new JConsumerSsl("jgroup" + i);
            jc.start();
        }
    }

    public JConsumerSsl(String groupId) {
        this.groupId = groupId;
    }

    @Override
    public void run() {
        consumer(this.groupId);
    }

    public static void consumer(String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test01"));
        boolean flag = true;
        while (flag) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            try {
                // sleep(60 * 1000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        consumer.close();
    }

}

2.6 Kafka為何如此之快?

由於Kafka遵循了一定的策略,這也是它設計的一部分,以使得它性能更好、更快。

  • 沒有隨機磁盤訪問:它使用稱為不可變隊列的順序數據結構,其中讀寫操作始終為恆定時間O(1)。它在末尾附加消息,並從頭開始或者從特定偏移量讀取;
  • 順序IO:現代操作系統將其大不部分可用的內存分配給磁盤緩存,並且更快的用於存儲和檢索順序數據;
  • 零拷貝:由於根本沒有修改數據,因此將磁盤中的數據不必要的加載到應用程序內存中,因此,它沒有將其加載到應用程序中,而是通過Socket位元組,緩衝區以及網絡從context緩存區發送了相同的數據;
  • 消息批處理:為了避免多次網絡交互,將多個消息分組在一起;
  • 消息壓縮:在通過網絡傳輸消息之前,使用gzip、snappy等壓縮算法對消息進行壓縮,然後在Consumer中使用API將其解壓。

2.7 數據如何存儲在Broker上?

在打開Kafka服務器之前,Broker中的所有消息都存儲在配置文件中的配置的日誌目錄中,在該目錄內,可以找到包含特定Topic分區的文件夾,其格式topic_name-partition_number,例如topic1-0。另外,__consumer_offsets這個Topic也存儲在同一日誌目錄中。

 

 

在特定Topic的分區目錄中,可以找到Kafka的Segment文件xxx.log,索引文件xxx.index和時間索引xxx.timeindex。當達到舊的Segment大小或者時間限制時,會在創建新的Segment文件時將屬於該分區的所有數據寫入活躍的Segment中。索引將每個偏移量映射到其消息在日誌中的位置,由於偏移量時順序的,因此將二進制搜索應用於在特定偏移量的日誌文件中查找數據索引。

2.8 日誌壓縮

  • 任何保持在日誌頭部以內的使用者都將看到所寫的每條消息,這些消息將具有順序偏移量。可以使用Topic的min.compaction.lag.ms屬性來保證消息在被壓縮之前必須經過的最短時間。也就是說,它為每個消息在(未壓縮)頭部停留的時間提供了一個下限。可以使用Topic的max.compaction.lag.ms屬性來保證從編寫消息到消息符合壓縮條件之間的最大延時
  • 消息始終保持順序,壓縮永遠不會重新排序消息,只是刪除一些而已
  • 消息的偏移量永遠不會改變,它是日誌中位置的永久標識符
  • 從日誌開始的任何使用者將至少看到所有記錄的最終狀態,按記錄的順序寫入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的時間內到達日誌的頭部,則會看到已刪除記錄的所有delete標記。保留時間默認是24小時。

詳情分析可閱讀《Kafka日誌壓縮剖析》。

3.總結

以上就是筆者給大家簡要的匯總了Kafka的各個知識點,包含常見的術語、Consumer & Producer的使用方式、存儲流程等

另外,筆者開源的一款Kafka監控關係系統Kafka-Eagle,喜歡的同學可以Star一下,進行關注。

Kafka Eagle源代碼地址://github.com/smartloli/kafka-eagle

4.結束語

這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。