數據的異構實戰(一) 基於canal進行日誌的訂閱和轉換
- 2019 年 10 月 15 日
- 筆記
什麼是數據的異構處理。簡單說就是為了滿足我們業務的擴展性,將數據從某種特定的格式轉換到新的數據格式中來。
為什麼會有這種需求出現呢?
傳統的企業中,主要都是將數據存儲在了關係型資料庫中,例如說MySQL這種資料庫,但是為了滿足需求的擴展,查詢的維度會不斷地增加,那麼這個時候我們就需要做數據的異構處理了。
常見的數據異構有哪些?
例如MySQL數據轉儲到Redis,MySQL數據轉儲到es等等,也是因為這種數據異構的場景開始出現,陸陸續續有了很多中間件在市場中冒出,例如說rocketMq,kafka,canal這種組件。
下邊有一張通俗易懂的數據異構過程圖:
在這裡插入圖片描述
canal進行數據同步
首先,我們需要正確地打開canal伺服器去訂閱binlog日誌。
關於binlog日誌查看常用的幾條命令如下:
#是否啟用了日誌 mysql>show variables like 'log_bin'; #怎樣知道當前的日誌 mysql> show master status; #查看mysql binlog模式 show variables like 'binlog_format'; #獲取binlog文件列表 show binary logs; #查看當前正在寫入的binlog文件 show master statusG #查看指定binlog文件的內容 show binlog events in 'mysql-bin.000002';
注意binlog日誌格式要求為row格式:
ROW格式日誌的特點
記錄sql語句和每個欄位變動的前後情況,能夠清楚每行數據的變化歷史,佔用較多的空間,不會記錄對數據沒有影響的sql,例如說select語句就不會記錄。可以使mysqlbinlog工具去查看內部資訊。
STATEMENT模式的日誌內容
STATEMENT格式的日誌就和它本身的命名有點類似,只是單獨地記錄了sql的內容,但是沒有記錄上下文資訊,在數據會UI福的時候可能會導致數據丟失。
MIX模式模式的日誌內容
這種模式的日誌內容比較靈活,當遇到了表結構變更的時候,就會記錄為statement模式,如果遇到了數據修改的話就會變為row模式。
如何配置canal的相關資訊?
比較簡單,首先通過下載好canal的安裝包,然後我們需要在canal的配置文件上邊做一些手腳:
canal的example文件夾下邊的properties文件 canal.instance.master.address=**.***.***.**:3306 # 日誌的文件名稱 canal.instance.master.journal.name=master-96-bin.000009 canal.instance.dbUsername=**** canal.instance.dbPassword=****
啟動我們的canal程式,然後查看日誌,如果顯示下邊這些內容就表示啟動成功了:
2019-10-13 16:00:30.072 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set 2019-10-13 16:00:30.734 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2019-10-13 16:00:30.783 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
ps:關於canal入門安裝的教程網上有很多,這裡我就不做過多的闡述了。
canal伺服器搭建起來之後,我們便進入了java端的程式編碼部分:
接著再來查看我們的客戶端程式碼,客戶端中我們需要通過java程式獲取canal伺服器的連接,然後進入監聽binlog日誌的狀態。
可以參考下邊的程式程式碼:
package com.sise.client.simple; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.sise.common.dto.TypeDTO; import com.sise.common.handle.CanalDataHandler; import java.net.InetSocketAddress; import java.util.List; import java.util.stream.Collectors; /** * 簡單版本的canal監聽客戶端 * * @author idea * @date 2019/10/12 */ public class SImpleCanalClient { private static String SERVER_ADDRESS = "127.0.0.1"; private static Integer PORT = 11111; private static String DESTINATION = "example"; private static String USERNAME = ""; private static String PASSWORD = ""; public static void main(String[] args) throws InterruptedException { CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD); canalConnector.connect(); canalConnector.subscribe(".*..*"); canalConnector.rollback(); for (; ; ) { Message message = canalConnector.getWithoutAck(100); long batchId = message.getId(); if(batchId!=-1){ // System.out.println(message.getEntries()); System.out.println(batchId); printEntity(message.getEntries()); } } } public static void printEntity(List<CanalEntry.Entry> entries){ for (CanalEntry.Entry entry : entries) { if (entry.getEntryType()!=CanalEntry.EntryType.ROWDATA){ continue; } try { CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { System.out.println(rowChange.getEventType()); switch (rowChange.getEventType()){ //如果希望監聽多種事件,可以手動增加case case INSERT: String tableName = entry.getHeader().getTableName(); //測試選用t_type這張表進行映射處理 if ("t_type".equals(tableName)) { TypeDTO typeDTO = CanalDataHandler.convertToBean(rowData.getAfterColumnsList(), TypeDTO.class); System.out.println(typeDTO); } System.out.println("this is INSERT"); break; default: break; } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } /** * 列印內容 * * @param columns */ private static void printColums(List<CanalEntry.Column> columns){ String line=columns.stream().map(column -> column.getName()+"="+column.getValue()) .collect(Collectors.joining(",")); System.out.println(line); } }
本地監聽到了canal的example文件夾中配置的監聽的日誌資訊之後,就會自動將該日誌裡面記錄的數據進行列印讀取。
那麼這個時候我們還需要做多一步處理,那就是將堅聽到的數據轉換為可識別的對象,然後進行對象轉移處理。
其實光是鏈接獲取到canal的binlog日誌並不困難,接著我們還需要將binlog日誌進行統一的封裝處理,需要編寫一個特定的處理器將日誌的內容轉換為我們常用的DTO類:
下邊這個工具類可以借鑒一下:
package com.sise.common.handle; import com.alibaba.otter.canal.protocol.CanalEntry; import com.sise.common.dto.CourseDetailDTO; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 基於canal的數據處理器 * * @author idea * @data 2019/10/13 */ @Slf4j public class CanalDataHandler extends TypeConvertHandler { /** * 將binlog的記錄解析為一個bean對象 * * @param columnList * @param clazz * @param <T> * @return */ public static <T> T convertToBean(List<CanalEntry.Column> columnList, Class<T> clazz) { T bean = null; try { bean = clazz.newInstance(); Field[] fields = clazz.getDeclaredFields(); Field.setAccessible(fields, true); Map<String, Field> fieldMap = new HashMap<>(fields.length); for (Field field : fields) { fieldMap.put(field.getName().toLowerCase(), field); } if (fieldMap.containsKey("serialVersionUID")) { fieldMap.remove("serialVersionUID".toLowerCase()); } System.out.println(fieldMap.toString()); for (CanalEntry.Column column : columnList) { String columnName = column.getName(); String columnValue = column.getValue(); System.out.println(columnName); if (fieldMap.containsKey(columnName)) { //基礎類型轉換不了 Field field = fieldMap.get(columnName); Class<?> type = field.getType(); if(BEAN_FIELD_TYPE.containsKey(type)){ switch (BEAN_FIELD_TYPE.get(type)) { case "Integer": field.set(bean, parseToInteger(columnValue)); break; case "Long": field.set(bean, parseToLong(columnValue)); break; case "Double": field.set(bean, parseToDouble(columnValue)); break; case "String": field.set(bean, columnValue); break; case "java.handle.Date": field.set(bean, parseToDate(columnValue)); break; case "java.sql.Date": field.set(bean, parseToSqlDate(columnValue)); break; case "java.sql.Timestamp": field.set(bean, parseToTimestamp(columnValue)); break; case "java.sql.Time": field.set(bean, parseToSqlTime(columnValue)); break; } }else{ field.set(bean, parseObj(columnValue)); } } } } catch (InstantiationException | IllegalAccessException e) { log.error("[CanalDataHandler]convertToBean,初始化對象出現異常,對象無法被實例化,異常為{}", e); } return bean; } public static void main(String[] args) throws IllegalAccessException { CourseDetailDTO courseDetailDTO = new CourseDetailDTO(); Class clazz = courseDetailDTO.getClass(); Field[] fields = clazz.getDeclaredFields(); Field.setAccessible(fields, true); System.out.println(courseDetailDTO); for (Field field : fields) { if ("java.lang.String".equals(field.getType().getName())) { field.set(courseDetailDTO, "name"); } } System.out.println(courseDetailDTO); } /** * 其他類型自定義處理 * * @param source * @return */ public static Object parseObj(String source){ return null; } }
接著是canal的核心處理器,主要的目的是將binlog轉換為我們所希望的實體類對象,該類目前主要考慮兼容的數據類型為目前8種,比較有限,如果讀者後續在實際開發中還遇到某些特殊的數據類型可以手動添加到map中。
package com.sise.common.handle; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * 類型轉換器 * * @author idea * @data 2019/10/13 */ public class TypeConvertHandler { public static final Map<Class, String> BEAN_FIELD_TYPE; static { BEAN_FIELD_TYPE = new HashMap<>(8); BEAN_FIELD_TYPE.put(Integer.class, "Integer"); BEAN_FIELD_TYPE.put(Long.class, "Long"); BEAN_FIELD_TYPE.put(Double.class, "Double"); BEAN_FIELD_TYPE.put(String.class, "String"); BEAN_FIELD_TYPE.put(Date.class, "java.handle.Date"); BEAN_FIELD_TYPE.put(java.sql.Date.class, "java.sql.Date"); BEAN_FIELD_TYPE.put(java.sql.Timestamp.class, "java.sql.Timestamp"); BEAN_FIELD_TYPE.put(java.sql.Time.class, "java.sql.Time"); } protected static final Integer parseToInteger(String source) { if (isSourceNull(source)) { return null; } return Integer.valueOf(source); } protected static final Long parseToLong(String source) { if (isSourceNull(source)) { return null; } return Long.valueOf(source); } protected static final Double parseToDouble(String source) { if (isSourceNull(source)) { return null; } return Double.valueOf(source); } protected static final Date parseToDate(String source) { if (isSourceNull(source)) { return null; } if (source.length() == 10) { source = source + " 00:00:00"; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date; try { date = sdf.parse(source); } catch (ParseException e) { return null; } return date; } protected static final java.sql.Date parseToSqlDate(String source) { if (isSourceNull(source)) { return null; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); java.sql.Date sqlDate; Date utilDate; try { utilDate = sdf.parse(source); } catch (ParseException e) { return null; } sqlDate = new java.sql.Date(utilDate.getTime()); return sqlDate; } protected static final java.sql.Timestamp parseToTimestamp(String source) { if (isSourceNull(source)) { return null; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date; java.sql.Timestamp timestamp; try { date = sdf.parse(source); } catch (ParseException e) { return null; } timestamp = new java.sql.Timestamp(date.getTime()); return timestamp; } protected static final java.sql.Time parseToSqlTime(String source) { if (isSourceNull(source)) { return null; } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); Date date; java.sql.Time time; try { date = sdf.parse(source); } catch (ParseException e) { return null; } time = new java.sql.Time(date.getTime()); return time; } private static boolean isSourceNull(String source) { if (source == "" || source == null) { return true; } return false; } }
ps: t_type表是一張我們用於做測試時候使用的表,這裡我們可以根據自己實際的業務需要訂製不同的實體類對象
現在我們已經可以通過binlog轉換為實體類了,那麼接下來就是如何將實體類做額外的傳輸和處理了。數據的傳輸我們通常會藉助mq這類型的中間件來進行操作,關於這部分的內容我會在後續的文章中做詳細的輸出。
推薦閱讀(點擊即可跳轉閱讀)
2.面試題內容聚合
3.設計模式內容聚合
5.多執行緒內容聚合