我的 Kafka 旅程 – Linux下的安装 & 基础命令
准备工作
安装解压缩工具 tar
# 检查是否安装了解压缩工具 tar
yum list tar
# 如未安装 tar
yum install tar -y
安装必备的 java
# 检查是否安装了 java-openjdk,这里选择 java-1.8.0.openjdk 版
yum list java-1.8.0.openjdk
# 如未安装 java-openjdk
yum install java-1.8.0.openjdk -y
安装 kafka
官网下载地址://kafka.apache.org/downloads,这里下载 kafka_2.13-3.2.3.tgz 版。
# 下载 kafka 安装包文件
curl -O //downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
# 解压安装包
tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local
运行
在 2.8 之前,kafka 依赖与 zookeeper;2.8 之后,zookeeper 的替代品 karft。
以下命令均在解压包(kafka)的根目录进行。
基于 zookeeper 方式
# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties
#
# 验证启动;zookeeper 默认端口2181,kafka 默认端口为9092
ss -plnts # 端口列是否列出了 2181、9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh
# 停止 zookeeper
bin/zookeeper-server-stop.sh
基于 kraft 方式
# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 准备 uuid
bin/kafka-storage.sh random-uuid
# 配置 uuid
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
# 通过 kraft 配置 后台启动 kafka
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#
# 验证启动;kafka 默认端口为9092
ss -plnts # 端口列是否列出了 9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh
基本命令
开放端口:确保每个服务器所应用到的端口(2181 和 9092等)开放(参考firewall)。
Topic
# 查看已有的 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建一个新的 Topic,分区数1个,副本数3份
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create \
--partitions 1 --replication-factor 1
# 查看一个 Topic 详细
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe
# 修改一个 Topic 的属性:分区变更为2个(分区只能加,不能减少,对于消费者分不清数据所属)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2
Consumer
# 消费者订阅测试;新打开一个终端,连接到服务器,消费者订阅一个topic,接收消息测试
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning
Producer
# 生产者发送消息;新开一个终端窗体,连接到服务器,用来生产者发送消息(回车后输入要发送的消息内容)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}
切换到 消费者终端窗口,查看接收到订阅的 topic 消息。
集群部署
这里用三台机组成集群模式
- 把解压后的 /usr/local/kafka_2.13-3.2.3 文件夹分发到集群中的每个服务器
- 配置文件 config/server.properties | config/kraft/server.properties 的配置
- 启动每台服务器的 kafka
安装包分发到每台服务器
# 把解压后的 kafka 根目录文件夹分发到集群中的每台服务器
scp -r /usr/local/kafka_2.13-3.2.3 root@{服务器IP} /usr/local
zookeeper 方式
以 config/server.properties 文件为主的配置及启动
zookeeper 使用的端口:2181;kafka 使用的端口:9092。
# 新终端窗口登录到每台服务器,配置每台服务器的 config/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/server.properties
vi config/server.properties
# 以下两点必须的变更
# 1、确保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2)
# 2、确保每台 config/server.properties 中的 zookeeper.connect 都相同
# 示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181
#
#
# 启动每台服务器的 zookeeper,kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
kraft 方式
以 config/kraft/server.properties 文件为主的配置及启动
kraft 使用的端口:9093;kafka 使用的端口:9092。
# 新终端窗口登录到每台服务器,配置每台服务器的 config/kraft/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/kraft/server.properties
vi config/kraft/server.properties
#以下三点必须的变更
#
# 1、确保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3)
# 示例:node.id=1
#
# 2、确保每台 config/kraft/server.properties 中的 listeners 配置各自的主机名称或IP
# 示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093
#
# 3、确保每台 config/kraft/server.properties 中的 controller.quorum.voters 都相同
# 格式:controller.quorum.voters={node.id}@{host}:{port}
# 示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093
#
#
# 启动 kafka
# 先为集群生成一个uuid
bin/kafka-storage.sh random-uuid
# 分别为每台服务器配置相同的uuid后启动
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties