CKafka系列学习文章 – 搭建单机模式zookeeper+kafka(十四)
- 2019 年 10 月 4 日
- 筆記
导语:搭建单机模式的zookeeper+kafka,用来做开发测试环境,管理主题、分区、生产消费及主题数据的导入导出。
一、安装JDK1.8
下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
mv jdk-8u221-linux-x64.tar.gz /usr/local/jdk/

二、单机模式的zookeeper
下载地址:http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
解压:

Zookeeper配置文件:
[root@VM_1_250_centos zookeeper]# cat conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/opt/zookeeperData/zookeeper dataLogDir=/opt/zookeeperData/logs # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=1 minSessionTimeout=4000 maxSessionTimeout=10000 server.1=10.1.1.250:2888:3888
###加入环境变量
[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile # .bash_profile # Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi # User specific environment and startup programs export ZK_HOME=/opt/zookeeper export PATH=$PATH:$ZK_HOME/bin PATH=$PATH:$HOME/bin export PATH
加载环境变量:source ~/.bash_profile
###启动zookeeper


三、单机模式的kafka
下载地址:https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
解压:

###加入环境变量
[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile # .bash_profile # Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi # User specific environment and startup programs export ZK_HOME=/opt/zookeeper export KAFKA_HOME=/opt/kafka export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin PATH=$PATH:$HOME/bin export PATH
加载环境变量:source ~/.bash_profile
启动kafka:
[root@VM_1_250_centos jdk]# kafka-server-start.sh /opt/kafka/config/server.properties &


四、管理主题:
1、创建主题
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic ip_login


查看在zookeeper中的元数据:
[root@VM_1_250_centos kafka-logs]# zkCli.sh -server localhost:2181
Connecting to localhost:2181

2、查看主题
查看ip_login主题的详细信息
kafka-topics.sh --describe --zookeeper localhost:2181 --topic ip_login

查看所有的主题:
kafka-topics.sh --describe --zookeeper localhost:2181

查看所有主题名:
kafka-topics.sh --list --zookeeper localhost:2181

查看正在同步的主题:
kafka-topics.sh --describe --zookeeper localhost:2181 --under-replicated-partitions
查看主题中不可用的分区:
kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions
查看主题重写的配置:
kafka-topics.sh --describe --zookeeper localhost:2181 --topics-with-overrides

3、修改主题
kafka-topics.sh --alter --zookeeper localhost:2181 --topic user_order1 --config max.message.bytes=204800


4、删除主题
kafka-topics.sh --zookeeper localhost:2181 --delete --topic user_order1

五、管理分区和副本
1,修改分区
kafka-topics.sh --partitions 8 --alter --zookeeper localhost:2181 --topic ip_login

2、修改副本数(单机模式只能有1个副本)
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic user_order3
cat user_order3_replicas.json { “partitions”: [ { “topic”:”user_order3”, “partition”:0, “replicas”:[2,0,1] }, { “topic”:”user_order3”, “partition”:1, “replicas”:[0,1,2] }, { “topic”:”user_order3”, “partition:2, “replicas”:[1,2,0] }, { “topic”:”user_order3”, “partition”:3, “replicas”:[2,1,0] }, { “topic”:”user_order3”, “partition”:4, “replicas”:[0,2,1] }, { “topic”:”user_order3”, “partition”:5, “replicas”:[1,0,2] }, ], “version”:1 }
###执行副本修改操作
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –execute
###执行验证操作
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –verify
六、生产消息
kafka-console-producer.sh --broker-list localhost:9092 --topic ip_login

七、消费消息
###用新接口启动消费者程序
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login

###指定消费者组
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466

###查看消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

###用旧接口启动消费者程序(和新的接口不同:新的代理IP和端口,旧的用的是zookeeper的IP和端口)
kafka-console-consumer.sh --zookeeper localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466 --from-beginning --delete-consumer-offsets

上述消费者命令中,各个参数所代表含义如下:
–zookeeper:Zookeeper连接地址,用来获取Kafka元数据信息;
–topic: Kafka集群中的主题名
–consumer-property:配置消费者级别参数,比如自定义设置消费者组名
–from-beginning: 从消息记录最开始的位置开始“消费”
–delete-consumer-offsets: 删除Zookeeper中已消费的偏移量
八、将数据导入到Kafka主题中
###导入导出都要用客户端
[root@VM_1_250_centos kafka]# cat config/connect-file-source.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/tmp/test.txt topic=ip_login
###创建导入文件

##启动一个单机模式的连接器
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties


九、将Kafka主题中的数据导出到文件

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties

