阿里Canal框架數據庫同步-實戰教程
- 2020 年 4 月 20 日
- 筆記
- 【學習 MySql】
一、Canal簡介:
canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
二、背景介紹:
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
三、適用場景:
在一些複雜的業務邏輯中,可能插入或者查詢數據都比較頻繁,如果一直在數據庫插入查詢會造成速度非常慢,可以把數據庫表分成兩個庫,一個庫用來做查詢,一個庫作為插入數據,讀寫分離,怎麼解決呢?就可以用canal框架來監聽數據是否發生改變,來同步數據。
比如大部分人都做搜索引擎ES,咱們不可能每次數據庫更新了數據手動去同步索引庫,咱們就可以用Canal來監聽數據庫增刪改時去重新導入索引庫,保持數據一致性。
四、Canal的工作機制
複製過程分成三步:
(1) Master主庫將改變記錄,寫到二進制日誌(binary log)中(這些記錄叫做二進制日誌事件,binary log events,可以通過show binlog events進行查看);
(2) Slave從庫向mysql master發送dump協議,將master主庫的binary log events拷貝到它的中繼日誌(relay log);
(3) Slave從庫讀取並重做中繼日誌中的事件,將改變的數據同步到自己的數據庫。
四、Canal中間件功能
基於純java語言開發,可以用於做增量數據訂閱和消費功能。
相比於傳統的數據同步,我們通常需要進行先搭建主從架構,然後使用binlog日誌進行讀取,然後指定需要同步的數據庫,數據庫表等信息。但是隨着我們業務的不斷複雜,這種傳統的數據同步方式以及開始變得較為繁瑣,不夠靈活。
canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議mysql master收到dump請求,開始推送binary log給slave(也就是canal),canal解析binary log對象(原始為byte流),通過對binlog數據進行解析即可獲取需要同步的數據,在進行同步數據的過程中還可以加入開發人員的一些額外邏輯處理,比較開放。
Binlog的三種基本類型分別為:
STATEMENT模式只記錄了sql語句,但是沒有記錄上下文信息,在進行數據恢復的時候可能會導致數據的丟失情況
ROW模式除了記錄sql語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行數據的變化歷史,但是會佔用較多的空間,需要使用mysqlbinlog工具進行查看。
MIX模式比較靈活的記錄,例如說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了數據更新或者刪除情況下就會變為row模式
五、安裝Canal
1.準備工作:win10系統、jdk1.8、mysql5.7、canal1.1.1
2.連接自己的數據,檢查binlog功能是否有開啟,檢查命令:show variables like ‘log_bin’;
3.如果顯示狀態為OFF表示該功能未開啟,就需要找到自己安裝的Mysql位置找到my.ini文件,在此文件的最下面一行加上如下(注意:保存文件後重啟下自己的Mysql數據庫):
1 server-id=1 #不能與canal的slaveId重複即可
2 log-bin=mysql-bin
3 binlog_format = ROW #設置ROW模式
4.再次查看binlog功能是否有開啟,檢查命令:show variables like ‘log_bin’;
5.我們需要創建一個用戶操作數據庫的寫入操作,我們需要給用戶權限,執行如下sql語句:
1 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
2 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
3 FLUSH PRIVILEGES;
6.下載我canal客戶端,官方地址進行相應版本的安裝包進行下載(注意:如果下載翻到本文最下面聯繫我): //github.com/alibaba/canal/releases
7.下載成功後,解壓壓縮包後進入conf下面的example目錄下面的instance.properties文件打開編輯如下地方:
8.返回bin目錄點擊startup.bat啟動canal服務端,如下圖表示啟動成功:
六、java代碼實現
1.新建一個maven項目,導入maven jar包如下:
1 <dependency> 2 <groupId>com.alibaba.otter</groupId> 3 <artifactId>canal.client</artifactId> 4 <version>1.1.0</version> 5 </dependency>
2.編寫測試代碼
1 package com.fuzongle.canal.conf;
2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.client.CanalConnectors;
5 import com.alibaba.otter.canal.protocol.CanalEntry;
6 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
7 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
8 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
9 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
10 import com.alibaba.otter.canal.protocol.Message;
11 import com.google.protobuf.InvalidProtocolBufferException;
12
13 import java.net.InetSocketAddress;
14 import java.util.List;
15 import java.util.Queue;
16 import java.util.concurrent.ConcurrentLinkedQueue;
17 /**
18 * @Auther: fzl
19 * @Date: 2020/4/20 01:21
20 * @Description:
21 */
22 public class TestCanal {
23
24 private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
25
26 public static void main(String[] args) {
27 //獲取canalServer連接:本機地址,端口號
28 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
29 11111), "example", "", "");
30 int batchSize = 1000;
31 try {
32 //連接canalServer
33 connector.connect();
34 //訂閱Desctinstion
35 connector.subscribe();
36 connector.rollback();
37 try {
38 while (true) {
39 //嘗試從master那邊拉去數據batchSize條記錄,有多少取多少
40 //輪詢拉取數據 上面的where
41 Message message = connector.getWithoutAck(batchSize);
42 long batchId = message.getId();
43 int size = message.getEntries().size();
44 if (batchId == -1 || size == 0) {
45 //睡眠
46 Thread.sleep(1000);
47 } else {
48 dataHandle(message.getEntries());
49 }
50 connector.ack(batchId);
51 System.out.println("aa"+size);
52 //當隊列裏面堆積的sql大於一定數值的時候就模擬執行
53 if (SQL_QUEUE.size() >= 10) {
54 executeQueueSql();
55 }
56 }
57 } catch (InterruptedException e) {
58 e.printStackTrace();
59 } catch (InvalidProtocolBufferException e) {
60 e.printStackTrace();
61 }
62 } finally {
63 connector.disconnect();
64 }
65
66
67 }
68
69
70
71
72 /**
73 * 模擬執行隊列裏面的sql語句
74 */
75 public static void executeQueueSql() {
76 int size = SQL_QUEUE.size();
77 for (int i = 0; i < size; i++) {
78 String sql = SQL_QUEUE.poll();
79 System.out.println("[sql]----> " + sql);
80 }
81 }
82
83 /**
84 * 數據處理
85 *
86 * @param entrys
87 */
88 private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
89 for (CanalEntry.Entry entry : entrys) {
90 if (EntryType.ROWDATA == entry.getEntryType()) {
91 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
92 CanalEntry.EventType eventType = rowChange.getEventType();
93 if (eventType == EventType.DELETE) {
94 saveDeleteSql(entry);
95 } else if (eventType == EventType.UPDATE) {
96 saveUpdateSql(entry);
97 } else if (eventType == CanalEntry.EventType.INSERT) {
98 saveInsertSql(entry);
99 }
100 }
101 }
102 }
103
104 /**
105 * 保存更新語句
106 *
107 * @param entry
108 */
109 private static void saveUpdateSql(CanalEntry.Entry entry) {
110 try {
111 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
112 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
113 for (CanalEntry.RowData rowData : rowDatasList) {
114 List<Column> newColumnList = rowData.getAfterColumnsList();
115 StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
116 for (int i = 0; i < newColumnList.size(); i++) {
117 sql.append(" " + newColumnList.get(i).getName()
118 + " = '" + newColumnList.get(i).getValue() + "'");
119 if (i != newColumnList.size() - 1) {
120 sql.append(",");
121 }
122 }
123 sql.append(" where ");
124 List<Column> oldColumnList = rowData.getBeforeColumnsList();
125 for (Column column : oldColumnList) {
126 if (column.getIsKey()) {
127 //暫時只支持單一主鍵
128 sql.append(column.getName() + "=" + column.getValue());
129 break;
130 }
131 }
132 SQL_QUEUE.add(sql.toString());
133 }
134 } catch (InvalidProtocolBufferException e) {
135 e.printStackTrace();
136 }
137 }
138
139 /**
140 * 保存刪除語句
141 *
142 * @param entry
143 */
144 private static void saveDeleteSql(CanalEntry.Entry entry) {
145 try {
146 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
147 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
148 for (CanalEntry.RowData rowData : rowDatasList) {
149 List<Column> columnList = rowData.getBeforeColumnsList();
150 StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
151 for (Column column : columnList) {
152 if (column.getIsKey()) {
153 //暫時只支持單一主鍵
154 sql.append(column.getName() + "=" + column.getValue());
155 break;
156 }
157 }
158 SQL_QUEUE.add(sql.toString());
159 }
160 } catch (InvalidProtocolBufferException e) {
161 e.printStackTrace();
162 }
163 }
164
165 /**
166 * 保存插入語句
167 *
168 * @param entry
169 */
170 private static void saveInsertSql(CanalEntry.Entry entry) {
171 try {
172 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
173 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
174 for (CanalEntry.RowData rowData : rowDatasList) {
175 List<Column> columnList = rowData.getAfterColumnsList();
176 StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
177 for (int i = 0; i < columnList.size(); i++) {
178 sql.append(columnList.get(i).getName());
179 if (i != columnList.size() - 1) {
180 sql.append(",");
181 }
182 }
183 sql.append(") VALUES (");
184 for (int i = 0; i < columnList.size(); i++) {
185 sql.append("'" + columnList.get(i).getValue() + "'");
186 if (i != columnList.size() - 1) {
187 sql.append(",");
188 }
189 }
190 sql.append(")");
191 SQL_QUEUE.add(sql.toString());
192 }
193 } catch (InvalidProtocolBufferException e) {
194 e.printStackTrace();
195 }
196 }
197 }
3.如果數據庫值發生改變之後會觸發增刪改,咱們可以拿到這個數據插入到其他數據庫中。