kafka學習筆記

  • 2019 年 10 月 4 日
  • 筆記

這是之前學習時候寫的筆記,貼一下便於日常查閱。。。

官方文檔:http://kafka.apache.org/quickstart

https://engineering.linkedin.com/kafka/

搭建參考: https://www.cnblogs.com/luotianshuai/p/5206662.html

理論資料參考 http://www.jasongj.com/tags/Kafka

本文大部分理論知識來自技術世界原文鏈接http://www.jasongj.com/2015/03/10/KafkaColumn1

環境:

CentOS7.3

kafka_2.11-1.0.0

zookeeper-3.4.9

3台主機:192.168.5.71、192.168.5.、192.168.5.73

配置hosts解析

編輯3台主機vim /etc/hosts 加上如下的3行:

192.168.5.71  node71

192.168.5.72  node72

192.168.5.73  node73

安裝zk

tar xf zookeeper-3.4.9.tar.gz -C /opt/

cd /opt

ln -s zookeeper-3.4.9 zk

cd zk/conf

vim zoo.cfg  內容如下:

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/opt/zk

dataLogDir=/opt/zk

clientPort=2181

maxClientCnxns=60

minSessionTimeout=4000

maxSessionTimeout=60000

autopurge.purgeInterval=24

autopurge.snapRetainCount=5

quorum.auth.enableSasl=false

quorum.cnxn.threads.size=20

server.1=node71:3181:4181

server.2=node72:3181:4181

server.3=node73:3181:4181

在3個節點分別創建myid,並啟動zkserver:

node71上: echo '1' > /opt/zk/myid

node72上: echo '2' > /opt/zk/myid

node73上: echo '3' > /opt/zk/myid

在3個節點分別啟動zkserver:

/opt/zk/bin/zkServer.sh start /opt/zk/conf/zoo.cfg

在3個節點觀察集群主機的狀態:

/opt/zk/bin/zkServer.sh status /opt/zk/conf/zoo.cfg

安裝kafka

tar xf kafka_2.11-1.0.0.tgz -C /opt/

cd /opt/

ln -s kafka_2.11-1.0.0 kafka

每個主機的配置如下:

[root@node71 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=0      #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 listeners=PLAINTEXT://:9092        #當前kafka對外提供服務的埠默認是9092 num.network.threads=3        # 這個是borker進行網路處理的執行緒數 num.io.threads=8              # 這個是borker進行I/O處理的執行緒數 socket.send.buffer.bytes=102400    #發送緩衝區buffer大小,數據不是一下子就發送的,先回存儲到緩衝區了到達一定的大小後在發送,能提高性能 socket.receive.buffer.bytes=102400    #kafka接收緩衝區大小,當數據到達一定大小後在序列化到磁碟 socket.request.max.bytes=104857600   #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 log.dirs=/tmp/kafka-logs #消息存放的目錄,這個目錄可以配置為「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 num.partitions=1      # 默認的分區數,一個topic默認1個分區數 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168        # 默認消息的最大持久化時間,168小時,7天 log.segment.bytes=1073741824   log.retention.check.interval.ms=300000  #每隔300000毫秒去檢查上面配置的log失效時間(默認168 ),到目錄查看是否有過期的消息如果有,刪除 message.max.byte=5242880        #消息保存的最大值5M default.replication.factor=2    #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880  #取消息的最大直接數 zookeeper.connect=node71:2181,node72:2181,node73:2181  #設置zookeeper的連接埠 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node72 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@node73 /opt/kafka/config ]# egrep -v '^#|^$' /opt/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node71:2181,node72:2181,node73:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0

啟動Kafka集群

在3台機器上都執行啟動Kafka集群:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

測試kafka的功能

1、創建Topic

/opt/kafka/bin/kafka-topics.sh –create –zookeeper node71:2181 –replication-factor 2 –partitions 3 –topic mysql-order

/opt/kafka/bin/kafka-topics.sh –create –zookeeper node71:2181 –replication-factor 2 –partitions 2 –topic mysql-coupons

#參數含義

–replication-factor 2   #複製兩份

–partitions 2    #創建2個分區

–topic #主題

2、在一台伺服器上創建一個發布者broker

/opt/kafka/bin/kafka-console-producer.sh –broker-list node72:9092 –topic mysql-order

然後,可以輸入一些文字內容

3、在一台伺服器上創建一個訂閱者consumer

/opt/kafka/bin/kafka-console-consumer.sh –zookeeper node72:2181 –from-beginning –topic mysql-order

4、列出全部的topic

/opt/kafka/bin/kafka-topics.sh –list –zookeeper node72:2181

5、查看指定的topic狀態

[root@node72 /opt/kafka/config ]# /opt/kafka/bin/kafka-topics.sh –describe –zookeeper node72:2181 –topic mysql-coupons

查看zk中kafka的狀態:

cd /etc/zk/bin

./zkCli.sh -server node71:2181

ls /brokers

get /brokers/ids/2

get /brokers/topics/mysql-coupons/partitions/0    # 這裡就不截圖了

關閉Kafka集群

官方提到的關閉方式就是kill -9

ps aux|grep kafka|grep -v grep|awk '{print $2}'|xargs kill -9

kafka架構

Broker

Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker

Topic

每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)

Partition

parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件

Producer

負責發布消息到Kafka broker (push消息到broker)

Consumer

消費消息(從broker那裡poll消息)。每個consumer屬於一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

如上圖所示,一個典型的kafka集群中包含若干producer(可以是web前端產生的page view,或者是伺服器日誌,系統CPU、memory等),若干broker(Kafka支援水平擴展,一般broker數量越多,集群吞吐率越高),若干consumer group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發布到broker,consumer使用pull模式從broker訂閱並消費消息。

Push vs. Pull的區別

作為一個messaging system,Kafka遵循了傳統的方式,選擇由producer向broker push消息並由consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,採用非常不同的push模式。事實上,push模式和pull模式各有優劣。

push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網路擁塞。

而pull模式則可以根據consumer的消費能力以適當的速率消費消息。

kafka消息的消費&留存

本節所有描述都是基於consumer hight level API而非low level API

每一個consumer實例都屬於一個consumer group,每一條消息只會被同一個consumer group里的一個consumer實例消費(這是為了實現傳統message queue消息只被消費一次的語義),但是不同consumer group可以同時消費同一條消息,這一特性可以為消息的多元化處理提供了支援。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的consumer在不同的consumer group即可。

Linked的一種kafka部署方案:

對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。例如可以通過配置server.properties,讓Kafka刪除一周前的數據,也可通過配置讓Kafka在partition文件超過1GB時刪除舊數據

log.retention.hours=168        # 默認消息的最大持久化時間,168小時,7天

log.segment.bytes=1073741824     # partition大小超過1G時候,清理舊數據

log.retention.check.interval.ms=300000  #每隔300000毫秒去檢查上面配置的log失效時間(默認168 ),到目錄查看是否有過期的消息如果有,刪除

這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與文件大小無關,所以這裡刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁碟以及具體的需求有關。另外,Kafka會為每一個consumer group保留一些metadata資訊—當前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息後線性增加這個offset。當然,consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些consumer過,不需要通過broker去保證同一個consumer group只有一個consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。

kafka customer rebalance

本節所講述內容均基於Kafka consumer high level API

具體參考:http://developer.51cto.com/art/201501/464491.htm

kafka的數據複製

Kafka從0.8開始提供partition級別的replication,replication的數量可在server.properties中配置。

kafka默認default.replication.factor = 2

該Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有一定影響的,但極大的增強了可用性。每個partition都有一個唯一的leader,所有的讀寫操作都在leader上完成,consumer批量從leader上pull數據。一般情況下partition的數量大於等於broker的數量,並且所有partition的leader均勻分布在broker上。follower上的日誌和其leader上的完全一樣。

和大部分分散式系統一樣,Kakfa處理失敗需要明確定義一個broker是否alive。

對於Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個通過Zookeeper的heartbeat機制來實現)。二是follower必須能夠及時將leader的writing複製過來,不能落後太多。

leader會track "in sync"的node list。如果一個follower宕機,或者落後太多,leader將把它從"in sync" list中移除。這裡所描述的"落後太多" 指follower複製的消息落後於leader後的條數超過預定值,該值可在 server.properties中配置。

replica.lag.max.messages=4000

replica.lag.time.max.ms=10000

需要說明的是,Kafka只解決」fail/recover」,不處理「Byzantine」(「拜占庭」)問題。

個人覺得,"in sync" list 機制,類似於MySQL中的semi-sync半同步。

一條消息只有被"in sync" list里的所有follower都從leader複製過去才會被認為已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而造成數據丟失(consumer無法消費這些數據)。而對於producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設置。這種機制確保了只要"in sync" list有一個或以上的flollower,一條被commit的消息就不會丟失。

這裡的複製機制即不是同步複製,也不是單純的非同步複製。事實上,同步複製要求「活著的」follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而非同步複製方式下,follower非同步的從leader複製數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follwer都落後於leader,而leader突然宕機,則會丟失數據。而Kafka的這種使用"in sync" list的方式則很好的均衡了確保數據不丟失以及吞吐率。follower可以批量的從leader複製數據,這樣極大的提高複製性能(批量寫磁碟),極大減少了follower與leader的差距(前文有說到,只要follower落後leader不太遠,則被認為在"in sync" list里)。

Kafka的選舉機制

首先Kafka會將接收到的消息分區(partition),每個主題(topic)的消息有不同的分區。這樣一方面消息的存儲就不會受到單一伺服器存儲空間大小的限制,另一方面消息的處理也可以在多個伺服器上並行。

其次為了保證高可用,每個分區都會有一定數量的副本(replica)。這樣如果有部分伺服器不可用,副本所在的伺服器就會接替上來,保證應用的持續性。 

但是,為了保證較高的處理效率,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其他副本則是Follower。而Follower則會定期地到Leader上同步數據。

Leader選舉

如果某個分區所在的伺服器除了問題,不可用,kafka會從該分區的其他的副本中選擇一個作為新的Leader。之後所有的讀寫就會轉移到這個新的Leader上。現在的問題是應當選擇哪個作為新的Leader。顯然,只有那些跟Leader保持同步的Follower才應該被選作新的Leader。

Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分區的副本。只有當這些副本都跟Leader中的副本同步了之後,kafka才會認為消息已提交,並回饋給消息的生產者。如果這個集合有增減,kafka會更新zookeeper上的記錄。

如果某個分區的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader。

顯然通過ISR,kafka需要的冗餘度較低,可以容忍的失敗數比較高。假設某個topic有f+1個副本,kafka可以容忍f個伺服器不可用。

為什麼不用少數服從多數的方法:

少數服從多數是一種比較常見的一致性演算法和Leader選舉法。它的含義是只有超過半數的副本同步了,系統才會認為數據已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種演算法需要較高的冗餘度。譬如只允許一台機器失敗,需要有三個副本;而如果只容忍兩台機器失敗,則需要五個副本。而kafka的ISR集合方法,分別只需要兩個和三個副本。

如果所有的ISR副本都失敗了怎麼辦:

  此時有兩種方法可選,一種是等待ISR集合中的副本復活,一種是選擇任何一個立即可用的副本,而這個副本不一定是在ISR集合中。這兩種方法各有利弊,實際生產中按需選擇。

  如果要等待ISR副本復活,雖然可以保證一致性,但可能需要很長時間。而如果選擇立即可用的副本,則很可能該副本並不一致。

例如下圖:

Kafka的監控

1、zabbix監控

2、prometheus監控