Canal 實戰 | 第一篇:SpringBoot 整合 Canal + RabbitMQ 實現監聽 MySQL 數據庫同步更新 Redis 緩存
- 2021 年 11 月 8 日
- 筆記
- canal, RabbitMQ, Spring Boot, springboot
一. Canal 簡介
canal [kə’næl],譯意為水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日誌解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。
基於日誌增量訂閱和消費的業務包括
- 數據庫鏡像
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業務 cache 刷新
- 帶業務邏輯的增量數據處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
二. Canal 工作原理
1. MySQL主備複製原理
- MySQL master 將數據變更寫入二進制日誌( binary log, 其中記錄叫做二進制日誌事件binary log events,可以通過 show binlog events 進行查看)
- MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
- MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
2. Canal 工作原理
-
Canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
-
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
-
Canal 解析 binary log 對象(原始為 byte 流)
三. Canal 實戰
1. 需求分析
有來項目 youlai-mall 當前進度下使用Redis緩存MySQL數據庫中的OAuth2客戶端信息、角色權限映射關係、菜單路由,現在這樣有兩個很明顯的問題:
- 在後台管理界面修改角色、菜單、權限和OAuth2客戶端任何一方信息都需要讓緩存失效或者更新緩存,代碼耦合性高;
- 數據庫直接修改上面相關信息,緩存無法失效或更新。
第一種情況至少有解決方案,無非就在代碼層面上清緩存或者更新緩存,但是如果是直接修改數據庫呢?實際工作可能經常會遇到直接修改數據庫的場景,本篇通過SpringBoot 整合 Canal + RabbitMQ 實現對數據庫的監聽然後同步讓緩存失效或者更新。當然有來項目引入 Canal 中間件刷新緩存只是個開始,接下來還會使用 Canal 同步商品表至 ElasticSearch。
2. MySQL開啟 binlog 日誌
MySQL 部署://www.cnblogs.com/haoxianrui/p/15488810.html
開啟 biglog 日誌
vim /etc/my.cnf
添加配置
[mysqld]
log-bin=mysql-bin # 開啟binlog
binlog-format=ROW # 選擇ROW模式
server_id=1 # 配置MySQL replaction需要定義,不和Canal的slaveId重複即可
重啟MySQL ,查看配置是否生效
show variables like 'log_bin';
3. RabbitMQ 隊列創建
- 添加交換機 canal.exchange
- 添加隊列 canal.queue
- 隊列綁定交換機
4. Canal 配置和啟動
Canal Server下載
- 官方文檔://github.com/alibaba/canal/wiki
- 項目地址://github.com/alibaba/canal
- 下載地址://github.com/alibaba/canal/releases
進入下載地址,選擇 canal.deployer-1.1.5.tar.gz
將壓縮包解壓,我這裡把最後解壓出來的文件放入 有來項目 的middleware中間件文件,和之前的 nacos 和 sentinel 同一個套路。
Canal Server配置
需要配置的東西就兩項,一個是監聽數據庫配置,另一個是 RabbitMQ 連接配置。
改動的兩個文件分別是 Canal 配置文件 canal.properties
和 實例配置文件 instance.properties
㊙️:一個 Server 可以配置多個實例監聽 ,Canal 功能默認自帶的有個 example 實例,本篇就用 example 實例 。如果增加實例,複製 example 文件夾內容到同級目錄下,然後在 canal.properties
指定添加實例的名稱。
-
canal.properties
配置 Canal 服務方式為 RabbitMQ 和連接配置(🏷️ 只列出需要修改的地方)
# tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rabbitMQ ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = x.youlai.tech rabbitmq.virtual.host =/ rabbitmq.exchange =canal.exchange rabbitmq.username =guest rabbitmq.password =guest rabbitmq.deliveryMode =
-
instance.properties
監聽數據庫配置(🏷️ 只列出需要修改的地方)
# position info canal.instance.master.address=x.youlai.tech:3306 # username/password canal.instance.dbUsername=root canal.instance.dbPassword=root # mq config canal.mq.topic=canal.routing.key
5. SpringBoot 整合 Canal + RabbitMQ
🏠 完整源碼://gitee.com/youlaitech/youlai-mall
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ連接配置
spring:
rabbitmq:
host: x.youlai.tech
port: 5672
username: guest
password: guest
監聽邏輯處理
/**
* Canal + RabbitMQ 監聽數據庫數據變化
*
* @author <a href="mailto:[email protected]">haoxr</a>
* @date 2021/11/4 23:14
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {
private final ISysPermissionService permissionService;
private final ISysOauthClientService oauthClientService;
private final ISysMenuService menuService;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(String message) {
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
String tableName = canalMessage.getTable();
log.info("Canal 監聽 {} 發生變化;明細:{}", tableName, message);
if ("sys_oauth_client".equals(tableName)) {
log.info("======== 清除客戶端信息緩存 ========");
oauthClientService.cleanCache();
} else if (Arrays.asList("sys_permission", "sys_role", "sys_role_permission").contains(tableName)) {
log.info("======== 刷新角色權限緩存 ========");
permissionService.refreshPermRolesRules();
} else if (Arrays.asList("sys_menu", "sys_role", "sys_role_menu").contains(tableName)) {
log.info("======== 清理菜單路由緩存 ========");
menuService.cleanCache();
}
}
}
6. 實戰測試
🏷️ 如果使用有來項目線上 RabbitMQ 測試,記得需要新建隊列,否者多人消費同一隊列會讓你覺得 Canal 監聽數據有丟失的現象。
接下來模擬測試,當直接在數據庫修改菜單數據,能否讓 Redis 的菜單路由緩存失效。
啟動 Canal
切換到項目的 cd ./middleware/canal/deployer/bin startup
目錄下,輸入 startup
啟動 Canal
啟動 youlai-admin
應用,測試效果如下,可見最後菜單路由緩存在直接在數據庫修改菜單表數據時會失效,達到預期效果。
四. 總結
本篇通過 Canal + RabbitMQ 實現對 MySQL 數據變動監聽,能夠應對實際工作直接修改數據庫數據後讓緩存失效或者刷新的場景。有來項目引入 Canal 本篇只是個開始,因為 Canal 的應用場景太豐富了,接下來有來項目使用 Canal 同步 MySQL 數據庫的商品數據至 ElasticSearch 索引庫,個人感覺以後會越來越火,所以建議有必要深入了解這個 Canal 框架。
五. 聯繫信息
有興趣進交流群的童鞋歡迎加群, 純屬學習交流群,無任何利益,二維碼過期可加我微信備註「有來」即可,我拉你進群,另外如果有興趣加入開源項目 youlai-mall 開發的歡迎私信我。
有來②群二維碼 | 【有來小店】微信小程序體驗碼 | 我的微信 |
---|---|---|
![]() |
![]() |
![]() |