flink sql使用kafka作為source和sink
- 2019 年 10 月 4 日
- 筆記
大家都知道sql有著簡單,直接,容易上手等優勢,所以現在大有用sql去掉api的趨勢。那麼我們少說廢話,下面先上個sql的列子
val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(10000) env.setParallelism(1) //注入數據源 var tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog()) tableEnv.sqlUpdate( s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test` |select |fstDeptSet, |filedName1, |filedName2, |userId, |brandNames |from kafka.`kafka-k8s`.`pb_internal_test` | """.stripMargin) env.execute("Flink SQL Skeleton")
上面是一個查詢,插入語句,在flink中會被轉為一個任務進行提交
下面我們大概講一下flink內部kafka的實例化過程

有圖可知,主要分為4大步驟,先通過calcite分析sql,轉為相應的relnode,在根據用戶配置的schema和Java spi,過濾出需要的kafka produce和kafka consumer版本。
kafka consumer對應於select部分
kafka produce對應於insert部分