上一篇我们分析了执行内存池和存储内存池,它们是内存的抽象,负责记录内存的使用情况,不负责直接的内存管理。Spark 为存储内存和执行内存的管理提供了统一的接口MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存。Spark1.6前,采用的静态内存管理机制,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,在Spark 3.0中Spark移除了StaticMemoryManager,详见SPARK-26539,代码,统一使用统一内存管理作为默认MemoryManager,统一内存管理可以在存储内存和执行内存之间相互借用,使内存尽可能充分利用。
MemoryManager
MemoryManager是管理execution内存和storage内存的抽象类。其中execution内存主要被任务使用,用来进行计算、转换等操作,如shuffle,join,排序,聚合等。storage内存被BlockManager所使用,用来保存block数据。每一个JVM只有一个MemoryManager,MemoryManager提供了获取和释放execution内存和storage内存的方法。
主要变量
我们来看下MemoryManager的实现,包含了堆内/堆外的执行/存储内存四个变量来记录真实的内存情况。
- 堆内内存的大小是堆内存储内存大小[onHeapStorageMemory]和堆内执行内存大小[onHeapExecutionMemory]的和,这两个大小都是由具体的MemoryManager子类根据用户设定的executor-memory参数进行划分得到的,具体划分是由子类实现的;
- 堆外内存是否开启是由参数
spark.memory.offHeap.enabled
决定的,如果开启了堆外内存大小等于参数spark.memory.offHeap.size
的值,存储内存的大小等于参数spark.memory.storageFraction
和总内存大小的积,而执行内存则是总内存与存储内存的差,从源码中可以清楚地看出来。
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int, // 核数
onHeapStorageMemory: Long, // 堆内存储内存大小
onHeapExecutionMemory: Long) extends Logging { // 堆内执行内存大小
/** 管理了on-heap & off-heap的存储内存池和执行内存池 */
@GuardedBy("this") // MemoryPool中的lock对象就是MemoryManager对象
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
// 堆内内存大小分配
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
// 堆外内存大小分配
// spark.memory.offHeap.size控制堆外内存大小
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
// 堆外内存中存储内存是由spark.memory.storageFraction控制,默认为0.5
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
}
另外还包含了Tungsten的一些常量,这三个常量分别定义了tungsten的内存形式、内存页大小和内存分配器。
final val tungstenMemoryMode: MemoryMode = { // MemoryMode
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
/** Tungsten采用的Page的默认大小[单位为字节], 可通过spark.buffer.pageSize属性进行配置。
如果未指定spark.buffer.pageSize属性,则使用该方法进行计算。 */
val pageSizeBytes: Long = {
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
// 获取CPU核数,如果指定了numCores就使用numCores,否则使用机器的CPU可用核数
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
val safetyFactor = 16
val maxTungstenMemory: Long = tungstenMemoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
}
/** 计算页大小: 传入的参数是maxTungstenMemory/cores/safetyFactor,
* 最终得到的页大小是小于maxTungstenMemory/cores/safetyFactor的最大的2的次方值。*/
val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
// 页的大小需要在 1MB ~ 64MB 之间
val default = math.min(maxPageSize, math.max(minPageSize, size))
// 尝试从spark.buffer.pageSize参数获取,如果没有指定就使用上面计算的默认值
conf.get(BUFFER_PAGESIZE).getOrElse(default)
}
/** Tungsten采用的内存分配器(MemoryAllocator)
如果tungstenMemoryMode为MemoryMode.ON_HEAP,那么tungstenMemoryAllocator为堆内存分配器(HeapMemoryAllocator),
否则为使用sun.misc.Unsafe的API分配操作系统内存的分配器UnsafeMemoryAllocator。*/
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
tungstenMemoryMode match {
case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}
内存申请&释放
内存的申请都是未实现的,需要具体的子类进行实现,内存的释放都是委托给对应的MemoryPool来做了。
/** 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。 */
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**为展开BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。*/
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/** 为执行taskAttemptId对应的TaskAttempt,从堆内存或堆外内存获取所需大小(即numBytes)的内存。*/
private[memory]
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
/** 从堆内存或堆外内存释放taskAttemptId对应的TaskAttempt所消费的指定大小(即numBytes)的执行内存。*/
private[memory]
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
}
/**从堆内存及堆外内存释放taskAttemptId代表的TaskAttempt所消费的所有执行内存。*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
/** 从堆内存或堆外内存释放指定大小的内存。*/
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}
/**从堆内存及堆外内存释放所有内存。 */
final def releaseAllStorageMemory(): Unit = synchronized {
onHeapStorageMemoryPool.releaseAllMemory()
offHeapStorageMemoryPool.releaseAllMemory()
}
/** 释放指定大小的展开内存。 */
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
releaseStorageMemory(numBytes, memoryMode)
}
内存使用查看
MemoryManager还提供了查看堆内堆外执行内存,存储内存的内存使用情况,以及单个Task的内存占用情况,都比较简单,可以参考源码:
/** 获取堆上执行内存池与堆外执行内存池已经使用的执行内存之和 */
final def executionMemoryUsed: Long = synchronized {
onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed
}
/** onHeapStorageMemoryPool与offHeapStorageMemoryPool中一共占用的存储内存。 */
final def storageMemoryUsed: Long = synchronized {
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}
/** onHeapExecutionMemoryPool中一共占用执行内存 */
final def onHeapExecutionMemoryUsed: Long = synchronized {
onHeapExecutionMemoryPool.memoryUsed
}
/** offHeapExecutionMemoryPool中一共占用执行内存 */
final def offHeapExecutionMemoryUsed: Long = synchronized {
offHeapExecutionMemoryPool.memoryUsed
}
/** onHeapStorageMemoryPool中一共占用执行内存 */
final def onHeapStorageMemoryUsed: Long = synchronized {
onHeapStorageMemoryPool.memoryUsed
}
/** offHeapStorageMemoryPool中一共占用执行内存 */
final def offHeapStorageMemoryUsed: Long = synchronized {
offHeapStorageMemoryPool.memoryUsed
}
/** 获取某个任务占用的内存 */
private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
}
StaticMemoryManager
虽然StaticMemoryManager已经在spark 3.0中移除了,我们还是看下它的思路,这样才知道后面默认使用的统一内存管理的优点,静态内存管理是由参数spark.memory.useLegacyMode
控制的,如果开启则使用静态内存管理,执行内存池和存储内存池之间有严格的界限,两个池的大小永不改变。整体内存结构如下图所示:
从上图可以看出来,整体内存分为存储内存(默认占比0.6,有参数spark.shuffle.memoryFraction
控制)、执行内存(默认占比0.2)以及其他内存(默认占比0.2,用来存储用户代码中自定义的数据结构,以及Spark内部的一些元数据)三个部分,最小的内存不能低于32M,下面我们来通过源码看下各部分的内存具体分配。
- 获取最大的执行内存,来进行执行内存池的扩充,由于执行内存使用过程中是用估算的方式进行内存判断,所以为了降低OOM的风险,执行内存中有一个最大安全系数[参数
spark.shuffle.safetyFraction
默认为0.8],用来防止执行内存溢出,所以最大的执行内存值是可用的最大内存 * 0.2 * 0.8
为什么除了实际占比之外,还会有一个安全比例呢?我们已经知道,Spark中的对象可以序列化存储,也可以非序列化存储。对于序列化对象,可以通过其字节流的长度获知其大小。而对于非序列化对象,其占用的内存就只能通过估算得到,与实际情况可能出入较大。另外,MemoryManager申请的内存空间可能还未实际分配,而标记为释放的内存空间也可能并未被JVM实际GC掉,存在滞后性。总之,Spark并不能准确地跟踪堆内内存的占用量,为了防止偏差过大出现OOM,就必须预留一些缓冲空间了。默认会预留10%的存储内存、20%的执行内存作为缓冲。
private def getMaxExecutionMemory(conf: SparkConf): Long = {
// 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存<这个是JVM启动时候指定的>
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
// 判断可用最大内存是否小于32MB,如果小于32MB则抛出异常
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// 判断是否有spark.executor.memory配置
if (conf.contains("spark.executor.memory")) {
// 获取spark.executor.memory配置,即Execution使用的内存大小
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
// 判断Execution使用的内存大小是否小于32MB,如果小于32MB则抛出异常
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
// 执行内存占比,默认为0.2,即存储内存占内存池总大小的20%
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
// 执行内存的最大安全系数,默认为0.8,该值用于防止执行内存溢出
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
// 因此,执行内存的最大大小为 可用的最大内存 * 0.2 * 0.8,即可用最大内存的16%
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
-
获取最大的存储内存,来进行存储内存池的扩充,同样为了降低OOM的风险,存储内存也有一个安全系数[
spark.storage.safetyFraction
,默认为0.9],所以得到的最大内存是可用最大内存 * 0.6 * 0.9private def getMaxStorageMemory(conf: SparkConf): Long = { // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) // 存储内存占比,默认为0.6,即存储内存占内存池总大小的60% val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) // 存储内存的最大安全系数,默认为0.9,该值用于防止存储内存溢出 val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) // 因此,存储内存的最大大小为 可用的最大内存 * 0.6 * 0.9,即可用最大内存的54% (systemMaxMemory * memoryFraction * safetyFraction).toLong }
-
填充内存池,通过MemoryManager进行扩充内存池大小;另外不支持堆外存储内存,所以要把原本属于堆外存储池的空间转移到堆外执行池,代码如下所示:
// The StaticMemoryManager does not support off-heap storage memory: offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize) offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
-
另外还提供了一个maxUnrollMemory,是通过
spark.storage.unrollFraction
来控制的,默认是0.2
申请内存
StaticMemoryManager重写了三个申请内存的方法,我们分别来分析具体实现。
acquireStorageMemory
由于静态内存管理器不支持堆外存储内存,所以获取存储内存要先验证内存模型,如果申请内存大小过大,则返回错误,不然交给存储内存池进行申请内存即可。
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
// StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap storage memory")
if (numBytes > maxOnHeapStorageMemory) { // 申请的内存大小大于最大的堆内存储内存大小
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxOnHeapStorageMemory bytes)")
// 直接返回false
false
} else { // 交给onHeapStorageMemoryPool内存池处理
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}
acquireUnrollMemory
由于将RDD展开为块需要占用连续的存储空间,在必要的情况下需要释放其他缓存的空间,以放下这个块。释放空间的上限为“最大展开内存 - 现占用的展开内存 - 空闲存储内存”,之所以要规定这个上限,是为了防止展开一个超大的块导致所有缓存都失效。
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
// StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
// 获取当前可用于Unroll的存储内存大小
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
// 当前空闲的存储内存大小
val freeMemory = onHeapStorageMemoryPool.memoryFree
// 不足的内存大小 = 最大可用于Unroll的存储内存大小 - 当前可用于Unroll的存储内存大小 - 当前空闲的存储内存大小
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// 计算需要释放的内存大小
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
/** 委托给onHeapStorageMemoryPool进行处理,注意第3个参数 acquireMemory()方法内会尝试腾出额外的空间以满足不足的内存*/
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
acquireExecutionMemory
获取执行内存,只需要根据内存Mode交给相应的内存池进行处理即可。
private[memory] override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {// 委托给具体的内存池处理
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
总结
从上面分析,可以看出StaticMemoryManager的内存区域都是由各个比例参数规定好的。这样实现起来简单,但是在复杂业务场景或者参数设定不当时,它容易造成一方内存过剩而另一方内存紧张,需要很强的调参能力才能应对复杂的程序处理,所以spark 1.6以后都默认为统一内存管理器,我们下节再说。