Flink 程序结构 上篇
- 2019 年 10 月 8 日
- 筆記
欢迎来到 KK 大数据,今天分享的主题是:Flink 程序结构
(这两天公司有发布项目,拖更了两天, 甚是对不住观众老爷 )
我们还是从wordcount程序说起
下面是一段使用 Flink 实现的 WordCount 代码
import org.apache.flink.streaming.api.scala._ object WordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile("d://1.txt") val counts : DataStream[(String,Int)] = text .flatMap(_.toLowerCase.split(" ")) .filter(_.nonEmpty) .map((_,1)) .keyBy(0) .sum(1) counts.print() env.execute("WordCount") } }
整个 flink 程序一共有五步,分别是:创建 Flink 执行环境、创建或加载数据、对数据集进行转换操作、指定计算结果输出位置、调用execute方法触发执行。
下面依次来讲这五个步骤(分两篇文章讲完)
(1)Execution Environment
运行 Flink 程序第一步就是要获取相应的执行环境,决定程序在什么地方执行(本地或者集群上),同时不同的运行环境决定了应用的类型,批量处理作业(ExecutionEnvironment)还是流式处理作业(StreamExecutionEnvionment)。
自动选择环境,如果是在本地执行,则创建本地执行环境,如果是在集群执行,则创建集群执行环境
StreamExecutionEnvironment.getExecutionEnvironment
指定并行度,并创建本地执行环境
StreamExecutionEnvironment.createLocalEnvironment(5)
批处理也是一样:
自动选择环境
ExecutionEnvironment.getExecutionEnvironment
指定并行度,创建本地环境
ExecutionEnvironment.createLocalEnvironment(5)
(2)初始化数据
Flink 提供了不同的数据接口完成数据的初始化,将数据转换为 DataStream<T> 或 DataSet<T> 数据集。
如下的代码,把本地的文本文件读取为 DataStream
val text:DataStream[String] = env.readTextFile("d://1.txt")
Flink 提供了多种数据连接器,用来读取外部数据源数据,比如 kafka,es,hdfs 等等,后续会重点讲。
(3)执行转换操作
数据转换,就是把从输入数据得到的 DataStream 转换成其他数据。
Flink 内置了很多的算子,比如 map、flatMap、filter、keyBy等,我们只需要定义每个算子的逻辑即可。
比如我们的wordcount的代码:
flatMap算子,输入是数组,输出是元素。就是把数组压扁成一个个元素。这里先把每行变成小写,然后按空格切分,输入是一行数据,输出是多个切分后的 单词
.flatMap(_.toLowerCase.split(" "))
filter 过滤算子,留下满足条件的。这里过滤掉空的单词
.filter (_.nonEmpty)
map 算子,一对一转换,输入是一个单词,输出是一个元组(单词,1)
.map((_,1))
按照指定 key 对数据重分区
.keyBy(0)
执行求和操作
.sum(1)
这里,我们只需要传入相应的 Lambda 表达式,就能完成 Function 的定义。
特殊情况下,用户也可以通过实行 Function 接口来完成定义数据处理逻辑。然后将定义好的 Function 应用在对应的算子中即可
(1)通过创建 Class 实行 Function 接口
val counts : DataStream[(String,Int)] = text.map(new MyMapFunction) class MyMapFunction extends MapFunction[String,String] { override def map(value: String): String = value.toUpperCase }
如下我们自己定义了转换大写的 Function,调用 map 的时候,实例化一个传进去即可
(2)创建匿名类实现 Function 接口
val counts : DataStream[(String,Int)] = text.map( new MapFunction[String,String] { override def map(value: String): String = value.toUpperCase() })
这里我们没有创建类,而是 new 了一个接口,实现了接口实现的方法
(3)通过实现 RichFunction 接口
RichFunction 接口有
open 方法
close方法
getRuntimeContext 方法
getIterationRuntimeContext 方法
setRuntimeContext方法
通过这些方法可以获取缓存、状态等 Flink 内部数据,用来实现更加高级的功能,这些功能后续都会讲到!
好,今天就讲到这,下一次讲 Flink 程序结构的 分区 key 指定,输出结果,程序触发 。
观众老爷们,下次见!