Kafka動態增加Topic的副本

  • 2020 年 2 月 25 日
  • 筆記

一、kafka的副本機制

由於Producer和Consumer都只會與Leader角色的分區副本相連,所以kafka需要以集群的組織形式提供主題下的消息高可用。kafka支援主備複製,所以消息具備高可用和持久性。

    一個分區可以有多個副本,這些副本保存在不同的broker上。每個分區的副本中都會有一個作為Leader。當一個broker失敗時,Leader在這台broker上的分區都會變得不可用,kafka會自動移除Leader,再其他副本中選一個作為新的Leader。

在通常情況下,增加分區可以提供kafka集群的吞吐量。然而,也應該意識到集群的總分區數或是單台伺服器上的分區數過多,會增加不可用及延遲的風險。

關於副本的更多資訊,請參考鏈接:

https://blog.csdn.net/weixin_38750084/article/details/82942564

二、概述

目前的kakfa集群有3個節點,server.properties 關於topic的配置為:

offsets.topic.replication.factor=1                                                                                                                                                                                             transaction.state.log.replication.factor=1                                                                                                                                                                                     transaction.state.log.min.isr=1

目前的設置為1個副本,這樣不健全。如果有一台伺服器掛掉了,那麼就會造成數據丟失!

因此,需要將副本數改為3,也就是每台伺服器都有一個副本,這樣才是穩妥的!

三、動態擴容

kafka-topics.sh 不能用來增加副本因子replication-factor。實際應該使用kafka bin目錄下面的kafka-reassign-partitions.sh

查看topic詳情

首先查看kafka的所有topic

/kafka/bin/kafka-topics.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --list

輸出:

test  ...

查看topic為test的詳細資訊

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

輸出:

Topic:test    PartitionCount:3    ReplicationFactor:1    Configs:      Topic: test    Partition: 0    Leader: 1    Replicas: 1    Isr: 1      Topic: test    Partition: 1    Leader: 2    Replicas: 2    Isr: 2      Topic: test    Partition: 2    Leader: 3    Replicas: 3    Isr: 3

可以看到test的副本數為1

擴容副本

kafka-reassign-partitions.sh 執行時,依賴一個json文件。

創建 test.json

{      "version": 1,      "partitions": [          {              "topic": "test",              "partition": 0,              "replicas": [                  1,                  2,              ]          },          {              "topic": "test",              "partition": 1,              "replicas": [                  1,                  2,              ]          },          {              "topic": "test",              "partition": 2,              "replicas": [                  1,                  2,              ]          }      ]  }

注意:這個json文件和上面查看的test詳情,是有關聯的!否則會導致執行失敗

關係圖

正式執行腳本

/kafka/bin/kafka-reassign-partitions.sh --zookeeper  zookeeper-1.default.svc.cluster.local:2181 --reassignment-json-file test.json --execute

參數解釋:

–reassignment-json-file 帶有分區的JSON文件 –execute 按規定啟動重新分配通過—重新分配JSON文件選擇權。

執行輸出:

Current partition replica assignment    {"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":0,"replicas":[2],"log_dirs":["any"]}]}

出現 Successfully 表示成功了!

再次查看topic為test的partition詳情

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

輸出:

Topic:test    PartitionCount:3    ReplicationFactor:3    Configs:      Topic: test    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1      Topic: test    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2      Topic: test    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

可以發現,副本已經改為3了!

默認配置

在java程式碼或者python程式碼中,是直接發送生產者消息。topic的名字是動態生成的(當kafka發現topic不存在時,會自動創建),那麼它的partitions和replication-factor的數量是由服務端決定的

因為kafka集群有3個節點,所有需要改成3個

offsets.topic.replication.factor=3                                                                                                                                                                                             transaction.state.log.replication.factor=3                                                                                                                                                                                     transaction.state.log.min.isr=3  num.partitions=1  default.replication.factor=3

參數解釋:

offsets.topic.replication.factor 用於配置offset記錄的topic的partition的副本個數 transaction.state.log.replication.factor 事務主題的複製因子  transaction.state.log.min.isr 覆蓋事務主題的min.insync.replicas配置

num.partitions 新建Topic時默認的分區數

default.replication.factor 自動創建topic時的默認副本的個數

注意:這些參數,設置得更高以確保高可用性!

其中 default.replication.factor 是真正決定,topi的副本數量的

關於kafka配置文件的更多解釋,請參考鏈接:

https://blog.csdn.net/memoordit/article/details/78850086

那麼默認參數,如何測試呢?

很簡單,由於在應用程式碼,是不會主動創建topic的,由kafka集群自動創建topic。

那麼由程式碼進行一次,生產者和消費者,就可以了!

Python測試

這個腳本是普通版的kafka消息測試,沒有ACL配置!

test.py

#!/usr/bin/env python3  # coding: utf-8  import sys  import io    def setup_io():  # 設置默認螢幕輸出為utf-8編碼      sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)      sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)  setup_io()    import time  from kafka import KafkaProducer  from kafka import KafkaConsumer    class KafkaClient(object):      def __init__(self, kafka_server, port, topic, content):          self.kafka_server = kafka_server  # kafka伺服器ip地址          self.port = port  # kafka埠          self.topic = topic  # topic名          self.content = content # 內容        def producer(self):          producer = KafkaProducer(bootstrap_servers=['%s:%s' % (kafka_server, port)])          producer.send(topic, content)          producer.flush()  # flush確保所有meg都傳送給broker          producer.close()          return producer        def consumer(self):          consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)])          # consumer.close()          return consumer        def main(self):          startime = time.time()  # 開始時間            client = KafkaClient(self.kafka_server, self.port, self.topic, self.content)  # 實例化客戶端            client.producer()  # 執行生產者          print("已執行生產者")          consumer = client.consumer()  # 執行消費者          print("已執行消費者")          print("等待結果輸出...")          flag = False          for msg in consumer:              # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)              # 判斷生產的消息和消費的消息是否一致              print(msg.value)              # print(self.content)              if msg.value == self.content:                  flag = True                  break            consumer.close()  # 關閉消費者對象          endtime = time.time()  # 結束時間            if flag:              # %.2f %(xx) 表示保留小數點2位              return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime)          else:              return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime)      if __name__ == '__main__':      kafka_server = "kafka-1.default.svc.cluster.local"      port = "9092"      topic = "test_xxx"      content = "hello honey".encode('utf-8')        client = KafkaClient(kafka_server,port,topic,content)  # 實例化客戶端      print(client.main())

這裡指定的topic為 test_xxx

執行Python腳本,然後到伺服器上面,查看topic為test_xxx的詳細資訊

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test_xxx

輸出如下:

Topic:test_xxx    PartitionCount:3    ReplicationFactor:3    Configs:      Topic: test_xxx    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1      Topic: test_xxx    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2      Topic: test_xxx    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

可以發現副本為3,說明默認配置生效了!