helm安裝kafka集群並測試其高可用性
- 2019 年 10 月 3 日
- 筆記
介紹
Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
一、KAFKA
體系結構圖:
- Producer: 生產者,也就是發送消息的一方。生產者負責創建消息,通過zookeeper找到broker,然後將其投遞到 Kafka 中。
- Consumer: 消費者,也就是接收消息的一方。通過zookeeper找對應的broker 進行消費,進而進行相應的業務邏輯處理。
- Broker: 服務代理節點。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務實例。大多數情況下也可以將 Broker 看作一台 Kafka 服務器,前提是這台服務器上只部署了一個 Kafka 實例。一個或多個 Broker 組成了一個 Kafka 集群。一般而言,我們更習慣使用首字母小寫的 broker 來表示服務代理節點
Send消息流程圖:
Kafka多副本(Replica)機制:
如上圖所示,Kafka 集群中有4個 broker,某個主題中有3個分區,且副本因子(即副本個數)也為3,如此每個分區便有1個 leader 副本和2個 follower 副本。生產者和消費者只與 leader 副本進行交互,而 follower 副本只負責消息的同步,很多時候 follower 副本中的消息相對 leader 副本而言會有一定的滯後。
二、Zookeeper
ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分佈式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分佈式同步、組服務等。
原理:
高可以用架構圖:
圖中每一個Server代表一個安裝Zookeeper服務的服務器。組成 ZooKeeper 服務的服務器都會在內存中維護當前的服務器狀態,並且每台服務器之間都互相保持着通信。集群間通過 Zab 協議(Zookeeper Atomic Broadcast)來保持數據的一致性。
三、部署kafka&zookeeper集群
我們選擇的是官方的chart地址:https://github.com/helm/charts/tree/master/incubator/kafka
1)編寫自己的values.yaml文件(注意我的storageClass是已經做好了的nfs存儲)
imageTag: "5.2.2" resources: limits: cpu: 2 memory: 4Gi requests: cpu: 1 memory: 2Gi kafkaHeapOptions: "-Xmx2G -Xms2G" persistence: enabled: true storageClass: "managed-nfs-storage" size: "40Gi" zookeeper: resources: limits: cpu: 1 memory: 2Gi requests: cpu: 100m memory: 536Mi persistence: enabled: true storageClass: "managed-nfs-storage" size: "10Gi"
2)安裝kafka
添加chart倉庫:
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
部署
$ helm install --name kafka -f values.yaml incubator/kafka
最後我們能看到:
四、測試kafka高可用性
1)根據提示創建一個測試客戶端
apiVersion: v1 kind: Pod metadata: name: testclient namespace: sscp-test spec: containers: - name: kafka image: solsson/kafka:0.11.0.0 command: - sh - -c - "exec tail -f /dev/null"
Once you have the testclient pod above running, you can list all kafka
topics with:
kubectl -n sscp-test exec testclient -- kafka-topics --zookeeper kafka-test-zookeeper:2181 --list
To create a new topic:
kubectl -n sscp-test exec testclient -- kafka-topics --zookeeper kafka-test-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
To listen for messages on a topic:
kubectl -n sscp-test exec -ti testclient -- for x in {1..1000}; do echo $x; sleep 2; done | kafka-console-producer --broker-list kafka-test-headless:9092 --topic test1
To stop the listener session above press: Ctrl+C
To start an interactive message producer session:
kubectl -n sscp-test exec -ti testclient -- kafka-console-producer --broker-list kafka-test-headless:9092 --topic test1
To create a message in the above session, simply type the message and press “enter”
To end the producer session try: Ctrl+C
注意:有三個kafka節點,消息要發三個副本才能保持其高可用!!!
五、測試Zookeeper高可用性
1.Create a node by command below: “kubectl exec -it testclient bash -n sscp-test” “zookeeper-shell kafka-test-zookeeper-headless:2181 create /foo bar” 2. Check zookeeper status Watch existing members: $ kubectl run --attach bbox --image=busybox --restart=Never -- sh -c 'while true; do for i in 0 1 2; do echo zk-${i} $(echo stats | nc kafka-zookeeper-${i}.kafka-zookeeper-headless:2181 | grep Mode); sleep 1; done; done' zk-2 Mode: follower zk-0 Mode: follower zk-1 Mode: leader zk-2 Mode: follower 3.kill the leader by command below: “Kubectl delete pod kafka-test-zookeeper-1” 4.Check the previously inserted key by command below: ““kubectl exec -it testclient bash -n sscp-test” “zookeeper-shell kafka-test-zookeeper-headless:2181 get /foo”