Spark 3.x Spark Core详解 & 性能优化
Spark Core
1. 概述
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎
1.1 Hadoop vs Spark
上面流程对应Hadoop的处理流程,下面对应着Spark的处理流程
Hadoop
- Hadoop 是由 java 语言编写的,在分布式服务器集群上存储海量数据并运行分布式 分析应用的开源框架
- 作为 Hadoop 分布式文件系统,HDFS 处于 Hadoop 生态圈的最下层,存储着所有的 数 据 , 支持着 Hadoop的所有服务 。 它的理论基础源于Google 的 The GoogleFile System 这篇论文,它是 GFS 的开源实现。
- MapReduce 是一种编程模型,Hadoop 根据 Google 的 MapReduce 论文将其实现, 作为 Hadoop 的分布式计算模型,是 Hadoop 的核心。基于这个框架,分布式并行 程序的编写变得异常简单。综合了 HDFS 的分布式存储和 MapReduce 的分布式计 算,Hadoop 在处理海量数据时,性能横向扩展变得非常容易。
- HBase 是对 Google 的 Bigtable 的开源实现,但又和 Bigtable 存在许多不同之处。 HBase 是一个基于 HDFS 的分布式数据库,擅长实时地随机读/写超大规模数据集。 它也是 Hadoop 非常重要的组件。
Spark
- Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎
- Spark Core 中提供了 Spark 最基础与最核心的功能
- Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
- Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。
由上面的信息可以获知,Spark 出现的时间相对较晚,并且主要功能主要是用于数据计算, 所以其实 Spark 一直被认为是 Hadoop 框架的升级版。
Spark or Hadoop
- Hadoop MapReduce 由于其设计初衷并不是为了满足循环迭代式数据流处理,因此在多并行运行的数据可复用场景(如:机器学习、图挖掘算法、交互式数据挖掘算法)中存在诸多计算效率等问题。
- 所以 Spark 应运而生,Spark 就是在传统的 MapReduce 计算框架的基础上,利用其计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速 度,并将计算单元缩小到更适合并行计算和重复使用的 RDD 计算模型。
- Spark 是一个分布式数据快速分析项目。它的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。
- Spark 和Hadoop 的根本差异是多个作业之间的数据通信问题 : Spark 多个作业之间数据 通信是基于内存,而 Hadoop 是基于磁盘。
1.2 Spark 核心模块
Spark Core
- Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL, Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的
Spark SQL
- Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
Spark Streaming
- Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。
Spark MLlib
- MLlib 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
Spark Graphx
- GraphX 是 Spark 面向图计算提供的框架与算法库。
1.3 Spark应用场景
- 低延时的海量数据计算需求
- 低延时SQL交互查询需求
- 准实时(秒级)海量数据计算需求
2. Spark 运行环境
2.1 Local模式
所谓的 Local 模式,就是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等
2.1.1 安装部署
在官网下载安装包,将 spark-XX-bin-hadoopXX.tgz 文件上传到 Linux 并解压缩,放置在指定位置,路径中不要包含中文或空格。
tar -zxvf spark-XXX-bin-hadoop.XX.tgz -C /opt/module
cd /opt/module
mv spark-3.0.0-bin-hadoop3.2 spark-local
2.1.2 启动Local环境
-
进入解压缩后的路径,执行如下指令
bin/spark-shell
可以在命令行中,执行scala命令,也可以调用spark
测试
在解压缩文件夹下的 data 目录中,添加 word.txt 文件。
Hello Scala Hello Spark
在命令行工具中执行如下代码指令
sc.textFile("data/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect
-
启动成功后,可以输入网址进行 Web UI 监控页面访问
//虚拟机or本机ip地址:4040
-
退出
按键 Ctrl+C 或输入 Scala 指令
:quit
2.1.3 提交应用
/opt/module/spark-local/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_XXXXXjar \
10
- –class
- 表示要执行程序的主类,此处可以更换为自己写的应用程序
- –master local[2]
- 部署模式,默认为本地模式,数字表示分配的虚拟 CPU 核数量
- spark-examples_XXX.jar
- 运行的应用类所在的 jar 包(根据实际版本输入),实际使用时,可以设定自己的 jar 包
- 数字 10 表示程序的入口参数,用于设定当前应用的任务数量
2.2 Standlone模式
local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的 集群中去执行,这里我们来看看只使用 Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。
集群规划:
2.2.1 安装部署
注意: 每个节点上配置相同,可配置一台节点,然后上传到其他节点便可
解压缩文件
将 spark-XXX.tgz 文件上传到 Linux 并解压缩在指定位置
tar -zxvf spark-XXX.tgz -C /opt/module
cd /opt/module
mv spark-XXX spark-standalone
修改配置文件
-
进入解压缩后路径的 conf 目录,修改 workers.template 文件名为 workers
有些老版本是slaves.template
修改 slaves 文件,添加 worker 节点
# 根据自己的主机节点名进行添加 node1 node2 node3
-
修改 spark-env.sh.template 文件名为 spark-env.sh
修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点
# 根据实际情况进行修改 export JAVA_HOME=/XXX/jdkXXX SPARK_MASTER_HOST=node1 SPARK_MASTER_PORT=7077
注意:7077 端口,相当于 hadoop3 内部通信的 8020 端口,此处的端口需要确认自己的 Hadoop 配置
最后
将配置好的spark,分别上传到每一个节点上
2.2.2 启动集群
在任意节点上,执行脚本命令
/opt/module/spark-standlone/sbin/start-all.sh
查看 Master 资源监控 Web UI 界面: //node1:8080
关闭集群
/opt/module/spark-standlone/sbin/stop-all.sh
2.2.3 提交应用
/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10
- –class 表示要执行程序的主类
- –master spark://linux1:7077 独立部署模式,连接到 Spark 集群
- spark-examples_XXX.jar 运行类所在的 jar 包
- 数字 10 表示程序的入口参数,用于设定当前应用的任务数量
执行任务时,会产生多个 Java 进程
2.2.4 提交参数说明
bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]
2.2.5 配置历史服务
由于 spark-shell 停止掉后,集群监控 node1:4040 页面就看不到历史任务的运行情况,所以 开发时都配置历史服务器记录任务运行情况
-
修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
修改 spark-defaults.conf 文件,配置日志存储路径
spark.eventLog.enabled true spark.eventLog.dir hdfs://node1:8020/logs
注意:路径自己设置,需要启动 hadoop 集群,HDFS 上的 directory 目录需要提前存在。
-
修改 spark-env.sh 文件, 添加日志配置
前后路径保持一致
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node1:8020/logs -Dspark.history.retainedApplications=30"
参数 1 含义:WEB UI 访问的端口号为 18080
参数 2 含义:指定历史服务器日志存储路径
参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
注意:每一个节点上配置保持一致
重新启动集群和历史服务
/opt/module/spark-standlone/sbin/start-all.sh
/opt/module/spark-standlone/sbin/start-history-server.sh
重新执行任务
/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10
查看历史服务://node1:18080
2.3 YARN模式
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。
2.3.1 安装部署
注意: 每个节点上配置相同,可配置一台节点,然后上传到其他节点便可
解压缩文件
将 spark-XXX.tgz 文件上传到 Linux 并解压缩在指定位置
tar -zxvf spark-XXX.tgz -C /opt/module
cd /opt/module
mv spark-XXX spark-yarn
修改配置文件
修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR\HADOOP_CONF_DIR 配置
export JAVA_HOME=/XXX/jdk1XX
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
2.3.2 启动集群
启动HDFS和YARN集群
2.3.3 提交应用
/opt/module/spark-yarn/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_XXX.jar \
10
查看显示的链接页面,点击 History,查看历史页面
2.2.4 配置历史服务
-
修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true spark.eventLog.dir hdfs://node1:8020/logs
注意:需要启动 hadoop 集群,HDFS 上的目录需要提前存在
-
修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node1:8020/logs -Dspark.history.retainedApplications=30"
参数 1 含义:WEB UI 访问的端口号为 18080
参数 2 含义:指定历史服务器日志存储路径
参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
-
修改 spark-defaults.conf
spark.yarn.historyServer.address=node1:18080 spark.history.ui.port=18080
注意:每一个节点上配置保持一致
-
启动历史服务
sbin/start-history-server.sh
-
重新提交应用
/opt/module/spark-yarn/bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_XXX.jar \ 10
3. Spark 运行架构
3.1 运行架构
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。
如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。
3.2 核心组件
3.2.1 Driver & Executor
计算组件
Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在 Executor 之间调度任务(task)
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类。
Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 有两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。
3.2.2 Master & Worker
资源管理组件
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。
3.2.3 ApplicationMaster
Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。
3.3 核心概念
3.3.1 Executor 与 Core
Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数 量。
应用程序相关启动参数如下:
3.3.2 并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。
那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。
3.3.3 有向无环图(DAG)
资源之间的依赖关系,不能成环,会形成死锁
大数据计算引擎框架我们根据使用方式的不同一般会分为四类:
- 其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map 阶段 和 Reduce 阶段
- 对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框 架的产生。
- 因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。
- 接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。
这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。
3.4 提交流程
基于Yarn环境
Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。
两种模式主要区别在于:Driver 程序的运行节点位置
3.4.1 Yarn Client 模式
Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试
- Driver 在任务提交的本地机器上运行
- Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
- ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
- ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后 ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
- Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
- 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。
3.4.2 Yarn Cluster 模式
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境
- 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
- 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
- Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程
- Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
- 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。
4. Spark 核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。
三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
4.1 RDD
4.1.1 什么是 RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
-
弹性
-
存储的弹性:内存与磁盘的自动切换
-
容错的弹性:数据丢失可以自动恢复
-
计算的弹性:计算出错重试机制
-
分片的弹性:可根据需要重新分片
-
-
分布式:数据存储在大数据集群不同节点上
-
数据集:RDD 封装了计算逻辑,并不保存数据
-
数据抽象:RDD 是一个抽象类,需要子类具体实现
-
不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
-
可分区、并行计算
RDD vs IO
RDD的数据处理方式类似于IO流,也有装饰者设计模式
RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前的封装全部都是功能上的扩展
RDD是不保存数据的,但是IO可以临时保存一部分数据
4.1.2 核心属性
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
分区列表
-
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
protected def getPartitions: Array[Partition]
分区计算函数
-
Spark 在计算时,是使用分区函数对每一个分区进行计算
@DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T]
RDD 之间的依赖关系
-
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
分区器(可选)
-
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
@transient val partitioner: Option[Partitioner] = None
首选位置(可选)
-
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
4.1.3 执行原理
-
数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合
-
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD的工作原理
-
启动 Yarn 集群环境
-
Spark 通过申请资源创建调度节点和计算节点
-
Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
-
调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算
4.1.4 RDD 创建
在 Spark 中创建 RDD 的创建方式可以分为四种
-
从集合(内存)中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.parallelize(List(1,2,3,4)) val rdd2 = sparkContext.makeRDD( List(1,2,3,4)) rdd1.collect().foreach(println) rdd2.collect().foreach(println) sparkContext.stop()
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } // makeRDD方法可以传递第二个参数,这个参数表示分区的数量 // 第二个参数可以不传递的,那么makeRDD方法会使用默认值 : defaultParallelism(默认并行度) // scheduler.conf.getInt("spark.default.parallelism", totalCores) // spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism // 如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数 // val rdd = sc.makeRDD(List(1,2,3,4),2)
-
从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()
-
从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD
-
直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用
4.1.5 RDD并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4)
val fileRDD: RDD[String] = sparkContext.textFile( "input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
4.2 RDD 转换算子
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型
Value | DESC |
---|---|
map | 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换 |
mapPartitions | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据 |
mapPartitionsWithIndex | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引 |
flatMap | 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射 |
glom | 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 |
groupBy | 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组 |
filter | 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。 |
sample | 根据指定的规则从数据集中抽取数据 |
distinct | 将数据集中重复的数据去重 |
coalesce | 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本 |
repartition | 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程 |
sortBy | 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程 |
双Value | DESC |
intersection | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
union | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD |
subtract | 以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来 |
zip | 将两个 RDD 中的元素,以键值对的形式进行合并 |
Key – Value | DESC |
partitionBy | 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner |
reduceByKey | 可以将数据按照相同的 Key 对 Value 进行聚合 |
groupByKey | 将数据源的数据根据 key 对 value 进行分组 |
aggregateByKey | 将数据根据不同的规则进行分区内计算和分区间计算 |
foldByKey | 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey |
combineByKey | 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function),combineByKey()允许用户返回值的类型与输入不一致 |
sortByKey | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序 的 |
join | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD |
leftOuterJoin | 类似于 SQL 语句的左外连接 |
cogroup | (join & group)在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
4.2.1 value
map
-
函数签名
def map[U: ClassTag](f: T => U): RDD[U]
-
函数说明
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD1: RDD[Int] = dataRDD.map( num => { num * 2 } ) val dataRDD2: RDD[String] = dataRDD1.map( num => { "" + num} )
mapPartitions
-
函数签名
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
-
函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据
val dataRDD1: RDD[Int] = dataRDD.mapPartitions( datas => { datas.filter(_==2) } ) // mapPartitions : 可以以分区为单位进行数据转换操作 // 但是会将整个分区的数据加载到内存进行引用 // 如果处理完的数据是不会被释放掉,存在对象的引用。 // 在内存较小,数据量较大的场合下,容易出现内存溢出。
map 和 mapPartitions 的区别?
- Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
- Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
- MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
- Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。
mapPartitionsWithIndex
-
函数签名
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
-
函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
val dataRDD1 = dataRDD.mapPartitionsWithIndex( (index, datas) => { datas.map(index, _) } )
flatMap
-
函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
-
函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)),1) val dataRDD1 = dataRDD.flatMap(list => list)
glom
-
函数签名
def glom(): RDD[Array[T]]
-
函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
groupBy
-
函数签名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
-
函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) val dataRDD1 = dataRDD.groupBy(_%2)
filter
-
函数签名
def filter(f: T => Boolean): RDD[T]
-
函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) val dataRDD1 = dataRDD.filter(_%2 == 0)
sample
-
函数签名
def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
-
函数说明
根据指定的规则从数据集中抽取数据
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4 ),1) // 抽取数据不放回(伯努利算法) // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。 // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要 // 第一个参数:抽取的数据是否放回,false:不放回 // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是当前系统时间 val dataRDD1 = dataRDD.sample(false, 0.5) // 抽取数据放回(泊松算法) // 第一个参数:抽取的数据是否放回,true:放回;false:不放回 // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数 // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是当前系统时间 val dataRDD2 = dataRDD.sample(true, 2)
distinct
-
函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T] def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
函数说明
将数据集中重复的数据去重
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1) val dataRDD1 = dataRDD.distinct() val dataRDD2 = dataRDD.distinct(2)
coalesce
-
函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) :RDD[T]
-
函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),6) val dataRDD1 = dataRDD.coalesce(2) // coalesce方法默认情况下不会将分区的数据打乱重新组合 // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜 // 如果想要让数据均衡,可以进行shuffle处理
repartition
-
函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),2) val dataRDD1 = dataRDD.repartition(4)
两者区别
coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
所以如果想要实现扩大分区的效果,需要使用shuffle操作
spark提供了一个简化的操作
- 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
- 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
sortBy
-
函数签名
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
-
函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),2) val dataRDD1 = dataRDD.sortBy(num=>num, false, 4) // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式 // sortBy默认情况下,不会改变分区。但是中间存在shuffle操作
4.2.2 double value
交集,并集和差集要求两个数据源数据类型保持一致
intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
def intersection(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
def subtract(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)
subtract
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
def subtract(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
zip
将两个 RDD 中的元素,以键值对的形式进行合并。
-
数据类型可以不一致
-
两个数据源要求分区数量要保持一致
-
两个数据源要求分区中数据数量保持一致
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)
4.2.3 key-value
partitionBy
-
函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
-
函数说明 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
import org.apache.spark.HashPartitioner val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3) // RDD => PairRDDFunctions // 隐式转换(二次编译) // 重分区的分区器与当前RDD的分区器一样,则不会再次分区 val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
reduceByKey
-
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
-
函数说明
可以将数据按照相同的 Key 对 Value 进行聚合
// reduceByKey : 相同的key的数据进行value数据的聚合操作 // scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合 // 【1,2,3】 // 【3,3】 // 【6】 // reduceByKey中如果key的数据只有一个,是不会参与运算的。 val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.reduceByKey(_+_) val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
groupByKey
spark中,shuffle操作必须落盘处理,不能在内存中数据等待,会导致内存溢出
-
函数签名
def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
-
函数说明
将数据源的数据根据 key 对 value 进行分组
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.groupByKey() val dataRDD3 = dataRDD1.groupByKey(2) val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
两者区别
-
从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
-
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
aggregateByKey
-
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
-
函数说明
将数据根据不同的规则进行分区内计算和分区间计算
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_) // TODO : 取出每个分区内相同 key 的最大值然后分区间相加 // aggregateByKey 算子是函数柯里化,存在两个参数列表 // 1. 第一个参数列表中的参数表示初始值 // 2. 第二个参数列表中含有两个参数 // 2.1 第一个参数表示分区内的计算规则 // 2.2 第二个参数表示分区间的计算规则
foldByKey
-
函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
-
函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
combineByKey
-
函数签名
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
-
函数说明
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)) val input: RDD[(String, Int)] = sc.makeRDD(list, 2) val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey( (_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) )
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
-
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
-
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相 同
-
AggregateByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
-
CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同
// reduceByKey:
combineByKeyWithClassTag[V](
(v: V) => v, // 第一个值不会参与计算
func, // 分区内计算规则
func, // 分区间计算规则
)
// aggregateByKey :
combineByKeyWithClassTag[U](
(v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedSeqOp, // 分区内计算规则
combOp, // 分区间计算规则
)
// foldByKey:
combineByKeyWithClassTag[V](
(v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedFunc, // 分区内计算规则
cleanedFunc, // 分区间计算规则
)
// combineByKey :
combineByKeyWithClassTag(
createCombiner, // 相同key的第一条数据进行的处理函数
mergeValue, // 表示分区内数据的处理函数
mergeCombiners, // 表示分区间数据的处理函数
)
sortByKey
-
函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
-
函数说明
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true) val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
join
-
函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
-
函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"))) val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd.join(rdd1).collect().foreach(println)
leftOuterJoin
-
函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
-
函数说明
类似于 SQL 语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
cogroup
-
函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
-
函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3))) val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
4.3 RDD 行动算子
4.3.1 reduce
-
函数签名
def reduce(f: (T, T) => T): T
-
函数说明
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 聚合数据 val reduceResult: Int = rdd.reduce(_+_)
4.3.2 collect
-
函数签名
def collect(): Array[T]
-
函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集数据到 Driver rdd.collect().foreach(println)
4.3.3 count
-
函数签名
def count(): Long
-
函数说明
返回 RDD 中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 返回 RDD 中元素的个数 val countResult: Long = rdd.count()
4.3.4 first
-
函数签名
def first(): T
-
函数说明
返回 RDD 中的第一个元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 返回 RDD 中元素的个数 val firstResult: Int = rdd.first() println(firstResult)
4.3.5 take
-
函数签名
def take(num: Int): Array[T]
-
函数说明
返回一个由 RDD 的前 n 个元素组成的数组
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 返回 RDD 中元素的个数 val takeResult: Array[Int] = rdd.take(2) println(takeResult.mkString(","))
4.3.6 takeOrdered
-
函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
-
函数说明
返回该 RDD 排序后的前 n 个元素组成的数组
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4)) // 返回 RDD 中元素的个数 val result: Array[Int] = rdd.takeOrdered(2)
4.3.7 aggregate
-
函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
-
函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8) // 将该 RDD 所有元素相加得到结果 //val result: Int = rdd.aggregate(0)(_ + _, _ + _) val result: Int = rdd.aggregate(10)(_ + _, _ + _)
4.3.8 fold
-
函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
-
函数说明
折叠操作,aggregate 的简化版操作
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val foldResult: Int = rdd.fold(0)(_+_)
4.3.9 countByKey
-
函数签名
def countByKey(): Map[K, Long]
-
函数说明
折叠操作,aggregate 的简化版操作
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c"))) // 统计每种 key 的个数 val result: collection.Map[Int, Long] = rdd.countByKey()
4.3.10 save 相关算子
-
函数签名
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
-
函数说明
将数据保存到不同格式的文件中
// 保存成 Text 文件 rdd.saveAsTextFile("output") // 序列化成对象保存到文件 rdd.saveAsObjectFile("output1") // 保存成 Sequencefile 文件 rdd.map((_,1)).saveAsSequenceFile("output2")
4.3.11 foreach
-
函数签名
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
-
函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集后打印 rdd.map(num=>num).collect().foreach(println) println("****************") // 分布式打印 rdd.foreach(println)
4.4 RDD 序列化
4.4.1 闭包检查
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。
那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
Scala2.12 版本后闭包编译方式发生了改变
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行
object Spark_RDD_Serial {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))
val search = new Search("h")
// 会报错
//search.getMatch1(rdd).collect().foreach(println)
// 不会报错
search.getMatch2(rdd).collect().foreach(println)
sc.stop()
}
// 查询对象
// 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
class Search(query:String){
def isMatch(s: String): Boolean = {
s.contains(this.query)
}
// 函数序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val s = query
rdd.filter(x => x.contains(s))
}
}
}
4.5 RDD 依赖关系
RDD 血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。
RDD 依赖关系
这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系
RDD 窄依赖
Narrow Dependency
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用
RDD 宽依赖
Shuffle Dependency
宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle
4.5 RDD stage
Spark Job会被划分为多个Stage,每一个Stage是有一组并行的Task组成的。
划分依据:是否产生了Shuffle(宽依赖),一个shuffle会产生两个stage
RDD 阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
RDD 阶段划分源码
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage (finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning ("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed (e)
return
}
……
private def createResultStage (
rdd: RDD[_],
func: (TaskContext, Iterator[_] ) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite
): ResultStage = {
val parents = getOrCreateParentStages (rdd, jobId)
val id = nextStageId.getAndIncrement ()
val stage = new ResultStage (id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage (id) = stage
updateJobIdStageIdMaps (jobId, stage)
stage
}
……
private def getOrCreateParentStages (rdd: RDD[_], firstJobId: Int): List[Stage]
= {
getShuffleDependencies (rdd).map {
shuffleDep =>
getOrCreateShuffleMapStage (shuffleDep, firstJobId)
}.toList
}
……
private[scheduler] def getShuffleDependencies ( rdd: RDD[_] ): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push (rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop ()
if (! visited (toVisit) ) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push (dependency.rdd)
}
}
}
parents
}
RDD 任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application
- Job:一个 Action 算子就会生成一个 Job
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
RDD 任务划分源码
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map {
id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map {
id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
4.6 RDD 持久化
4.6.1 Cache 缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
存储级别
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
4.6.2 checkPoint 检查点
所谓的检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)
缓存和检查点区别
- Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
- Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
- 建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD
4.7 RDD 分区器
Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。
- 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
- 每个 RDD 的分区 ID 范围:0 ~ (numPartitions – 1),决定这个值是属于那个分区的
Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
...
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition (key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt (k, rangeBounds (partition) ) ) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch (rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = - partition - 1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
override def equals (other: Any): Boolean = other match {
...
}
override def hashCode (): Int = {
...
}
@throws(classOf[IOException])
private def writeObject (out: ObjectOutputStream): Unit =
Utils.tryOrIOException {
...
}
@throws(classOf[IOException])
private def readObject (in: ObjectInputStream): Unit = Utils.tryOrIOException {
...
}
}
4.8 RDD 文件读取与保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库
text 文件
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
object 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T: ClassTag](path)
函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用 saveAsObjectFile()
实现对对象文件的输出。因为是序列化所以要指定类型
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
4.9 累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge
4.9.1 系统累加器
val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
num => {
// 使用累加器
sum.add(num)
}
)
// 获取累加器的值
println("sum = " + sum.value)
4.9.2 自定义累加器
// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
var map : mutable.Map[String, Long] = mutable.Map()
// 累加器是否为初始状态
override def isZero: Boolean = {
map.isEmpty
}
// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new WordCountAccumulator
}
// 重置累加器
override def reset(): Unit = {
map.clear()
}
// 向累加器中增加数据 (In)
override def add(word: String): Unit = {
// 查询 map 中是否存在相同的单词
// 如果有相同的单词,那么单词的数量加 1
// 如果没有相同的单词,那么在 map 中增加这个单词
map(word) = map.getOrElse(word, 0L) + 1L
}
// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
Unit = {
val map1 = map
val map2 = other.value
// 两个 Map 的合并
map = map1.foldLeft(map2)(
( innerMap, kv ) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
innerMap
}
)
}
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}
4.10 广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// 使用广播变量
for ((k, v) <- broadcast.value) {
if (k == key) {
num2 = v
}
}
(key, (num, num2))
}
}
5. 进阶
5.1 SortByKey原理
有一台服务器:32G内存,如何在内存中对1T数据排序
- 先抽样,看分布,然后分块(至少32块),对每个块进行排序,最后合并
5.2 shuffle
在Spark中,什么情况下,会产生shuffle?
- reduceByKey,groupByKey,sortByKey,countByKey,join等等
Spark shuffle一共经历了这几个过程:
- 未优化的 Hash Based Shuffle
- 优化后的 Hash Based Shuffle
- Sort-Based Shuffle
5.2.1 ShuffleMapStage 与 ResultStage
在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,前面的所有 stage 被称为 ShuffleMapStage。
ShuffleMapStage
- 的结束伴随着 shuffle 文件的写磁盘。
ResultStage
- 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。
5.2.2 HashShuffle 解析
1. 未优化的 HashShuffle
如下图中有 3 个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器: hash/numreduce 取模),分类出 3 个不同的类别,每个 Task 都分成 3 种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以 Reducer 会在每个 Task 中把属于自己类别的数 据收集过来,汇聚成一个同类别的大集合,每 1 个 Task 输出 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出了 4 个 Tasks x 3 个分类文件 = 12 个本地小文件。
2. 优化后的 HashShuffle
优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制 的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
这里还是有 4 个 Tasks,数据类别还是分成 3 种类型,因为 Hash 算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer 里,然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,(一个 Core 只有一种类 型的 Key 的数据),每 1 个 Task 所在的进程中,分别写入共同进程中的 3 份本地文件,这里 有 4 个 Mapper Tasks,所以总共输出是 2 个 Cores x 3 个分类文件 = 6 个本地小文件。
5.2.3 SortShuffle 解析
1. 普通 SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局部聚合,一边写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。
最后在每个 Task 中,将所有的临时文件合并,这就是 merge 过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
2. bypass SortShuffle
bypass 运行机制的触发条件如下:
- shuffle reduce task 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值,默认为 200
- 不是聚合类的 shuffle 算子(比如 reduceByKey)
此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于:不会进行排序。也就是说, 启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
6. 性能优化
6.1 高性能序列化库
- Spark倾向于序列化的便捷性,默认使用了Java序列化机制
- Java序列化机制的性能并不高,序列化的速度相对较慢,而且序列化以后的数据,相对来说比较大,比较占用内存空间
- Spark提供了两种序列化机制:Java序列化和Kryo序列化
Kryo序列化
- Kryo序列化比Java序列化更快,而且序列化后的数据更小,通常小十倍
- 如果要使用Kryo序列化机制,首先要用SparkConf和Spark序列化器设置为KryoSerializer
- 使用Kryo时,针对需要序列化的类,需要预先进行注册,这样才能获得最佳性能,如果不注册,Kryo必须时刻保存类型的全类名,反而占用不少内存
- Spark默认对Scala中常用的类型自动在Kryo进行了注册
- 如果在算子中,使用了外部的自定义类型的对象,那么还是需要对其进行注册
- 格式:
conf.registerKryoClasses(...)
- 格式:
- 注意:如果要序列化的自定义的类型,字段特别多,此时就需要对Kryo本身进行优化,因为Kryo内部的换存储可能不够存放那么大的class对象
- 需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大,默认值为2,单位是MB
6.2 持久化&checkpoint
- 针对程序中多次被transformation或者action操作的RDD进行持久化操作,避免对一个RDD反复进行计算,再进一步优化,使用序列化Kryo的持久化级别
- 为了保证RDD持久化数据在可能丢失的情况下还能实现高可靠,则需要对RDD执行CheckPoint操作
6.3 JVM垃圾回收调优
默认情况下,Spark使用每个Executor 60%的内存空间来缓存RDD,那么只有40%的内存空间来存放算子执行期间创建的对象
- 如果垃圾回收频繁发生,就需要对这个比例进行调优,通过参数spark.storage.memoryFraction来修改比例
统一内存管理
统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示:
其中最重要的优化在于动态占用机制,其规则如下:
- 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
- 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
6.4 提高并行度
-
要尽量设置合理的并行度,来充分地利用集群的资源,才能充分提高Spark程序的性能
-
可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度,也可以使用spark.default.parallelism参数,来设置统一的并行度,Spark官方推荐,给集群的每个cpu core设置2-3个task
6.5 数据本地化
数据本地化级别 | 解释 |
---|---|
PROCESS_LOCAL | 数据和计算它的代码在同一个JVM进程中 |
NODE_LOCAL | 数据和计算它的代码在一个节点上,但是不在一个JVM进程中 |
NO_PREF | 数据从哪里过来,性能都是一样的 |
RACK_LOCAL | 数据和计算它的代码在一个机架上 |
ANY | 数据可能在任意地方,比如其他网络环境内,或者其它机架上 |
- Spark倾向于使用最好的本地化级别调度task,但不现实
- Spark默认会等待指定时间,期望task要处理的数据所在的节点上的Executor空闲出一个cpu,从而将task分配过去,只要超过了时间,那么spark就会将task分配到其他任意一个空闲的Executor上
- 可以设置spark.locality系列参数,来调节spark等待task可以进行数据本地化的时间
- spark.locality.wait.process
- spark.locality.wait.node
- spark.locality.wait.rack
6.6 算子优化
6.6.1 map vs mapPartitions
-
map: 一次处理一条数据
- 因为可以GC回收处理过的数据,所以一般不会导致OOM异常
-
mapPartitions: 一次处理一个分区的数据
- 如果元素过多,可能会导致OOM异常
- 性能更高
建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以算子在执行的时候会报错
数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样的
6.6.2 foreach vs foreachPartition
- foreach:一次处理一条数据
- foreachPartition:一次处理一个分区的数据
6.6.3 repartition
- 对RDD进行重分区
- 可以调整RDD的并行度
- 可以解决RDD中数据倾斜的问题
6.6.4 reduceByKey vs groupByKey
reduceByKey会先进行预聚合,会减少数据量,性能更高