flink sql使用kafka作为source和sink

  • 2019 年 10 月 4 日
  • 筆記

大家都知道sql有着简单,直接,容易上手等优势,所以现在大有用sql去掉api的趋势。那么我们少说废话,下面先上个sql的列子

val env = StreamExecutionEnvironment.getExecutionEnvironment      env.enableCheckpointing(10000)      env.setParallelism(1)      //注入数据源      var tableEnv: StreamTableEnvironment  = TableEnvironment.getTableEnvironment(env)      tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())      tableEnv.sqlUpdate(        s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`           |select           |fstDeptSet,           |filedName1,           |filedName2,           |userId,           |brandNames           |from kafka.`kafka-k8s`.`pb_internal_test`           | """.stripMargin)      env.execute("Flink SQL Skeleton")

上面是一个查询,插入语句,在flink中会被转为一个任务进行提交

下面我们大概讲一下flink内部kafka的实例化过程

有图可知,主要分为4大步骤,先通过calcite分析sql,转为相应的relnode,在根据用户配置的schema和Java spi,过滤出需要的kafka produce和kafka consumer版本。

kafka consumer对应于select部分

kafka produce对应于insert部分

Exit mobile version