.Net Core 集成 Kafka

最近維護的一個系統並發有點高,所以想引入一個消息隊列來進行削峰。考察了一些產品,最終決定使用kafka來當做消息隊列。以下是關於kafka的一些知識的整理筆記。

kafka

kafka 是分佈式流式平台。它由linkedin開發,後貢獻給了Apache開源組織並成為頂級開源項目。它可以應用在高並發場景下的日誌系統,也可以當作消息隊列來使用,也可以當作消息服務對系統進行解耦。

流處理平台有以下三種特性:

  1. 可以讓你發佈和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。
  2. 可以儲存流式的記錄,並且有較好的容錯性。
  3. 可以在流式記錄產生時就進行處理。

一般它可以應用於兩個場景:

  1. 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue)
  2. 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)

broker

kafka中的每個節點即每個服務器就是一個broker 。

topic

kafka中的topic是一個分類的概念,表示一類消息。生產者在生產消息的時候需要指定topic,消費者在消費消息的時候也需要指定topic。

partition

partition是分區的概念。kafka的一個topic可以有多個partition。每個partition會分散到不同的broker上,起到負載均衡的作用。生產者的消息會通過算法均勻的分散在各個partition上。

consumer group

kafka的消費者有個組的概念。一個partition可以被多consumer group訂閱。每個消息會廣播到每一個group中。但是每個消息只會被group中的一個consumer消費。相當於每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數量不可以超過topic中partition的數量。並且消息的消費的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因為consumer的消費速度是有快慢的。
所以如果要用kafka實現嚴格的消息隊列點對點模式那麼我們可以設置一個partition並且設置一個consumer。如果對消息消費的順序不是那麼敏感,那麼可以設置多個partition來並行消費消息,提高吞吐量。

安裝kafka

為了能體驗下kafka,我們還是要實際安裝一下kafka,畢竟空想是沒有用的。現在有了docker,安裝起來也是相當滴簡單。我們只需要定義好docker-compose的yml就行了。

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我們在yml里定義2個service:

  1. zookeeper,kafka的分佈式依賴zookeeper,所以我需要先定義它。
  2. kafka ,kafka的定義有幾個地方要注意的。
  • depends_on:zookeeper 指定kafka依賴zookeeper這個service,當啟動kafka的時候自動會啟動zookeeper。
  • KAFKA_ADVERTISED_HOST_NAME 這裡要指定宿主機的ip
  • KAFKA_CREATE_TOPICS 這個變量只是的默認創建的topic。”test:3:1″代表創建一個名為test的topic並且創建3個分區1個複製。

定義好這些之後我們只需要使用docker-compose命令運行它:

sudo docker-compose up -d

.net 操作 kafka

安裝好kafka的docker環境之後,下面演示下如何使用.net操作kafka,進行消息的生產與消費。

生產者

        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };


            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

新建一個控制台項目,從nuget安裝kafka的官方client。

Install-Package Confluent.Kafka

代碼非常簡單,使用ProducerBuilder構造一個producer,然後調用ProduceAsync方法發送消息。
其中需要注意的是如果你的場景並發非常之高,官方文檔推薦的方法是Produce而不是ProduceAsync。這是一個比較迷的地方。按常理使用ProduceAsync應該比使用同步方法Produce能獲得更高的並發才對。但是文檔確確實實說高並發場景請使用Produce。可能是為了避免ProduceAsync結果返回的時候異步線程上下文切換造成的性能開銷。
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

消費者

        static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

消費者的演示代碼同樣很簡單。我們需要指定groupId,然後訂閱topic。使用ConsumerBuilder構造一個consumer,然後調用Consume方法進行消費就可以。
注意:
這裡默認是自動commit消費。你也可以根據情況手動提交commit。

運行一下


我們運行一個生產者進程,按照500ms的速度生產消息。運行三個consumer進行消費,可以看到消息被均勻的推送到三個consumer上去。

總結

以上簡單的介紹了kafka的背景、安裝方法、使用場景。還簡單演示了如何使用.net來操作kafka。它可以當作流式計算平台來使用,也可以當作傳統的消息隊列使用。它當前非常流行,網上的資料也多如牛毛。官方也提供了簡單易用的.net sdk ,為.net 平台集成kafka提供了便利。

關注我的公眾號一起玩轉技術