Kafka理解
- 2019 年 10 月 3 日
- 筆記
1. 引言
最近使用Kafka做消息隊列時,完成了基本的消息發送與接收,已上線運行。一方面防止出現Bug時自己不能及時定位問題,一方面網上的配置可能還可以更加優化,決定去了解下Kafka。
2. 配置
kafka基本配合zookeeper使用,網上有很多關於liunx上搭建zookeeper+kafka集群的實例,此處不再闡述。貼一個我以前搭建過的實例。https://blog.csdn.net/hudyang/article/details/80419214
3. 理解Kafka
3.1 kafka是什麼
個人理解:kafka是一個消息中間件,用於各個系統中消息的傳遞。例如:A系統是平台主入口,每天產生大量日誌,A系統不可能自己去處理這些冗雜的事務,於是發送給Kafka。B系統很空閑,讓他來接收Kafka數據,於是從kafka取數據,然後記錄日誌;在舉個例子,在微服務中,A系統是用戶模組,B系統是許可權模組。創建用戶時也要創建用戶許可權。於是在A系統在創建用戶時,向kafka中發送一條指定消息,B系統收到這條消息後,於是開始創建用戶許可權。是不是很好理解—–
總結一句話:kafka是一個分布流媒體平台
流媒體平台有以下三個關鍵功能:
- 發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
- 以容錯的持久方式存儲記錄流。
- 記錄發生時處理流。
3.2 相關概念
主題是Kafka最核心的抽象模組,用於連接生產者和消費者。生產者發送消息到指定的主題,消費者訂閱該主題的消息,於是完成基本的消息傳遞流程。下面為kafka建立主題的code:
./kafka-topics.sh --create --zookeeper 10.113.56.68:2181 --replication-factor 1 --partitions 3 --topic testKafkas
// --zookeeper zookeeper集群地址
// --replication-factor 副本
// --parttitions 分區
// --topic 主題名稱
分區的概念:分區是主題的橫向拓展的實現。傳統的消息隊列相當於一個列表,生產者將消息放入列表中,然後消費者從列表中取出消息。吞吐量受到限制。如果使用多執行緒去取消息,就會造成非同步操作,有可能出現數據消費混亂。Kafka中使用分區概念(相當於將一個大的隊列分為成多個隊列)每個分區由指定的消費者去消費,這樣能極大的提高kafka的吞吐量,並且效率提高。
每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日誌中。分區中的記錄每個都被分配一個稱為偏移的順序ID號(offSet),它唯一地標識分區中的每個記錄。如上圖,一個topic的消息均勻分布在partition0,partition1,partition2中。消費者寫3條消息到topic上,分別記錄在partition0的offset12節點,partition1的offset9節點,partition2的offset12節點。消費者消費時,會從partition的offset0節點開始,消費完成後,offset+1,下次消費時,從上次的offset開始進行消費。
副本的概念:每個分區擁有多個副本,其中一個副本將被指定為主副本(leader replicas),其餘的為跟隨副本。所有的消息都會寫入到主副本,所有的消息都從主要副本讀取,其他的副本只需要保持於主副本同步即可。
消費組概念:消費組比較難理解。消費組相當於一個指定消費者的聚合。
如圖,KafkaCluster相當於一個topic,topic有4個分區 P0,P1,P2,P3;P0,P3在伺服器1上,P1,P2在伺服器2上(此處可以忽略他們在哪個伺服器);有2個消費組ConsumerGroupA,ConsumerGroupB,裡面有C1-C6消費者,每個消費者都訂閱了topic的消息。上文提過,一個分區的消息只能被同一消費組下面的一個消費者消費。於是,對於ConsumerGroupA來說,C1接收P0,P3的消息,C2接收P1,P2的消息(為什麼這麼分配,下文會進行)。對於ConsumerGroupB來說,C3-C6分別接收P0-P3的消息。此處還有個小點,即消費組中的消費者數量不要大於分區數量。以ConsumerGroupB為例,消費者和分區剛好一一對應,如果此時增加了一個消費者C7,那麼C7接受不到任何分區的消息,因為一個分區只能對應同個消費組下的一個消費者。
3.3 補充
描述了Kafka基本概念後,此處介紹另一個概念Rebalance(即分區分配給消費組中指定消費者的過程)
如上,消費組中原來有2個consumer,consumerA對應主題A的0分區,主題B的1分區,主題C的0分區;consumerB對應主題A的1分區,主題B的0分區,主題C的1分區。此時,消費組新增一個consumerC,那麼對應的分區就要相應的重新分配,這個過程就叫Rebalance。
什麼時候發生Rebalance?
- 消費組中消費者新增,離開,崩潰都會觸發Rebalance;
- 訂閱的主題增加或減少;
- 訂閱的主題分區數增加或減少;
Rebalance引發的問題
- Rebalance過程中,消費者不能進行消費;
- 如果一個消費者消費一段消息後,沒有提交確認就發生了Rebalance,那麼此段消息還會分發給分配後的消費者,造成消息重複消費;
Rebalance流程(簡介)
- 加入組請求(JoinGroup)=>作用在於選擇Group Leader(消費組中的領導,用於制定分區分配方案)。
- 同步組請求(SyncGroup)=>作用在於確定分區分配方案給Coordinator(對消費組進行管理的對象),把方案響應給所有Consumer。
自己理解:consumer申請加入組,向Coordinator發送JoinGroup請求,Coordinator從重多consumer選擇一個Group Leader(領導)。領導用於指定分區方案,即那個分區非配給哪個消費者。制定完成後,把分區方案給Coordinator,Coordinator然後告訴所有消費者:你負責的分區是哪個。
4. 實際案例
自己用Kafka做了一個簡易消息系統,完成基本的消息發送。
// 引入kafka相關jar包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.7.RELEASE</version> </dependency>
import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; /** * @Author xiabing5 * @Create 2019/8/1 19:34 * @Desc kafka工具類 **/ public class KafkaUtil { private static Producer<String, String> producer; private static KafkaConsumer<String, String> consumer; private static KafkaConsumer<String, String> consumer1; static { Properties producerProps = new Properties(); //必需的3個參數 producerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(producerProps); Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("group.id", "VoucherGroup"); //關閉自動提交offset consumerProps.put("enable.auto.commit", "false"); //此處開啟2個消費者進行消費 consumer = new KafkaConsumer<String, String>(consumerProps); consumer1 = new KafkaConsumer<String, String>(consumerProps); } /** * @Author xiabing5 * @Create 2019/8/1 19:40 * @Desc 同步發送消息 topic 主題 value 消息內容 **/ public static void sendSync(String topic, String value) throws ExecutionException, InterruptedException { producer.send(new ProducerRecord<String, String>(topic, value)).get(); } public static void consume(Consumer<String> c) { // 訂閱主題,可訂閱多個主題 consumer.subscribe(Arrays.asList("testKafkas")); while (true) { ConsumerRecords<String, String> records = consumer.poll(10); for (ConsumerRecord<String, String> record : records) { System.out.println("A"+record); c.accept(record.value()); } try { //同步手動提交offset consumer.commitSync(); } catch (CommitFailedException e) { System.out.println("Kafka消費者提交offset失敗"); } } } public static void consume1(Consumer<String> c) { // 訂閱主題,可訂閱多個主題 consumer1.subscribe(Arrays.asList("testKafkas")); while (true) { ConsumerRecords<String, String> records = consumer1.poll(10); for (ConsumerRecord<String, String> record : records) { System.out.println("A"+record); c.accept(record.value()); } try { //同步手動提交offset consumer1.commitSync(); } catch (CommitFailedException e) { System.out.println("Kafka消費者提交offset失敗"); } } } }
// 發送消息介面 public interface MessageService { // 發送消息 boolean sendMessage(String topic,String content); }
@Service public class MessageServiceImpl implements MessageService { // 創建執行緒池 @PostConstruct public void init() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("MessageConsumerThread-%d") .setDaemon(true) .build(); //ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory); ExecutorService executorService = Executors.newFixedThreadPool(2,threadFactory); MqMessageConsumerThread mqMessageConsumerThread= new MqMessageConsumerThread(); MqMessageConsumerThread1 mqMessageConsumerThread1= new MqMessageConsumerThread1(); executorService.execute(mqMessageConsumerThread); executorService.execute(mqMessageConsumerThread1); } // 自定義接收執行緒 private class MqMessageConsumerThread implements Runnable { @Override public void run() { KafkaUtil.consume(consumerRecord -> { System.out.println("A"+consumerRecord); }); } } private class MqMessageConsumerThread1 implements Runnable { @Override public void run() { KafkaUtil.consume1(consumerRecord -> { System.out.println("B"+consumerRecord); }); } } @Override public boolean sendMessage(String topic, String content) { try { KafkaUtil.sendSync(topic,content); // 執行業務邏輯 System.out.println("發送kafka消息成功"); } catch (Exception e) { // 發送失敗 System.out.println("發送kafka消息失敗"); // 失敗重發 try { KafkaUtil.sendSync(topic,content); }catch (Exception e1) { System.out.println("失敗"); } } return true; } }
/** * @Author xiabing5 * @Create 2019/9/19 20:17 * @Desc 容器初始化(模擬消息發送) **/ @Component public class InitApplication implements CommandLineRunner{ @Autowired MessageService messageService; @Override public void run(String... strings) throws Exception { for (int i = 0; i < 10; i++) { messageService.sendMessage("testKafkas",i+""); } } }
啟動容器,會出現如下介面
5. 總結
學習一項新技術,最好知道它實現的基本內涵,希望自己這個小白在每一次記錄中成長;
本文純原創,寫的不好或理解不到位的地方請多多指教;