Kafka-消息系統

1、Kafka概念

kafka是一個高吞吐的分散式消息系統,它類似HDFS用來存儲數,但HDFS是持久化的,文件數據會一直保留,而Kafka只存儲一段時間的數據,長時間不消費會自動刪除,同時存儲採用零拷貝技術,可以不需要再記憶體中消費資源

2、kafka架構

1、broker:kafka集群的server,可以負責處理讀寫消息並存儲,多個broker是利用了zookeeper的協同服務,分散式可以做到數據的備份操作

2、topic:topic就相當於消息隊列,本質也是K-V格式的,但其實每一個topic都可以分成多個partition分區的,而每一個分區就相當於一個小文件,一個partition對應一個broker,而一個broker可以管理多個partition

3、partition:每個partition內部的消息都是有序號offset提供的,都是進行排序的,這樣方便在讀寫錯誤不用重頭來。生產者在進行生產數據時,也可以自定義寫到哪一個的partition中去,要實現負載均衡,類似於shuffle階段的基於hashcode分區操作

4、中間消息採用零拷貝技術,數據不需要在記憶體中拷貝消耗資源,大大加快了速度,而且寫入磁碟是有順序的。常見的零拷貝技術:Linux的sendfile()和java NIO中的FileChannel.transferTo()

5、存儲的是數據是根據自己指定的或者默認的策略進行一段時間的刪除,並不是消費完就刪除了

3、Kafka消費模型

1、消費者需要消費kafka集群裡面的數據,那麼每一個consumer都需要維護好自己消費到哪一個offset

2、每個consumer都有自己對應的group,group內便是消息隊列里的消費模型,各個group各自獨立消費,互不影響,同時各個consumer消費不同的partition,因此一個消息在group內只消費一次

4、實現Kafka的生產端

package com.dtc.bigdata.kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;


public class KafkaProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();

        //指定kafka的broker的地址
        properties.setProperty("bootstrap.servers","master2:9092");

        //設置key和value的序列化,沒有會報"key.serializer"錯誤
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //創建生產者
        org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);

        //傳入topic的值
        //如果topic值不存在,則會自動創建一個分區為1,副本為1的topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("zhenyun", "zyliyuiyuizylzyl");

        
        producer.send(producerRecord);

        producer.flush();

        producer.close();


    }
}

5、實現Kafka的消費端

package com.dtc.bigdata.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;

public class KafakaComsumer {
    public static void main(String[] args) {


        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "master2:9092");

        //反序列化
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //增加組
        properties.setProperty("group.id", "aaaa");

        //顯示最開始的數據
        properties.put("auto.offset.reset", "earliest");

        //創建消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        System.out.println("創建消費者成功");

        //訂閱topic
        ArrayList<String> topics = new ArrayList<>();
        topics.add("zhenyun");
        consumer.subscribe(topics);


        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(1000);
           

            Iterator<ConsumerRecord<String, String>> it = poll.iterator();
            while (it.hasNext()) {

                ConsumerRecord<String, String> consumerRecord = it.next();

                //分別取topic,分區,偏移量,值
                String topic = consumerRecord.topic();
                int partition = consumerRecord.partition();
                long offset = consumerRecord.offset();
                String value = consumerRecord.value();

                System.out.println(topic + "," + partition + "," + offset + "," + value);


            }
        }


    }
}

6、Flume整合Kafka

1、調整flume的配置文件,監控namenode的日誌文件

agent.sources=s1
agent.channels=c1
agent.sinks=k1

agent.sources.s1.type=exec

#監聽文件地址
agent.sources.s1.command=tail -F  /usr/local/soft/hadoop-2.7.6/logs/hadoop-root-namenode-master.log


agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

#設置Kafka接收器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#設置Kafka的broker地址和埠號
agent.sinks.k1.brokerList=master:9092,node1:9092,node2:9092

#設置Kafka的Topic   如果topic不存在會自動創建一個topic,默認分區為1,副本為1
agent.sinks.k1.topic=hadoop-namenode-log

#設置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

#將三個主件串聯起來
agent.sources.s1.channels=c1
agent.sinks.k1.channel=c1

#flume-ng agent -n agent -f ./FlumeToKafka.properties -Dflume.root.logger=DEBUG,console
#kafka-console-consumer.sh --zookeeper  node1:2181  --from-beginning --topic flume

2、啟動flume

flume-ng agent -n agent -f ./FlumeToKafka.properties -Dflume.root.logger=INFO,console

3、啟動kafka控制台消費者查看數據

kafka-console-consumer.sh --bootstrap-server  master2:9092, --from-beginning --topic flume