Kafka異地雙活深度講解 – Mirrormaker V2

  • 2019 年 10 月 31 日
  • 筆記

總結:Apache Kafka Mirrormaker V1的解決方案在提供企業管理的災難恢復方面存在局限性。MM V2(KIP-382)針對MM V1 進行了擴展,並修復了MM V1的局限性,使其能夠動態修改配置,並且能夠將Topic在群集之間保持同步,同時儘可能地降低觸發Rebalance的情況以提高性能。此外,Active-Active群集和Disaster Recover在MM V2中已經屬於開箱即用(Out-of-the-box)功能。

MirrorMaker2架構

01

MM V2的核心架構基於Kafka Connect框架,可以抽象的理解為是一個Kafka Connect 里的Source Connector和Sink Connector的組合體。MM V2建議採用與MM V1 中一致的Remote consume和Local produce的部署模式。因此在最簡單的,也就是Source – Target 複製場景下,MM V2服務是部署在Target數據中心的。同時MM V2的 Connect框架所需的Primary集群與Target 的Kafka集群是共用的。

(點擊查看大圖)

為什麼不使用Kafka MM V1 來實現Kafka的跨集群複製?

02

MM V1 中Topic的命名問題

基於MM V1的Kafka集群複製通常需要將Source的Topic名和Target的Topic名保持相同。這樣Topic命名過程會導致在Active – Active雙活時造成無限的消息循環。

在MM V2中,會通過增加預先配置的前綴(例如,群集別名是DCX或DCY)到Target的Topic的命名來解決這個問題。

例如,在Active – Active 場景下複製兩個數據中心DCX,DCY的兩個Kafka群集,MM V2會過濾掉前綴中帶有目標群集名稱的任何Topic。而Consumer可以訂閱模糊匹配的多個Topic,例如,"* TopicA" 從源群集中消費,並在故障轉移後自動繼續從目標群集消費。

(點擊查看大圖)

主備Consumer Offset 管理

在MM V1中,Source集群的Topic Partition Offset和目標群集上Topic Partition Offset 幾乎不可能相同。因此,Consumer提交到Source集群的Committed Offset 在Target集群中是不可用的。如果Consumer切換到目標群集,也就不能簡單地使用原有的Committed Offset來繼續消費。一種處理辦法是依賴Kafka對消息時間戳的支援,但是這個解決辦法不夠完美,因為涉及到了猜測時間和重複消費的問題。

(點擊查看大圖)

MM V2的實現則完全不同,它採用了2個內部Topic來跟蹤源和目標的offset mapping。同時也自動的完成了Source 的consumer_offsets與target 的 consumer_offsets之間的轉換。技術實現上是通過Target集群中的 "offset_sync " topic對Source的Consumer offset與Target的Consumer Offset 進行了映射處理。對依賴 "__consumer_offsets" Topic來跟蹤進度的Consumer Groups,MM V2會將Consumer offset映射到Target群集中的 "__checkpoint" Topic中,這是一個Log Compacted Topic。同時定期向源集群查詢來自所有Consumer Group已提交的offset,並向Target集群的"__checkpoint" Topic發送消息來完成Offset管理的動作。

(點擊查看大圖)

因此,在MM V2 中,通過使用 "__checkpoint" Topic,Consumer在故障遷移時時可以直接確定(使用MM V2 API)需要開始消費的目標集群offset。

減少MirrorMaker集群數量

傳統上,MM V1 群集與目標群集共用。因此,在使用Remote Consume和Local Produce模式之後,每個目標集群都會有一個鏡像集群。

(點擊查看大圖)

於大型的數據中心,這樣會明顯的增加運營成本。而理想的情況是,每一對兒數據中心原則上應該只有一個MM集群。

在MM V2的實現下,Kafka Connect框架會假定Source Connetor從外部源讀取並寫入Kafka,而Sink Connector從Kafka讀取並寫入外部接收器。因此每個Target數據中心只需要一個Connect集群,在該對數據中心上複製的所有Kafka集群都可以由一個MM V2集群處理。

(點擊查看大圖)

更加靈活的白/黑名單控制

MM V1會將白名單和黑名單與正則表達式或Topic列表結合使用。但這些都是靜態配置的。也就是說,當創建一個與白名單匹配的新Topic時,會在Target集群上創建新Topic,並自動進行複製。但是,白名單本身更新時,它需要重啟。每次列表更改時重新啟動MM V1都會在造成數據堆積,從而導致重啟後的複製吞吐風暴。在MM V2中,可以使用REST API動態更改Topic列表和正則表達式的配置,不需要重啟服務。

為什麼不直接用Kafka Connect來實現Kafka的跨集群複製?

03

Kafka Connect框架的Kafka重依賴問題

Kafka Connect框架需要有一個Kafka集群來存儲狀態,在Connect中叫「Primary」集群。在典型的Kafka Connect配置中,Source Connector將數據從外部源寫入Kafka集群,而Sink Connector從Kafka集群讀取數據並寫入外部存儲庫。在Source – Target 複製場景下,Connect的Primary集群是我們的Target Kafka集群。

如果我們只是採用Kafka Source和Connect連接器並將它們串聯起來實現kafka的災備,那麼數據先寫入Primary Kafka 集群然後再讀取出來。MM V2 則是從Source直接傳遞給Sink 從而避免了這種不必要的數據複製。

同時,在Active – Active場景下,沒有必要為每個Kafka集群建一個Primary群集。

Rebalance的頻繁觸發

MirrorMaker2中使用的Kafka Connect框架原生採用了Kafka的High Level Consumer從Kafka讀取數據。High Level Consumer會自動地使Consumer Group中消費的分區在整個組中自動平衡。每次Topic的元數據發生更改時,例如改分區計數,或更改Connect Worker的數量等等,會觸發Connect rebalance。頻繁的重新平衡會導致阻塞,並且嚴重的影響Target集群的吞吐。

在MM V2中,我們使用了Low Level Consumer 去Consume給定的分區列表,因此可以避免由於Topic的分區數更改而觸發的Rebalance動作。因此,對Topic和分區數的任何更改都不會導致完全的重新平衡。但是,需要注意的是,由Connect集群本身(例如添加更多Worker Node等)的更改觸發的重新平衡是無法避免的。在大多數情況下,這些變化比Topic的變化次數要少的多。

MM V2目前的一些局限性及未來改進

04

跨集群有且只有一次的消息複製

Kafka提供有且只有一次(EOS)的消息處理,但該特性僅是針對某一個具體的Kafka集群,而在跨集群的場景下並不適用。因此跨群集複製無法直接利用這個特性。也就是說,當前的MM2在源和目標集群之間複製數據時只能提供至少一次語義,下游可能存在重複記錄。

來看一下跨集群複製上在哪個環節會出現數據重複。如果我們將複製的動作看做一個從Source集群Consume並Produce到Target集群的流,那麼消費Source端的Consumer會寫Source端的"__consumer_offsets" Topic來跟蹤這個Consumer的狀態。同時會把數據寫入下游的對應Topic中

(點擊查看大圖)

這兩個「Write」操作不能做成原子事務,因為它們跨越兩個不同的集群,總是有可能在其中一個失敗時導致數據重複。

如何才能做到跨集群的有且只有一次的消息處理?其實和其他流數據處理系統一樣,在MM V2中,我們有一個"__checkpoint" Topic是在Target集群上的,它是用來來跟蹤Source的Consumer狀態。因此,MM V2可以通過這個內部Topic與目標Topic處於同一事務中來提供EOS語義。這個功能即將在MM V2的下一次迭代中推出。

高吞吐的對等複製

在複製Kafka集群的場景中,如果源和目標在相同Topic,相同Partition數,相同的分區方案,相同壓縮,相同serde的情況下,我們稱之為對等鏡像。在這種情況下,理想情況是將一批記錄作為位元組流讀取並將其寫入而不進行任何處理。這樣我們可以繞過Consume,解析,序列化,Produce這個流程。對等複製可以比傳統方法提供更高的吞吐。這是MM V2即將推出的另一項功能。

參考文章:

【A look inside Kafka Mirrormaker 2】:

https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/

【Kafka – KIP 382 】:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

【KIP 382 Repo】:

https://github.com/apache/kafka/pull/6295