我的 Kafka 旅程 – Linux下的安装 & 基础命令

准备工作

安装解压缩工具 tar

# 检查是否安装了解压缩工具 tar
yum list tar
# 如未安装 tar
yum install tar -y

安装必备的 java

# 检查是否安装了 java-openjdk,这里选择 java-1.8.0.openjdk 版
yum list java-1.8.0.openjdk
# 如未安装 java-openjdk
yum install java-1.8.0.openjdk -y

安装 kafka

官网下载地址://kafka.apache.org/downloads,这里下载 kafka_2.13-3.2.3.tgz 版。

# 下载 kafka 安装包文件
curl -O //downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
# 解压安装包
tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local

运行

在 2.8 之前,kafka 依赖与 zookeeper;2.8 之后,zookeeper 的替代品 karft。
以下命令均在解压包(kafka)的根目录进行。

基于 zookeeper 方式

# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties
#
# 验证启动;zookeeper 默认端口2181,kafka 默认端口为9092
ss -plnts			# 端口列是否列出了 2181、9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh
# 停止 zookeeper
bin/zookeeper-server-stop.sh

基于 kraft 方式

# 启动
#
# 切换到解压后的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
# 准备 uuid
bin/kafka-storage.sh random-uuid
# 配置 uuid
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
# 通过 kraft 配置 后台启动 kafka
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#
# 验证启动;kafka 默认端口为9092
ss -plnts			# 端口列是否列出了 9092,表示启动正常
#
# 停止 kafka
bin/kafka-server-stop.sh

基本命令

开放端口:确保每个服务器所应用到的端口(2181 和 9092等)开放(参考firewall)。

Topic

# 查看已有的 Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建一个新的 Topic,分区数1个,副本数3份
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create \
--partitions 1 --replication-factor 1
# 查看一个 Topic 详细
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe
# 修改一个 Topic 的属性:分区变更为2个(分区只能加,不能减少,对于消费者分不清数据所属)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2

Consumer

# 消费者订阅测试;新打开一个终端,连接到服务器,消费者订阅一个topic,接收消息测试
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning

Producer

# 生产者发送消息;新开一个终端窗体,连接到服务器,用来生产者发送消息(回车后输入要发送的消息内容)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}

切换到 消费者终端窗口,查看接收到订阅的 topic 消息。

集群部署

这里用三台机组成集群模式

  • 把解压后的 /usr/local/kafka_2.13-3.2.3 文件夹分发到集群中的每个服务器
  • 配置文件 config/server.properties | config/kraft/server.properties 的配置
  • 启动每台服务器的 kafka

安装包分发到每台服务器

# 把解压后的 kafka 根目录文件夹分发到集群中的每台服务器
scp -r /usr/local/kafka_2.13-3.2.3 root@{服务器IP} /usr/local

zookeeper 方式

以 config/server.properties 文件为主的配置及启动

zookeeper 使用的端口:2181;kafka 使用的端口:9092。

# 新终端窗口登录到每台服务器,配置每台服务器的 config/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/server.properties
vi config/server.properties
# 以下两点必须的变更
# 	1、确保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2)
#	2、确保每台 config/server.properties 中的 zookeeper.connect 都相同
#	示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181
#
#
# 启动每台服务器的 zookeeper,kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

kraft 方式

以 config/kraft/server.properties 文件为主的配置及启动

kraft 使用的端口:9093;kafka 使用的端口:9092。

# 新终端窗口登录到每台服务器,配置每台服务器的 config/kraft/server.properties
#
#
# 切换到每台服务器的 kafka 根目录
cd /usr/local/kafka_2.13-3.2.3
#
# 编辑每台配置文件 config/kraft/server.properties
vi config/kraft/server.properties
#以下三点必须的变更
#
# 	1、确保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3)
#	示例:node.id=1
#
#	2、确保每台 config/kraft/server.properties 中的 listeners 配置各自的主机名称或IP
#	示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093
#
#	3、确保每台  config/kraft/server.properties 中的 controller.quorum.voters 都相同
#	格式:controller.quorum.voters={node.id}@{host}:{port}
#	示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093
#
#
# 启动 kafka
# 先为集群生成一个uuid
bin/kafka-storage.sh random-uuid
# 分别为每台服务器配置相同的uuid后启动
bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties

 

Tags: