一、任务占用资源计算
executor占用CPU = executor_instances * executor_cores * 10 * 0.8(0.1核)
executor占用内存 = executor_instances * (executor.memory + max(executor.memoryOverhead, OffHeap.size) + executor.pyspark.memory)(GB)
其中,若参数未手动设置,会分配默认值。
也就是说,使用默认参数,每个executor就会分配4g + max(5g, 3.7g) + 6g = 15g的内存,对于一般任务已经足够使用。
-- driver
spark.driver.cores 1
spark.driver.memory 4g
-- executor
spark.executor.cores 2
spark.executor.memory 4g
spark.executor.memoryOverhead 5g
spark.executor.pyspark.memory 6g
-- Bytes,约为3.7G
spark.memory.offHeap.size 4000000000
二、用户需要关注的参数
如上,在使用默认配置时,每个executor就会分配15g内存,已经足够一般任务使用。
所以用户一般只需配置spark.executor.instances,spark.sql.shuffle.partitions,spark.default.parallelism即可。
如果配置后发现还是报OOM错误,可适当提高内存参数,重要参数含义见下方。
推荐配置:
spark.executor.instances 50
spark.sql.shuffle.partitions 300
spark.default.parallelism 300
三、重要参数含义
1、driver相关参数
driver实际申请内存大小计算公式:driver.memory + driver.memoryOverhead
-
spark.driver.memory
-
driver进程(JVM使用)的内存数,一般(memory/cores >= 2g)
-
通用配置:4g
-
df.collect()会返回所有数据的list,但是这个方法会将所有数据pull到driver,所以在遇到driver爆内存时,可以注意这一点。参数driver.memory调高。
-
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。
-
-
spark.driver.cores
-
默认1,driver程序使用的CPU内核数,若无过多driver单机处理操作,一般不需要配置
-
通用配置:2
-
-
spark.driver.memoryOverhead
-
driver JVM堆外内存的大小,默认为max(384, 0.1 * spark.driver.memory)
-
此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g
-
通用配置:1g
-
2、executor相关参数
-
spark.executor.instances
-
设置spark作业executor的个数executor.instances * executor.cores为当前application内并行运行task数,需要根据spark.sql.shuffle.partitions判断,一般保证executor.instances * executor.cores <= partitions / 2
-
通用配置:10
-
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
-
-
spark.executor.memory
-
每个executor进程(JVM使用)的内存大小
-
默认配置:4g
-
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/4~1/3,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行
-
-
spark.executor.cores
-
每个executor的core数目。每个core同一时间只能执行一个Task线程,cores的数目也就意味着每个executor并行task的数目。
-
每个task分配的内存大小是executor-memory/executor-cores,可以按照这个分析每个task所占用的内存大小,一般(memory/cores >= 2g)。
-
每个executor为1个进程,分配一个JVM,考虑到JVM加载task信息的数量,cores个数不要超过5,超出后会容易出现大量加载任务信息导致OOM的情况。
-
默认配置:2
-
-
spark.executor.memoryOverhead
-
executor JVM堆外内存大小,一般运行非JVM的逻辑
-
此部分内存主要用于JVM自身,字符串, NIO Buffer(Driect Buffer)等开销。此部分为用户代码及Spark 不可操作的内存,不足时可通过调整参数解决。
-
此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g
-
默认配置:5g
-
-
spark.executor.pyspark.memory
-
python的worker内存,仅在使用pyspark时生效
-
默认配置:6g
-
-
spark.shuffle.spill.numElementsForceSpillThreshold
-
默认256000000,即256M。
-
shuffle超过该数据会强行落盘此配置存在单位B,因此直接配置数字即可
-
通用配置:256000000
-
-
spark.sql.shuffle.partitions
-
对Spark SQL专用的设置
-
默认200,用于设置shuffle时partition的数目,只作用于SQL、DataSet的join/aggregations,无法对纯map操作生效。该参数代表了shuffle read task的并行度。
-
在用户shuffle OOM时,可考虑增大数目
-
通用配置:200
-
-
spark.default.parallelism
-
在处理RDD时才会起作用,对Spark SQL的无效
-
默认200,与上面作用相同,只作用于RDD的join/reduceByKey等,无法对纯map操作生效。
-
通用配置:200
-
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!
-
Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
-
-
spark.memory.fraction
-
默认0.75,用于存放缓存数据和运行数据,剩余0.25为User Memory,存放用户定义的数据结构和Spark元数据信息。
-
在用户persist大量数据或者shuffle聚合数据量比较大时可以考虑增加该值
-
缓存持久化(persist) + 运行(shuffle+执行编写的代码) = memory.fraction,默认为0.75,persist用memory.storageFraction参数指定,默认0.5
-
-
spark.memory.storageFraction(spark.memory.useLegacyMode(代表启用spark1.6前的版本)时,spark.storage.memoryFraction)
-
默认0.5, storage内存大小,用于存储缓存数据,剩余空间用于execute。
-
在Unified Memory Manage模式下,内存会自动调整,分配storage和execute使用,但是在storage内存不足时,会要回所有分配的内存。
-
在用户shuffle处理数据比较大时可减小该参数
-
-
spark.memory.offHeap.size
-
设置JVM堆外内存大小,可以运行executor JVM相关计算,默认为5000000000,即5G。
-
一般任务中只有部分shuffle需要大量操作,内存可能OOM时启用,在spark.memory.offHeap.enabled设为true时启用。
-
此配置存在单位B,因此直接配置数字即可
-
默认配置:4000000000
-