Spark性能优化

Spark配置介绍

  • Spark中的配置选项在四个地方可以进行配置,其中优先级如下:

    SparkConf(代码) > spark-submit 或 spark-shell 命令行参数 > spark-defaults.conf > spark-env.sh > 默认值

  • 在代码中配置的为静态配置,在spark-submit提交和spark-default.conf设置的参数为动态配置

  • spark的属性大致分为两种类型:

    1. 部署相关 如spark.driver.memory,spark.executor.instances,在sparkConfig中可能不起作用,用配置文件和命令行设置
    2. Spark运行时控制相关 如spark.task.maxFailures,任何方式都可配置
  • 各种配置项的配置建议:

    1. 硬件资源类的(资源供给类:CPU、内存、磁盘、网络),spark-defaults.conf统一设置;
    2. 全局服务类的(比如动态扩容、External shuffle services,安全,压缩,Spark UI,等等),spark-defaults.conf统一设置;
    3. 任务粒度的配置(如Task重试、locality wait、是否推断speculative task,等等),用命令行,或是SparkConf,推荐用SparkConf,命令行不好管理

个人实践:在spark-env.sh中配置集群需要的配置;在spark-default.conf中配置job需要的配置,提交job时指定配置文件或适用默认配置文件spark-default.conf

一、CPU配置项

cpu的利用率由两个方面决定,要充分利用cpu要两方面资源相匹配

  • 系统资源方面:集群、executor中的cpu核数
  • 数据资源方面:数据分片的个数

cpu并发度(每个executor能并发执行几个task)

# 集群内满配cpu核数
spark.cores.max  
# 单个Executor内cpu核数,standalone模式默认会使用全部核
spark.executor.cores   
# 单个task计算任务消耗cpu核数,默认为1且不能小于1,大于1时是task任务为多线程的(大部分时候不必设置)
spark.task.cpus        

数据并行度参数如下(将数据划分为多少块):

# 未指定分区数时RDD默认并行度
spark.default.parallelism    
# SparkSQL框架下,数据关联、聚合操作时Reducer 在shuffle reduce阶段默认的并行度
spark.sql.shuffle.partitions  

并发度=任务数=(spark.executor.cores)/(spark.task.cpus)

并发度基本由 spark.executor.cores 参数敲定,因为spark.task.cpus通常为1,且不能小于1,可以大于1(为了应对需要多个线程才能执行的任务)

Executor数=(spark.cores.max)/(spark.executor.cores)

以上是都是在standalone模式下的配置项,在yarn集群中可直接指定executor个数:

# 在yarn集群中指定executor的个数
spark.executor.instances 

例子:在kafka中的分区数就是spark拉取数据的并行度度,在拉取clickhouse数据时指定的numPartitions就是数据的并行度。

建议:将数据并行度设置为cpu核数的2-3倍,以充分利用cpu,否则可能会导致task倾斜的问题(一些executor十分繁忙另外一些executor却没有在执行)。将分区数(数据并行度)调高也会将数据的分片大小减小,使每个分片执行的更快,但太高的并行度会导致调度花费较多时间。

二、内存配置项

Spark 会区分堆内内存(On-heap Memory)和堆外内存(Off-heap Memory)

堆内内存的申请与释放统一由 JVM 管理,堆外内存是 Spark 通过调用 Unsafe 的 allocateMemory 和 freeMemory 方法直接在操作系统内存中申请、释放内存空间

慎用堆外内存,官方推荐只用堆内内存。堆内外空间互相隔离,堆内、堆外是以Job为粒度划分的,也就是说,同一个Job,要么全用堆外,要么全用堆内。堆外、堆内的内存空间,是不能在同一个Job之内共享的。

堆外内存相关设置:

spark.memory.offHeap.enabled 默认false
spark.memory.offHeap.size    默认为0

堆外内存仅供了解,只使用堆内内存即可

堆内内存相关设置:

# 每个executor的内存绝对值大小,默认1g
spark.executor.memory 
# 除用户内存外的计算和存储内存所占比例,默认0.6
spark.memory.fraction
# 计算和和存储内存中存储内存所占比例,默认0.5
spark.memory.storageFraction

  • Reserved Memory:固定为300MB,不受开发者控制,它是 Spark预留的、用来存储各种Spark内部对象的内存区域;

  • User Memory:用于存储开发者自定义的数据结构,例如RDD算子中引用的数组、列表、映射等等

  • Execution Memory:用来执行分布式任务。分布式任务的计算,主要包括数据的转换、过滤、映射、排序、聚并归并等环节,而计算环节的内存消耗,统统来自Executor Memory

  • Storage Memery:用于缓存分布式数据集,比如RDD Cache、广播变量等等。RDD Cache指的是RDD物化到内存中的副本。在果同一个RDD被引用多次,那么把这个RDD缓存到内大幅提升作业的执行性能。

举例:

内存 大小
spark.executor.memory 20GB
spark.memory.offHeap.size 10GB
spark.memory.fraction 0.8
spark.memory.storageFraction 0.6

堆内内存:

Reserved Memory大小:300M

User Memory大小:20 x(1-0.8)= 4GB

Storage Memeory大小:20 x 0.8 x 0.6 = 9.6 GB

Execution Memory大小:20 x 0.8 x (1-0.6)= 6.4GB

堆外内存:

Storage Memeory大小:10 x 0.6 = 6GB

Execution Memory大小:10 x 0.4 = 4GB

在spark1.6版本之后,内存静态划分转换为动态管理内存,即 Execution Memory和Storage Memery可相互抢占,抢占规则如下(Execution Memeory更重要原则):

  • 如果对方的内存空间有空闲,双方可以互相抢占
  • 对于Storage Memory抢占的 Execution Memory部分,当分布式任务有计算需要时Storage Memory必须立即归还抢占的内存,涉及的存数据要么落盘、要么清除
  • 对于 Execution Memory抢占的 Storage Memory部分,即便 Storage Memory有收回内存的需要,也必须要等到分布式任务执行完毕才能释放。

调优建议:

  • spark.memory.fraction可以尽可能调大,spark中用户内存用不了太多,主要使用计算和存储内存

  • ETL(Extract、Transform、Load)作业,业务抽取、转换、加载,数据只处理一次,不需要缓存,存储内存的比率适当降低;机器学习、图计算反复使用数据,计算内存比率适当增大

  • 数据分片的大小与executor中每个核分得的内存大小基本相同

三、磁盘配置项

在Spark运行过程中会产生日志和在shuffle过程中会产生中间文件,将这些文件存放在固态硬盘上会使Spark拥有更好的性能

配置在spark-env.sh中,服务级别的配置

# spark暂存空间目录,存放map输出文件和RDDs,支持","分隔的多个目录。shuffle输出的文件
SPARK_LOCAL_DIRS=
# spark的worker工作目录,暂存空间存放全部日志,默认SPARK_HOME/work
SPARK_WORKER_DIR=

配置在spark-defaults.conf或sparkConf中,任务级别的配置,会被SPARK_LOCAL_DIRS设置的目录覆盖

# spark暂存空间目录,用来改善Shuffle中间文件存储,以及RDD Cache磁盘存储
spark.local.dir 目录

四、cache

在Spark计算过程中善用cache会极大提高性能,对重复使用的数据建议添加cache,而对只使用一两次的数据不建议添加cache,否则不仅浪费内存空间而且会降低Spark运行效率

使用的建议:

  • 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,就坚决不使用 Cache
  • 如果引用次数大于 1,且运行成本占比超过 30%,应当考虑启用 Cache

Cache table

# 创建临时视图再cache
df.createTempView("table_name")
spark.sql("cache tabel table_name")
CACHE [ LAZY ] TABLE table_identifier
    [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
  • LAZY可选,加了之后不立刻缓存,当第一次使用的时候缓存。不加默认立刻缓存

  • OPTIONS存储级别,默认MEMORY_AND_DISK
    SER 字样的表示以序列化方式存储,不带 SER 则表示采用对象值,序列化存储(二进制存储)会节省存储空间,但是消耗计算资源

    最常用MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。这两种存储级别都是先尝试把数据缓存到内存, MEMORY_AND_DISK在内存不足时将数据缓存到磁盘

  • query 将查询结果缓存,如将testData表中查到的结果缓存为testCache表

    CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testData;
    

Cache算子

df.cache.count

cache操作时惰性操作,只有action算子时才触发计算

只有 count 才会触发缓存的完全物化,而 first、take 和 show 这 3 个算子只会把涉及的数据物化,例如first只缓存1条数据,show只缓存20条数据

在此只介绍了Spark硬件方面的部分调优方法,此外还有SparkSQL、Shuffle、广播变量等方面的调优方法

本文参考极客时间中《零基础入门Spark》《Spark性能调优实战》