Kafka 安装与启动
- 2019 年 11 月 26 日
- 筆記
1. 下载代码
下载 2.3.0 版本并解压缩:
tar -zxvf kafka_2.12-2.3.0.tgz -C .
创建软连接便于升级:
ln -s kafka_2.12-2.3.0/ kafka
配置环境变量:
export KAFKA_HOME=/Users/smartsi/opt/kafka export PATH=${KAFKA_HOME}/bin:$PATH
2. 安装ZooKeeper
Kafka 依赖 ZooKeeper,如果你还没有 ZooKeeper 服务器,你需要先启动一个 ZooKeeper 服务器。可以先参考ZooKeeper 安装与启动来安装 ZooKeeper。ZooKeeper 配置如下:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/Users/smartsi/opt/zookeeper/data clientPort=2181 server.1=localhost:2888:3888
你也可以通过与 kafka 打包在一起的便捷脚本来快速简单地创建一个单节点 ZooKeeper 实例:
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
3. 配置Kafka
第一个 broker 配置 server-9092.properties
如下:
broker.id=0 listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
运行起来至少要配置四项。上面的前四项。
第二个 broker 配置 server-9093.properties
如下:
broker.id=1 listeners=PLAINTEXT://127.0.0.1:9093 log.dirs=/Users/smartsi/opt/kafka/logs/log-9093 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
第三个 broker 配置 server-9094.properties
如下:
broker.id=2 listeners=PLAINTEXT://127.0.0.1:9094 log.dirs=/Users/smartsi/opt/kafka/logs/log-9094 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有都在同一个端口注册,或者覆盖彼此的数据。所以用端口号9092、9093、9094分别代表三个 broker。
下面具体解释一下我们的配置项:
(1) Broker相关:
broker.id=0
broker 的 Id。每一个 broker 在集群中的唯一标示,要求是正数。每个 broker 都不相同。
(2) Socket服务设置:
listeners=PLAINTEXT://127.0.0.1:9092
Socket服务器监听的地址,如果没有设置,则监听 java.net.InetAddress.getCanonicalHostName()
返回的地址。
(3) ZooKeeper相关:
zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
zookeeper.connect
是一个逗号分隔的 host:port
键值对,每个对应一个 zk 服务器。例如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
。你还可以将可选的客户端命名空间 Chroot 字符串追加到 URL 上以指定所有 kafka 的 Znode 的根目录。另外这个 kafka-2.3.0
这个节点需要你提前建立。让 Kafka 把他需要的数据结构都建立在这个节点下,否则会建立在根节点 /
节点下。
(3) 日志相关:
log.dirs=/Users/smartsi/opt/kafka/logs/log-9092
Kafka存储Log的目录。
4. 启动Kafka服务器
有两种方式可以启动 Kafka 服务器:
# 第一种方式(推荐) bin/kafka-server-start.sh -daemon config/server.properties # 第二种方式 nohup bin/kafka-server-start.sh config/server.properties &
我们以第一种方式启动 Kafka 服务器:
bin/kafka-server-start.sh -daemon config/server-9092.properties bin/kafka-server-start.sh -daemon config/server-9093.properties bin/kafka-server-start.sh -daemon config/server-9094.properties
查看进程和端口:
smartsi:kafka smartsi$ jps 8914 DataNode 42802 Jps 9252 NodeManager 41253 Kafka 41541 Kafka 42790 Kafka 41670 ZooKeeperMain 16731 9164 ResourceManager 1997
我们现在看一下 Kafka 在 ZooKeeper 上创建的节点:
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0 [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
看一下我们在ZooKeeper上注册的两个 broker:
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids [0, 1, 2] [zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4} cZxid = 0x92 ctime = Mon Sep 02 10:08:41 CST 2019 mZxid = 0x92 mtime = Mon Sep 02 10:08:41 CST 2019 pZxid = 0x92 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560012 dataLength = 188 numChildren = 0 [zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4} cZxid = 0xa7 ctime = Mon Sep 02 10:08:48 CST 2019 mZxid = 0xa7 mtime = Mon Sep 02 10:08:48 CST 2019 pZxid = 0xa7 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560014 dataLength = 188 numChildren = 0 [zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4} cZxid = 0xbd ctime = Mon Sep 02 10:19:09 CST 2019 mZxid = 0xbd mtime = Mon Sep 02 10:19:09 CST 2019 pZxid = 0xbd cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560018 dataLength = 188 numChildren = 0
5. 测试Kafka
5.1 创建Topic
让我们创建一个名为 test
的 Topic,它有一个分区和一个副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test
现在我们可以运行 list
命令来查看这个 Topic:
smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0 test
或者,你也可将代理配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。
5.2 启动生产者
Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。
运行 Producer (生产者),然后在控制台输入一些消息以发送到服务器:
smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >this is my first message >this is my second message
5.3 启动消费者
Kafka 还有一个命令行 Consumer(消费者),将消息转储到标准输出:
smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning this is my first message this is my second message
如果你将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
原文:Quickstart