「從零單排canal 06」 instance模組源碼解析
基於1.1.5-alpha版本,具體源碼筆記可以參考我的github://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal
instance模組比較簡單,我們重點了解以下幾個問題
- instance配置模式有哪幾種,如何根據配置創建instance?
- 遠端配置如何覆蓋本地配置的?
- instance實例內部有哪些組件?
1.基本結構
instance模組下面也分為三個子模組,core、manager、spring。
其中,core是instance的核心邏輯 。
而manager和spring只是兩種不同的instance配置讀取方式,manager通過http請求讀取admin的配置,spring通過配置文件的方式讀取。
主要控制邏輯我們在deployer模組源碼分析中提到過,就是在CanalController類 的instanceGenerator,配置參數是canal.instance.global.mode
- 根據destination創建config
- 如果canal.instance.global.mode = manager,就使用PlainCanalInstanceGenerator
- 如果canal.instance.global.mode = spring,就使用SpringCanalInstanceGenerator
源碼如下
2.core子模組
程式碼不多,就兩個介面,兩個類。
2.1 CanalInstanceGenerator介面
這個介面只有一個方法
具體實現就是開頭提到的兩種,PlainCanalInstanceGenerator和SpringCanalInstanceGenerator,分別在manager子模組和spring子模組中實現。
具體選擇就是開頭的那個canalController裡面根據canal.instance.global.mode來選擇。
2.2 CanalInstance介面
先看一張官方文檔的圖,這個前面的文章已經分析過了。
server代表一個canal-server運行實例,對應於一個jvm。server內部可以有多個instance。
Instance內部有4個主要組件:
- eventParser :數據源接入,模擬slave協議和master進行交互,協議解析
- eventSink :Parser和Store連接器,進行數據過濾,加工,分發的工作
- eventStore :數據存儲
- metaManager:增量訂閱&消費資訊管理器
在這個介面中,就定義了獲取4個組件的方法,以及新版本增加的mqProducer的配置資訊(mqProducer在server模組解析中介紹過了,可以回頭去看看)
我們簡單看下4個組件介面的各個實現類。
CanalEventParser介面實現類(paser模組):
- MysqlEventParser:偽裝成單個mysql實例的slave解析binglog日誌
- GroupEventParser:偽裝成多個mysql實例的slave解析binglog日誌。內部維護了多個CanalEventParser,組合多個EventParser進行合併處理,group只是作為一個delegate處理。主要應用場景是分庫分表:比如一個大表拆分了4個庫,位於不同的mysql實例上,正常情況下,我們需要配置四個CanalInstance。對應的,業務上要消費數據時,需要啟動4個客戶端,分別鏈接4個instance實例。為了方便業務使用,此時我們可以讓CanalInstance引用一個GroupEventParser,由GroupEventParser內部維護4個MysqlEventParser去4個不同的mysql實例去拉取binlog,最終合併到一起。此時業務只需要啟動1個客戶端,鏈接這個CanalInstance即可
- LocalBinlogEventParser:解析本地的mysql binlog。例如將mysql的binlog文件拷貝到canal的機器上進行解析。
- RdsLocalBinlogEventParser:基於阿里雲rds的binlog備份文件複製,下載到本地後進行本地的binlog解析。
CanalEventSink介面實現類(sink模組):
- EntryEventSink:普通的單個parser的sink操作,進行數據過濾,加工,分發
- GroupEventSink:用於分庫分表的場景,對應GroupEventParser的數據解析,然後實現基於歸併排序的sink處理
CanalEventStore介面實現類(store模組):
- MemoryEventStoreWithBuffer:基於記憶體實現存儲store
CanalMetaManager(meta模組):
- ZooKeeperMetaManager:將元數據存存儲到zk中
- MemoryMetaManager:將元數據存儲到記憶體中
- MixedMetaManager:組合memory + zookeeper的使用模式
- PeriodMixedMetaManager:基於定時刷新的策略的mixed實現
- FileMixedMetaManager:先寫記憶體,然後定時刷新數據到File
關於這些實現的具體細節,我們在相應模組的源碼分析時,進行講解。目前只需要知道,一些組件有多種實現,因此組合工作方式有多種。
2.3 AbstractCanalInstance類
AbstractCanalInstance是canalInstance的抽象類,維護了相關組件的引用
這個抽象類有兩個實現,CanalInstanceWithManager 和 CanalInstanceWithSpring。
AbstractCanalInstance的初始化過程都是在實現類中完成的。
如果選擇admin控制模式,那就是在CanalInstanceWithManager中完成,如果是spring模式,就在CanalInstanceWithSpring中完成。
但是它們的初始化過程並不是在這裡完成的,如果選擇admin控制模式,那就是在CanalInstanceWithManager中完成,如果是spring模式,就在CanalInstanceWithSpring中完成。
這裡有個小發現:
仔細看下實際程式碼調用我們發現,CanalInstanceWithManager是給ManagerCanalInstanceGenerator使用的,而這個generator實際上沒有被使用到。如果使用admin模式,本文開頭我們就看到了,使用了PlainCanalInstanceGenerator。PlainCanalInstanceGenerator裡面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然後替換系統變數,然後再從spring的beanfactory中構建具體的實例。
2.3.1 subscribeChange() 方法
AbstractCanalInstance類實現了CanalInstance介面的subscribeChange方法。
我們看到,如果訂閱關係發生變化,就做一些操作,這裡看的話,主要就是更新了一下filter。
filter規定了需要訂閱哪些庫,哪些表。
2.3.2 start() 方法
啟動沒什麼特別的邏輯,就是按照順序依次啟動各個組件。
順序為 metaManager -> alarmHandler -> eventStore -> eventSink -> eventParser。
啟動順序主要跟依賴關係有關,元資訊相關的管理跟所有都有關,所以metaManager最先啟動,其他的按照彼此之間的關係一一啟動。
這裡我們發現,在啟動eventParser的時候做了特殊處理,分別是beforeStartEventParser 和 afterStartEventParser。後文我們專門講一下。
2.3.3 stop()方法
stop也沒什麼特殊的,就是依次關閉各個組件。
關閉的順序就是start的逆過程。
這裡就不貼程式碼了。
2.3.4 eventParserr的特殊處理
在start和stop方法中的eventParser前後都有特殊的處理,start的beforeStartEventParser 和 afterStartEventParser,Stop的beforeStopEventParser 和 afterStopEventParser。
這個其實跟eventParser的設計有關。
EventParser 設計
- 每個EventParser都會關聯兩個內部組件
- CanalLogPositionManager : 記錄binlog 最後一次解析成功位置資訊,主要是描述下一次canal啟動的位點 CanalHAController: 控制 EventParser 的鏈接主機管理,判斷當前該鏈接哪個mysql資料庫
所以這兩個beforexxx、afterxxxx方法做的主要是CanalLogPositionManager和CanalHAController的啟停工作。
2.3.5 AbstractCanalInstance類 總結
可以看到AbstractCanalInstance除了負責啟動和停止其內部組件,就沒有其他工作了。
eventParser在AbstractCanalInstance中啟動後,就會自行開啟多執行緒任務dump數據,通過eventSink投遞給eventStore。
而對eventStore的操作邏輯,實際上都是在CanalServerWithEmbedded中完成的,我們可以回顧一下CanalServerWithEmbedded中 getWithoutAck( ) 的相關邏輯。
包括:
- 根據clientIdentity的destination獲取對應的instance
- 獲取到流式數據中的最後一批獲取的位置positionRanges(跟batchId有關聯,就是上面那個map裡面的)
- 從cananlEventStore裡面獲取binlog,轉化為event。一般是從最後的一個batchId位置開始,如果之前沒有batchId,那麼就從cursor記錄的消費位點開始;如果cursor為空,那隻能從eventStore的第一條消息開始。(這裡幾個位置關係再想一想,跟ack有關,畫個圖)
- event轉化為entry,並生成新的batchId,組合成message返回給客戶端
所以,其實這裡只是簡單的啟動和停止,組件的交互邏輯是在CanalServerWithEmbedded中get出instance的各個組件來進行實現的。
3.spring模組
前面提到了,PlainCanalInstanceGenerator裡面的generate方法實現,其實跟SpringCanalInstanceGenerator差不多。就是從遠端admin拉到配置,然後替換系統變數,然後再從spring的beanfactory中構建具體的實例。
所以我們重點關注spring子模組的配置方式即可。
就下面四個類
3.1 CanalInstanceWithSpring類
基於spring容器啟動canal實例,方便獨立於manager啟動。
繼承了AbstractCanalInstance,其實就是一系列組件的setter方法,就不貼源碼了。
具體配置是基於spring的xml來做的.
當我們配置載入方式為spring時,創建的CanalInstance實例類型都是CanalInstanceWithSpring。canal將會尋找本地的spring配置文件來創建instance實例。canal默認提供了以下幾種spring配置文件:
- spring/memory-instance.xml
- spring/file-instance.xml
- spring/default-instance.xml
- spring/group-instance.xml
四個配置文件中,對CanalInstanceWithSpring都採用了同樣的配置方式:
當然,具體每個組件的ref在不同配置文件中有所不同。
最主要的就是metaManager 和eventParser 這兩個配置有所不同,可能在記憶體、文件或zk進行存儲。
eventStore 、和eventSink 定義都是相同的,eventStore目前的開源版本中eventStore只有一種基於記憶體的實現,eventSink其作用是eventParser和eventStore的連接器,進行數據過濾,加工,分發的工作。不涉及存儲,也就沒有必要針對記憶體、file、或者zk進行區分。
3.2 SpringCanalInstanceGenerator類
這個是具體創建instance的邏輯。
順便看下PlainCanalInstanceGenerator裡面的實現,就是多了從遠端拉取配置,然後用PropertyPlaceholderConfigurer進行了變數替換,然後還是用beanFactory來獲取實例。
com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer繼承了org.springframework.beans.factory.config.PropertyPlaceholderConfigurer,設置動態properties,替換掉本地properties。
4.總結
其實這個模組的東西比較少,沒有什麼特別複雜的邏輯。
我們來回顧下開頭的幾個問題
- instance配置模式有哪幾種,如何根據配置創建instance?
主要有基於spring和基於遠端配置兩種方式,前者的實現在,後者的實現在PlainCanalInstanceGenerator
- 遠端配置如何覆蓋本地配置的?
PlainCanalInstanceGenerator中使用了spring的PropertyPlaceholderConfigurer來覆蓋配置
- instance實例內部有哪些組件?
包括了parser、sink、store、metamanager等組件,但是只負責了啟動和停止邏輯,具體交互邏輯還是在CanalServerWithEmbedded中實現的。
都看到最後了,原創不易,點個關注,點個贊吧~
文章持續更新,可以微信搜索「阿丸筆記 」第一時間閱讀,回復關鍵字【學習】有我準備的一線大廠面試資料。
知識碎片重新梳理,構建Java知識圖譜:github.com/saigu/JavaK…(歷史文章查閱非常方便)