kafka對接Rancher日誌

kafka對接Rancher日誌

概述

Rancher應用商店自帶的kafka可以方便的對接Rancher日誌,但是不支援sasl認證。

如果有認證需求,可以使用bitnami倉庫的kafka,這個kafka帶有相關的認證參數,可以很方便的開啟sasl相關的認證。

參考://github.com/bitnami/charts/tree/master/bitnami/kafka

環境準備

軟體 版本
kubernetes v1.19.3
Rancher v2.4.8 | v2.5.2
kafka 2.6.0
kafka-chart 11.8.9
helm 3.4.0

正常對接kafka集群

如果是需要開啟SASL認證,可以直接跳到後面開啟SASL認證方式的內容

1、helm添加bitnami庫

helm repo add bitnami //charts.bitnami.com/bitnami
helm repo update

2、下載 kafka 對應的chart壓縮文件

bitnami庫中國可能訪問不太友好,需要翻一下牆。。。

helm pull bitnami/kafka --version 11.8.9

上述命令會下載一個kafka-11.8.9-tgz的壓縮文件,解壓該文件

tar -zxvf kafka-11.8.9-tgz

查看解壓目錄內容

# cd kafka/
# ls
charts  Chart.yaml  files  README.md  requirements.lock  requirements.yaml  templates  values-production.yaml  values.yaml

可以看到有兩個values文件,分別是values.yamlvalues-production.yaml

其中values.yaml會啟動一個最簡單的kafka集群,

values-production.yaml中的配置會包含更多面向生產的配置,例如啟用持久化存儲,啟用sasl相關認證等。

3、啟動kafka集群

最簡化啟動kafka集群,可以添加這兩個參數,取消持久化存儲

kubectl create ns kafka
## 進入到kafka目錄下再執行下面的命令
helm install kafka -n kafka -f values.yaml --set persistence.enabled=false --set zookeeper.persistence.enabled=false .

或者如果k8s有持久化存儲,可以設置storage Class,將xxx替換為對應的storage Class

helm install kafka -n kafka -f values.yaml --set global.storageClass=xxx .

執行完helm install後會輸出以下內容

NAME: kafka
LAST DEPLOYED: Wed Nov 25 17:04:50 2020
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.kafka.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.kafka.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.6.0-debian-10-r57 --namespace kafka --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace kafka -- bash

    PRODUCER:
        kafka-console-producer.sh \
            
            --broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \
            
            --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

查看pod、svc狀態

# kubectl get pod,svc -n kafka
NAME                    READY   STATUS    RESTARTS   AGE
pod/kafka-0             0/1     Pending   0          34s
pod/kafka-client        1/1     Running   0          3d21h
pod/kafka-zookeeper-0   1/1     Running   0          34s

NAME                               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/kafka                      ClusterIP   10.43.97.41     <none>        9092/TCP                     34s
service/kafka-headless             ClusterIP   None            <none>        9092/TCP,9093/TCP            34s
service/kafka-zookeeper            ClusterIP   10.43.127.105   <none>        2181/TCP,2888/TCP,3888/TCP   34s
service/kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   34s

4、操作kafka集群

根據helm創建成功的提示,我們可以創建客戶端去執行kafka相關的操作

創建kafka-client的pod

kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.6.0-debian-10-r57 --namespace kafka --command -- sleep infinity

進入到容器

kubectl -n kafka exec -it kafka-client bash

生產者

kafka有生產者和消費者的概念,生產者會生產數據,消費者去消費生產者產生的數據,具體可以先了解一下相關的概念

首先生產者先生產數據,執行如下命令,可以進入到生產者的input端

kafka-console-producer.sh --broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --topic test

例如我們輸入rancher run everywhere

> rancher run everywhere

消費者

然後消費者消費數據

kafka-console-consumer.sh --bootstrap-server kafka.kafka.svc.cluster.local:9092 --topic test --from-beginning

輸入上述命令,就可以查看到剛剛在生產者生產的數據

rancher run everything

5、對接Rancher logging

首先在Rancher上創建一個項目,並部署一個nginx應用

D0Ss2D.png

接著在跳轉到工具 -> 日誌,選擇Kafka

有兩種方式連接到kafka集群,分別是zookeeper和broker,這裡以zookeeper訪問端點類型為例

D0ScKH.png

參數解釋:

訪問地址//kafka-zookeeper.kafka:2181,這裡填寫對應的訪問地址,由於選擇的是zookeeper,所以填寫zookeeper相關的service訪問地址,也可以寫clusterIP,默認埠是2181

主題:rancher,日誌將會發送到這個主題上

其他參數:

刷新時間間隔:默認60s,如果是測試環境,可以設置為10s更快的查看效果


點擊測試按鈕,等待結果返回驗證通過後,點擊保存按鈕,Rancher會在System項目下創建相應的fluentd工作負載,到這裡日誌對接基本沒問題了

6、驗證效果

訪問nginx應用,然後等待對應的刷新時間間隔後,在kafka-client中查看是否能消費到數據

D0Sr8O.png

開啟SASL認證方式

values-production.yaml中的配置會包含更多面向生產的配置,例如啟用持久化存儲,啟用sasl相關認證等,所以可以直接使用這個配置文件進行創建kafka集群,會自動sasl相關認證的功能

1、helm 安裝kafka

這裡為了方便都關閉了,取消了持久化存儲,關閉了相關metric,設置生產環境按需開啟

其中autoCreateTopicsEnable設置為true,作用是開啟自動創建topic功能,如果關閉這個,則需要手動創建topic才能對接rancher,生產環境也建議關閉掉

helm install kafka -n kafka -f values-production.yaml --set persistence.enabled=false --set zookeeper.persistence.enabled=false --set metrics.kafka.enabled=false --set metrics.jmx.enabled=false --set zookeeper.metrics.enabled=false --set autoCreateTopicsEnable=true .

執行完helm install後會輸出以下內容

NAME: kafka
LAST DEPLOYED: Wed Nov 25 18:49:35 2020
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.kafka.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.kafka.svc.cluster.local:9092
    kafka-1.kafka-headless.kafka.svc.cluster.local:9092
    kafka-2.kafka-headless.kafka.svc.cluster.local:9092

You need to configure your Kafka client to access using SASL authentication. To do so, you need to create the 'kafka_jaas.conf' and 'client.properties' configuration files by executing these commands:

    - kafka_jaas.conf:

cat > kafka_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="user"
password="$(kubectl get secret kafka-jaas -n kafka -o jsonpath='{.data.client-passwords}' | base64 --decode | cut -d , -f 1)";
};
EOF

    - client.properties:

cat > client.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
EOF

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.6.0-debian-10-r57 --namespace kafka --command -- sleep infinity
    kubectl cp --namespace kafka /path/to/client.properties kafka-client:/tmp/client.properties
    kubectl cp --namespace kafka /path/to/kafka_jaas.conf kafka-client:/tmp/kafka_jaas.conf
    kubectl exec --tty -i kafka-client --namespace kafka -- bash
    export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"

    PRODUCER:
        kafka-console-producer.sh \
            --producer.config /tmp/client.properties \
            --broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092,kafka-1.kafka-headless.kafka.svc.cluster.local:9092,kafka-2.kafka-headless.kafka.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \
            --consumer.config /tmp/client.properties \
            --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

可以看到,已經在k8s集群中創建了3個節點的kafka集群

2、創建kafka client端

由於設置了sasl認證,所以需要創建client.propertieskafka_jaas.conf兩個文件,並拷貝到client端,client端使用這兩個配置文件才能對kafka集群進行相關操作

創建kafka_jaas.conf

cat > kafka_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="user"
password="$(kubectl get secret kafka-jaas -n kafka -o jsonpath='{.data.client-passwords}' | base64 --decode | cut -d , -f 1)";
};
EOF

創建kafka_jaas.conf

cat > kafka_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="user"
password="$(kubectl get secret kafka-jaas -n kafka -o jsonpath='{.data.client-passwords}' | base64 --decode | cut -d , -f 1)";
};
EOF

拷貝到client端內

kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.6.0-debian-10-r57 --namespace kafka --command -- sleep infinity
kubectl cp --namespace kafka client.properties kafka-client:/tmp/client.properties
kubectl cp --namespace kafka kafka_jaas.conf kafka-client:/tmp/kafka_jaas.conf

3、Rancher對接Kafka

SASL認證,只支援通過broker連接入口,所以需要配置broker相關參數

D0Syxe.png

kafka配置

訪問端點類型:選擇Broker

訪問地址:填寫kafka serviceIP地址,這裡填寫//kafka.kafka:9092

主題:日誌發送的主題

SSL配置

如果SASL類型為Scram,則不用配置SSL,反之當SASL類型為Plain時,需要配置SSL

SASL配置

用戶名:默認是user,可以在values.yaml中,通過配置auth.jaas.clientUsers修改

密碼:默認是隨機值,可以通過kubectl get secret kafka-jaas -n kafka -o jsonpath='{.data.client-passwords}' | base64 --decode | cut -d , -f 1這個命令查看,另外也可以通過配置auth.jaas.clientPasswords來修改

類型:Scram

安全機制:sha256


點擊測試按鈕,等待結果返回驗證通過後,點擊保存按鈕,Rancher會在System項目下創建相應的fluentd工作負載,到這裡日誌對接基本沒問題了

4、驗證效果

通過kafka-client查看日誌是否對接成功

進入kafka-client pod bash環境

kubectl exec -it kafka-client -n kafka -- bash

由於開啟了認證,需要export相關環境變數

最後通過kafka-console-consumer.sh命令查看日誌是否發送過來了

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"

kafka-console-consumer.sh  --consumer.config /tmp/client.properties  --bootstrap-server kafka.kafka.svc.cluster.local:9092  --topic rancher  --from-beginning

效果如下:

D0Sgrd.png

kafka-client相關命令

查看topic 列表

kafka-topics.sh --zookeeper kafka-zookeeper:2181 --list

創建topic

kafka-topics.sh --zookeeper kafka-zookeeper:2181 --topic rancher --create --partitions 1 --replication-factor 1

刪除topic

kafka-topics.sh --delete --zookeeper kafka-zookeeper:2181 --topic test

如果delete.topic.enable=true,則會直接刪除topic,如果delete.topic.enable=false,則只是把這個 topic 標記為刪除(marked for deletion),重啟 Kafka Server 後刪除

生產者生產數據

kafka-console-producer.sh  --producer.config /tmp/client.properties  --broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092,kafka-1.kafka-headless.kafka.svc.cluster.local:9092,kafka-2.kafka-headless.kafka.svc.cluster.local:9092  --topic rancher

消費者消費生產者的數據

kafka-console-consumer.sh --consumer.config /tmp/client.properties --bootstrap-server kafka.kafka.svc.cluster.local:9092 --topic rancher --from-beginning