Spark 3.x Spark Core详解 & 性能优化

Spark Core

1. 概述

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

1.1 Hadoop vs Spark

上面流程对应Hadoop的处理流程,下面对应着Spark的处理流程

image-20220601090758280

Hadoop

  • Hadoop 是由 java 语言编写的,在分布式服务器集群上存储海量数据并运行分布式 分析应用的开源框架
  • 作为 Hadoop 分布式文件系统,HDFS 处于 Hadoop 生态圈的最下层,存储着所有的 数 据 , 支持着 Hadoop的所有服务 。 它的理论基础源于Google 的 The GoogleFile System 这篇论文,它是 GFS 的开源实现。
  • MapReduce 是一种编程模型,Hadoop 根据 Google 的 MapReduce 论文将其实现, 作为 Hadoop 的分布式计算模型,是 Hadoop 的核心。基于这个框架,分布式并行 程序的编写变得异常简单。综合了 HDFS 的分布式存储和 MapReduce 的分布式计 算,Hadoop 在处理海量数据时,性能横向扩展变得非常容易。
  • HBase 是对 Google 的 Bigtable 的开源实现,但又和 Bigtable 存在许多不同之处。 HBase 是一个基于 HDFS 的分布式数据库,擅长实时地随机读/写超大规模数据集。 它也是 Hadoop 非常重要的组件。

Spark

  • Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎
  • Spark Core 中提供了 Spark 最基础与最核心的功能
  • Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。
  • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。

由上面的信息可以获知,Spark 出现的时间相对较晚,并且主要功能主要是用于数据计算, 所以其实 Spark 一直被认为是 Hadoop 框架的升级版。

Spark or Hadoop

  • Hadoop MapReduce 由于其设计初衷并不是为了满足循环迭代式数据流处理,因此在多并行运行的数据可复用场景(如:机器学习、图挖掘算法、交互式数据挖掘算法)中存在诸多计算效率等问题。
  • 所以 Spark 应运而生,Spark 就是在传统的 MapReduce 计算框架的基础上,利用其计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速 度,并将计算单元缩小到更适合并行计算和重复使用的 RDD 计算模型。
  • Spark 是一个分布式数据快速分析项目。它的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。
  • Spark 和Hadoop 的根本差异是多个作业之间的数据通信问题 : Spark 多个作业之间数据 通信是基于内存,而 Hadoop 是基于磁盘。

1.2 Spark 核心模块

image-20220513103513844
image-20220513104747780

Spark Core

  • Spark Core 中提供了 Spark 最基础与最核心的功能,Spark 其他的功能如:Spark SQL, Spark Streaming,GraphX, MLlib 都是在 Spark Core 的基础上进行扩展的

Spark SQL

  • Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。

Spark Streaming

  • Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。

Spark MLlib

  • MLlib 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。

Spark Graphx

  • GraphX 是 Spark 面向图计算提供的框架与算法库。

1.3 Spark应用场景

  • 低延时的海量数据计算需求
  • 低延时SQL交互查询需求
  • 准实时(秒级)海量数据计算需求

2. Spark 运行环境

image-20220514193723770

2.1 Local模式

所谓的 Local 模式,就是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等

2.1.1 安装部署

官网下载安装包,将 spark-XX-bin-hadoopXX.tgz 文件上传到 Linux 并解压缩,放置在指定位置,路径中不要包含中文或空格。

tar -zxvf spark-XXX-bin-hadoop.XX.tgz -C /opt/module
cd /opt/module
mv spark-3.0.0-bin-hadoop3.2 spark-local

2.1.2 启动Local环境

  1. 进入解压缩后的路径,执行如下指令

    bin/spark-shell
    

    可以在命令行中,执行scala命令,也可以调用spark

    测试

    在解压缩文件夹下的 data 目录中,添加 word.txt 文件。

    Hello Scala
    Hello Spark
    

    在命令行工具中执行如下代码指令

    sc.textFile("data/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect
    

    image-20220514194509620

  2. 启动成功后,可以输入网址进行 Web UI 监控页面访问

    //虚拟机or本机ip地址:4040
    
  3. 退出

    按键 Ctrl+C 或输入 Scala 指令

    :quit
    

2.1.3 提交应用

/opt/module/spark-local/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_XXXXXjar \
10
  1. –class
    • 表示要执行程序的主类,此处可以更换为自己写的应用程序
  2. –master local[2]
    • 部署模式,默认为本地模式,数字表示分配的虚拟 CPU 核数量
  3. spark-examples_XXX.jar
    • 运行的应用类所在的 jar 包(根据实际版本输入),实际使用时,可以设定自己的 jar 包
  4. 数字 10 表示程序的入口参数,用于设定当前应用的任务数量

2.2 Standlone模式

local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的 集群中去执行,这里我们来看看只使用 Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。

集群规划:

image-20220514194845413

2.2.1 安装部署

注意: 每个节点上配置相同,可配置一台节点,然后上传到其他节点便可

解压缩文件

将 spark-XXX.tgz 文件上传到 Linux 并解压缩在指定位置

tar -zxvf spark-XXX.tgz -C /opt/module 
cd /opt/module
mv spark-XXX spark-standalone

修改配置文件

  1. 进入解压缩后路径的 conf 目录,修改 workers.template 文件名为 workers

    有些老版本是slaves.template

    修改 slaves 文件,添加 worker 节点

    # 根据自己的主机节点名进行添加
    node1
    node2
    node3
    
  2. 修改 spark-env.sh.template 文件名为 spark-env.sh

    修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点

    # 根据实际情况进行修改
    export JAVA_HOME=/XXX/jdkXXX
    SPARK_MASTER_HOST=node1
    SPARK_MASTER_PORT=7077
    

    注意:7077 端口,相当于 hadoop3 内部通信的 8020 端口,此处的端口需要确认自己的 Hadoop 配置

最后

将配置好的spark,分别上传到每一个节点上

2.2.2 启动集群

在任意节点上,执行脚本命令

/opt/module/spark-standlone/sbin/start-all.sh

查看 Master 资源监控 Web UI 界面: //node1:8080

关闭集群

/opt/module/spark-standlone/sbin/stop-all.sh

2.2.3 提交应用

/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10
  1. –class 表示要执行程序的主类
  2. –master spark://linux1:7077 独立部署模式,连接到 Spark 集群
  3. spark-examples_XXX.jar 运行类所在的 jar 包
  4. 数字 10 表示程序的入口参数,用于设定当前应用的任务数量

执行任务时,会产生多个 Java 进程

image-20220514195813903

2.2.4 提交参数说明

bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]

image-20220514200055150

image-20220514200029270

2.2.5 配置历史服务

由于 spark-shell 停止掉后,集群监控 node1:4040 页面就看不到历史任务的运行情况,所以 开发时都配置历史服务器记录任务运行情况

  1. 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf

    修改 spark-defaults.conf 文件,配置日志存储路径

    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://node1:8020/logs
    

    注意:路径自己设置,需要启动 hadoop 集群,HDFS 上的 directory 目录需要提前存在。

  2. 修改 spark-env.sh 文件, 添加日志配置

    前后路径保持一致

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080
    -Dspark.history.fs.logDirectory=hdfs://node1:8020/logs
    -Dspark.history.retainedApplications=30"
    

    参数 1 含义:WEB UI 访问的端口号为 18080

    参数 2 含义:指定历史服务器日志存储路径

    参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

注意:每一个节点上配置保持一致

重新启动集群和历史服务

/opt/module/spark-standlone/sbin/start-all.sh
/opt/module/spark-standlone/sbin/start-history-server.sh

重新执行任务

/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10

查看历史服务//node1:18080

image-20220514200611155

2.3 YARN模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。

2.3.1 安装部署

注意: 每个节点上配置相同,可配置一台节点,然后上传到其他节点便可

解压缩文件

将 spark-XXX.tgz 文件上传到 Linux 并解压缩在指定位置

tar -zxvf spark-XXX.tgz -C /opt/module 
cd /opt/module
mv spark-XXX spark-yarn

修改配置文件

修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR\HADOOP_CONF_DIR 配置

export JAVA_HOME=/XXX/jdk1XX
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop

2.3.2 启动集群

启动HDFS和YARN集群

2.3.3 提交应用

/opt/module/spark-yarn/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_XXX.jar \
10

image-20220514201118184

查看显示的链接页面,点击 History,查看历史页面

image-20220514201205830

2.2.4 配置历史服务

  1. 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf

    修改 spark-default.conf 文件,配置日志存储路径

    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://node1:8020/logs
    

    注意:需要启动 hadoop 集群,HDFS 上的目录需要提前存在

  2. 修改 spark-env.sh 文件, 添加日志配置

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080
    -Dspark.history.fs.logDirectory=hdfs://node1:8020/logs
    -Dspark.history.retainedApplications=30"
    

    参数 1 含义:WEB UI 访问的端口号为 18080

    参数 2 含义:指定历史服务器日志存储路径

    参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序 信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  3. 修改 spark-defaults.conf

    spark.yarn.historyServer.address=node1:18080
    spark.history.ui.port=18080
    

    注意:每一个节点上配置保持一致

  4. 启动历史服务

    sbin/start-history-server.sh
    
  5. 重新提交应用

    /opt/module/spark-yarn/bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_XXX.jar \
    10
    

3. Spark 运行架构

3.1 运行架构

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。

如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。

image-20220514201844880

image-20220526101111391

3.2 核心组件

3.2.1 Driver & Executor

计算组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。

Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)
  • 在 Executor 之间调度任务(task)
  • 跟踪 Executor 的执行情况
  • 通过 UI 展示查询运行情况

实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类。

Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。

Executor 有两个核心功能

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。

3.2.2 Master & Worker

资源管理组件

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM

3.2.3 ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况

说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。

3.3 核心概念

3.3.1 Executor 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数 量。

应用程序相关启动参数如下:

image-20220514202733531

3.3.2 并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度

那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。

3.3.3 有向无环图(DAG)

image-20220514202948692

资源之间的依赖关系,不能成环,会形成死锁

大数据计算引擎框架我们根据使用方式的不同一般会分为四类:

  • 其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map 阶段 和 Reduce 阶段
  • 对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框 架的产生。
  • 因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。
  • 接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。

这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

3.4 提交流程

基于Yarn环境

image-20220514203344486

Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。

两种模式主要区别在于:Driver 程序的运行节点位置

image-20220527215105976

3.4.1 Yarn Client 模式

Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试

  • Driver 在任务提交的本地机器上运行
  • Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
  • ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后 ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
  • 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

3.4.2 Yarn Cluster 模式

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境

  • 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
  • Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
  • 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

4. Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。

三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

4.1 RDD

4.1.1 什么是 RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性

    • 存储的弹性:内存与磁盘的自动切换

    • 容错的弹性:数据丢失可以自动恢复

    • 计算的弹性:计算出错重试机制

    • 分片的弹性:可根据需要重新分片

  • 分布式:数据存储在大数据集群不同节点上

  • 数据集:RDD 封装了计算逻辑,并不保存数据

  • 数据抽象:RDD 是一个抽象类,需要子类具体实现

  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑

  • 可分区、并行计算

RDD vs IO

RDD的数据处理方式类似于IO流,也有装饰者设计模式

RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前的封装全部都是功能上的扩展

RDD是不保存数据的,但是IO可以临时保存一部分数据

image-20220527212834387

image-20220527212811671

4.1.2 核心属性

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

分区列表

  • RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性

    protected def getPartitions: Array[Partition]
    

分区计算函数

  • Spark 在计算时,是使用分区函数对每一个分区进行计算

    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T]
    

RDD 之间的依赖关系

  • RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

    protected def getDependencies: Seq[Dependency[_]] = deps
    

分区器(可选)

  • 当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

    @transient val partitioner: Option[Partitioner] = None
    

首选位置(可选)

  • 计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    

4.1.3 执行原理

  • 数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合

  • Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果

RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD的工作原理

  1. 启动 Yarn 集群环境

    image-20220518103600961

  2. Spark 通过申请资源创建调度节点和计算节点

    image-20220518103609398

  3. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

    image-20220518104419021

  4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

    image-20220518103658933

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算

4.1.4 RDD 创建

在 Spark 中创建 RDD 的创建方式可以分为四种

  1. 集合(内存)中创建 RDD

    从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val rdd1 = sparkContext.parallelize(List(1,2,3,4))
    val rdd2 = sparkContext.makeRDD( List(1,2,3,4))
    rdd1.collect().foreach(println)
    rdd2.collect().foreach(println)
    sparkContext.stop()
    

    从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

    def makeRDD[T: ClassTag](
    	seq: Seq[T],
    	numSlices: Int = defaultParallelism): RDD[T] = withScope {
    		parallelize(seq, numSlices)
    	}
    
    // makeRDD方法可以传递第二个参数,这个参数表示分区的数量
    // 第二个参数可以不传递的,那么makeRDD方法会使用默认值 : defaultParallelism(默认并行度)
    //    scheduler.conf.getInt("spark.default.parallelism", totalCores)
    //    spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
    //    如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
    // val rdd = sc.makeRDD(List(1,2,3,4),2)
    
  2. 外部存储(文件)创建 RDD

    由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val fileRDD: RDD[String] = sparkContext.textFile("input")
    fileRDD.collect().foreach(println)
    sparkContext.stop()
    
  3. 从其他 RDD 创建

    主要是通过一个 RDD 运算完后,再产生新的 RDD

  4. 直接创建 RDD(new)

    使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用

4.1.5 RDD并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4)
val fileRDD: RDD[String] = sparkContext.textFile( "input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下

// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
	(0 until numSlices).iterator.map { i =>
		val start = ((i * length) / numSlices).toInt
		val end = (((i + 1) * length) / numSlices).toInt
		(start, end)
	}
}

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 long totalSize = 0; // compute total size
 for (FileStatus file: files) { // check we have valid files
     if (file.isDirectory()) {
     	throw new IOException("Not a file: "+ file.getPath());
 	}
 	totalSize += file.getLen();
 }
 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

 ...

 for (FileStatus file: files) {

 ...

 if (isSplitable(fs, path)) {
     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(goalSize, minSize, blockSize);
     ...
 }
 protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
 	return Math.max(minSize, Math.min(goalSize, blockSize));
 }

4.2 RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型

Value DESC
map 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
mapPartitions 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据
mapPartitionsWithIndex 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
flatMap 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
glom 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
groupBy 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
filter 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
sample 根据指定的规则从数据集中抽取数据
distinct 将数据集中重复的数据去重
coalesce 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
repartition 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程
sortBy 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
双Value DESC
intersection 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
union 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
subtract 以一个 RDD 元素为主,除两个 RDD 中复元素,将其他元素保留下来
zip 将两个 RDD 中的元素,以键值对的形式进行合并
Key – Value DESC
partitionBy 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
reduceByKey 可以将数据按照相同的 Key 对 Value 进行聚合
groupByKey 将数据源的数据根据 key 对 value 进行分组
aggregateByKey 将数据根据不同的规则进行分区内计算和分区间计算
foldByKey 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
combineByKey 通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function),combineByKey()允许用户返回值的类型与输入不一致
sortByKey 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序 的
join 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD
leftOuterJoin 类似于 SQL 语句的左外连接
cogroup (join & group)在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

4.2.1 value

map

  • 函数签名

    def map[U: ClassTag](f: T => U): RDD[U]
    
  • 函数说明

    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD1: RDD[Int] = dataRDD.map(
     	num => { num * 2 }
    )
    val dataRDD2: RDD[String] = dataRDD1.map(
     	num => { "" + num}
    )
    

mapPartitions

  • 函数签名

    def mapPartitions[U: ClassTag]( 
        f: Iterator[T] => Iterator[U], 
        preservesPartitioning: Boolean = false): RDD[U] 
    
  • 函数说明

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据

    val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
     datas => { datas.filter(_==2) }
    )
    
    // mapPartitions : 可以以分区为单位进行数据转换操作
    //                 但是会将整个分区的数据加载到内存进行引用
    //                 如果处理完的数据是不会被释放掉,存在对象的引用。
    //                 在内存较小,数据量较大的场合下,容易出现内存溢出。
    

map 和 mapPartitions 的区别?

  • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
  • Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
  • MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

mapPartitionsWithIndex

  • 函数签名

    def mapPartitionsWithIndex[U: ClassTag](
    	f: (Int, Iterator[T]) => Iterator[U],
    	preservesPartitioning: Boolean = false): RDD[U]
    
  • 函数说明

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

    val dataRDD1 = dataRDD.mapPartitionsWithIndex(
    	(index, datas) => { datas.map(index, _) }
    )
    

flatMap

  • 函数签名

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    
  • 函数说明

    将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

    val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)),1)
    val dataRDD1 = dataRDD.flatMap(list => list)
    

glom

  • 函数签名

    def glom(): RDD[Array[T]]
    
  • 函数说明

    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
    

groupBy

  • 函数签名

    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    
  • 函数说明

    将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中

    一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1 = dataRDD.groupBy(_%2)
    

filter

  • 函数签名

    def filter(f: T => Boolean): RDD[T]
    
  • 函数说明

    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1 = dataRDD.filter(_%2 == 0)
    

sample

  • 函数签名

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong): RDD[T]
    
  • 函数说明

    根据指定的规则从数据集中抽取数据

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4
    ),1)
    // 抽取数据不放回(伯努利算法)
    // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
    // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
    // 第一个参数:抽取的数据是否放回,false:不放回
    // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
    // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是当前系统时间
    val dataRDD1 = dataRDD.sample(false, 0.5)
    // 抽取数据放回(泊松算法)
    // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
    // 第三个参数:随机数种子,如果不传递第三个参数,那么使用的是当前系统时间
    val dataRDD2 = dataRDD.sample(true, 2)
    

distinct

  • 函数签名

    def distinct()(implicit ord: Ordering[T] = null): RDD[T]
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函数说明

    将数据集中重复的数据去重

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
    val dataRDD1 = dataRDD.distinct()
    val dataRDD2 = dataRDD.distinct(2)
    

coalesce

  • 函数签名

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
    	partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    	(implicit ord: Ordering[T] = null)
    	:RDD[T]
    
  • 函数说明

    根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

    当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),6)
    val dataRDD1 = dataRDD.coalesce(2)
    
    // coalesce方法默认情况下不会将分区的数据打乱重新组合
    // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
    // 如果想要让数据均衡,可以进行shuffle处理
    

repartition

  • 函数签名

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函数说明

    该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.repartition(4)
    

两者区别

coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
所以如果想要实现扩大分区的效果,需要使用shuffle操作
spark提供了一个简化的操作

  • 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
  • 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle

sortBy

  • 函数签名

    def sortBy[K](
     f: (T) => K,
     ascending: Boolean = true,
     numPartitions: Int = this.partitions.length)
     (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
  • 函数说明

    该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
    
    // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
    // sortBy默认情况下,不会改变分区。但是中间存在shuffle操作
    

4.2.2 double value

交集,并集和差集要求两个数据源数据类型保持一致

intersection

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

def intersection(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

union

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

def subtract(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

subtract

以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

def subtract(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

zip

将两个 RDD 中的元素,以键值对的形式进行合并。

  • 数据类型可以不一致

  • 两个数据源要求分区数量要保持一致

  • 两个数据源要求分区中数据数量保持一致

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)

4.2.3 key-value

partitionBy

  • 函数签名

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    
  • 函数说明 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

    import org.apache.spark.HashPartitioner
    
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3) 
    // RDD => PairRDDFunctions
    // 隐式转换(二次编译)
    // 重分区的分区器与当前RDD的分区器一样,则不会再次分区
    val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
    

reduceByKey

image-20220527212701296

  • 函数签名

    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    
  • 函数说明

    可以将数据按照相同的 Key 对 Value 进行聚合

    // reduceByKey : 相同的key的数据进行value数据的聚合操作
    // scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
    // 【1,2,3】
    // 【3,3】
    // 【6】
    // reduceByKey中如果key的数据只有一个,是不会参与运算的。
    
    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.reduceByKey(_+_)
    val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
    

groupByKey

image-20220527212626326

spark中,shuffle操作必须落盘处理,不能在内存中数据等待,会导致内存溢出

  • 函数签名

    def groupByKey(): RDD[(K, Iterable[V])]
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    
  • 函数说明

    将数据源的数据根据 key 对 value 进行分组

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.groupByKey()
    val dataRDD3 = dataRDD1.groupByKey(2)
    val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
    

两者区别

  • 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

  • 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

aggregateByKey

  • 函数签名

    def aggregateByKey[U: ClassTag](zeroValue: U)
    	(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
    
  • 函数说明

    将数据根据不同的规则进行分区内计算和分区间计算

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
    
    // TODO : 取出每个分区内相同 key 的最大值然后分区间相加
    // aggregateByKey 算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    // 2. 第二个参数列表中含有两个参数
    // 2.1 第一个参数表示分区内的计算规则
    // 2.2 第二个参数表示分区间的计算规则
    

foldByKey

  • 函数签名

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    
  • 函数说明

    当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
    

combineByKey

  • 函数签名

    def combineByKey[C](
    	createCombiner: V => C,
    	mergeValue: (C, V) => C,
    	mergeCombiners: (C, C) => C): RDD[(K, C)]
    
  • 函数说明

    最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致

    val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
    ("a", 95), ("b", 98))
    val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
    val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
    	(_, 1),
    	(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
    	(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )
    

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

  • reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

  • FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相 同

  • AggregateByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

  • CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同

// reduceByKey:
combineByKeyWithClassTag[V](
    (v: V) => v, // 第一个值不会参与计算
    func, // 分区内计算规则
    func, // 分区间计算规则
)

// aggregateByKey :
combineByKeyWithClassTag[U](
    (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
    cleanedSeqOp, // 分区内计算规则
    combOp,       // 分区间计算规则
)

// foldByKey:
combineByKeyWithClassTag[V](
    (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
    cleanedFunc,  // 分区内计算规则
    cleanedFunc,  // 分区间计算规则
)

// combineByKey :
combineByKeyWithClassTag(
    createCombiner,  // 相同key的第一条数据进行的处理函数
    mergeValue,      // 表示分区内数据的处理函数
    mergeCombiners,  // 表示分区间数据的处理函数
 )

sortByKey

  • 函数签名

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
     : RDD[(K, V)]
    
  • 函数说明

    在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
    val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
    

join

  • 函数签名

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    
  • 函数说明

    在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD

    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
    rdd.join(rdd1).collect().foreach(println)
    

leftOuterJoin

  • 函数签名

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    
  • 函数说明

    类似于 SQL 语句的左外连接

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
    

cogroup

  • 函数签名

    def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
    
  • 函数说明

    在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
    val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
    val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
    

4.3 RDD 行动算子

4.3.1 reduce

  • 函数签名

    def reduce(f: (T, T) => T): T
    
  • 函数说明

    聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 聚合数据
    val reduceResult: Int = rdd.reduce(_+_)
    

4.3.2 collect

  • 函数签名

    def collect(): Array[T]
    
  • 函数说明

    在驱动程序中,以数组 Array 的形式返回数据集的所有元素

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集数据到 Driver rdd.collect().foreach(println)
    

4.3.3 count

  • 函数签名

    def count(): Long
    
  • 函数说明

    返回 RDD 中元素的个数

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val countResult: Long = rdd.count()
    

4.3.4 first

  • 函数签名

    def first(): T
    
  • 函数说明

    返回 RDD 中的第一个元素

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val firstResult: Int = rdd.first()
    println(firstResult)
    

4.3.5 take

  • 函数签名

    def take(num: Int): Array[T]
    
  • 函数说明

    返回一个由 RDD 的前 n 个元素组成的数组

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val takeResult: Array[Int] = rdd.take(2)
    println(takeResult.mkString(","))
    

4.3.6 takeOrdered

  • 函数签名

    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    
  • 函数说明

    返回该 RDD 排序后的前 n 个元素组成的数组

    val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
    // 返回 RDD 中元素的个数
    val result: Array[Int] = rdd.takeOrdered(2)
    

4.3.7 aggregate

  • 函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    
  • 函数说明

    分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    // 将该 RDD 所有元素相加得到结果
    //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    val result: Int = rdd.aggregate(10)(_ + _, _ + _)
    

4.3.8 fold

  • 函数签名

    def fold(zeroValue: T)(op: (T, T) => T): T
    
  • 函数说明

    折叠操作,aggregate 的简化版操作

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val foldResult: Int = rdd.fold(0)(_+_)
    

4.3.9 countByKey

  • 函数签名

    def countByKey(): Map[K, Long]
    
  • 函数说明

    折叠操作,aggregate 的简化版操作

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
    "b"), (3, "c"), (3, "c")))
    // 统计每种 key 的个数
    val result: collection.Map[Int, Long] = rdd.countByKey()
    

4.3.10 save 相关算子

  • 函数签名

    def saveAsTextFile(path: String): Unit
    def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(
        path: String,
        codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    
  • 函数说明

    将数据保存到不同格式的文件中

    // 保存成 Text 文件
    rdd.saveAsTextFile("output")
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("output1")
    // 保存成 Sequencefile 文件
    rdd.map((_,1)).saveAsSequenceFile("output2")
    

4.3.11 foreach

  • 函数签名

    def foreach(f: T => Unit): Unit = withScope {
    	val cleanF = sc.clean(f)
    	sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
  • 函数说明

    分布式遍历 RDD 中的每一个元素,调用指定函数

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 收集后打印
    rdd.map(num=>num).collect().foreach(println)
    println("****************")
    // 分布式打印
    rdd.foreach(println)
    

4.4 RDD 序列化

4.4.1 闭包检查

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。

那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

Scala2.12 版本后闭包编译方式发生了改变

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行

object Spark_RDD_Serial {

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))
        val search = new Search("h")
        // 会报错
        //search.getMatch1(rdd).collect().foreach(println)
       	// 不会报错
        search.getMatch2(rdd).collect().foreach(println)
        sc.stop()
    }
    // 查询对象
    // 类的构造参数其实是类的属性, 构造参数需要进行闭包检测,其实就等同于类进行闭包检测
    class Search(query:String){

        def isMatch(s: String): Boolean = {
            s.contains(this.query)
        }
        // 函数序列化案例
        def getMatch1 (rdd: RDD[String]): RDD[String] = {
            rdd.filter(isMatch)
        }
        // 属性序列化案例
        def getMatch2(rdd: RDD[String]): RDD[String] = {
            val s = query
            rdd.filter(x => x.contains(s))
        }
    }
}

4.5 RDD 依赖关系

RDD 血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。

image-20220527213036628

RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

RDD 窄依赖

Narrow Dependency

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用

image-20220527213048283

RDD 宽依赖

Shuffle Dependency

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle

image-20220527213109726

4.5 RDD stage

Spark Job会被划分为多个Stage,每一个Stage是有一组并行的Task组成的。

划分依据:是否产生了Shuffle(宽依赖),一个shuffle会产生两个stage

RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

image-20220518152517725

RDD 阶段划分源码

try {
	// New stage creation may throw an exception if, for example, jobs are run on a
	// HadoopRDD whose underlying HDFS files have been deleted.
	finalStage = createResultStage (finalRDD, func, partitions, jobId, callSite)
} catch {
	case e: Exception =>
	logWarning ("Creating new stage failed due to exception - job: " + jobId, e)
	listener.jobFailed (e)
	return
}
	……
private def createResultStage (
	rdd: RDD[_],
	func: (TaskContext, Iterator[_] ) => _,
	partitions: Array[Int],
	jobId: Int,
	callSite: CallSite
): ResultStage = {
	val parents = getOrCreateParentStages (rdd, jobId)
	val id = nextStageId.getAndIncrement ()
	val stage = new ResultStage (id, rdd, func, partitions, parents, jobId, callSite)
	stageIdToStage (id) = stage
	updateJobIdStageIdMaps (jobId, stage)
	stage
}

……
private def getOrCreateParentStages (rdd: RDD[_], firstJobId: Int): List[Stage]
= {
	getShuffleDependencies (rdd).map {
		shuffleDep =>
		getOrCreateShuffleMapStage (shuffleDep, firstJobId)
	}.toList
}
……
private[scheduler] def getShuffleDependencies ( rdd: RDD[_] ): HashSet[ShuffleDependency[_, _, _]] = {
	val parents = new HashSet[ShuffleDependency[_, _, _]]
	val visited = new HashSet[RDD[_]]
	val waitingForVisit = new Stack[RDD[_]]
	waitingForVisit.push (rdd)
	while (waitingForVisit.nonEmpty) {
		val toVisit = waitingForVisit.pop ()
		if (! visited (toVisit) ) {
			visited += toVisit
			toVisit.dependencies.foreach {
				case shuffleDep: ShuffleDependency[_, _, _] =>
					parents += shuffleDep
				case dependency =>
					waitingForVisit.push (dependency.rdd)
			}
		}
	}
	parents
}

image-20220527213348436

RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application
  • Job:一个 Action 算子就会生成一个 Job
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

image-20220520105818930

RDD 任务划分源码

val tasks: Seq[Task[_]] = try {
	stage match {
		case stage: ShuffleMapStage =>
			partitionsToCompute.map { 
				id =>
				val locs = taskIdToLocations(id)
				val part = stage.rdd.partitions(id)
				new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                    taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
                    Option(sc.applicationId), sc.applicationAttemptId)
			}
		case stage: ResultStage =>
			partitionsToCompute.map { 
				id =>
				val p: Int = stage.partitions(id)
				val part = stage.rdd.partitions(p)
				val locs = taskIdToLocations(id)
				new ResultTask(stage.id, stage.latestInfo.attemptId,
					taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
					Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
			}
	}
	……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
	mapOutputTrackerMaster
		.findMissingPartitions(shuffleDep.shuffleId)
		.getOrElse(0 until numPartitions)
}

4.6 RDD 持久化

image-20220527213500891

4.6.1 Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

object StorageLevel {
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

image-20220520110754800

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。

Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。

4.6.2 checkPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
    word => {
        (word, System.currentTimeMillis())
    }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

缓存和检查点区别

  1. Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖
  2. Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
  3. 建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD

4.7 RDD 分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

  • 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
  • 每个 RDD 的分区 ID 范围:0 ~ (numPartitions – 1),决定这个值是属于那个分区的

Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余

class HashPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    def numPartitions: Int = partitions
    def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
	}
    override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
            h.numPartitions == numPartitions
        case _ =>
        	false
    }
    override def hashCode: Int = numPartitions
}

Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](
        partitions: Int,
        rdd: RDD[_ <: Product2[K, V]],
        private var ascending: Boolean = true)
    extends Partitioner {
    // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
    require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
	private var ordering = implicitly[Ordering[K]]
    // An array of upper bounds for the first (partitions - 1) partitions
    private var rangeBounds: Array[K] = {
    	...
    }
    def numPartitions: Int = rangeBounds.length + 1
    private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
        
    def getPartition (key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
            // If we have less than 128 partitions naive search
            while (partition < rangeBounds.length && ordering.gt (k, rangeBounds (partition) ) ) {
                partition += 1
            }
        } else {
            // Determine which binary search method to use only once.
            partition = binarySearch (rangeBounds, k)
            // binarySearch either returns the match location or -[insertion point]-1
            if (partition < 0) {
                partition = - partition - 1
            }
            if (partition > rangeBounds.length) {
                partition = rangeBounds.length
            }
        }
        if (ascending) {
            partition
        } else {
            rangeBounds.length - partition
        }
    }
    override def equals (other: Any): Boolean = other match {
        ...
    }
    override def hashCode (): Int = {
        ...
    }
    @throws(classOf[IOException])
    private def writeObject (out: ObjectOutputStream): Unit =
    	Utils.tryOrIOException {
            ...
        }
        @throws(classOf[IOException])
        private def readObject (in: ObjectInputStream): Unit = Utils.tryOrIOException {
            ...
        }
    }     

4.8 RDD 文件读取与保存

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件

文件系统分为:本地文件系统、HDFS、HBASE 以及数据库

text 文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")

sequence 文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)

// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

object 对象文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T: ClassTag](path)函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用 saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型

// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

4.9 累加器

image-20220527213546922

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge

4.9.1 系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
    num => {
        // 使用累加器
        sum.add(num)
    }
)
// 获取累加器的值
println("sum = " + sum.value)

4.9.2 自定义累加器

// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
    var map : mutable.Map[String, Long] = mutable.Map()
    // 累加器是否为初始状态
    override def isZero: Boolean = {
        map.isEmpty
    }
    // 复制累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new WordCountAccumulator
    }
    // 重置累加器
    override def reset(): Unit = {
        map.clear()
    }
    // 向累加器中增加数据 (In)
    override def add(word: String): Unit = {
        // 查询 map 中是否存在相同的单词
        // 如果有相同的单词,那么单词的数量加 1
        // 如果没有相同的单词,那么在 map 中增加这个单词
        map(word) = map.getOrElse(word, 0L) + 1L
    }

    // 合并累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
    Unit = {
        val map1 = map
        val map2 = other.value
        // 两个 Map 的合并
        map = map1.foldLeft(map2)(
            ( innerMap, kv ) => {
                innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
                innerMap
            }
        )
    }
    // 返回累加器的结果 (Out)
    override def value: mutable.Map[String, Long] = map
}

4.10 广播变量

image-20220527213618736

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送

image-20220527155607305

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
    case (key, num) => {
        var num2 = 0
        // 使用广播变量
        for ((k, v) <- broadcast.value) {
            if (k == key) {
                num2 = v
            }
        }
        (key, (num, num2))
    }
}

5. 进阶

5.1 SortByKey原理

image-20220527205719552

有一台服务器:32G内存,如何在内存中对1T数据排序

  • 先抽样,看分布,然后分块(至少32块),对每个块进行排序,最后合并

5.2 shuffle

在Spark中,什么情况下,会产生shuffle?

  • reduceByKey,groupByKey,sortByKey,countByKey,join等等

Spark shuffle一共经历了这几个过程:

  1. 未优化的 Hash Based Shuffle
  2. 优化后的 Hash Based Shuffle
  3. Sort-Based Shuffle

5.2.1 ShuffleMapStage 与 ResultStage

image-20220527220058216

在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,前面的所有 stage 被称为 ShuffleMapStage。

ShuffleMapStage

  • 的结束伴随着 shuffle 文件的写磁盘。

ResultStage

  • 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。

5.2.2 HashShuffle 解析

1. 未优化的 HashShuffle

如下图中有 3 个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器: hash/numreduce 取模),分类出 3 个不同的类别,每个 Task 都分成 3 种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以 Reducer 会在每个 Task 中把属于自己类别的数 据收集过来,汇聚成一个同类别的大集合,每 1 个 Task 输出 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出了 4 个 Tasks x 3 个分类文件 = 12 个本地小文件。

image-20220527220312047

image-20220527221610178

2. 优化后的 HashShuffle

优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制 的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。

这里还是有 4 个 Tasks,数据类别还是分成 3 种类型,因为 Hash 算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer 里,然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,(一个 Core 只有一种类 型的 Key 的数据),每 1 个 Task 所在的进程中,分别写入共同进程中的 3 份本地文件,这里 有 4 个 Mapper Tasks,所以总共输出是 2 个 Cores x 3 个分类文件 = 6 个本地小文件。

image-20220527220445952

image-20220527221827986

5.2.3 SortShuffle 解析

1. 普通 SortShuffle

在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局部聚合,一边写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件

最后在每个 Task 中,将所有的临时文件合并,这就是 merge 过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。

image-20220527220824236

2. bypass SortShuffle

bypass 运行机制的触发条件如下:

  1. shuffle reduce task 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值,默认为 200
  2. 不是聚合类的 shuffle 算子(比如 reduceByKey)

此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:不会进行排序。也就是说, 启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

image-20220527221117117

6. 性能优化

6.1 高性能序列化库

  • Spark倾向于序列化的便捷性,默认使用了Java序列化机制
  • Java序列化机制的性能并不高,序列化的速度相对较慢,而且序列化以后的数据,相对来说比较大,比较占用内存空间
  • Spark提供了两种序列化机制:Java序列化和Kryo序列化

Kryo序列化

  • Kryo序列化比Java序列化更快,而且序列化后的数据更小,通常小十倍
  • 如果要使用Kryo序列化机制,首先要用SparkConf和Spark序列化器设置为KryoSerializer
  • 使用Kryo时,针对需要序列化的类,需要预先进行注册,这样才能获得最佳性能,如果不注册,Kryo必须时刻保存类型的全类名,反而占用不少内存
  • Spark默认对Scala中常用的类型自动在Kryo进行了注册
  • 如果在算子中,使用了外部的自定义类型的对象,那么还是需要对其进行注册
    • 格式:conf.registerKryoClasses(...)
  • 注意:如果要序列化的自定义的类型,字段特别多,此时就需要对Kryo本身进行优化,因为Kryo内部的换存储可能不够存放那么大的class对象
    • 需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大,默认值为2,单位是MB

6.2 持久化&checkpoint

  • 针对程序中多次被transformation或者action操作的RDD进行持久化操作,避免对一个RDD反复进行计算,再进一步优化,使用序列化Kryo的持久化级别
  • 为了保证RDD持久化数据在可能丢失的情况下还能实现高可靠,则需要对RDD执行CheckPoint操作

6.3 JVM垃圾回收调优

默认情况下,Spark使用每个Executor 60%的内存空间来缓存RDD,那么只有40%的内存空间来存放算子执行期间创建的对象

  • 如果垃圾回收频繁发生,就需要对这个比例进行调优,通过参数spark.storage.memoryFraction来修改比例

image-20220530111644485

统一内存管理

统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示:

image-20220531112830971

image-20220531112853360

其中最重要的优化在于动态占用机制,其规则如下:

  1. 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
  2. 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  3. 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
  4. 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

6.4 提高并行度

  • 要尽量设置合理的并行度,来充分地利用集群的资源,才能充分提高Spark程序的性能

  • 可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度,也可以使用spark.default.parallelism参数,来设置统一的并行度,Spark官方推荐,给集群的每个cpu core设置2-3个task

image-20220530144109360

6.5 数据本地化

数据本地化级别 解释
PROCESS_LOCAL 数据和计算它的代码在同一个JVM进程中
NODE_LOCAL 数据和计算它的代码在一个节点上,但是不在一个JVM进程中
NO_PREF 数据从哪里过来,性能都是一样的
RACK_LOCAL 数据和计算它的代码在一个机架上
ANY 数据可能在任意地方,比如其他网络环境内,或者其它机架上
  1. Spark倾向于使用最好的本地化级别调度task,但不现实
  2. Spark默认会等待指定时间,期望task要处理的数据所在的节点上的Executor空闲出一个cpu,从而将task分配过去,只要超过了时间,那么spark就会将task分配到其他任意一个空闲的Executor上
  3. 可以设置spark.locality系列参数,来调节spark等待task可以进行数据本地化的时间
    • spark.locality.wait.process
    • spark.locality.wait.node
    • spark.locality.wait.rack

6.6 算子优化

6.6.1 map vs mapPartitions

  • map: 一次处理一条数据

    • 因为可以GC回收处理过的数据,所以一般不会导致OOM异常
  • mapPartitions: 一次处理一个分区的数据

    • 如果元素过多,可能会导致OOM异常
    • 性能更高

建议针对初始化链接之类的操作,使用mapPartitions,放在mapPartitions内部
例如:创建数据库链接,使用mapPartitions可以减少链接创建的次数,提高性能
注意:创建数据库链接的代码建议放在次数,不要放在Driver端或者it.foreach内部
数据库链接放在Driver端会导致链接无法序列化,无法传递到对应的task中执行,所以算子在执行的时候会报错
数据库链接放在it.foreach()内部还是会创建多个链接,和使用map算子的效果是一样的

6.6.2 foreach vs foreachPartition

  • foreach:一次处理一条数据
  • foreachPartition:一次处理一个分区的数据

6.6.3 repartition

  • 对RDD进行重分区
    • 可以调整RDD的并行度
    • 可以解决RDD中数据倾斜的问题

6.6.4 reduceByKey vs groupByKey

reduceByKey会先进行预聚合,会减少数据量,性能更高

image-20220531101833726

Tags: