目录
3.2、Spark1.6.X及之后-Spark Unified Memory
1、Heap
由于Spark中的RDD实际上是Java对象,所以它被存放在JVM中的堆中,因为堆中存放的是对象与数组实例,所以垃圾回收主要在堆中进行(也有可能在方法区中)
堆内存可以说是分为3部分(方法区其实是堆外空间):
Permanent Space(永生区):即方法区,是一块独立于java堆的内存空间,有时也叫non-heap(并不是堆空间内的部分)。一个常住内存区域,用于存放JDK自身所携带的Class,Interface的元数据,即存储的是运行环境必须的类信息,被装载到此区域的数据是不会被垃圾回收掉的,直至关闭JVM才会释放
New Generation(新生代):程序中新建的对象都会分配到新生代中,新生代又由Eden Space 和两块Survivor Space构成,可通过-Xmn参数来指定其大小,Eden 和两块Survivor Space的大小比例默认是8:1:1,这个比例可通过-XX:SurvivorRatio来指定
Old Generation(老年代):用于存放程序中经过几次垃圾回收还存活的对象,例如缓存的对象,老年代所占的内存大小即为-Xmx大小减去-Xmn大小
2、Minor GC & Full GC
2.1、垃圾回收流程
1)所有新生成的对象首先都是放在新生代中,新生代的目标就是尽可能快速的收集掉那些生命周期短的对象。大部分对象在Eden区生成,当Eden区满时会触发minor GC(频率较高,不一定满了才触发)
2)还存活的对象将被复制到Survivor区中(两中的一个),当这个Survivor区满的时候,此区存放的对象会被复制到另一个Survivor区中。
3)当另一个Survivor也满的时候,从第一个Survivor复制过来的还存活的对象将被复制到老年代中。Survivor的两个区是相同的,没有先后关系,所以同一个Survivor取中可能存在从Eden复制过来的对象和从另一个Survivor复制过来的对象,而且Survivor总有一个是空的,而且可以配置多于两个。
4)老年代则是存放那些在程序中经历了好几次MinorGC仍然还活着的或者特别大的对象,当老年代满了则会发生FullGC
2.2、Spark中JVM优化原因
Spark task执行算子函数时会生成大量对象,这些对象会被放入年轻代中。年轻代内存比较小时,会导致年轻代中Eden区和Survivor区频繁内存溢满,导致频繁的minor GC。而频繁的minorGC或导致一些存活的短声明周期(其实就是在后面用不到的对象)对象直接放入老年代中,而当老年代内存溢满是则会导致Full GC
无论是Full GC还是Minor GC,都会导致JVM的工作线程停止工作。也就是说,GC的时候,spark就停止工作了,等着垃圾回收结束。
说到底还是堆中内存分配不当的原因,所以很有必要了解一下Spark中JVM的内存管理
3、Spark内存管理
Spark基于内存的观点其实有一些不当,准确来说Spark只是优先充分利用内存而已
在数据规模、硬件条件已经确定的情况下,有多少 Executor 以及每个 Executor 可分配多少内存,我们需要知道內存最多能够缓存多少数据;在 Shuffle 的过程中又使用了多少比例的缓存,这样对于算法的编写以及业务实现是至关重要。
在Spark1.5版本及之前,Spark采用的是静态内存管理模型。在1.6版本推出后,Spark曹勇了同一(unified)内存管理模型
3.1、Spark 1.5.X及之前-静态内存管理
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图
- Storage区域(60% of heap)
Storage:Storage区域的90%,存储的是经broadcast,cache,persist数据的地方。
Unroll:Storage的20%,比如通过persist的数据会进过序列化再持久化到内存或磁盘中,当读出来的时候就需要反序列化存储在这里
预留部分:Storage区域的10%,用来防止OMM。
- Execution区域(20% of heap),也成Shuffle区域
Execution:Execution区域的80%,执行内存,像join、aggregate等这些shuffle操作包括map也会先缓存在这里,若满了才会写入磁盘
- other区域(20% of heap):程序执行时的预留空间,向task的执行和task执行时产生的对象会保存在这里
参数总结
spark.executor.memory //JVM Heap,默认512MB
spark.storage.memoryFraction //Storage区域(包括预留),默认60%
spark.storage.safetyFraction //Storage,默认90%
spark.storage.unrollFraction //Unroll,默认20%
//Storage区域的预留,10%
spark.shuffle.memoryFraction //Execution区域(包括预留),默认20%
spark.shuffle.safteyFraction //Execution,80%
//Execution区域的预留,20%
//heap的预留,20%
3.2、Spark1.6.X及之后-Spark Unified Memory
联合内存 (Spark Unified Memeory),数据缓存(Storage)与数据执行(Execution)之间的内存可以相互移动,这是一种更弹性的方式。
在2.1.X之后对有些参数作出了调整
注意:其中的75%表示heap-保留区域的75%
- Reserved Memory
默认都是300MB,这个数字一般都是固定不变的,在系统运行的时候 Java Heap 的大小至少为 Heap Reserved Memory x 1.5,即300MB x 1.5 = 450MB的JVM配置。
- Spark Memory(1.6:60%;2.1:75%)
系统框架运行时需要使用的空间,这是从两部份构成的,分别是 Storage Memeory 和 Execution Memory。现在 Storage(与1.5.X中Storage相同) 和 Execution (相当于1.5.X中的Execution) 采用了 Unified 的方式共同使用了Spark Memory。默认情况下 Storage 和 Execution 各占该空间的 50%。Storgae 和 Execution 的存储空间可以往上和往下移动。
1)Storage Memory:相当于旧版本的 Storage 空间,负责存储 Persist、Unroll 以及 Broadcast 的数据
2)Execution Memory:相当于旧版本的 Execution 空间,负责存储 ShuffleMapTask 的数据,比如从上一个 Stage 抓取数据和一些聚合的操作等
Unified 意为 Storgae 和 Execution 在适当时候可以借用彼此的 Memory,需要注意的是当 Execution 空间不足而且 Storage 空间也不足的情况下,Storage 空间如果曾经使用了超过 Unified 默认的 50% 空间的话则超过部份会被强制GC掉一部份数据来解决 Execution 空间不足的问题 (注意:drop 后数据会不会丢失主要是看你在程序设置的 storage_level 来决定你是 Drop 到那里,可能 Drop 到磁盘上),这是因为执行(Execution) 比缓存 (Storage) 是更重要的事情。
- User Memory(1.6:40%;2.1:25%)
用户操作空间,Spark 程序中产生的临时数据或者是自己维护的一些数据结构也需要给予它一部份的存储空间,这样设计可以让用户操作时所需要的空间与系统框架运行时所需要的空间分离开。
假设 Executor 有 4G 的大小,那么在默认情况下 User Memory 大小是:(4G - 300MB) x 25% = 949MB,也就是说一个 Stage 内部展开后 Task 的算子在运行时最大的大小不能够超过 949MB。例如工程师使用 mapPartition 等,一个 Task 内部所有算子使用的数据空间的大小如果大于 949MB 的话,那么就会出现 OOM。
4、Spark(2.1.X)的JVM调优
在 Yarn 上启动 Spark Application 的时候可以通过以下参数来调优:
- -num-executor 或者 spark.executor.instances 来指定运行时所需要的 Executor 的个数;
- -executor-memory 或者 spark.executor.memory 来指定每个 Executor 在运行时所需要的内存空间;
- -executor-cores 或者是 spark.executor.cores 来指定每个 Executor 在运行时所需要的 Cores 的个数;
- -driver-memory 或者是 spark.driver.memory 来指定 Driver 内存的大小;
- spark.task.cpus 来指定每个 Task 运行时所需要的 Cores 的个数;
1)减小Storage Memory(增大Execution Memory)
默认情况下,给RDD的cache操作的内存占比是0.6,即60%内存用来给RDD做缓存用。但其实RDD并不需要这么大的内存,我们可以通过查看每个stage中每个task运行的时间,GC时间等来判断是否发生了频繁的minorGC和fullGC,从而来调低spark.storage.memoryFractio的比例
2)增大预留内存
当Spark处理超大数据量时,executor的预留内存可能会不够用,可能出现shuffle file can’t find、task lost、OOM等情况。默认情况下,这个预留内存为300M。当运行超大数据量时可以调节到1G、2G、4G等大小 。调节方法必须在spark-submit提交脚本中设置而不能在程序中设置
--conf spark.yarn.executor.memoryOverhead=2048
3)GC引起的连接等待时长
Spark在处理超大数据量时,task可能会创建很大很多的对象,频繁的让JVM内存溢满,从而导致频繁GC。executor优先的从本地关联的blockmanager获取数据。如果没有的话,会通过transferService去远程连接其他executor的blockmanager,如果正好碰到那个executor正在垃圾回收,那么程序就会卡住。spark默认网络连接时长是60s,当超过60s没有获取到数据,则直接宣告任务失败。也有可能DAGscheduler反复提交几次stage,TaskScheduler反复提交task会大大影响spark运行速度,所以可以考虑适当调节等待时长。 必须在提交spark程序的脚本中设置
//增大网络连接超时时间,等待GC完成,尽量少让TaskScheduler反复提交task
--conf spark.core.connection.ack.wait.timeout=300
4)JVM的GC导致拉取文件失败
还是上面的情景
spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s
第一个参数的意思是shuffle文件拉取的时候,如果没有拉取到或拉取失败,最多会重试几次,第二个参数的意思是每一次重新拉取文件的时间间隔是5s
所以说,当上一个stage的executor正在发生漫长的FullGC,导致第二个stage的executor尝试去拉取文件,而拉取不到,默认情况下,会反复重试3次,每次间隔是5秒钟,最多只会等待3*5=15秒,如果15秒没有拉取到数据,就会报出shuffle file not found... 错误
所以当程序频繁报该错误时,需要调节大这两个参数
推荐一篇博客:【总结】Spark任务的core,executor,memory资源配置方法
参考资料