深入理解Kafka核心設計及原理(三):消費者

 轉載請註明出處://www.cnblogs.com/zjdxr-up/p/16114877.html

 深入理解Kafka核心設計及原理(一):初識Kafka

 深入理解Kafka核心設計及原理(二):生產者 

3.1 消費者與消費組

  消費者(Consumer)負責訂閱Kafka 中的主題(Topic), 並且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是: 在Kafka 的消費理念中還有一層消費組(Consumer Group)的概念, 每個消費者都有一個對應的消費組。 當消息發布到主題後, 只會被投遞給訂閱它的每個消費組中的一個消費者。

  每一個分區只能被一個消費組中的一個消費者所消費。

  對於消息中間件而言,一般有兩種消息投遞模式:點對點(P2P, Point-to-Point)模式和發布/訂閱(Pub/ Sub)模式。點對點模式是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息。發布訂閱模式定義了如何向 一個內容節點發布和訂閱消息,這個內容節點稱為主題(Topic) , 主題可以認為是消息傳遞的中介,消息發布者將消息發布到某個主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發布者互相保持獨立,不需要進行接觸即可保證消息的傳遞,發布/訂閱模式在消息的一對多廣播時採用。Kafka 同時支待兩種消息投遞模式,而這正是得益於消費者與消費組模型的契合:

     如果所有的消費者都隸屬於同一個消費組,那麼所有的消息都會被均衡地投遞給每一個消費者,即每條消息只會被一個消費者處理,這就相當於點對點模式的應用。

      如果所有的消費者都隸屬於不同的消費組,那麼所有的消息都會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就相當於發布/訂閱模式的應用。

  消費組是一個邏輯上的概念,它將旗下的消費者歸為 一類,每一個消費者只隸屬於一個消費組。每一個消費組都會有一個固定的名稱,消費者在進行消費前需要指定其所屬消費組的名稱,這個可以通過消費者客戶端參數group.id來配置,默認值為空字元串。

3.2 消息消費過程及程式碼

  一個正常的消費邏輯需要具備以下幾個步驟:

    (1) 配置消費者客戶端參數及創建相應的消費者實例。

    (2) 訂閱主題。

    (3)拉取消息並消費。

    (4) 提交消費位移。

    (5)關閉消費者實例。

public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupid = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);
    public static Properties initConfig () {
        Properties props= new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS—CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP—ID_CONFIG, groupid);
        props. put (ConsumerConfig. CLIENT_ID _ CONFIG, "client. id. demo");
        return props;
    }

    public static void main(String[] args) (
        Properties props= initConfig();
        KafkaConsumer<String, String> consumer= new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        try {
        while (isRunning. get()) {
            ConsumerRecords<String, String> records=
                consumer.poll(Duration.ofMillis(lOOO));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("topic="+record.topic()+ ", partition = "+         record.partition()+ ", offset="+ record.offset());
                System.out.println("key ="+record.key()+ ", value="+ record.value());
            //do something to process record.
        } catch(Exception e) {
            log.error("occur exception", e);
        } finally {
            consumer.close();
        }
    }
}

    通過 subscribe()方法訂閱主題具有 消費者自動再均衡的功能,在多個消費者的情況下可以根據分區分配策略來自動分配各個消費者與分區的關係。當消費組內的消費者增加或減少時,分區分配關係會自動調整,以實現消費負載均衡及故障自動轉移。

    如果我們事先並不知道主題中有多少個分區怎麼辦?KafkaConsumer中的partitionsFor ()方法可以用來查詢指定主題的元數據資訊,partitionsFor()方法的具體定義如下:

public List<Partitioninfo> partitionsFor(String topic)

    其中 Partitionlnfo類型即為主題的分區元數據資訊,此類的主要結構如下:

public class Partitioninfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
    //這裡省略了構造函數、屬性提取、toString等方法
}

    Partitioninfo類中的屬性topic表示主題名稱,partition代表分區編號,leader代表分區的leader副本所在的位置,replicas代表分區的AR集合,inSyncReplicas代表分區的ISR集合,offlineReplicas代表分區的OSR集合。

3.3 消息消費模式

    Kafka中的消費是基於 拉模式的。消息的消費一般有兩種模式:推模式和 拉模式。推模式是服務端主動將消息推送給消費者, 而 拉模式是消費者主動向服務端發起請求來拉取消息。Kafka中的消息消費是一個不斷輪詢的過程,消費者所要做的就是重複地調用poll()方法,而poll()方法返回的是所訂閱的主題(分區)上的一組消息。 對於poll()方法而言,如果某些分區中沒有可供消費的消息,那麼此分區對應 的消息拉取的結果就為空;如果訂閱的所有分區中都沒有可供消費的消息, 那麼poll()方法返回為空的消息集合

    消費者消費到 的每條消息的類型為ConsumerRecord(注意與ConsumerRecords 的區別,ConsumerRecords為一次獲取到的消息集),這個和生產者發送的消息類型ProducerRecord相對應,不過ConsumerRecord中的內容更加豐富,具體的結構參考如下程式碼:

public class ConsumerRecord<K, V> {
    private final Stringtopic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
//省略若干方法

    timestarnpType 有兩種類型:CreateTime 和LogAppendTime, 分別代表消息創建的時間戳和消息追加到日誌的時間戳。

3.4 位移提交

    對於 Kafka 中的分區而言,它的每條消息都有唯一 的 offset,用來表示消息在分區中對應 的位置 。 對於消費者而言 , 它也有一個 offset 的概念,消費者使用 offset 來表示消費到分區中某個消息所在的位置。

    在每次調用 poll ()方法時,它返回的是還沒有被消費過的消息集(當然這個前提是消息己經存儲在 Kafka 中 了,並且暫不考慮異常情況的發生),要做到這一點,就需要記錄上一 次消費時的消費位移 。 並且這個消費位移必須做持久化保存,而不是單單保存在記憶體中,否則消費者重啟之後就無法知曉之前的消費位移 。

    在舊消費者客戶端中,消費位移是存儲在 ZooKeeper 中的 。 而在新消費者客戶端中,消費位移存儲在 Kafka 內 部的主題consumer offsets 中 。 這裡把將消費位移存儲起來(持久化)的動作稱為「提交』,消費者在消費完消息之後需要執行消費位移的提交。

    在Kafka 中默認的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable. auto. commit 配置,默認值為 true。當然這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數 auto. commit. interval. ms配置,默認值為 5 秒,此參數生效的前提是 enable. auto.commit 參數為 true 。

    在默認的方式下,消費者每隔 5 秒會將拉取到的每個分區中最大的消息位移進行提交 。自動位移提交的動作是在 poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。

3.5 位移提交過程導致重複消費的現象

    如果在業務邏輯處理完之後,並且在同步位移提交前,程式出現了崩漬 ,那麼待恢復之後又只能從上一次位移提交的地方拉取消息,由此在兩次位移提交的窗口中出現了重複消費的現象。

     KafkaConsumer 中的 seek()方法提供了追前消費或 回溯消費。

public void seek(TopicPartition partition ,long offset)

     seek()方法中的參數 partition 表示分區,而 offset 參數用來指定從分區的哪個位置開始消費。seek()方法只能重置消費者分配到的分區的消費位置,而分區的分配是在poll()方法的調用過程中實現的 。 也就是說,在執行 seek()方法之前需要先執行一次 poll ()方法 ,等到分配到分區之後才可以重置消費位置 。

KafkaConsumer <String ,String> consumer= new KafkaConsumer<> (props);
consumer.subscribe(Arrays.asList(topic));
consumer . poll(Duration.ofMillis(lOOOO));
Set<TopicPartition> assignment = consumer.assignment();
for(TopicPartition tp : assignment) {
    consumer.seek(tp , 10) ;
    while(true) {
    ConsumerRecords<String , String> records = consumer.poll(Duration.ofMillis (1000)) ;
    //consume the record .
}

    timeout參數用來設置等待獲取的超時時間。如果沒有指 定timeout參數的值, 那麼endOffsets() 方 法 的 等 待時 間由客戶端參 數request.timeout.ms來設置,默認值為30000。

    seek()方法為我們提供了從特定位置讀取消息的能力,我們可以通過這個方法來向前跳過若干消息, 也可以通過這個方法來向後回溯若干消息, 這樣為消息的消費提供了很大的靈活性。seek()方法也為我們提供了將消費位移保存在外部存儲介質中的能力,還可以配合再均衡監聽器來提供更加精準的消費能力。

3.6 再均衡

    再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為, 它為消費組具備高可用性和伸縮性提供保障, 使我們可以既方便又安全地刪除消費組內的消費者或往消費組內添加消費者。 不過在再均衡發生期間, 消費組內的消費者是無法讀取消息的。 也就是說, 在再均衡發生期間的這一小段時間內, 消費組會變得不可用。 另外, 當 一個分區被重新分配給另 一個消費者時, 消費者當前的狀態也會丟失。 比如消費者消費完某個分區中的一部分消息時還沒有來得及提交消費位移就發生了再均衡操作, 之後這個分區又被分配給了消費組內的另 一個消費者,原來被消費完的那部分消息又被重新消費 一遍, 也就是發生了重複消費。 一般情況下, 應盡量避免不必要的再均衡的發生。

    subscribe()方法中有再均衡監聽器ConsumerRebalanceListener, 在subscribe(Collection<String> topics, ConsumerRebalanceListener listener)和subscribe(Pattem pattern, ConsumerRebalanceListener listener)方法中都有它的身影。再均衡監聽器用來設定發生再均衡動作前後的一些準備或收尾的動作。 ConsumerRebalanceListener是 一個介面.

3.7Kafka消費端重要的參數

參數名稱 默認值 參數釋義
bootstrap.servers 「」 指定連接 Kafka 集群所需的 broker 地址清單
key.deserializer   消息key對應的反序列化類,需要實現org.apache.kafka.common.serialization.Deserializer介面
value.deserializer   消息key 所對應的反序列化類,需要實現org.apache.kafka.common.serialization.Deserializer介面
group.id “” 消費者所隸屬的消費組的唯一標識,即消費組的名稱
session. timeout.ms 10000 組管理協議中用來檢測消費者是否失效的超時時間
max.poll.interval.ms 300000 消費組管理消費者時,該配置指定拉取消息執行緒最長空閑時間,若超過這個時 間間 隔還沒有發起 poll 操作,則消費組認為該消費者己離開了消費組 ,將進行再均衡操作
auto.offset.reset latest 有效值為「 earliest 」" latest 」 「 none」
enable.auto.commit true 是否開啟自動提交消費位移的功能,默認開啟
auto.commit.interval.ms 5000 當 enable.auto.commit 參數設置為 true 時才生效 ,表示開啟自動提交消費位移功能 時自 動提交消費位移的時間間 隔
partition.assignment. strategy   消費者的分區分配策略
fetch .min.bytes 1( B ) Consumer 在一次拉取中從 Kafka 中拉取的最小數據量
fetch .max.bytes 50MB Consumer 在一次拉取中從 Kafka 中拉取的最大數據量
max.poll.records 500條 Consumer 在一次拉取請求中拉取的最大消息數
connections.max.idle.ms 9分鐘 用來指定在多久之後關閉限制的連接
isolation.level read_ uncommitted 事務隔離級別。字元串類型,有效值為「 read_uncommitted ,和「 read committed ",表示消費者所消費到的位置,可以消費到 HW (High Watermark )處的位置
  

 

Tags: