0500-使用Python2訪問Kerberos環境下的Kafka
- 2019 年 11 月 27 日
- 筆記
1
文檔編寫目的
Kafka支援多種客戶端語言(C/C++、Go、Java、JMS、.NET、Python)。Fayson在前面多篇文章介紹了Java訪問Kerberos和非Kerberos環境下的Kafka,參考《如何使用Java連接Kerberos的Kafka》。本篇文章Fayson主要介紹使用Python2訪問Kerberos環境下的Kafka。在學習本篇文章內容前你還需要知道《如何通過Cloudera Manager為Kafka啟用Kerberos及使用》。
- 測試環境:
1.作業系統:Redhat7.4
2.CM和CDH版本為5.15.0
3.CDK2.2.0(0.10.2)
4.Python 2.7.15
2
環境準備
在使用Python訪問Kafka前,還需要為Python環境安裝相關的Kafka包,這裡Fayson使用官網推薦使用的confluent-kafka-python依賴包。該依賴包的GitHub地址為:https://github.com/confluentinc/confluent-kafka-python,關於confluent-kafka-python的詳細說明可以參考GitHub。
如下為各個語言對Kafka功能的支援情況
https://docs.confluent.io/current/clients/index.html#feature-support

接下來準備Python訪問Kafka的運行環境。
1.安裝librdkafka依賴包,該依賴包為作業系統的依賴包
[root@cdh4 ~]# yum install -y librdkafka-devel python-devel

注意:安裝的librdkafka依賴包的版本需要>=0.11.5,librdkafka是C語言實現的Apache Kafka高性能客戶端,為生產和使用Kafka提供高效可靠的客戶端。
2. 由於我們訪問的是Kerberos環境下的Kafka,所以需要使用源碼模式安裝confluent-kafka
[root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip install --no-binary :all: confluent-kafka [root@cdh4 anaconda2]# /opt/cloudera/anaconda2/bin/pip show confluent-kafka

3
Python2示例程式碼
1.如下為Python2訪問Kerberos環境下Kafka示例程式碼
[root@cdh4 python_code]# vim kafka_test.py from confluent_kafka import Producer import sys conf = {'bootstrap.servers': 'cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092', 'security.protocol':'sasl_plaintext', 'sasl.kerberos.principal':'fayson@FAYSON.COM', 'sasl.kerberos.keytab':'/data/disk1/python_code/fayson.keytab', 'group.id':'testgroup'} # Create Producer instance p = Producer(**conf) print(p) def acked(err, msg): """Delivery report callback called (from flush()) on successful or failed delivery of the message.""" if err is not None: print("failed to deliver message: {}".format(err.str())) else: print("produced to: {} [{}] @ {}".format(msg.topic(), msg.partition(), msg.offset())) p.produce('test', value='python test value', callback=acked) p.flush()

2.關於Kafka支援的屬性配置可以參考如下地址
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

4
訪問驗證
本文提供的示例程式碼為向Kerberos環境Kafka的test Topic中發送消息,在命令行使用Kafka提供的kafka-console-consumer命令消費Python示例生產的消息。
1.準備客戶端消費配置文件
jaas.conf內容如下:
[root@cdh05 consumer]# more jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/disk1/python_code/fayson.keytab" principal="fayson@FAYSON.COM"; };

client.properties內容如下:
[root@cdh05 consumer]# more client.properties security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka group.id=testgroup

2.在命令行運行如下腳本啟動客戶端消費
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/python_code/consumer/jaas.conf" kafka-console-consumer --topic test --from-beginning --bootstrap-server cdh2.fayson.com:9092,cdh3.fayson.com:9092,cdh4.fayson.com:9092 --consumer.config /data/disk1/python_code/consumer/client.properties

3.在命令行運行python2的示例程式碼向test Topic發送「python test value」消息
[root@cdh4 python_code]# /opt/cloudera/anaconda2/bin/python kafka_test.py

4.查看Kafka消費程式接收到兩條消息

5
總結
1.confluent-kafka-python依賴包需要Python的環境>= 2.7 or Python 3.x。
2.如果使用confluent-kafka-python訪問Kerberos環境下的Kafka,需要安裝librdkafka及其依賴包,然後使用PyPi命令通過源碼的方式安裝。