Kafka動態增加Topic的副本
- 2020 年 2 月 25 日
- 筆記
一、kafka的副本機制
由於Producer和Consumer都只會與Leader角色的分區副本相連,所以kafka需要以集群的組織形式提供主題下的消息高可用。kafka支援主備複製,所以消息具備高可用和持久性。
一個分區可以有多個副本,這些副本保存在不同的broker上。每個分區的副本中都會有一個作為Leader。當一個broker失敗時,Leader在這台broker上的分區都會變得不可用,kafka會自動移除Leader,再其他副本中選一個作為新的Leader。
在通常情況下,增加分區可以提供kafka集群的吞吐量。然而,也應該意識到集群的總分區數或是單台伺服器上的分區數過多,會增加不可用及延遲的風險。

關於副本的更多資訊,請參考鏈接:
https://blog.csdn.net/weixin_38750084/article/details/82942564
二、概述
目前的kakfa集群有3個節點,server.properties 關於topic的配置為:
offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
目前的設置為1個副本,這樣不健全。如果有一台伺服器掛掉了,那麼就會造成數據丟失!
因此,需要將副本數改為3,也就是每台伺服器都有一個副本,這樣才是穩妥的!
三、動態擴容
kafka-topics.sh 不能用來增加副本因子replication-factor。實際應該使用kafka bin目錄下面的kafka-reassign-partitions.sh
查看topic詳情
首先查看kafka的所有topic
/kafka/bin/kafka-topics.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --list
輸出:
test ...
查看topic為test的詳細資訊
/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test
輸出:
Topic:test PartitionCount:3 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: test Partition: 2 Leader: 3 Replicas: 3 Isr: 3
可以看到test的副本數為1
擴容副本
kafka-reassign-partitions.sh 執行時,依賴一個json文件。
創建 test.json
{ "version": 1, "partitions": [ { "topic": "test", "partition": 0, "replicas": [ 1, 2, ] }, { "topic": "test", "partition": 1, "replicas": [ 1, 2, ] }, { "topic": "test", "partition": 2, "replicas": [ 1, 2, ] } ] }
注意:這個json文件和上面查看的test詳情,是有關聯的!否則會導致執行失敗
關係圖

正式執行腳本
/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --reassignment-json-file test.json --execute
參數解釋:
–reassignment-json-file 帶有分區的JSON文件 –execute 按規定啟動重新分配通過—重新分配JSON文件選擇權。
執行輸出:
Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":0,"replicas":[2],"log_dirs":["any"]}]}
出現 Successfully 表示成功了!
再次查看topic為test的partition詳情
/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test
輸出:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1 Topic: test Partition: 1 Leader: 3 Replicas: 1,2,3 Isr: 3,1,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
可以發現,副本已經改為3了!
默認配置
在java程式碼或者python程式碼中,是直接發送生產者消息。topic的名字是動態生成的(當kafka發現topic不存在時,會自動創建),那麼它的partitions和replication-factor的數量是由服務端決定的
因為kafka集群有3個節點,所有需要改成3個
offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 num.partitions=1 default.replication.factor=3
參數解釋:
offsets.topic.replication.factor 用於配置offset記錄的topic的partition的副本個數 transaction.state.log.replication.factor 事務主題的複製因子 transaction.state.log.min.isr 覆蓋事務主題的min.insync.replicas配置
num.partitions 新建Topic時默認的分區數
default.replication.factor 自動創建topic時的默認副本的個數
注意:這些參數,設置得更高以確保高可用性!
其中 default.replication.factor 是真正決定,topi的副本數量的
關於kafka配置文件的更多解釋,請參考鏈接:
https://blog.csdn.net/memoordit/article/details/78850086
那麼默認參數,如何測試呢?
很簡單,由於在應用程式碼,是不會主動創建topic的,由kafka集群自動創建topic。
那麼由程式碼進行一次,生產者和消費者,就可以了!
Python測試
這個腳本是普通版的kafka消息測試,沒有ACL配置!
test.py
#!/usr/bin/env python3 # coding: utf-8 import sys import io def setup_io(): # 設置默認螢幕輸出為utf-8編碼 sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) setup_io() import time from kafka import KafkaProducer from kafka import KafkaConsumer class KafkaClient(object): def __init__(self, kafka_server, port, topic, content): self.kafka_server = kafka_server # kafka伺服器ip地址 self.port = port # kafka埠 self.topic = topic # topic名 self.content = content # 內容 def producer(self): producer = KafkaProducer(bootstrap_servers=['%s:%s' % (kafka_server, port)]) producer.send(topic, content) producer.flush() # flush確保所有meg都傳送給broker producer.close() return producer def consumer(self): consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)]) # consumer.close() return consumer def main(self): startime = time.time() # 開始時間 client = KafkaClient(self.kafka_server, self.port, self.topic, self.content) # 實例化客戶端 client.producer() # 執行生產者 print("已執行生產者") consumer = client.consumer() # 執行消費者 print("已執行消費者") print("等待結果輸出...") flag = False for msg in consumer: # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # 判斷生產的消息和消費的消息是否一致 print(msg.value) # print(self.content) if msg.value == self.content: flag = True break consumer.close() # 關閉消費者對象 endtime = time.time() # 結束時間 if flag: # %.2f %(xx) 表示保留小數點2位 return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime) else: return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime) if __name__ == '__main__': kafka_server = "kafka-1.default.svc.cluster.local" port = "9092" topic = "test_xxx" content = "hello honey".encode('utf-8') client = KafkaClient(kafka_server,port,topic,content) # 實例化客戶端 print(client.main())
這裡指定的topic為 test_xxx
執行Python腳本,然後到伺服器上面,查看topic為test_xxx的詳細資訊
/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test_xxx
輸出如下:
Topic:test_xxx PartitionCount:3 ReplicationFactor:3 Configs: Topic: test_xxx Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1 Topic: test_xxx Partition: 1 Leader: 3 Replicas: 1,2,3 Isr: 3,1,2 Topic: test_xxx Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
可以發現副本為3,說明默認配置生效了!