Flink 案例分析

Flink程序的执行过程

no-desc 说明 详情
1-env 获取flink的执行环境

批处理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

流处理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2-source 加载数据 1) socketTextStream – 读取Socket 数据流
​2) readTextFile() – 逐行读取文本文件获取数据流,每行都返回字符串
3) fromCollection() – 从集合中创建数据流
​4) fromElements() – 从给定的数据对象创建数据流,所有数据类型要一致
​5) addSource() – 添加新的源函数,例如从kafka 中读取数据,参见读取kafka 数据案例
3-transformation 对加载的数据进行转换  
4-sink 对结果进行保存或者打印 1) writeAsText() – 以字符串的形式逐行写入文件,调用每个元素的toString()得到写入的字符串
2) writeAsCsv() – 将元组写出以逗号分隔的csv 文件。注意:只能作用到元组数据上
​3) print() – 控制台直接输出结果,调用对象的toString()方法得到输出结果。
​4) addSink() – 自定义接收函数。例如将结果保存到kafka 中,参见kafka 案例
5-execute 触发flink程序的执行 代码流程必须符合 source ->transformation -> sink transformation 都是执行,需要最后使用env.execute()或者使用 print(),count(),collect() 触发执行

注意

Flink编程不是基于K,V格式的编程,通过某些方式来指定虚拟key

Flink中的tuple最多支持25个元素,每个元素是从0开始

案例1: 词频统计

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("/yourpath/in.txt");

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        String outputPath = "/yourpath/out.txt";
        counts.writeAsCsv(outputPath, "\n", " ");
        env.execute("myflink");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.split(" ");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

 

参考

//blog.csdn.net/qq_40929921/article/details/99603150

Tags: