我的 Kafka 旅程 – Linux下的安裝 & 基礎命令

準備工作

安裝解壓縮工具 tar

# 檢查是否安裝了解壓縮工具 tar
yum list tar
# 如未安裝 tar
yum install tar -y

安裝必備的 java

# 檢查是否安裝了 java-openjdk,這裡選擇 java-1.8.0.openjdk 版
yum list java-1.8.0.openjdk
# 如未安裝 java-openjdk
yum install java-1.8.0.openjdk -y

安裝 kafka

官網下載地址://kafka.apache.org/downloads,這裡下載 kafka_2.13-3.2.3.tgz 版。

# 下載 kafka 安裝包文件
curl -O //downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
# 解壓安裝包
tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local

運行

在 2.8 之前,kafka 依賴與 zookeeper;2.8 之後,zookeeper 的替代品 karft。
以下命令均在解壓包(kafka)的根目錄進行。

基於 zookeeper 方式

# 啟動
#
# 切換到解壓後的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
# 啟動 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 啟動 kafka
bin/kafka-server-start.sh -daemon config/server.properties
#
# 驗證啟動;zookeeper 默認埠2181,kafka 默認埠為9092
ss -plnts			# 埠列是否列出了 2181、9092,表示啟動正常
#
# 停止 kafka
bin/kafka-server-stop.sh
# 停止 zookeeper
bin/zookeeper-server-stop.sh

基於 kraft 方式

# 啟動
#
# 切換到解壓後的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
# 準備 uuid
bin/kafka-storage.sh random-uuid
# 配置 uuid
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
# 通過 kraft 配置 後台啟動 kafka
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#
# 驗證啟動;kafka 默認埠為9092
ss -plnts			# 埠列是否列出了 9092,表示啟動正常
#
# 停止 kafka
bin/kafka-server-stop.sh

基本命令

開放埠:確保每個伺服器所應用到的埠(2181 和 9092等)開放(參考firewall)。

Topic

# 查看已有的 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 創建一個新的 Topic,分區數1個,副本數3份
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create \
--partitions 1 --replication-factor 1
# 查看一個 Topic 詳細
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe
# 修改一個 Topic 的屬性:分區變更為2個(分區只能加,不能減少,對於消費者分不清數據所屬)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2

Consumer

# 消費者訂閱測試;新打開一個終端,連接到伺服器,消費者訂閱一個topic,接收消息測試
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning

Producer

# 生產者發送消息;新開一個終端窗體,連接到伺服器,用來生產者發送消息(回車後輸入要發送的消息內容)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}

切換到 消費者終端窗口,查看接收到訂閱的 topic 消息。

集群部署

這裡用三台機組成集群模式

  • 把解壓後的 /usr/local/kafka_2.13-3.2.3 文件夾分發到集群中的每個伺服器
  • 配置文件 config/server.properties | config/kraft/server.properties 的配置
  • 啟動每台伺服器的 kafka

安裝包分發到每台伺服器

# 把解壓後的 kafka 根目錄文件夾分發到集群中的每台伺服器
scp -r /usr/local/kafka_2.13-3.2.3 root@{伺服器IP} /usr/local

zookeeper 方式

以 config/server.properties 文件為主的配置及啟動

zookeeper 使用的埠:2181;kafka 使用的埠:9092。

# 新終端窗口登錄到每台伺服器,配置每台伺服器的 config/server.properties
#
#
# 切換到每台伺服器的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
#
# 編輯每台配置文件 config/server.properties
vi config/server.properties
# 以下兩點必須的變更
# 	1、確保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2)
#	2、確保每台 config/server.properties 中的 zookeeper.connect 都相同
#	示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181
#
#
# 啟動每台伺服器的 zookeeper,kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

kraft 方式

以 config/kraft/server.properties 文件為主的配置及啟動

kraft 使用的埠:9093;kafka 使用的埠:9092。

# 新終端窗口登錄到每台伺服器,配置每台伺服器的 config/kraft/server.properties
#
#
# 切換到每台伺服器的 kafka 根目錄
cd /usr/local/kafka_2.13-3.2.3
#
# 編輯每台配置文件 config/kraft/server.properties
vi config/kraft/server.properties
#以下三點必須的變更
#
# 	1、確保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3)
#	示例:node.id=1
#
#	2、確保每台 config/kraft/server.properties 中的 listeners 配置各自的主機名稱或IP
#	示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093
#
#	3、確保每台  config/kraft/server.properties 中的 controller.quorum.voters 都相同
#	格式:controller.quorum.voters={node.id}@{host}:{port}
#	示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093
#
#
# 啟動 kafka
# 先為集群生成一個uuid
bin/kafka-storage.sh random-uuid
# 分別為每台伺服器配置相同的uuid後啟動
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties

 

Tags: