淘先锋技术网

首页 1 2 3 4 5 6 7

一、任务占用资源计算

executor占用CPU = executor_instances * executor_cores * 10 * 0.8(0.1核)

executor占用内存 = executor_instances * (executor.memory + max(executor.memoryOverhead, OffHeap.size) + executor.pyspark.memory)(GB)

其中,若参数未手动设置,会分配默认值。

也就是说,使用默认参数,每个executor就会分配4g + max(5g, 3.7g) + 6g = 15g的内存,对于一般任务已经足够使用

-- driver
spark.driver.cores 1
spark.driver.memory 4g
-- executor
spark.executor.cores 2
spark.executor.memory 4g
spark.executor.memoryOverhead 5g
spark.executor.pyspark.memory 6g
-- Bytes,约为3.7G
spark.memory.offHeap.size 4000000000 

二、用户需要关注的参数

如上,在使用默认配置时,每个executor就会分配15g内存,已经足够一般任务使用。

所以用户一般只需配置spark.executor.instances,spark.sql.shuffle.partitions,spark.default.parallelism即可。

如果配置后发现还是报OOM错误,可适当提高内存参数,重要参数含义见下方。

推荐配置:

spark.executor.instances 50
spark.sql.shuffle.partitions 300
spark.default.parallelism 300

三、重要参数含义

1、driver相关参数

driver实际申请内存大小计算公式:driver.memory + driver.memoryOverhead

  • spark.driver.memory

    • driver进程(JVM使用)的内存数,一般(memory/cores >= 2g)

    • 通用配置:4g

    • df.collect()会返回所有数据的list,但是这个方法会将所有数据pull到driver,所以在遇到driver爆内存时,可以注意这一点。参数driver.memory调高。

    • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。

  • spark.driver.cores

    • 默认1,driver程序使用的CPU内核数,若无过多driver单机处理操作,一般不需要配置

    • 通用配置:2

  • spark.driver.memoryOverhead

    • driver JVM堆外内存的大小,默认为max(384, 0.1 * spark.driver.memory)

    • 此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g

    • 通用配置:1g

2、executor相关参数

  • spark.executor.instances

    • 设置spark作业executor的个数executor.instances * executor.cores为当前application内并行运行task数,需要根据spark.sql.shuffle.partitions判断,一般保证executor.instances * executor.cores <= partitions / 2

    • 通用配置:10

    • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

  • spark.executor.memory

    • 每个executor进程(JVM使用)的内存大小

    • 默认配置:4g

    • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/4~1/3,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行

  • spark.executor.cores

    • 每个executor的core数目。每个core同一时间只能执行一个Task线程,cores的数目也就意味着每个executor并行task的数目。

    • 每个task分配的内存大小是executor-memory/executor-cores,可以按照这个分析每个task所占用的内存大小,一般(memory/cores >= 2g)。

    • 每个executor为1个进程,分配一个JVM,考虑到JVM加载task信息的数量,cores个数不要超过5,超出后会容易出现大量加载任务信息导致OOM的情况。

    • 默认配置:2

  • spark.executor.memoryOverhead

    • executor JVM堆外内存大小,一般运行非JVM的逻辑

    • 此部分内存主要用于JVM自身,字符串, NIO Buffer(Driect Buffer)等开销。此部分为用户代码及Spark 不可操作的内存,不足时可通过调整参数解决。

    • 此配置存在默认单位MB,因此直接配置数字或带具体单位,最少1g

    • 默认配置:5g

  • spark.executor.pyspark.memory

    • python的worker内存,仅在使用pyspark时生效

    • 默认配置:6g

  • spark.shuffle.spill.numElementsForceSpillThreshold

    • 默认256000000,即256M。

    • shuffle超过该数据会强行落盘此配置存在单位B,因此直接配置数字即可

    • 通用配置:256000000

  • spark.sql.shuffle.partitions

    • 对Spark SQL专用的设置

    • 默认200,用于设置shuffle时partition的数目,只作用于SQL、DataSet的join/aggregations,无法对纯map操作生效。该参数代表了shuffle read task的并行度。

    • 在用户shuffle OOM时,可考虑增大数目

    • 通用配置:200

  • spark.default.parallelism

    • 在处理RDD时才会起作用,对Spark SQL的无效

    • 默认200,与上面作用相同,只作用于RDD的join/reduceByKey等,无法对纯map操作生效。

    • 通用配置:200

    • 通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!

    • Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

  • spark.memory.fraction

    • 默认0.75,用于存放缓存数据和运行数据,剩余0.25为User Memory,存放用户定义的数据结构和Spark元数据信息。

    • 在用户persist大量数据或者shuffle聚合数据量比较大时可以考虑增加该值

    • 缓存持久化(persist) + 运行(shuffle+执行编写的代码) = memory.fraction,默认为0.75,persist用memory.storageFraction参数指定,默认0.5

  • spark.memory.storageFraction(spark.memory.useLegacyMode(代表启用spark1.6前的版本)时,spark.storage.memoryFraction)

    • 默认0.5, storage内存大小,用于存储缓存数据,剩余空间用于execute。

    • 在Unified Memory Manage模式下,内存会自动调整,分配storage和execute使用,但是在storage内存不足时,会要回所有分配的内存。

    • 在用户shuffle处理数据比较大时可减小该参数

  • spark.memory.offHeap.size

    • 设置JVM堆外内存大小,可以运行executor JVM相关计算,默认为5000000000,即5G。

    • 一般任务中只有部分shuffle需要大量操作,内存可能OOM时启用,在spark.memory.offHeap.enabled设为true时启用。

    • 此配置存在单位B,因此直接配置数字即可

    • 默认配置:4000000000