flink sql使用kafka作為source和sink

  • 2019 年 10 月 4 日
  • 筆記

大家都知道sql有著簡單,直接,容易上手等優勢,所以現在大有用sql去掉api的趨勢。那麼我們少說廢話,下面先上個sql的列子

val env = StreamExecutionEnvironment.getExecutionEnvironment      env.enableCheckpointing(10000)      env.setParallelism(1)      //注入數據源      var tableEnv: StreamTableEnvironment  = TableEnvironment.getTableEnvironment(env)      tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())      tableEnv.sqlUpdate(        s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`           |select           |fstDeptSet,           |filedName1,           |filedName2,           |userId,           |brandNames           |from kafka.`kafka-k8s`.`pb_internal_test`           | """.stripMargin)      env.execute("Flink SQL Skeleton")

上面是一個查詢,插入語句,在flink中會被轉為一個任務進行提交

下面我們大概講一下flink內部kafka的實例化過程

有圖可知,主要分為4大步驟,先通過calcite分析sql,轉為相應的relnode,在根據用戶配置的schema和Java spi,過濾出需要的kafka produce和kafka consumer版本。

kafka consumer對應於select部分

kafka produce對應於insert部分