我的 Kafka 旅程 – Linux下的安裝 & 基礎命令
準備工作
安裝解壓縮工具 tar
# 檢查是否安裝了解壓縮工具 tar
yum list tar
# 如未安裝 tar
yum install tar -y
安裝必備的 java
# 檢查是否安裝了 java-openjdk,這裡選擇 java-1.8.0.openjdk 版
yum list java-1.8.0.openjdk
# 如未安裝 java-openjdk
yum install java-1.8.0.openjdk -y
安裝 kafka
官網下載地址://kafka.apache.org/downloads,這裡下載 kafka_2.13-3.2.3.tgz 版。
# 下載 kafka 安裝包文件
curl -O //downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
# 解壓安裝包
tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local
運行
在 2.8 之前,kafka 依賴與 zookeeper;2.8 之後,zookeeper 的替代品 karft。
以下命令均在解壓包(kafka)的根目錄進行。
基於 zookeeper 方式
# 啟動
#
# 切換到解壓後的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
# 啟動 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 啟動 kafka
bin/kafka-server-start.sh -daemon config/server.properties
#
# 驗證啟動;zookeeper 默認埠2181,kafka 默認埠為9092
ss -plnts # 埠列是否列出了 2181、9092,表示啟動正常
#
# 停止 kafka
bin/kafka-server-stop.sh
# 停止 zookeeper
bin/zookeeper-server-stop.sh
基於 kraft 方式
# 啟動
#
# 切換到解壓後的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
# 準備 uuid
bin/kafka-storage.sh random-uuid
# 配置 uuid
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
# 通過 kraft 配置 後台啟動 kafka
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#
# 驗證啟動;kafka 默認埠為9092
ss -plnts # 埠列是否列出了 9092,表示啟動正常
#
# 停止 kafka
bin/kafka-server-stop.sh
基本命令
開放埠:確保每個伺服器所應用到的埠(2181 和 9092等)開放(參考firewall)。
Topic
# 查看已有的 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 創建一個新的 Topic,分區數1個,副本數3份
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create \
--partitions 1 --replication-factor 1
# 查看一個 Topic 詳細
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe
# 修改一個 Topic 的屬性:分區變更為2個(分區只能加,不能減少,對於消費者分不清數據所屬)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2
Consumer
# 消費者訂閱測試;新打開一個終端,連接到伺服器,消費者訂閱一個topic,接收消息測試
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning
Producer
# 生產者發送消息;新開一個終端窗體,連接到伺服器,用來生產者發送消息(回車後輸入要發送的消息內容)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}
切換到 消費者終端窗口,查看接收到訂閱的 topic 消息。
集群部署
這裡用三台機組成集群模式
- 把解壓後的 /usr/local/kafka_2.13-3.2.3 文件夾分發到集群中的每個伺服器
- 配置文件 config/server.properties | config/kraft/server.properties 的配置
- 啟動每台伺服器的 kafka
安裝包分發到每台伺服器
# 把解壓後的 kafka 根目錄文件夾分發到集群中的每台伺服器
scp -r /usr/local/kafka_2.13-3.2.3 root@{伺服器IP} /usr/local
zookeeper 方式
以 config/server.properties 文件為主的配置及啟動
zookeeper 使用的埠:2181;kafka 使用的埠:9092。
# 新終端窗口登錄到每台伺服器,配置每台伺服器的 config/server.properties
#
#
# 切換到每台伺服器的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
#
# 編輯每台配置文件 config/server.properties
vi config/server.properties
# 以下兩點必須的變更
# 1、確保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2)
# 2、確保每台 config/server.properties 中的 zookeeper.connect 都相同
# 示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181
#
#
# 啟動每台伺服器的 zookeeper,kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
kraft 方式
以 config/kraft/server.properties 文件為主的配置及啟動
kraft 使用的埠:9093;kafka 使用的埠:9092。
# 新終端窗口登錄到每台伺服器,配置每台伺服器的 config/kraft/server.properties
#
#
# 切換到每台伺服器的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
#
# 編輯每台配置文件 config/kraft/server.properties
vi config/kraft/server.properties
#以下三點必須的變更
#
# 1、確保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3)
# 示例:node.id=1
#
# 2、確保每台 config/kraft/server.properties 中的 listeners 配置各自的主機名稱或IP
# 示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093
#
# 3、確保每台 config/kraft/server.properties 中的 controller.quorum.voters 都相同
# 格式:controller.quorum.voters={node.id}@{host}:{port}
# 示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093
#
#
# 啟動 kafka
# 先為集群生成一個uuid
bin/kafka-storage.sh random-uuid
# 分別為每台伺服器配置相同的uuid後啟動
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties