kafka學習筆記
- 2019 年 10 月 4 日
- 筆記
這是之前學習時候寫的筆記,貼一下便於日常查閱。。。
官方文檔:http://kafka.apache.org/quickstart
https://engineering.linkedin.com/kafka/
搭建參考: https://www.cnblogs.com/luotianshuai/p/5206662.html
理論資料參考 http://www.jasongj.com/tags/Kafka
本文大部分理論知識來自技術世界,原文鏈接:http://www.jasongj.com/2015/03/10/KafkaColumn1
環境:
CentOS7.3
kafka_2.11-1.0.0
zookeeper-3.4.9
3台主機:192.168.5.71、192.168.5.、192.168.5.73
配置hosts解析
編輯3台主機vim /etc/hosts 加上如下的3行:
192.168.5.71 node71
192.168.5.72 node72
192.168.5.73 node73
安裝zk
tar xf zookeeper-3.4.9.tar.gz -C /opt/
cd /opt
ln -s zookeeper-3.4.9 zk
cd zk/conf
vim zoo.cfg 內容如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zk
dataLogDir=/opt/zk
clientPort=2181
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=60000
autopurge.purgeInterval=24
autopurge.snapRetainCount=5
quorum.auth.enableSasl=false
quorum.cnxn.threads.size=20
server.1=node71:3181:4181
server.2=node72:3181:4181
server.3=node73:3181:4181
在3個節點分別創建myid,並啟動zkserver:
node71上: echo '1' > /opt/zk/myid
node72上: echo '2' > /opt/zk/myid
node73上: echo '3' > /opt/zk/myid
在3個節點分別啟動zkserver:
/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg
在3個節點觀察集群主機的狀態:
/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg
安裝kafka
tar xf kafka_2.11-1.0.0.tgz -C /opt/
cd /opt/
ln -s kafka_2.11-1.0.0 kafka
每個主機的配置如下:
[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 listeners=PLAINTEXT://:9092 #當前kafka對外提供服務的埠默認是9092 num.network.threads=3 # 這個是borker進行網路處理的執行緒數 num.io.threads=8 # 這個是borker進行I/O處理的執行緒數 socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一下子就發送的,先回存儲到緩衝區了到達一定的大小後在發送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達一定大小後在序列化到磁碟 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 log.dirs=/tmp/kafka-logs #消息存放的目錄,這個目錄可以配置為「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 num.partitions=1 # 默認的分區數,一個topic默認1個分區數 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 # 默認消息的最大持久化時間,168小時,7天 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(默認168 ),到目錄查看是否有過期的消息如果有,刪除 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 zookeeper.connect=node71:2181,node72:2181,node73:2181 #設置zookeeper的連接埠 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
啟動Kafka集群
在3台機器上都執行啟動Kafka集群:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
測試kafka的功能
1、創建Topic
/opt/kafka/bin/kafka-topics.sh –create –zookeeper node71:2181 –replication-factor 2 –partitions 3 –topic mysql-order
/opt/kafka/bin/kafka-topics.sh –create –zookeeper node71:2181 –replication-factor 2 –partitions 2 –topic mysql-coupons
#參數含義
–replication-factor 2 #複製兩份
–partitions 2 #創建2個分區
–topic #主題
2、在一台伺服器上創建一個發布者broker
/opt/kafka/bin/kafka-console-producer.sh –broker-list node72:9092 –topic mysql-order
然後,可以輸入一些文字內容
3、在一台伺服器上創建一個訂閱者consumer
/opt/kafka/bin/kafka-console-consumer.sh –zookeeper node72:2181 –from-beginning –topic mysql-order
4、列出全部的topic
/opt/kafka/bin/kafka-topics.sh –list –zookeeper node72:2181
5、查看指定的topic狀態
[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh –describe –zookeeper node72:2181 –topic mysql-coupons

查看zk中kafka的狀態:
cd /etc/zk/bin
./zkCli.sh -server node71:2181
ls /brokers
get /brokers/ids/2
get /brokers/topics/mysql-coupons/partitions/0 # 這裡就不截圖了
關閉Kafka集群
官方提到的關閉方式就是kill -9
ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9
kafka架構
Broker
Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)
Partition
parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件
Producer
負責發布消息到Kafka broker (push消息到broker)
Consumer
消費消息(從broker那裡poll消息)。每個consumer屬於一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

如上圖所示,一個典型的kafka集群中包含若干producer(可以是web前端產生的page view,或者是伺服器日誌,系統CPU、memory等),若干broker(Kafka支援水平擴展,一般broker數量越多,集群吞吐率越高),若干consumer group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發布到broker,consumer使用pull模式從broker訂閱並消費消息。
Push vs. Pull的區別
作為一個messaging system,Kafka遵循了傳統的方式,選擇由producer向broker push消息並由consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,採用非常不同的push模式。事實上,push模式和pull模式各有優劣。
push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網路擁塞。
而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
kafka消息的消費&留存
本節所有描述都是基於consumer hight level API而非low level API
每一個consumer實例都屬於一個consumer group,每一條消息只會被同一個consumer group里的一個consumer實例消費(這是為了實現傳統message queue消息只被消費一次的語義),但是不同consumer group可以同時消費同一條消息,這一特性可以為消息的多元化處理提供了支援。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的consumer在不同的consumer group即可。

Linked的一種kafka部署方案:

對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。例如可以通過配置server.properties,讓Kafka刪除一周前的數據,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數據
log.retention.hours=168 # 默認消息的最大持久化時間,168小時,7天
log.segment.bytes=1073741824 # partition大小超過1G時候,清理舊數據
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(默認168 ),到目錄查看是否有過期的消息如果有,刪除
這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與文件大小無關,所以這裡刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁碟以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata資訊—當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息後線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
kafka customer rebalance
本節所講述內容均基於Kafka consumer high level API
具體參考:http://developer.51cto.com/art/201501/464491.htm
kafka的數據複製
Kafka從0.8開始提供partition級別的replication,replication的數量可在server.properties中配置。
kafka默認default.replication.factor = 2
該Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有一定影響的,但極大的增強了可用性。每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成,consumer批量從leader上pull數據。一般情況下partition的數量大於等於broker的數量,並且所有partition的leader均勻分布在broker上。follower上的日誌和其leader上的完全一樣。
和大部分分散式系統一樣,Kakfa處理失敗需要明確定義一個broker是否alive。
對於Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實現)。二是follower必須能夠及時將leader的writing複製過來,不能落後太多。
leader會track "in sync"的node list。如果一個follower宕機,或者落後太多,leader將把它從"in sync" list中移除。這裡所描述的"落後太多" 指follower複製的消息落後於leader後的條數超過預定值,該值可在 server.properties中配置。
replica.lag.max.messages=4000
replica.lag.time.max.ms=10000
需要說明的是,Kafka只解決」fail/recover」,不處理「Byzantine」(「拜占庭」)問題。
個人覺得,"in sync" list 機制,類似於MySQL中的semi-sync半同步。
一條消息只有被"in sync" list里的所有follower都從leader複製過去才會被認為已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而造成數據丟失(consumer無法消費這些數據)。而對於producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設置。這種機制確保了只要"in sync" list有一個或以上的flollower,一條被commit的消息就不會丟失。
這裡的複製機制即不是同步複製,也不是單純的非同步複製。事實上,同步複製要求「活著的」follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而非同步複製方式下,follower非同步的從leader複製數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follwer都落後於leader,而leader突然宕機,則會丟失數據。而Kafka的這種使用"in sync" list的方式則很好的均衡了確保數據不丟失以及吞吐率。follower可以批量的從leader複製數據,這樣極大的提高複製性能(批量寫磁碟),極大減少了follower與leader的差距(前文有說到,只要follower落後leader不太遠,則被認為在"in sync" list里)。
Kafka的選舉機制
首先Kafka會將接收到的消息分區(partition),每個主題(topic)的消息有不同的分區。這樣一方面消息的存儲就不會受到單一伺服器存儲空間大小的限制,另一方面消息的處理也可以在多個伺服器上並行。
其次為了保證高可用,每個分區都會有一定數量的副本(replica)。這樣如果有部分伺服器不可用,副本所在的伺服器就會接替上來,保證應用的持續性。
但是,為了保證較高的處理效率,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其他副本則是Follower。而Follower則會定期地到Leader上同步數據。

Leader選舉
如果某個分區所在的伺服器除了問題,不可用,kafka會從該分區的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。顯然,只有那些跟Leader保持同步的Follower才應該被選作新的Leader。
Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分區的副本。只有當這些副本都跟Leader中的副本同步了之後,kafka才會認為消息已提交,並回饋給消息的生產者。如果這個集合有增減,kafka會更新zookeeper上的記錄。
如果某個分區的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。
顯然通過ISR,kafka需要的冗餘度較低,可以容忍的失敗數比較高。假設某個topic有f+1個副本,kafka可以容忍f個伺服器不可用。
為什麼不用少數服從多數的方法:
少數服從多數是一種比較常見的一致性演算法和Leader選舉法。它的含義是只有超過半數的副本同步了,系統才會認為數據已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種演算法需要較高的冗餘度。譬如只允許一台機器失敗,需要有三個副本;而如果只容忍兩台機器失敗,則需要五個副本。而kafka的ISR集合方法,分別只需要兩個和三個副本。
如果所有的ISR副本都失敗了怎麼辦:
此時有兩種方法可選,一種是等待ISR集合中的副本復活,一種是選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。這兩種方法各有利弊,實際生產中按需選擇。
如果要等待ISR副本復活,雖然可以保證一致性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本並不一致。
例如下圖:

Kafka的監控
1、zabbix監控
2、prometheus監控