Kafka理解

  • 2019 年 10 月 3 日
  • 筆記

1. 引言

  最近使用Kafka做消息隊列時,完成了基本的消息發送與接收,已上線運行。一方面防止出現Bug時自己不能及時定位問題,一方面網上的配置可能還可以更加優化,決定去了解下Kafka。

2. 配置

  kafka基本配合zookeeper使用,網上有很多關於liunx上搭建zookeeper+kafka集群的實例,此處不再闡述。貼一個我以前搭建過的實例。https://blog.csdn.net/hudyang/article/details/80419214

3. 理解Kafka

3.1 kafka是什麼

  個人理解:kafka是一個消息中間件,用於各個系統中消息的傳遞。例如:A系統是平台主入口,每天產生大量日誌,A系統不可能自己去處理這些冗雜的事務,於是發送給Kafka。B系統很空閑,讓他來接收Kafka數據,於是從kafka取數據,然後記錄日誌;在舉個例子,在微服務中,A系統是用戶模組,B系統是許可權模組。創建用戶時也要創建用戶許可權。於是在A系統在創建用戶時,向kafka中發送一條指定消息,B系統收到這條消息後,於是開始創建用戶許可權。是不是很好理解—–

  總結一句話:kafka是一個分布流媒體平台

  流媒體平台有以下三個關鍵功能:

  • 發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
  • 以容錯的持久方式存儲記錄流。
  • 記錄發生時處理流。

3.2 相關概念

  主題是Kafka最核心的抽象模組,用於連接生產者和消費者。生產者發送消息到指定的主題,消費者訂閱該主題的消息,於是完成基本的消息傳遞流程。下面為kafka建立主題的code:

./kafka-topics.sh --create --zookeeper 10.113.56.68:2181 --replication-factor 1 --partitions 3 --topic testKafkas
// --zookeeper zookeeper集群地址
// --replication-factor 副本
// --parttitions 分區
// --topic 主題名稱

  分區的概念:分區是主題的橫向拓展的實現。傳統的消息隊列相當於一個列表,生產者將消息放入列表中,然後消費者從列表中取出消息。吞吐量受到限制。如果使用多執行緒去取消息,就會造成非同步操作,有可能出現數據消費混亂。Kafka中使用分區概念(相當於將一個大的隊列分為成多個隊列)每個分區由指定的消費者去消費,這樣能極大的提高kafka的吞吐量,並且效率提高。

  

 

 

   每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日誌中。分區中的記錄每個都被分配一個稱為偏移的順序ID號(offSet),它唯一地標識分區中的每個記錄。如上圖,一個topic的消息均勻分布在partition0,partition1,partition2中。消費者寫3條消息到topic上,分別記錄在partition0的offset12節點,partition1的offset9節點,partition2的offset12節點。消費者消費時,會從partition的offset0節點開始,消費完成後,offset+1,下次消費時,從上次的offset開始進行消費。

  副本的概念:每個分區擁有多個副本,其中一個副本將被指定為主副本(leader replicas),其餘的為跟隨副本。所有的消息都會寫入到主副本,所有的消息都從主要副本讀取,其他的副本只需要保持於主副本同步即可。

  消費組概念:消費組比較難理解。消費組相當於一個指定消費者的聚合。

 

 

   如圖,KafkaCluster相當於一個topic,topic有4個分區 P0,P1,P2,P3;P0,P3在伺服器1上,P1,P2在伺服器2上(此處可以忽略他們在哪個伺服器);有2個消費組ConsumerGroupA,ConsumerGroupB,裡面有C1-C6消費者,每個消費者都訂閱了topic的消息。上文提過,一個分區的消息只能被同一消費組下面的一個消費者消費。於是,對於ConsumerGroupA來說,C1接收P0,P3的消息,C2接收P1,P2的消息(為什麼這麼分配,下文會進行)。對於ConsumerGroupB來說,C3-C6分別接收P0-P3的消息。此處還有個小點,即消費組中的消費者數量不要大於分區數量。以ConsumerGroupB為例,消費者和分區剛好一一對應,如果此時增加了一個消費者C7,那麼C7接受不到任何分區的消息,因為一個分區只能對應同個消費組下的一個消費者。

3.3 補充

  描述了Kafka基本概念後,此處介紹另一個概念Rebalance(即分區分配給消費組中指定消費者的過程)

  

 

 

   如上,消費組中原來有2個consumer,consumerA對應主題A的0分區,主題B的1分區,主題C的0分區;consumerB對應主題A的1分區,主題B的0分區,主題C的1分區。此時,消費組新增一個consumerC,那麼對應的分區就要相應的重新分配,這個過程就叫Rebalance。

  什麼時候發生Rebalance?

  • 消費組中消費者新增,離開,崩潰都會觸發Rebalance;
  • 訂閱的主題增加或減少;
  • 訂閱的主題分區數增加或減少;

  Rebalance引發的問題

  • Rebalance過程中,消費者不能進行消費;
  • 如果一個消費者消費一段消息後,沒有提交確認就發生了Rebalance,那麼此段消息還會分發給分配後的消費者,造成消息重複消費;

  Rebalance流程(簡介)

  • 加入組請求(JoinGroup)=>作用在於選擇Group Leader(消費組中的領導,用於制定分區分配方案)。
  • 同步組請求(SyncGroup)=>作用在於確定分區分配方案給Coordinator(對消費組進行管理的對象),把方案響應給所有Consumer。

  自己理解:consumer申請加入組,向Coordinator發送JoinGroup請求,Coordinator從重多consumer選擇一個Group Leader(領導)。領導用於指定分區方案,即那個分區非配給哪個消費者。制定完成後,把分區方案給Coordinator,Coordinator然後告訴所有消費者:你負責的分區是哪個。

4. 實際案例

  自己用Kafka做了一個簡易消息系統,完成基本的消息發送。

// 引入kafka相關jar包  <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>      <version>2.1.7.RELEASE</version>  </dependency>

import org.apache.kafka.clients.consumer.CommitFailedException;  import org.apache.kafka.clients.consumer.ConsumerRecord;  import org.apache.kafka.clients.consumer.ConsumerRecords;  import org.apache.kafka.clients.consumer.KafkaConsumer;  import org.apache.kafka.clients.producer.KafkaProducer;  import org.apache.kafka.clients.producer.Producer;  import org.apache.kafka.clients.producer.ProducerRecord;    import java.util.Arrays;  import java.util.Properties;  import java.util.concurrent.ExecutionException;  import java.util.function.Consumer;    /**   * @Author  xiabing5   * @Create  2019/8/1 19:34   * @Desc    kafka工具類   **/  public class KafkaUtil {        private static Producer<String, String> producer;        private static KafkaConsumer<String, String> consumer;        private static KafkaConsumer<String, String> consumer1;      static {          Properties producerProps = new Properties();          //必需的3個參數          producerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");          producerProps.put("key.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          producerProps.put("value.serializer",                  "org.apache.kafka.common.serialization.StringSerializer");          producer = new KafkaProducer<String, String>(producerProps);            Properties consumerProps = new Properties();          consumerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");          consumerProps.put("key.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          consumerProps.put("value.deserializer",                  "org.apache.kafka.common.serialization.StringDeserializer");          consumerProps.put("group.id", "VoucherGroup");          //關閉自動提交offset          consumerProps.put("enable.auto.commit", "false");      //此處開啟2個消費者進行消費          consumer = new KafkaConsumer<String, String>(consumerProps);          consumer1 = new KafkaConsumer<String, String>(consumerProps);      }        /**       * @Author xiabing5       * @Create 2019/8/1 19:40       * @Desc 同步發送消息 topic 主題 value 消息內容       **/      public static void sendSync(String topic, String value) throws ExecutionException, InterruptedException {          producer.send(new ProducerRecord<String, String>(topic, value)).get();      }        public static void consume(Consumer<String> c) {          // 訂閱主題,可訂閱多個主題          consumer.subscribe(Arrays.asList("testKafkas"));          while (true) {              ConsumerRecords<String, String> records = consumer.poll(10);              for (ConsumerRecord<String, String> record : records) {                  System.out.println("A"+record);                  c.accept(record.value());              }              try {                  //同步手動提交offset                  consumer.commitSync();              } catch (CommitFailedException e) {                  System.out.println("Kafka消費者提交offset失敗");              }          }      }        public static void consume1(Consumer<String> c) {          // 訂閱主題,可訂閱多個主題          consumer1.subscribe(Arrays.asList("testKafkas"));          while (true) {              ConsumerRecords<String, String> records = consumer1.poll(10);              for (ConsumerRecord<String, String> record : records) {                  System.out.println("A"+record);                  c.accept(record.value());              }              try {                  //同步手動提交offset                  consumer1.commitSync();              } catch (CommitFailedException e) {                  System.out.println("Kafka消費者提交offset失敗");              }          }      }    }

// 發送消息介面  public interface MessageService {        // 發送消息      boolean sendMessage(String topic,String content);    }

@Service  public class MessageServiceImpl implements MessageService {        // 創建執行緒池      @PostConstruct      public void init() {          ThreadFactory threadFactory = new ThreadFactoryBuilder()                  .setNameFormat("MessageConsumerThread-%d")                  .setDaemon(true)                  .build();          //ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory);          ExecutorService executorService = Executors.newFixedThreadPool(2,threadFactory);          MqMessageConsumerThread mqMessageConsumerThread= new MqMessageConsumerThread();          MqMessageConsumerThread1 mqMessageConsumerThread1= new MqMessageConsumerThread1();          executorService.execute(mqMessageConsumerThread);          executorService.execute(mqMessageConsumerThread1);      }        // 自定義接收執行緒      private class MqMessageConsumerThread implements Runnable {          @Override          public void run() {              KafkaUtil.consume(consumerRecord -> {                System.out.println("A"+consumerRecord);              });          }      }      private class MqMessageConsumerThread1 implements Runnable {          @Override          public void run() {              KafkaUtil.consume1(consumerRecord -> {                  System.out.println("B"+consumerRecord);              });          }      }      @Override      public boolean sendMessage(String topic, String content) {          try {              KafkaUtil.sendSync(topic,content);              // 執行業務邏輯              System.out.println("發送kafka消息成功");          }          catch (Exception e) {              // 發送失敗              System.out.println("發送kafka消息失敗");              // 失敗重發              try {                  KafkaUtil.sendSync(topic,content);              }catch (Exception e1) {                  System.out.println("失敗");              }          }          return true;      }  }

/**   * @Author  xiabing5   * @Create  2019/9/19 20:17   * @Desc    容器初始化(模擬消息發送)   **/  @Component  public class InitApplication implements CommandLineRunner{        @Autowired      MessageService messageService;        @Override      public void run(String... strings) throws Exception {          for (int i = 0; i < 10; i++) {              messageService.sendMessage("testKafkas",i+"");          }      }  }

  啟動容器,會出現如下介面

 

 5. 總結

  學習一項新技術,最好知道它實現的基本內涵,希望自己這個小白在每一次記錄中成長;

  本文純原創,寫的不好或理解不到位的地方請多多指教;