Kafka 安裝與啟動
- 2019 年 11 月 26 日
- 筆記
1. 下載程式碼
下載 2.3.0 版本並解壓縮:
tar -zxvf kafka_2.12-2.3.0.tgz -C .
創建軟連接便於升級:
ln -s kafka_2.12-2.3.0/ kafka
配置環境變數:
export KAFKA_HOME=/Users/smartsi/opt/kafka export PATH=${KAFKA_HOME}/bin:$PATH
2. 安裝ZooKeeper
Kafka 依賴 ZooKeeper,如果你還沒有 ZooKeeper 伺服器,你需要先啟動一個 ZooKeeper 伺服器。可以先參考ZooKeeper 安裝與啟動來安裝 ZooKeeper。ZooKeeper 配置如下:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/Users/smartsi/opt/zookeeper/data clientPort=2181 server.1=localhost:2888:3888
你也可以通過與 kafka 打包在一起的便捷腳本來快速簡單地創建一個單節點 ZooKeeper 實例:
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
3. 配置Kafka
第一個 broker 配置 server-9092.properties
如下:
broker.id=0 listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
運行起來至少要配置四項。上面的前四項。
第二個 broker 配置 server-9093.properties
如下:
broker.id=1 listeners=PLAINTEXT://127.0.0.1:9093 log.dirs=/Users/smartsi/opt/kafka/logs/log-9093 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
第三個 broker 配置 server-9094.properties
如下:
broker.id=2 listeners=PLAINTEXT://127.0.0.1:9094 log.dirs=/Users/smartsi/opt/kafka/logs/log-9094 zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
我們必須重寫埠和日誌目錄,因為我們在同一台機器上運行這些,我們不希望所有都在同一個埠註冊,或者覆蓋彼此的數據。所以用埠號9092、9093、9094分別代表三個 broker。
下面具體解釋一下我們的配置項:
(1) Broker相關:
broker.id=0
broker 的 Id。每一個 broker 在集群中的唯一標示,要求是正數。每個 broker 都不相同。
(2) Socket服務設置:
listeners=PLAINTEXT://127.0.0.1:9092
Socket伺服器監聽的地址,如果沒有設置,則監聽 java.net.InetAddress.getCanonicalHostName()
返回的地址。
(3) ZooKeeper相關:
zookeeper.connect=localhost:2181/kafka-2.3.0 zookeeper.connection.timeout.ms=6000
zookeeper.connect
是一個逗號分隔的 host:port
鍵值對,每個對應一個 zk 伺服器。例如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
。你還可以將可選的客戶端命名空間 Chroot 字元串追加到 URL 上以指定所有 kafka 的 Znode 的根目錄。另外這個 kafka-2.3.0
這個節點需要你提前建立。讓 Kafka 把他需要的數據結構都建立在這個節點下,否則會建立在根節點 /
節點下。
(3) 日誌相關:
log.dirs=/Users/smartsi/opt/kafka/logs/log-9092
Kafka存儲Log的目錄。
4. 啟動Kafka伺服器
有兩種方式可以啟動 Kafka 伺服器:
# 第一種方式(推薦) bin/kafka-server-start.sh -daemon config/server.properties # 第二種方式 nohup bin/kafka-server-start.sh config/server.properties &
我們以第一種方式啟動 Kafka 伺服器:
bin/kafka-server-start.sh -daemon config/server-9092.properties bin/kafka-server-start.sh -daemon config/server-9093.properties bin/kafka-server-start.sh -daemon config/server-9094.properties
查看進程和埠:
smartsi:kafka smartsi$ jps 8914 DataNode 42802 Jps 9252 NodeManager 41253 Kafka 41541 Kafka 42790 Kafka 41670 ZooKeeperMain 16731 9164 ResourceManager 1997
我們現在看一下 Kafka 在 ZooKeeper 上創建的節點:
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0 [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
看一下我們在ZooKeeper上註冊的兩個 broker:
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids [0, 1, 2] [zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4} cZxid = 0x92 ctime = Mon Sep 02 10:08:41 CST 2019 mZxid = 0x92 mtime = Mon Sep 02 10:08:41 CST 2019 pZxid = 0x92 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560012 dataLength = 188 numChildren = 0 [zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4} cZxid = 0xa7 ctime = Mon Sep 02 10:08:48 CST 2019 mZxid = 0xa7 mtime = Mon Sep 02 10:08:48 CST 2019 pZxid = 0xa7 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560014 dataLength = 188 numChildren = 0 [zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4} cZxid = 0xbd ctime = Mon Sep 02 10:19:09 CST 2019 mZxid = 0xbd mtime = Mon Sep 02 10:19:09 CST 2019 pZxid = 0xbd cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x100009088560018 dataLength = 188 numChildren = 0
5. 測試Kafka
5.1 創建Topic
讓我們創建一個名為 test
的 Topic,它有一個分區和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test
現在我們可以運行 list
命令來查看這個 Topic:
smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0 test
或者,你也可將代理配置為:在發布的topic不存在時,自動創建topic,而不是手動創建。
5.2 啟動生產者
Kafka 自帶一個命令行客戶端,它從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。
運行 Producer (生產者),然後在控制台輸入一些消息以發送到伺服器:
smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >this is my first message >this is my second message
5.3 啟動消費者
Kafka 還有一個命令行 Consumer(消費者),將消息轉儲到標準輸出:
smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning this is my first message this is my second message
如果你將上述命令在不同的終端中運行,那麼現在就可以將消息輸入到生產者終端中,並將它們在消費終端中顯示出來。
原文:Quickstart