FlinkCDC 2.0使用實踐體驗
一、背景說明
所謂CDC:全稱是 Change Data Capture ,在廣義的概念上,只要能捕獲數據變更的技術,我們都可以稱為 CDC 。通常我們說的 CDC 技術主要面向數據庫的變更,是一種用於捕獲數據庫中數據變更的技術。
目前實時鏈路對於數據的處理是大多數使用的方案是通過工具,對業務數據日誌的監控(如canal/maxwell),並連接到kafka,實現對業務數據的實時獲取,在實時數倉架構上,ods層一般也會設計在kafka(數據入湖另外說),參考下面圖1。而通過FlinkCDC則可以在確保數據一致性的前提下,繞過消息中間組件,Flink實現對數據的直接處理,減少數據的流轉鏈路,另外,由於還支持分佈式處理,因此可以獲得比canal等組件更高的效率,流程如下面圖2。
二、代碼部分
關於版本兼容一點說明:使用StreamAPI的話,1.12的版本是支持CDC2.0,如若使用FlinkSQL,需按照官方指定的版本,使用1.13
/**
* @Author: Rango
* @Date: 2021/09/12/下午10:25
* @Description: FlinkCDC監控MySQ,DataStream寫法,demo不寫checkpoint
*/
public class flincdc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("123456")
.databaseList("test_cdc")
.tableList("test_cdc.cdc_flink") //必須加庫名
.deserializer(new myDeserializationSchema()) //自定義反序列化
//.deserializer(new StringDebeziumDeserializationSchema()) //原反系列化器
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
mysqlDS.print();
env.execute();
}
}
//自定義反序列化器
class myDeserializationSchema implements DebeziumDeserializationSchema<String> {
/*
期望輸出效果
{
db:數據庫名
tb:表名
op:操作類型
befort:{} 數據修改前,create操作沒有該項
after:{} 數據修改後,delete操作沒有該項
}
*/
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject result = new JSONObject();
String[] split = sourceRecord.topic().split("\\.");
result.put("db",split[1]);
result.put("tb",split[2]);
//獲取操作類型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
result.put("op",operation.toString().toLowerCase());
Struct value =(Struct)sourceRecord.value();
JSONObject after = getValueBeforeAfter(value, "after");
JSONObject before = getValueBeforeAfter(value, "before");
if (after!=null){result.put("after",after);}
if (before!=null){result.put("before",before);}
collector.collect(result.toJSONString());
}
public JSONObject getValueBeforeAfter(Struct value,String type){
Struct midStr = (Struct)value.get(type);
JSONObject result = new JSONObject();
if(midStr!=null){
List<Field> fields = midStr.schema().fields();
for (Field field : fields) {
result.put(field.name(),midStr.get(field));
}
return result;
}return null;
}
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
效果展示:
#監控的MySQL數據庫對應的表結構及數據如下:
mysql> select * from test_cdc.cdc_flink;
+------+----------+--------+
| id | name | sex |
+------+----------+--------+
| 1001 | zhangsan | female |
| 1002 | lisilsi | male |
+------+----------+--------+
2 rows in set (0.00 sec)
#命令行提交jar包
./bin/flink run -c com.hll.flincdc FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
默認 initial 模式,也就是任務啟動會把數據庫原有數據全打印出來。其他模式在第三部分介紹
三、2.0版本的主要優化點說明
在1.x的版本有如下提到的幾個問題,而在2.0的版本,則實現了無鎖讀取的方式來實現一致性的保證,並且全量讀取支持checkpoint,失敗無需從頭再開啟任務。
所謂無鎖讀取的方式則是通過全量數據chunk切分後並行讀取,通過高低水位的方式來確保全量數據一致性讀取,而增量部分則是單線程彙報方式,礙於篇幅此處不做源碼解讀,感興趣可以看看源碼BlinlogSplit部分。
四、其他說明
- 關於反序列那塊,由於cdc直連數據庫會有太多冗餘信息,只提取需要內容即可,原生內容為SourceRecord對象,對內容進行對應提取即可,SourceRecord對象完整內容如下:
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1631413470, file=mysql-bin.000002, pos=8281, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test_cdc.cdc_flink', kafkaPartition=null, key=Struct{id=1001}, keySchema=Schema{mysql_binlog_source.test_cdc.cdc_flink.Key:STRUCT}, value=Struct{after=Struct{id=1001,name=zhangsan,sex=female},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1631413470946,snapshot=true,db=test_cdc,table=cdc_flink,server_id=0,file=mysql-bin.000002,pos=8281,row=0},op=r,ts_ms=1631413470950}, valueSchema=Schema{mysql_binlog_source.test_cdc.cdc_flink.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- 關於連接source的模式,可以先看官網介紹:
initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
簡單理解,initial是默認模式,cdc任務啟動後會把數據庫中表原數據全打印出來,可用於歷史全量歷史數據的輸出,而last-offset則是任務啟動後,以後數據庫有數據變更cdc才有數據輸出,用於只關注增量數據的方式。
- FlinkSQL的寫法比較簡單,上面的例子,則寫法如下:
//注意sql寫法必須指定表,每次只能讀一張表,stramapi的方式可以監控整個庫
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE mysql_binlog (" +
" id INT NOT NULL," +
" name STRING," +
" sex STRING," +
" PRIMARY KEY(id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'test_cdc'," +
" 'table-name' = 'cdc_flink')");
學習交流,有任何問題還請隨時評論指出交流。