kafka 非同步雙活方案 mirror maker2 深度解析
mirror maker2背景
通常情況下,我們都是使用一套kafka集群處理業務。但有些情況需要使用另一套kafka集群來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,以下mm1即代表mirror maker1),用來同步兩個kafka集群的數據。
最開始版本的mirror maker本質上就是一個消費者 + 生產者的程式。但它有諸多諸多不足,包括
- 目標集群的Topic使用默認配置創建,但通常需要手動repartition。
- acl和配置修改的時候不會自動同步,給多集群管理帶來一些困難
- 消息會被
DefaultPartitioner
打散到不同分區,即對一個topic ,目標集群的partition與源集群的partition不一致。 - 任何配置修改,都會使得集群變得不穩定。比如比較常見的增加topic到whitelist。
- 無法讓源集群的producer或consumer直接使用目標集群的topic。
- 不保證exactly-once,可能出現重複數據到情況
- mm1支援的數據備份模式較簡單,比如無法支援active <-> active互備
- rebalance會導致延遲
因為存在這些問題,mirror maker難以在生產環境中使用。所以kafka2.4版本,推出一個新的mirror maker2(以下mm2即代表mirror maker2)。mirror maker2基於kafka connect工具,解決了上面說的大部分問題。
今天主要介紹mirror maker2的設計,主要功能和部署。
設計和功能
整體設計
mirror maker2是基於kafka connect框架進行開發的,可以簡單地將mirror maker2視作幾個source connector和sink connector的組合。包括:
- MirrorSourceConnector, MirrorSourceTask:用來進行同步數據的connector
- MirrorCheckpointConnector, MirrorCheckpointTask:用來同步輔助資訊的connector,這裡的輔助資訊主要是consumer的offset
- MirrorHeartbeatConnector, MirrorHeartbeatTask:維持心跳的connector
不過雖然mirror maker2歲基於kafka connect框架,但它卻做了一定的改造,可以單獨部署一個mirror maker2集群,當然也可以部署在kafka connect單機或kafka connect集群環境上。這部分後面介紹部署的時候再介紹。
和mm1一樣,在最簡單的主從備份場景中,mm2建議部署在目標(target)集群,即從遠端消費然後本地寫入。如果部署在源集群端,那麼出錯的時候可能會出現丟數據的情況。
其整體架構如圖:
內部topic設計
mm2會在kafka生成多個內部topic ,來存儲源集群topic相關的狀態和配置資訊,以及維持心跳。主要有三個內部topic:
- hearbeat topic
- checkpoints topic
- offset sync topic
這幾個內部topic都比較好理解,一看名字基本就知道是幹嘛用的,值得一提的是這其中checkpoints和hearbeat功能都可以通過配置關閉。下面我們詳細介紹下這幾個topic的功能和數據格式。
heartbeat topic
在默認的配置中,源集群和目標集群都會有一個用於發送心跳的topic,consumer 客戶端通過這個 topic,一方面可以確認當前的 connector 是否存活,另一方面確認源集群是否處於可用狀態。
heartbeat topic的schema如下:
- target cluster:接收心跳集群
- source cluster:發送心跳的集群
- timestamp:時間戳
checkpoints topic
對應的connector(即MirrorCheckpointConnector)會定期向目標集群發送checkpoint資訊,主要是consumer group提交的offset ,以及相關輔助資訊。
checkpoints topic 的schema如下:
- consumer group id (String)
- topic (String) :包含源集群和目標集群的 topic
- partition (int)
- upstream offset (int): 源集群指定consumer group已提交的offset(latest committed offset in source cluster)
- downstream offset (int): 目標集群已同步的offset(latest committed offset translated to target cluster)
- metadata (String):partition元數據
- timestamp
mm2提供的另一個功能,consumer切換集群消費就是通過這個topic實現的。因為這個topic中存放了源集群consumer group的消費offset,在某些場景(比如源集群故障)下要切換consumer到目標集群,就可以通過這個topic獲取消費offset然後繼續消費。
offset sync
這個topic ,主要是在兩個集群間同步topic partition的offset。這裡的offset並不是consumer的offset,而是日誌的offset。
它的 schema 如下:
- topic (String):topic 名
- partition (int)
- upstream offset (int):源集群的 offset
- downstream offset (int):目標集群的 offset,和源集群的應該保持一致
config sync
mm2會將源集群的數據同步到目標集群,那麼目標集群對應的topic的讀寫許可權上怎樣的呢?mm2約定了,目標集群對應的topic(源集群備份的那個)只有source和sink connector能夠寫入。為了實施此策略,MM2使用以下規則將 ACL 策略傳播到下游主題:
- 若用戶對源集群的topic有read的許可權,那麼對目標集群對應的topic也有read的許可權
- 除了mm2,別的用戶都不能寫入目標集群對應的topic
同時會同步topic相關配置資訊
acl
consumer切換集群
源集群的consumer group offset ,是存儲在目標集群的checkpoint topic中,這點我們上面已經有說到過。要獲取這些offset資訊,可以使用MirrorClient#remoteConsumerOffsets
這個 api,然後就能用 consumer#seek
api 根據給出的offset消費。
這裡順便提供下大致程式碼,首先maven添加依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>2.4.0</version>
</dependency>
然後獲取offset資訊:
MirrorMakerConfig mmConfig = new MirrorMakerConfig(mm2.getProp());
MirrorClientConfig mmClientConfig = mmConfig.clientConfig("target-cluster");
MirrorClient mmClient = new MirrorClient(mmClientConfig);
Map<TopicPartition, OffsetAndMetadata> offsetMap =
mmClient.remoteConsumerOffsets("my-consumer-group", "source-cluster", Duration.ofMinutes(1));
consumer#seek
的用法就不演示了。
其他功能
最後順便介紹下其他比較基礎的功能。
源集群和目標集群partition保持同步
- 消息的分區和排序,源集群和目標集群都會保持一樣
- 目標集群的分區數與源集群分區保持一樣
- 目標集群只會有一個topic與源集群topic對應
- 目標集群只會有一個分區與源集群的分區對應
- 目標集群的partition i對應源集群partition i
說白了就是源集群和目標集群的partition和消息會盡量保持一致,當然可能會有重複消息的情況,因為目前還不指定exactly-once,據說後續版本會有(2.4版本以後)。
同步topic增加前綴
mm1有一個缺陷,因為mm1備份數據的時候,源集群和目標集群的topic名稱都是一樣的,所以可能出現兩個集群的消息無限遞歸的情況(就是兩個名稱相同的topic,一條消息a傳b,b再傳a,循環往複)。mm2解決這個缺陷,採用了給topic加一個前綴的方式,如果是兩個集群相互備份,那麼有前綴的topic的消息,是不會備份的。
同步配置和acl
mm1的時候,配置資訊和topic acl相關的資訊是不會同步的,這會給集群管理帶來一定的困難,mm2解決了這個問題,即源集群的配置和acl都會自動同步到目標集群中。
說完功能,最後再介紹下部署方式。
部署方式
目前主要支援三種部署方式
- mm2專用集群部署:無需依賴kafka connect,mm2已經提供了一個driver可以單獨部署mm2集群,僅需一條命令就可以啟動:./bin/connect-mirror-maker.sh mm2.properties
- 依賴kafka connect集群部署:需要先啟動kafka connect集群模式,然後手動啟動每個mm2相關的connector,相對比較繁瑣。適合已經有kafka connect集群的場景。
- 依賴kafka connect單機部署:需要在配置文件中配置好各個connector,然後啟動Kafka connect單機服務。不過這種方式便捷性不如mm2專用集群模式,穩定性不如kafka connect 集群模式,適合測試環境下部署。
mm2 相關的配置參照KIP-382,主要配置包括 source 和 target 的 broker 配置,hearbeat ,checkpoint 功能是否啟用,同步時間間隔等。
mm2獨立集群部署
要部署mm2集群相對比較簡單,只需要先在config/mm2.properties寫個配置文件:
# 指定兩個集群,以及對應的host
clusters = us-west, us-east
us-west.bootstrap.servers = host1:9092
us-east.bootstrap.servers = host2:9092
# 指定同步備份的topic & consumer group,支援正則
topics = .*
groups = .*
emit.checkpoints.interval.seconds = 10
# 指定複製鏈條,可以是雙向的
us-west->us-east.enabled = true
# us-east->us-west.enabled = true # 雙向,符合條件的兩個集群的topic會相互備份
# 可以自定義一些配置
us-west.offset.storage.topic = mm2-offsets
# 也可以指定是否啟用哪個鏈條的hearbeat,默認是雙向hearbeat的
us-west->us-east.emit.heartbeats.enabled = false
然後使用一條命令就可以啟動了,./bin/connect-mirror-maker.sh mm2.properties
。啟動後用jps觀察進程,再list下topic,可以發現多了許多個topic,這種時候應該就啟動成功了。
順便說下,如果是使用kafka connect集群,那需要手動啟動每個connector,類似這樣:
PUT /connectors/us-west-source/config HTTP/1.1
{
"name": "us-west-source",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "us-west",
"target.cluster.alias": "us-east",
"source.cluster.bootstrap.servers": "us-west-host1:9091",
"topics": ".*"
}
以上~