flink sql使用中的一个问题
- 2019 年 12 月 15 日
- 筆記
最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。

对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。
而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?
先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。
这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。
sql代码如下,供大家测试参考
package org.table.kafka; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Rowtime; import org.apache.flink.table.descriptors.Schema; public class kafka2kafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9093") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime",Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink1"); Table table = tEnv.sqlQuery("select * from source"); tEnv.registerTable("view",table); tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); System.out.println(env.getExecutionPlan()); // env.execute(); } }
可视化页面链接:
https://flink.apache.org/visualizer/
使用的过程中避免重要的账号密码被泄露。