全量同步Elasticsearch方案之Canal
一、前言
Canal
是阿里的一款開源項目,純 Java
開發。基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了 MySQL
(也支持 mariaDB
)。
Canal
除了支持 binlog
實時 增量同步 數據庫之外也支持 全量同步 ,本文主要分享使用Canal來實現從MySQL到Elasticsearch的全量同步;
可通過使用 adapter
的 REST
接口手動觸發 ETL
任務,實現全量同步。
在執行全量同步的時候,同一個
destination
的增量同步任務會被 阻塞,待全量同步完成被阻塞的增量同步會被 重新喚醒
PS:關於Canal的部署與 實時同步 請看文章《Canal高可用架構部署》
二、ETL接口
adapter
的 ETL
接口為:/etl/{type}/{task}
- 默認web端口為
8081
- type 為類型(hbase/es7/rdb)
- task 為任務名對應配置文件名,如sys_user.yml
例子:
curl -X POST //127.0.0.1:8081/etl/es7/sys_user.yml
執行成功輸出:
{"succeeded":true,"resultMessage":"導入ES 數據:17 條"}
三、實踐過程中遇到的坑
3.1. 連接池不夠
當同步的數據量比較大時,執行一段時間後會出現下圖的錯誤
3.1.1. 原因分析
查看 canal
源碼得知當同步的數據量大於1w時,會分批進行同步,每批1w條記錄,並使用多線程來並行執行任務,而 adapter
默認的連接池為3,當線程獲取數據庫連接等待超過1分鐘就會拋出該異常。
線程數為當前服務器cpu的可用線程數
3.1.2. 解決方式
修改 adapter
的 conf/application.yml
文件中的 srcDataSources
配置項,增加 maxActive
配置數據庫的最大連接數為當前服務器cpu的可用線程數
cpu線程數可以下命令查看
grep 'processor' /proc/cpuinfo | sort -u | wc -l
3.2. es連接超時
當同步的表字段比較多時,幾率出現以下報錯
3.2.1. 原因分析
由於 adapter
的表映射配置文件中的 commitBatch
提交批大小設置過大導致(6000)
3.2.2. 解決方式
修改 adapter
的 conf/es7/xxx.yml
映射文件中的 commitBatch
配置項為3000
3.3. 同步慢
三千萬的數據量用時3.5小時左右
3.3.1. 原因分析
由於當數據量大於1w時 canal
會對數據進行分批同步,每批1w條通過分頁查詢實現;所以當數據量較大時會出現深分頁的情況導致查詢非常慢。
3.3.2. 解決方式
預先使用ID、時間或者業務字段等進行數據分批後再進行同步,減少每次同步的數據量。
3.3.3. 案例
使用ID進行數據分批,適合增長類型的ID,如自增ID、雪花ID等;
- 查出 最小ID、最大ID 與 總數據量
- 根據每批數據量大小計算每批的 ID區間
計算過程:
- 最小ID = 1333224842416979257
- 最大ID = 1341698897306914816
- 總數據量 = 3kw
- 每次同步量 = 300w
(1) 計算同步的次數
總數據量 / 每次同步量 = 10
(2) 計算每批ID的增量值
(最大ID - 最小ID) / 次數 = 847405488993555.9
(3) 計算每批ID的值
最小ID + 增量值 = ID2
ID2 + 增量值 = ID3
...
ID9 + 增量值 = 最大ID
(4) 使用分批的ID值進行同步
修改sql映射配置,的 etlCondition
參數:
etlCondition: "where id >= {} and id < {}"
調用etl接口,並增加 params
參數,多個參數之間使用 ;
分割
curl -X POST //127.0.0.1:8081/etl/es7/sys_user.yml?params=最小ID;ID2
curl -X POST //127.0.0.1:8081/etl/es7/sys_user.yml?params=ID2;ID3
...