python玩玩kafka

  • 2019 年 10 月 6 日
  • 筆記

kafka是一種高吞吐量的分散式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。

kafka裡面的一些概念:

  • producer:生產者。
  • consumer:消費者。
  • topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分門別類,每一類的消息稱之為一個主題(Topic)。
  • broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發布的消息。

kafka有四個核心API:producer API,consumer API,streams API,connector API

kafka有什麼用?

可它以有效的獲取系統和應用程式之間的數據,對數據流進行轉換或者反應。

關於kafka的下載安裝就不過多介紹了,下面主要介紹的是使用python操作kafka。

首先安裝kafka的模組:

pip install kafka

安裝完我們就可以嘗試著去跑個例子:

首先看看producer是怎麼跑起來的:

from kafka import KafkaProducer    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])    for i in range(3):      msg = "msg%d" % i      producer.send('test', msg)  producer.close()

調用KafkaProducer指定server地址即可

類似的來看看consumer例子:

from kafka import KafkaConsumer    consumer = KafkaConsumer('test',                           bootstrap_servers=['127.0.0.1:9092'])    for message in consumer:      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                            message.offset, message.key,                                            message.value))

對於consumer group(消費者群組),我們需要給一個群組id(用來區分單個消費者或是群組):

from kafka import KafkaConsumer    consumer = KafkaConsumer('test',                           group_id='my-group',                           bootstrap_servers=['127.0.0.1:9092'])    for message in consumer:      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                            message.offset, message.key,                                            message.value))

使用consumer訂閱多個主題,需要使用subscribe方法,傳入需要訂閱的標題:

from kafka import KafkaConsumer  from kafka.structs import TopicPartition    consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])  consumer.subscribe(topics=('topic1','topic2','top3'))  #訂閱要消費的主題  print consumer.topics()  print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量  for message in consumer:      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                            message.offset, message.key,                                            message.value))

如果需要手動拉取資訊,那我們需要加一個循環,在這個循環里監聽,一直獲取伺服器資訊:

from kafka import KafkaConsumer    consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])  consumer.subscribe(topics=('topic1','topic2','top3'))  while True:      msg = consumer.poll(timeout_ms=5)   #從kafka獲取消息      print msg

如果想掛起consumer可以調用pause()方法,恢復調用resume()方法:

from kafka import KafkaConsumer  from kafka.structs import TopicPartition  import time    consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])  consumer.subscribe(topics=('topic1'))  consumer.topics()  consumer.pause(TopicPartition(topic=u'test', partition=0))  num = 0  while True:      print num      print consumer.paused()   #獲取當前掛起的消費者      msg = consumer.poll(timeout_ms=5)      print msg      time.sleep(2)      num = num + 1      if num == 10:          consumer.resume(TopicPartition(topic=u'test', partition=0))          print "resume......"

關於簡單的操作就介紹到這裡了,想了解更多:

https://pypi.org/project/kafka-python/