Kafka 安裝與啟動

  • 2019 年 11 月 26 日
  • 筆記

1. 下載程式碼

下載 2.3.0 版本並解壓縮:

tar -zxvf kafka_2.12-2.3.0.tgz -C .

創建軟連接便於升級:

ln -s kafka_2.12-2.3.0/ kafka

配置環境變數:

export KAFKA_HOME=/Users/smartsi/opt/kafka  export PATH=${KAFKA_HOME}/bin:$PATH

2. 安裝ZooKeeper

Kafka 依賴 ZooKeeper,如果你還沒有 ZooKeeper 伺服器,你需要先啟動一個 ZooKeeper 伺服器。可以先參考ZooKeeper 安裝與啟動來安裝 ZooKeeper。ZooKeeper 配置如下:

tickTime=2000  initLimit=10  syncLimit=5  dataDir=/Users/smartsi/opt/zookeeper/data  clientPort=2181  server.1=localhost:2888:3888

你也可以通過與 kafka 打包在一起的便捷腳本來快速簡單地創建一個單節點 ZooKeeper 實例:

> bin/zookeeper-server-start.sh config/zookeeper.properties  [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)  ...

3. 配置Kafka

第一個 broker 配置 server-9092.properties 如下:

broker.id=0  listeners=PLAINTEXT://127.0.0.1:9092  log.dirs=/Users/smartsi/opt/kafka/logs/log1-9092  zookeeper.connect=localhost:2181/kafka-2.3.0  zookeeper.connection.timeout.ms=6000

運行起來至少要配置四項。上面的前四項。

第二個 broker 配置 server-9093.properties 如下:

broker.id=1  listeners=PLAINTEXT://127.0.0.1:9093  log.dirs=/Users/smartsi/opt/kafka/logs/log-9093  zookeeper.connect=localhost:2181/kafka-2.3.0  zookeeper.connection.timeout.ms=6000

第三個 broker 配置 server-9094.properties 如下:

broker.id=2  listeners=PLAINTEXT://127.0.0.1:9094  log.dirs=/Users/smartsi/opt/kafka/logs/log-9094  zookeeper.connect=localhost:2181/kafka-2.3.0  zookeeper.connection.timeout.ms=6000

我們必須重寫埠和日誌目錄,因為我們在同一台機器上運行這些,我們不希望所有都在同一個埠註冊,或者覆蓋彼此的數據。所以用埠號9092、9093、9094分別代表三個 broker。

下面具體解釋一下我們的配置項:

(1) Broker相關:

broker.id=0

broker 的 Id。每一個 broker 在集群中的唯一標示,要求是正數。每個 broker 都不相同。

(2) Socket服務設置:

listeners=PLAINTEXT://127.0.0.1:9092

Socket伺服器監聽的地址,如果沒有設置,則監聽 java.net.InetAddress.getCanonicalHostName() 返回的地址。

(3) ZooKeeper相關:

zookeeper.connect=localhost:2181/kafka-2.3.0  zookeeper.connection.timeout.ms=6000

zookeeper.connect 是一個逗號分隔的 host:port 鍵值對,每個對應一個 zk 伺服器。例如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。你還可以將可選的客戶端命名空間 Chroot 字元串追加到 URL 上以指定所有 kafka 的 Znode 的根目錄。另外這個 kafka-2.3.0 這個節點需要你提前建立。讓 Kafka 把他需要的數據結構都建立在這個節點下,否則會建立在根節點 / 節點下。

(3) 日誌相關:

log.dirs=/Users/smartsi/opt/kafka/logs/log-9092

Kafka存儲Log的目錄。

4. 啟動Kafka伺服器

有兩種方式可以啟動 Kafka 伺服器:

# 第一種方式(推薦)  bin/kafka-server-start.sh -daemon config/server.properties  # 第二種方式  nohup bin/kafka-server-start.sh config/server.properties &

我們以第一種方式啟動 Kafka 伺服器:

bin/kafka-server-start.sh -daemon config/server-9092.properties  bin/kafka-server-start.sh -daemon config/server-9093.properties  bin/kafka-server-start.sh -daemon config/server-9094.properties

查看進程和埠:

smartsi:kafka smartsi$ jps  8914 DataNode  42802 Jps  9252 NodeManager  41253 Kafka  41541 Kafka  42790 Kafka  41670 ZooKeeperMain  16731  9164 ResourceManager  1997

我們現在看一下 Kafka 在 ZooKeeper 上創建的節點:

[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka-2.3.0  [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

看一下我們在ZooKeeper上註冊的兩個 broker:

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka-2.3.0/brokers/ids  [0, 1, 2]  [zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka-2.3.0/brokers/ids/0  {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390121522","port":9092,"version":4}  cZxid = 0x92  ctime = Mon Sep 02 10:08:41 CST 2019  mZxid = 0x92  mtime = Mon Sep 02 10:08:41 CST 2019  pZxid = 0x92  cversion = 0  dataVersion = 1  aclVersion = 0  ephemeralOwner = 0x100009088560012  dataLength = 188  numChildren = 0  [zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka-2.3.0/brokers/ids/1  {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9093"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390128813","port":9093,"version":4}  cZxid = 0xa7  ctime = Mon Sep 02 10:08:48 CST 2019  mZxid = 0xa7  mtime = Mon Sep 02 10:08:48 CST 2019  pZxid = 0xa7  cversion = 0  dataVersion = 1  aclVersion = 0  ephemeralOwner = 0x100009088560014  dataLength = 188  numChildren = 0  [zk: 127.0.0.1:2181(CONNECTED) 6] get /kafka-2.3.0/brokers/ids/2  {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9094"],"jmx_port":-1,"host":"127.0.0.1","timestamp":"1567390749151","port":9094,"version":4}  cZxid = 0xbd  ctime = Mon Sep 02 10:19:09 CST 2019  mZxid = 0xbd  mtime = Mon Sep 02 10:19:09 CST 2019  pZxid = 0xbd  cversion = 0  dataVersion = 1  aclVersion = 0  ephemeralOwner = 0x100009088560018  dataLength = 188  numChildren = 0

5. 測試Kafka

5.1 創建Topic

讓我們創建一個名為 test 的 Topic,它有一個分區和一個副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka-2.3.0 --replication-factor 1 --partitions 1 --topic test

現在我們可以運行 list 命令來查看這個 Topic:

smartsi:kafka smartsi$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka-2.3.0  test

或者,你也可將代理配置為:在發布的topic不存在時,自動創建topic,而不是手動創建。

5.2 啟動生產者

Kafka 自帶一個命令行客戶端,它從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。

運行 Producer (生產者),然後在控制台輸入一些消息以發送到伺服器:

smartsi:kafka smartsi$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  >this is my first message  >this is my second message

5.3 啟動消費者

Kafka 還有一個命令行 Consumer(消費者),將消息轉儲到標準輸出:

smartsi:kafka smartsi$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  this is my first message  this is my second message

如果你將上述命令在不同的終端中運行,那麼現在就可以將消息輸入到生產者終端中,並將它們在消費終端中顯示出來。

原文:Quickstart