Kafka 入門(一)–安裝配置和 kafka-python 調用

一、Kafka 簡介

1.基本概念

  Kafka 是一個分散式的基於發布/訂閱消息系統,主要應用於大數據實時處理領域,其官網是://kafka.apache.org/。Kafka 是一個分散式、支援分區的(Partition)、多副本的(Replica),基於 ZooKeeper 協調的發布/訂閱消息系統。

  Kafka 有以下三個基本概念:

  • Kafka 作為一個集群運行在一個或多個伺服器上;
  • Kafka 集群存儲的消息是以 Topic 為類別記錄的;
  • 每個消息是由一個 Key,一個 Value 和時間戳構成。

2.基本架構

  Kafka 的基本架構圖如下:

  

  • Producer:生產者,就是向 Broker 發消息的客戶端;
  • Consumer:消費者,就是從 Broker 取消息的客戶端;
  • Consumer Group:消費者組,由多個消費者組成。組內每個消費者負責消費不同分區的數據,一個分區的數據只能由一個組內的消費者進行消費,組內消費者之間互不影響
  • Broker:一個 Kafka 伺服器就是一個 Broker,一個集群由多個 Broker 組成;
  • Topic:主題,可以理解為隊列,生成者和消費者都是用的同一個隊列;
  • Partition:分區,為實現擴展性,一個大的 Topic 可以分散到多個 Broker 上,一個 Topic 可以分為多個 Partition;
  • Replica:副本,保證集群中某個節點發生故障時,該節點上的數據不丟失。

 

二、Ubuntu 下安裝 Kafka

1.安裝 Java

  更新軟體包

sudo apt-get update

  安裝 openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

  查看 Java 版本,檢查是否安裝成功

   

2.安裝 ZooKeeper 

1)安裝

  下載 ZooKeeper://mirrors.hust.edu.cn/apache/zookeeper/

  下載好之後解壓(注意:3.5.5之後的版本應該下載文件名中帶「bin」的壓縮包),再執行如下命令:

sudo mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper

cd /usr/local/zookeeper

cp conf/zoo_sample.cfg conf/zoo.cfg

  其中有一些配置參數:

  • tickTime:Zookeeper 使用的基本時間單元,默認值2000;
  • initLimit:Zookeeper 中連接同步的最大時間,默認值為10;
  • syncLimit:Zookeeper 中進行心跳檢測的最大時間,默認值為5;
  • dataDir:資料庫更新事物保存的目錄;
  • clientPort:Zookeeper 服務監聽的埠,默認值為2181。

2)配置

  修改 /etc/profile 文件,增加如下內容:

export ZOOKEEPER_HOME=/usr/local/zookeeper/

export PATH=$PATH:$ZOOKEEPER_HOME/bin

  更新環境變數

source /etc/profile

3)測試

  首先進入 bin 目錄,開啟服務:

  

  再啟動 CLI 連接服務:

  

3.安裝 Kafka

1)安裝

  下載 Kafka://kafka.apache.org/downloads

  

  下載好之後解壓,再執行如下命令:

sudo mv kafka_2.13-2.5.0/ /usr/local/kafka

cd /usr/local/kafka

2)測試

  由於前面已經啟動了 Zookeeper 服務,所以這裡只需要執行如下命令來開啟 Kafka 服務:

bin/kafka-server-start.sh config/server.properties

  通過輸出資訊可以看到 Kafka 服務已經成功開啟了,截圖如下:

  

  但這樣開啟之後是阻塞的了,我們可以在中間加一個「-daemon」即開一個守護進程來運行,則命令如下:

bin/kafka-server-start.sh -daemon config/server.properties

  創建一個主題,用一個分區和一個副本創建一個名為「mytopic」的主題:

bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic mytopic

  

  這樣就已經創建成功了,然後可以使用如下命令查看主題:

bin/kafka-topics.sh –list –zookeeper 127.0.0.1:2181

  Kafka 有一個命令行服務端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送:

bin/kafka-console-producer.sh –broker-list 127.0.0.1:9092 –topic mytopic

   

  同樣的,Kafka 還有一個命令行客戶端,可以從 Kafka 集群中獲取消息:

bin/kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –topic mytopic –from-beginning

  

 

三、kafka-python 使用

1.安裝 kafka-python

pip3 install kafka-python

2.創建 Consumer

  Consumer 消費者負責從 Kafka 中獲取消息進行處理,需要實例化 KafkaConsumer 這個類。

1 from kafka import KafkaConsumer
2 
3 
4 consumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"])
5 for msg in consumer:
6     print(msg)

3.創建 Producer

  Producer 生產者負責向 Kafka 生產和發送消息,需要實例化 KafkaProducer 這個類。

1 from kafka import KafkaProducer
2 
3 
4 producer = KafkaProducer(bootstrap_servers="localhost:9092")
5 for i in range(10):
6     producer.send("test", "Hello {}".format(i).encode("utf-8"))
7 producer.close()

4.運行測試

  先運行消費者程式,再運行生產者程式,消費者一直在監聽,等到生產者發送消息,消費者就把消息取出,運行結果如下:

   可以看到其中每個消息都包含了主題、分區、消息內容、時間戳等資訊。

Tags: