Canal實時解析mysql binlog數據實戰
一、說明
通過canal實時監聽mysql binlog日誌文件的變化,並將數據解析出來
二、環境準備
1、創建maven項目並修改pom.xml配置文件
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
2、嗦程式碼
特別說明:在解析數據時,相當於程式是客戶端,客戶端在連接canal服務端時是不需要用戶名和密碼
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { // 獲取連接 CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111), "example","",""); while(true) { // 連接 canalConnector.connect(); // 訂閱資料庫 canalConnector.subscribe("CanalDb.*"); // 獲取數據 Message message = canalConnector.get(100); // 獲取Entry集合 List<CanalEntry.Entry> entries=message.getEntries(); // 判斷集合是否為空,如果為空,則執行緒等待2秒再拉取數據 if (entries.size()<=0) { System.out.println("當次抓取沒有數據,休息一會兒。。。"); Thread.sleep(2000); } else { // 遍歷entries,單條解析 for (CanalEntry.Entry entry:entries) { // 1,獲取表名 String tableName=entry.getHeader().getTableName(); // 2,獲取類型 CanalEntry.EntryType entryType=entry.getEntryType(); // 3,獲取序列化後的數據 ByteString storeValue=entry.getStoreValue(); // 4.判斷當前entryType類型是否為ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5.反序列化數據 CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue); //6.獲取當前事件的操作類型 CanalEntry.EventType eventType=rowChange.getEventType(); //7.獲取數據集 List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList(); //8.遍歷rowDataList並列印數據集 for(CanalEntry.RowData rowData:rowDataList) { JSONObject beforData=new JSONObject(); List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList(); for (CanalEntry.Column column:beforClountList) { beforData.put(column.getName(),column.getValue()); } JSONObject afterData=new JSONObject(); List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList(); for (CanalEntry.Column column:afterClountList) { afterData.put(column.getName(),column.getValue()); } // 列印數據 System.out.println(""+tableName+ ",EventType:"+eventType+ ",Before:"+beforData+ ",After:"+afterData); } } else { System.out.println("當前操作類型為"+entryType); } } } } } }
三、項目效果