kafka的認識、安裝與配置
認識Kafka
花費越少的精力在數據移動上,就能越專註於核心業務 — 《Kafka:The Definitive Guide》
認識 Kafka 之前,先了解一下發布與訂閱消息系統:消息的發送者不會直接把消息發送給接收者、發送者以某種方式對消息進行分類,接收者訂閱它們,以便能接受特定類型的消息。發布與訂閱系統一般會有一個 broker(n. 經紀人、中間商) 也就是發布消息的中心點。
Kafka 是一款基於發布與訂閱的消息系統,一般被稱為「分散式提交日誌」或者「分散式流平台」。 Kafka 的數據單元被稱作消息,可以看作是資料庫中的一行數據,消息是由位元組數組組成,故對 kafka 來說消息沒有特別的意義,消息可以有一個可選的元數據,也就是鍵。鍵也是一個位元組數組,同樣對於 kafka 沒有什麼特殊意義。鍵可以用來將消息以一種可控的方式寫入分區。最簡單的例子就是為鍵生成一個一致性散列值,然後使用散列值對主題分區數進行取模,為消息選擇分區。這樣可以保證具有相同鍵的消息總是被寫在相同的分區上。保證消息在一個主題中順序讀取。
為了提高效率,消息將被分批次寫入 Kafka 。批次就是一組消息,類似於 redis 中的流水線(Pipelined)操作。
主題和分區
kafka 的消息通過主題進行分類,主題就相當於資料庫中的表,主題可以被分成若干個分區,一個分區就是一個提交日誌,消息以追加的形式被寫入分區。然後按照先入先出的順序讀取。一個主題下的分區也可以在不同的伺服器上,以此提供比單個伺服器更加強大的性能
生產者和消費者
Kafka 的客戶端就是 Kafka 系統的用戶,一般情況下有兩種基本類型:生產者和消費者
Producer 生產者創建消息,一般情況下,一個消息會被發布到一個特定的主題上。生產者在默認情況下將消息均分在主題的每個分區上
Consumer 消費者讀取消息,消費者訂閱一個或多個主題,並按照消息的生成順序讀取他們,消費者通過檢查消息的偏移量來區分已經讀過的消息。這個偏移量會被消費者在 zk 或者 kafka 上保存,如果消費者關閉或者重啟,他的讀取狀態不會消失
消費者是消費者群組 Consumer group的一部分,群組可以保證每個分區被一個消費者消費(因此消費者數量不能大於分區數量,會造成消費者伺服器的浪費),如果一個消費者失效,群組裡的其他消費者可以接管失效消費者的工作。
Kafka的優點
- 無縫支援多個生產者
- 支援多個消費者從一條消息流讀取數據、且各個消費者之間的偏移量不影響。也支援多個消費者共享一個消息流,並保證整個消費者群組對每個消息只消費一次
- 可以對每個主題設置保留規則,根據保留規則持久化數據到磁碟
- 高性能,高伸縮性
安裝
Kafka 使用 Zookeeper(後面簡稱zk) 保存集群的元數據資訊和消費者資訊, Kafka 發行版本自帶 zk,可以直接從腳本啟動,不過安裝一個完整版的 zk 也不難
安裝單節點 zk
官方下載地址://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/
如果下載速度不如意,可以使用我的藍奏云://keats.lanzous.com/iMWi8dpi04f 提取碼: keats
安裝目錄: /usr/local/zookeeper
數據目錄: /var/lib/zookeeper
# tar -zxf zookeeper-3.4.6.tar.gz
# mv zookeeper-3.4.6 /usr/local/zookeeper
# mkdir -p /var/lib/zookeeper
# cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
> tickTime=2000
> dataDir=/var/lib/zookeeper
> clientPort=2181
> EOF
# 接著設置一下環境變數中的 JAVA_HOME,可以先使用 export 命令查看是否已經設置
# export JAVA_HOME=/xxx
# 最後切換到 zk 安裝目錄,啟動 zk
# /usr/local/zookeeper/bin/zkServer.sh start
接著通過四字命令 srvr 驗證 zk 是否安裝正確
# telnet localhost 2181
Trying ::1...
Connected to localhost.
Escape character is '^]'.
srvr
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.
[root@linux-keats bin]# pwd
/usr/local/zookeeper/bin
安裝單節點 Kafka
下載: //archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
藍奏云:下載後將後綴名 zip 改為 tgz://keats.lanzous.com/iaZ9hdpj5bi
# tar -zxf kafka_2.11-0.9.0.1.tgz
# mv kafka_2.11-0.9.0.1 /usr/local/kafka
# mkdir /tmp/kafka-logs
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
其中 -dadmon 表示 kafka 以守護執行緒的形式啟動
配置 kafka
#broker 的全局唯一編號,集群中不能重複。int類型
broker.id=0
#是否允許刪除 topic
delete.topic.enable=true
#處理網路請求的執行緒數量
num.network.threads=3
#處理磁碟 IO 的執行緒數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka 運行日誌(此日誌非常規意義的日誌)存放的路徑。用上一步創建的目錄。
log.dirs=/tmp/kafka-logs
#topic 創建時默認的分區數
num.partitions=1
#用來恢復和清理 data 下數據的執行緒數量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接 Zookeeper 地址。如果是 zk 集群,使用 , 隔開
zookeeper.connect=localhost:2181
集群
zk 集群的安裝請度娘 zk 集群,kafka 可以按照末尾參考文獻安裝集群。我這裡測試伺服器性能不行還跑了幾個 java 程式,就不裝集群了
測試
主題相關操作
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
- –create 創建操作 還有 –list 查詢,–describe 詳情
- –zookeeper localhost:2181 配置 zk 的資訊
- –partitions 1 分區數目 1
- –replication-factor 1 副本數 1。副本數不能大於 kafka broker 節點的數目
- –topic test 指定主題名稱
創建好主題後,logs 文件夾內就會出現 主題名-分區名 的提交日誌
往主題發送消息
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test Message 1
Test Message 2
^D
從測試主題讀取消息
# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test Message 1
Test Message 2
^C
Processed a total of 2 messages
參考
《kafka權威指南》 — 美國人著