.net core kafka 入門實例 一篇看懂

  • 2020 年 5 月 20 日
  • 筆記
 
kafka 相信都有聽說過,不管有沒有用過,在江湖上可以說是大名鼎鼎,就像天龍八部里的喬峰。國際慣例,先介紹生平事迹
 
簡介

Kafka 是由 Apache軟體基金會 開發的一個開源流處理平台,由 Scala Java 編寫。Kafka是一種高吞吐量的 分散式 ,支援分區(partition),多副本(replica)的 發布訂閱消息系統 。與其他MQ最大不同是Topic 具有分區(Partition)的概念,消息出隊的速度也比其他MQ快。

 

特性及適用場景
  • 高吞吐量、低延遲
  • 可擴展性:集群支援熱擴展
  • 持久性、可靠性
  • 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高並發:支援數千個客戶端同時讀寫
常用場景
  • 日誌收集
  • 消息系統:生產者和消費者、快取消息等。
  • 用戶活動跟蹤:流網頁、搜索、點擊等活動
  • 運營指標
  • 工作流處理
  • 對實時性要求不高的數據處理

 

 

 

 

Kafka基礎概念

 

Topic

Kafka 中可將消息分類,每一類的消息稱為一個 Topic(主題),消費者可以對不同的 Topic 進行不同的處理。Topic相當於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分布在這個topic下的不同的partition上

Broker

每個 Broker(代理) 即一個 Kafka 服務實例,多個 Broker 構成一個 Kafka 集群,生產者發布的消息將保存在 Broker 中,消費者將從 Broker 中拉取消息進行消費。

 

producer

生產者

consumer

消費者

Partition

分區,Kafka 中比較特色的部分,一個 Topic 可以分為多個 Partition,每個 Partition 是一個有序的隊列,Partition 中的每條消息都存在一個有序的偏移量(Offest) ,同一個 Consumer Group 中,只有一個 Consumer 實例可消費某個 Partition 的消息。

 

持久化

Kafka會把消息持久化到本地文件系統中每個 Topic 將消息分成多 Partition,每個 Partition 在存儲層面是 append log 文件。任何發布到此 Partition 的消息都會被直接追加到 log 文件的尾部,每條消息在文件中的位置稱為 Offest(偏移量),Partition 是以文件的形式存儲在文件系統中,log 文件根據 Broker 中的配置保留一定時間後刪除來釋放磁碟空間。

由於message的寫入持久化是順序寫入的,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。

 

 

看到上面的一堆特性,巴拉巴拉,一頓吹,道理我都懂,怎麼操作,還是沒看到效果。

別急,接下來就上程式碼,這個是不能少的。保證你們拿去就能用

 

 

上程式碼,demo測試
先創建兩個介面,寫好基礎類庫,後面直接應用就行了,我這裡就直接放一起了
 /// <summary>
    /// 消費者
    /// </summary>
    public interface IKafkaConsumer : IDisposable
    {
        /// <summary>
        /// 消費數據
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <returns></returns>
        T Consume<T>() where T : class;
    }

  public interface IKafkaProducer : IDisposable
    {
        /// <summary>
        /// 發布消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <param name="data"></param>
        /// <param name="operateType"></param>
        /// <returns></returns>
        bool Produce<T>(string key, T data, int operateType) where T : class;
    }

  實現方法

using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace Kafka
{
    public class KafkaConsumer : IKafkaConsumer
    {
        private bool disposeHasBeenCalled = false;
        private readonly object disposeHasBeenCalledLockObj = new object();

        private readonly IConsumer<string, string> _consumer;

        /// <summary>
        /// 構造函數,初始化配置
        /// </summary>
        /// <param name="config">配置參數</param>
        /// <param name="topic">主題名稱</param>
        public KafkaConsumer(ConsumerConfig config, string topic)
        {
            _consumer = new ConsumerBuilder<string, string>(config).Build();

            _consumer.Subscribe(topic);
        }

        /// <summary>
        /// 消費
        /// </summary>
        /// <returns></returns>
        public T Consume<T>() where T : class
        {
            try
            {
                var result = _consumer.Consume(TimeSpan.FromSeconds(1));
                if (result != null)
                {
                    if (typeof(T) == typeof(string))
                        return (T)Convert.ChangeType(result.Value, typeof(T));

                    return JsonConvert.DeserializeObject<T>(result.Value);
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"consume error: {e.Error.Reason}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"consume error: {e.Message}");
            }

            return default;
        }

        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Dispose
        /// </summary>
        /// <param name="disposing"></param>
        protected virtual void Dispose(bool disposing)
        {
            lock (disposeHasBeenCalledLockObj)
            {
                if (disposeHasBeenCalled) { return; }
                disposeHasBeenCalled = true;
            }

            if (disposing)
            {
                _consumer?.Close();
            }
        }
    }

}

  

 public class KafkaProducer : IKafkaProducer
    {
        private bool disposeHasBeenCalled = false;
        private readonly object disposeHasBeenCalledLockObj = new object();

        private readonly IProducer<string, string> _producer;
        private readonly string _topic;

        /// <summary>
        /// 構造函數,初始化配置
        /// </summary>
        /// <param name="config">配置參數</param>
        /// <param name="topic">主題名稱</param>
        public KafkaProducer(ProducerConfig config, string topic)
        {
            _producer = new ProducerBuilder<string, string>(config).Build();
            _topic = topic;
        }

        /// <summary>
        /// 發布消息
        /// </summary>
        /// <typeparam name="T">數據實體</typeparam>
        /// <param name="key">數據key,partition分區會根據key</param>
        /// <param name="data">數據</param>
        /// <param name="operateType">操作類型[增、刪、改等不同類型]</param>
        /// <returns></returns>
        public bool Produce<T>(string key, T data, int operateType) where T : class
        {
            var obj = JsonConvert.SerializeObject(new
            {
                Type = operateType,
                Data = data
            });

            try
            {
                var result = _producer.ProduceAsync(_topic, new Message<string, string>
                {
                    Key = key,
                    Value = obj
                }).ConfigureAwait(false).GetAwaiter().GetResult();

#if DEBUG

                Console.WriteLine($"Topic: {result.Topic} Partition: {result.Partition} Offset: {result.Offset}");
#endif
                return true;

            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
            catch (Exception e)
            {
                Console.WriteLine($"Delivery failed: {e.Message}");
            }

            return false;
        }

        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Dispose
        /// </summary>
        /// <param name="disposing"></param>
        protected virtual void Dispose(bool disposing)
        {
            lock (disposeHasBeenCalledLockObj)
            {
                if (disposeHasBeenCalled) { return; }
                disposeHasBeenCalled = true;
            }

            if (disposing)
            {
                _producer?.Dispose();
            }
        }
    }

  

再寫兩個測試方法,一個發送消息,一個接收消息,控制台就好
注意  kafka 通過 topic 來接收消息 new KafkaProducer(config, “topic-c”))  發送方和接收方的topic要一致
 static void Main(string[] args)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.All
            };
             //發送消息
        
            using (var kafkaProducer = new KafkaProducer(config, "topic-d"))
            {
                var result = kafkaProducer.Produce<object>("a", new { name = "豬八戒3" }, 1);

            }
            Console.WriteLine("消息發送成功");
        }        








static void Main(string[] args)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "test",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            string text;
            Console.WriteLine("接受中......");
            while ((text = Console.ReadLine()) != "q")
            {
               //接受消息
                using (var kafkaProducer = new KafkaConsumer(config, "topic-d"))
                {
                    var result = kafkaProducer.Consume<object>();
                    if (result != null)
                    {
                        Console.WriteLine(result.ToString());
                    }

                }
            }

        }

 上結果、

 

 

可以看到,消息已經收到了。這個demo里,消費端要一直處於正常狀態才行,才能消費生產者得資訊

    

本文版權歸作者和部落格園共有,來源網址:歡迎各位轉載,但是未經作者本人同意,轉載文章之後必須在文章頁面明顯位置給出作者和原文連接,否則保留追究法律責任的權利。