Spark性能优化
Spark配置介绍
-
Spark中的配置选项在四个地方可以进行配置,其中优先级如下:
SparkConf(代码) > spark-submit 或 spark-shell 命令行参数 > spark-defaults.conf > spark-env.sh > 默认值
-
在代码中配置的为静态配置,在spark-submit提交和spark-default.conf设置的参数为动态配置
-
spark的属性大致分为两种类型:
- 部署相关 如spark.driver.memory,spark.executor.instances,在sparkConfig中可能不起作用,用配置文件和命令行设置
- Spark运行时控制相关 如spark.task.maxFailures,任何方式都可配置
-
各种配置项的配置建议:
- 硬件资源类的(资源供给类:CPU、内存、磁盘、网络),spark-defaults.conf统一设置;
- 全局服务类的(比如动态扩容、External shuffle services,安全,压缩,Spark UI,等等),spark-defaults.conf统一设置;
- 任务粒度的配置(如Task重试、locality wait、是否推断speculative task,等等),用命令行,或是SparkConf,推荐用SparkConf,命令行不好管理
个人实践:在spark-env.sh中配置集群需要的配置;在spark-default.conf中配置job需要的配置,提交job时指定配置文件或适用默认配置文件spark-default.conf
一、CPU配置项
cpu的利用率由两个方面决定,要充分利用cpu要两方面资源相匹配
- 系统资源方面:集群、executor中的cpu核数
- 数据资源方面:数据分片的个数
cpu并发度(每个executor能并发执行几个task)
# 集群内满配cpu核数
spark.cores.max
# 单个Executor内cpu核数,standalone模式默认会使用全部核
spark.executor.cores
# 单个task计算任务消耗cpu核数,默认为1且不能小于1,大于1时是task任务为多线程的(大部分时候不必设置)
spark.task.cpus
数据并行度参数如下(将数据划分为多少块):
# 未指定分区数时RDD默认并行度
spark.default.parallelism
# SparkSQL框架下,数据关联、聚合操作时Reducer 在shuffle reduce阶段默认的并行度
spark.sql.shuffle.partitions
并发度=任务数=(spark.executor.cores)/(spark.task.cpus)
并发度基本由 spark.executor.cores 参数敲定,因为spark.task.cpus通常为1,且不能小于1,可以大于1(为了应对需要多个线程才能执行的任务)
Executor数=(spark.cores.max)/(spark.executor.cores)
以上是都是在standalone模式下的配置项,在yarn集群中可直接指定executor个数:
# 在yarn集群中指定executor的个数
spark.executor.instances
例子:在kafka中的分区数就是spark拉取数据的并行度度,在拉取clickhouse数据时指定的numPartitions就是数据的并行度。
建议:将数据并行度设置为cpu核数的2-3倍,以充分利用cpu,否则可能会导致task倾斜的问题(一些executor十分繁忙另外一些executor却没有在执行)。将分区数(数据并行度)调高也会将数据的分片大小减小,使每个分片执行的更快,但太高的并行度会导致调度花费较多时间。
二、内存配置项
Spark 会区分堆内内存(On-heap Memory)和堆外内存(Off-heap Memory)
堆内内存的申请与释放统一由 JVM 管理,堆外内存是 Spark 通过调用 Unsafe 的 allocateMemory 和 freeMemory 方法直接在操作系统内存中申请、释放内存空间
慎用堆外内存,官方推荐只用堆内内存。堆内外空间互相隔离,堆内、堆外是以Job为粒度划分的,也就是说,同一个Job,要么全用堆外,要么全用堆内。堆外、堆内的内存空间,是不能在同一个Job之内共享的。
堆外内存相关设置:
spark.memory.offHeap.enabled 默认false
spark.memory.offHeap.size 默认为0
堆外内存仅供了解,只使用堆内内存即可
堆内内存相关设置:
# 每个executor的内存绝对值大小,默认1g
spark.executor.memory
# 除用户内存外的计算和存储内存所占比例,默认0.6
spark.memory.fraction
# 计算和和存储内存中存储内存所占比例,默认0.5
spark.memory.storageFraction
-
Reserved Memory:固定为300MB,不受开发者控制,它是 Spark预留的、用来存储各种Spark内部对象的内存区域;
-
User Memory:用于存储开发者自定义的数据结构,例如RDD算子中引用的数组、列表、映射等等
-
Execution Memory:用来执行分布式任务。分布式任务的计算,主要包括数据的转换、过滤、映射、排序、聚并归并等环节,而计算环节的内存消耗,统统来自Executor Memory
-
Storage Memery:用于缓存分布式数据集,比如RDD Cache、广播变量等等。RDD Cache指的是RDD物化到内存中的副本。在果同一个RDD被引用多次,那么把这个RDD缓存到内大幅提升作业的执行性能。
举例:
内存 | 大小 |
---|---|
spark.executor.memory | 20GB |
spark.memory.offHeap.size | 10GB |
spark.memory.fraction | 0.8 |
spark.memory.storageFraction | 0.6 |
堆内内存:
Reserved Memory大小:300M
User Memory大小:20 x(1-0.8)= 4GB
Storage Memeory大小:20 x 0.8 x 0.6 = 9.6 GB
Execution Memory大小:20 x 0.8 x (1-0.6)= 6.4GB
堆外内存:
Storage Memeory大小:10 x 0.6 = 6GB
Execution Memory大小:10 x 0.4 = 4GB
在spark1.6版本之后,内存静态划分转换为动态管理内存,即 Execution Memory和Storage Memery可相互抢占,抢占规则如下(Execution Memeory更重要原则):
- 如果对方的内存空间有空闲,双方可以互相抢占
- 对于Storage Memory抢占的 Execution Memory部分,当分布式任务有计算需要时Storage Memory必须立即归还抢占的内存,涉及的存数据要么落盘、要么清除
- 对于 Execution Memory抢占的 Storage Memory部分,即便 Storage Memory有收回内存的需要,也必须要等到分布式任务执行完毕才能释放。
调优建议:
-
spark.memory.fraction可以尽可能调大,spark中用户内存用不了太多,主要使用计算和存储内存
-
ETL(Extract、Transform、Load)作业,业务抽取、转换、加载,数据只处理一次,不需要缓存,存储内存的比率适当降低;机器学习、图计算反复使用数据,计算内存比率适当增大
-
数据分片的大小与executor中每个核分得的内存大小基本相同
三、磁盘配置项
在Spark运行过程中会产生日志和在shuffle过程中会产生中间文件,将这些文件存放在固态硬盘上会使Spark拥有更好的性能
配置在spark-env.sh中,服务级别的配置
# spark暂存空间目录,存放map输出文件和RDDs,支持","分隔的多个目录。shuffle输出的文件
SPARK_LOCAL_DIRS=
# spark的worker工作目录,暂存空间存放全部日志,默认SPARK_HOME/work
SPARK_WORKER_DIR=
配置在spark-defaults.conf或sparkConf中,任务级别的配置,会被SPARK_LOCAL_DIRS设置的目录覆盖
# spark暂存空间目录,用来改善Shuffle中间文件存储,以及RDD Cache磁盘存储
spark.local.dir 目录
四、cache
在Spark计算过程中善用cache会极大提高性能,对重复使用的数据建议添加cache,而对只使用一两次的数据不建议添加cache,否则不仅浪费内存空间而且会降低Spark运行效率
使用的建议:
- 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,就坚决不使用 Cache
- 如果引用次数大于 1,且运行成本占比超过 30%,应当考虑启用 Cache
Cache table
# 创建临时视图再cache
df.createTempView("table_name")
spark.sql("cache tabel table_name")
CACHE [ LAZY ] TABLE table_identifier
[ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
-
LAZY可选,加了之后不立刻缓存,当第一次使用的时候缓存。不加默认立刻缓存
-
OPTIONS存储级别,默认MEMORY_AND_DISK
SER 字样的表示以序列化方式存储,不带 SER 则表示采用对象值,序列化存储(二进制存储)会节省存储空间,但是消耗计算资源最常用MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。这两种存储级别都是先尝试把数据缓存到内存, MEMORY_AND_DISK在内存不足时将数据缓存到磁盘
-
query 将查询结果缓存,如将testData表中查到的结果缓存为testCache表
CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testData;
Cache算子
df.cache.count
cache操作时惰性操作,只有action算子时才触发计算
只有 count 才会触发缓存的完全物化,而 first、take 和 show 这 3 个算子只会把涉及的数据物化,例如first只缓存1条数据,show只缓存20条数据
在此只介绍了Spark硬件方面的部分调优方法,此外还有SparkSQL、Shuffle、广播变量等方面的调优方法
本文参考极客时间中《零基础入门Spark》《Spark性能调优实战》