Kafka集群搭建與部署

  • 2019 年 12 月 5 日
  • 筆記

此環境依靠zookeeper和jdk詳情請參考上篇文章,這裡直接部署kafka

1、簡介

Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,之後成為Apache項目的一部分。Kafka是一個分佈式的,可劃分的,冗餘備份的持久性的日誌服務。它主要用於處理活躍的流式數據。

在大數據系統中,常常會碰到一個問題,整個大數據是由各個子系統組成,數據需要在各個子系統中高性能,低延遲的不停流轉。傳統的企業消息系統並不是非常適合大規模的數據處理。為了同時搞定在線應用(消息)和離線應用(數據文件,日誌)Kafka就出現了。Kafka可以起到兩個作用:

1.降低系統組網複雜度。

2.降低編程複雜度,各個子系統不在是相互協商接口,各個子系統類似插口插在插座上,Kafka承擔高速數據總線的作用。

2、Kafka主要特點

1.同時為發佈和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。

2.可進行持久化操作。將消息持久化到磁盤,因此可用於批量消費,例如ETL,以及實時應用程序。通過將數據持久化到硬盤以及replication防止數據丟失。

3.分佈式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分佈式的。無需停機即可擴展機器。

4.消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。

5.支持online和offline的場景。

3、Kafka的架構

Kafka的整體架構非常簡單,是顯式分佈式架構,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka註冊的接口,數據從producer發送到broker,broker承擔一個中間緩存和分發的作用。broker分發註冊到系統中的consumer。broker的作用類似於緩存,即活躍的數據和離線處理系統之間的緩存。客戶端和服務器端的通信,是基於簡單,高性能,且與編程語言無關的TCP協議。幾個基本概念:

1.Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。

2.Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。

3.Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發佈一些消息。

4.Producers:消息和數據生產者,向Kafka的一個topic發佈消息的過程叫做producers。

5.Consumers:消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫做consumers。

6.Broker:緩存代理,Kafka集群中的一台或多台服務器統稱為broker。

4、消息發送的流程

1.Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。

2.Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息。

3.Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然後可以對這些消息進行處理。

從上圖中就可以看出同一個Topic下的消費者和生產者的數量並不是對應的。

5、Kafka的應用場景

1、消息隊列

比起大多數的消息系統來說,Kafka有更好的吞吐量,內置的分區,冗餘及容錯性,這讓Kafka成為了一個很好的大規模消息處理應用的解決方案。消息系統一般吞吐量相對較低,但是需要更小的端到端延時,並常常依賴於Kafka提供的強大的持久性保障。在這個領域,Kafka足以媲美傳統消息系統,如ActiveMR或RabbitMQ。

2、行為跟蹤

Kafka的另一個應用場景是跟蹤用戶瀏覽頁面、搜索及其他行為,以發佈-訂閱的模式實時記錄到對應的topic里。那麼這些結果被訂閱者拿到後,就可以做進一步的實時處理,或實時監控,或放到hadoop/離線數據倉庫里處理。

3、元信息監控

作為操作記錄的監控模塊來使用,即彙集記錄一些操作信息,可以理解為運維性質的數據監控吧。

4、日誌收集

日誌收集方面,其實開源產品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日誌聚合(log aggregation)。日誌聚合一般來說是從服務器上收集日誌文件,然後放到一個集中的位置(文件服務器或HDFS)進行處理。然而Kafka忽略掉文件的細節,將其更清晰地抽象成一個個日誌或事件的消息流。這就讓Kafka處理過程延遲更低,更容易支持多數據源和分佈式數據處理。比起以日誌為中心的系統比如Scribe或者Flume來說,Kafka提供同樣高效的性能和因為複製導致的更高的耐用性保證,以及更低的端到端延遲。

5、流處理

這個場景可能比較多,也很好理解。保存收集流數據,以提供之後對接的Storm或其他流式計算框架進行處理。很多用戶會將那些從原始topic來的數據進行階段性處理,匯總,擴充或者以其他的方式轉換到新的topic下再繼續後面的處理。例如一個文章推薦的處理流程,可能是先從RSS數據源中抓取文章的內容,然後將其丟入一個叫做「文章」的topic中;後續操作可能是需要對這個內容進行清理,比如回復正常數據或者刪除重複數據,最後再將內容匹配的結果返還給用戶。這就在一個獨立的topic之外,產生了一系列的實時數據處理的流程。Strom和Samza是非常著名的實現這種類型數據轉換的框架。

6、事件源

事件源是一種應用程序設計的方式,該方式的狀態轉移被記錄為按時間順序排序的記錄序列。Kafka可以存儲大量的日誌數據,這使得它成為一個對這種方式的應用來說絕佳的後台。比如動態匯總(News feed)。

7、持久性日誌(commit log)

Kafka可以為一種外部的持久性日誌的分佈式系統提供服務。這種日誌可以在節點間備份數據,並為故障節點數據回復提供一種重新同步的機制。Kafka中日誌壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似於Apache BookKeeper項目。

6、kafka集群的搭建

1、下載

 [root@zookeeper-kafka-01 ~]# wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz   [root@zookeeper-kafka-01 ~]# tar -xzvf kafka_2.12-1.1.0.tgz -C /usr/local/   [root@zookeeper-kafka-01 ~]# ln -s /usr/local/kafka_2.12-1.1.0 /usr/local/kafka

2、修改server.properties文件

 [root@zookeeper-kafka-01 config]# vim server.properties    1# master為0    2broker.id=0    3listeners=PLAINTEXT://ZooKeeper-Kafka-01:9092    4advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-01:9092    5num.network.threads=3    6num.io.threads=8    7socket.send.buffer.bytes=102400    8socket.receive.buffer.bytes=102400    9socket.request.max.bytes=104857600   10log.dirs=/tmp/kafka-logs   11num.partitions=5   12num.recovery.threads.per.data.dir=1   13offsets.topic.replication.factor=1   14transaction.state.log.replication.factor=1   15transaction.state.log.min.isr=1   16log.retention.hours=24   17log.segment.bytes=1073741824   18log.retention.check.interval.ms=300000   19# 連接   20zookeeper.connect=ZooKeeper-Kafka-01:2181,ZooKeeper-Kafka-02:2181,ZooKeeper-Kafka-03:2181   21zookeeper.connection.timeout.ms=6000   22group.initial.rebalance.delay.ms=0   23# 可刪除topic   24delete.topic.enable=true

3、將 kafka_2.12-1.1.0 文件夾複製到另外兩個節點下

 [root@zookeeper-kafka-01 local]# scp -rp kafka_2.12-1.1.0/ [email protected]:/usr/local/   [root@zookeeper-kafka-01 local]# scp -rp kafka_2.12-1.1.0/ [email protected]:/usr/local/

4、修改每個節點對應的 server.properties 文件的 broker.id和listenrs

 [root@zookeeper-kafka-02 config]# vim server.properties    2broker.id=1    3listeners=PLAINTEXT://ZooKeeper-Kafka-02:9092    4advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-02:9092   [root@zookeeper-kafka-03 config]# vim server.properties    2broker.id=2    3listeners=PLAINTEXT://ZooKeeper-Kafka-03:9092    4advertised.listeners=PLAINTEXT://ZooKeeper-Kafka-03:9092

5、啟動服務

 #三台機器分別執行   [root@zookeeper-kafka-01 kafka]# bin/kafka-server-start.sh config/server.properties &   [1] 12695

6、Zookeeper+Kafka集群測試

創建topic:

 [root@zookeeper-kafka-01 ~]# bin/kafka-topics.sh --create --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --replication-factor 3 --partitions 3 --topic yunweimao

顯示topic:

 [root@zookeeper-kafka-01 kafka]# bin/kafka-topics.sh --describe --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --topic yunweimao   Topic:yunweimaoPartitionCount:3ReplicationFactor:3Configs:   Topic: yunweimaoPartition: 0Leader: 2Replicas: 2,1,0Isr: 2,1,0   Topic: yunweimaoPartition: 1Leader: 0Replicas: 0,2,1Isr: 0,2,1   Topic: yunweimaoPartition: 2Leader: 1Replicas: 1,0,2Isr: 1,0,2

列出topic:

 [root@zookeeper-kafka-01 kafka]# bin/kafka-topics.sh --list --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181   yunweimao

創建 producer(生產者) 在master節點上 測試生產消息

 [root@zookeeper-kafka-01 kafka]# bin/kafka-console-producer.sh --broker-list ZooKeeper-Kafka-01:9092 -topic yunweimao   >hello word   >yunweimao welcome

創建 consumer(消費者)在ZooKeeper-Kafka-02節點上 測試消費

 [root@zookeeper-kafka-02 kafka]# bin/kafka-console-consumer.sh --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 -topic yunweimao --from-beginning   hello word   yunweimao welcome

在ZooKeeper-Kafka-03節點上 測試消費

 [root@zookeeper-kafka-03 kafka]# bin/kafka-console-consumer.sh --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 -topic yunweimao --from-beginning   yunweimao welcome   hello word

註:

然後在 producer 里輸入消息,consumer 中就會顯示出同樣的內容,表示消費成功

刪除 topic

 [root@zookeeper-kafka-01 kafka]# bin/kafka-topics.sh --delete --zookeeper ZooKeeper-Kafka-01:2181, ZooKeeper-Kafka-02:2181, ZooKeeper-Kafka-03:2181 --topic yunweimao

啟動服務

 [root@zookeeper-kafka-01 kafka]# bin/kafka-server-start.sh config/server.properties &

停止服務

 [root@zookeeper-kafka-01 kafka]# bin/kafka-server-stop.sh

7、總結

在0.9.X版本,kafka broker兼容之前老版本的客戶端。從0.10.0.0開始,kafka broker就兼容更新版本的客戶端。如果比較新的客戶端連接到舊版本的broker,那麼只能支持舊版本broker所支持的特性。