CKafka系列學習文章 – 搭建單機模式zookeeper+kafka(十四)

  • 2019 年 10 月 4 日
  • 筆記

導語:搭建單機模式的zookeeper+kafka,用來做開發測試環境,管理主題、分區、生產消費及主題數據的導入導出。

一、安裝JDK1.8

下載地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

mv jdk-8u221-linux-x64.tar.gz /usr/local/jdk/

二、單機模式的zookeeper

下載地址:http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

解壓:

Zookeeper配置文件:

[root@VM_1_250_centos zookeeper]# cat conf/zoo.cfg    # The number of milliseconds of each tick  tickTime=2000  # The number of ticks that the initial  # synchronization phase can take  initLimit=10  # The number of ticks that can pass between  # sending a request and getting an acknowledgement  syncLimit=5  # the directory where the snapshot is stored.  # do not use /tmp for storage, /tmp here is just  # example sakes.  dataDir=/opt/zookeeperData/zookeeper  dataLogDir=/opt/zookeeperData/logs  # the port at which the clients will connect  clientPort=2181  # the maximum number of client connections.  # increase this if you need to handle more clients  maxClientCnxns=60  #  # Be sure to read the maintenance section of the  # administrator guide before turning on autopurge.  #  # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance  #  # The number of snapshots to retain in dataDir  autopurge.snapRetainCount=3  # Purge task interval in hours  # Set to "0" to disable auto purge feature  autopurge.purgeInterval=1  minSessionTimeout=4000  maxSessionTimeout=10000  server.1=10.1.1.250:2888:3888  

###加入環境變數

[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile  # .bash_profile  # Get the aliases and functions  if [ -f ~/.bashrc ]; then  	. ~/.bashrc  fi  # User specific environment and startup programs  export ZK_HOME=/opt/zookeeper  export PATH=$PATH:$ZK_HOME/bin  PATH=$PATH:$HOME/bin  export PATH  

載入環境變數:source ~/.bash_profile

###啟動zookeeper

三、單機模式的kafka

下載地址:https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz

解壓:

###加入環境變數

[root@VM_1_250_centos zookeeper]# cat ~/.bash_profile  # .bash_profile  # Get the aliases and functions  if [ -f ~/.bashrc ]; then  	. ~/.bashrc  fi  # User specific environment and startup programs  export ZK_HOME=/opt/zookeeper  export KAFKA_HOME=/opt/kafka  export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin  PATH=$PATH:$HOME/bin  export PATH  

載入環境變數:source ~/.bash_profile

啟動kafka:

[root@VM_1_250_centos jdk]# kafka-server-start.sh /opt/kafka/config/server.properties &

四、管理主題:

1、創建主題

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic ip_login

查看在zookeeper中的元數據:

[root@VM_1_250_centos kafka-logs]# zkCli.sh -server localhost:2181

Connecting to localhost:2181

2、查看主題

查看ip_login主題的詳細資訊

kafka-topics.sh --describe --zookeeper localhost:2181 --topic ip_login

查看所有的主題:

kafka-topics.sh --describe --zookeeper localhost:2181

查看所有主題名:

kafka-topics.sh --list --zookeeper localhost:2181

查看正在同步的主題:

kafka-topics.sh --describe --zookeeper localhost:2181 --under-replicated-partitions

查看主題中不可用的分區:

kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions

查看主題重寫的配置:

kafka-topics.sh --describe --zookeeper localhost:2181 --topics-with-overrides

3、修改主題

 kafka-topics.sh --alter --zookeeper localhost:2181  --topic user_order1 --config max.message.bytes=204800

4、刪除主題

kafka-topics.sh  --zookeeper localhost:2181 --delete  --topic user_order1

五、管理分區和副本

1,修改分區

kafka-topics.sh --partitions 8 --alter --zookeeper localhost:2181 --topic ip_login

2、修改副本數(單機模式只能有1個副本)

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic user_order3
cat user_order3_replicas.json  {  「partitions」:     [         {             「topic」:」user_order3」,             「partition」:0,             「replicas」:[2,0,1]              },         {             「topic」:」user_order3」,             「partition」:1,             「replicas」:[0,1,2]              },         {             「topic」:」user_order3」,             「partition:2,             「replicas」:[1,2,0]              },         {             「topic」:」user_order3」,             「partition」:3,             「replicas」:[2,1,0]              },         {             「topic」:」user_order3」,             「partition」:4,             「replicas」:[0,2,1]              },         {             「topic」:」user_order3」,             「partition」:5,             「replicas」:[1,0,2]              },  ],  「version」:1  }  

###執行副本修改操作

kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –execute

###執行驗證操作

kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file user_order3_replicas.json –verify

六、生產消息

kafka-console-producer.sh --broker-list localhost:9092 --topic ip_login

七、消費消息

###用新介面啟動消費者程式

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login

###指定消費者組

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466

###查看消費者組

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

###用舊介面啟動消費者程式(和新的介面不同:新的代理IP和埠,舊的用的是zookeeper的IP和埠)

kafka-console-consumer.sh --zookeeper localhost:9092 --topic ip_login --consumer-property group.id=console-consumer-54466 --from-beginning --delete-consumer-offsets

上述消費者命令中,各個參數所代表含義如下:

–zookeeper:Zookeeper連接地址,用來獲取Kafka元數據資訊;

–topic: Kafka集群中的主題名

–consumer-property:配置消費者級別參數,比如自定義設置消費者組名

–from-beginning: 從消息記錄最開始的位置開始「消費」

–delete-consumer-offsets: 刪除Zookeeper中已消費的偏移量

八、將數據導入到Kafka主題中

###導入導出都要用客戶端

[root@VM_1_250_centos kafka]# cat config/connect-file-source.properties    # Licensed to the Apache Software Foundation (ASF) under one or more  # contributor license agreements.  See the NOTICE file distributed with  # this work for additional information regarding copyright ownership.  # The ASF licenses this file to You under the Apache License, Version 2.0  # (the "License"); you may not use this file except in compliance with  # the License.  You may obtain a copy of the License at  #  #    http://www.apache.org/licenses/LICENSE-2.0  #  # Unless required by applicable law or agreed to in writing, software  # distributed under the License is distributed on an "AS IS" BASIS,  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  # See the License for the specific language governing permissions and  # limitations under the License.    name=local-file-source  connector.class=FileStreamSource  tasks.max=1  file=/tmp/test.txt  topic=ip_login  

###創建導入文件

##啟動一個單機模式的連接器

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties

導入成功!

九、將Kafka主題中的數據導出到文件

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties

導出成功!!!