淘先锋技术网

首页 1 2 3 4 5 6 7

上一篇我们分析了执行内存池和存储内存池,它们是内存的抽象,负责记录内存的使用情况,不负责直接的内存管理。Spark 为存储内存和执行内存的管理提供了统一的接口MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存。Spark1.6前,采用的静态内存管理机制,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,在Spark 3.0中Spark移除了StaticMemoryManager,详见SPARK-26539代码,统一使用统一内存管理作为默认MemoryManager,统一内存管理可以在存储内存和执行内存之间相互借用,使内存尽可能充分利用。

MemoryManager

MemoryManager是管理execution内存和storage内存的抽象类。其中execution内存主要被任务使用,用来进行计算、转换等操作,如shuffle,join,排序,聚合等。storage内存被BlockManager所使用,用来保存block数据。每一个JVM只有一个MemoryManager,MemoryManager提供了获取和释放execution内存和storage内存的方法。

主要变量

我们来看下MemoryManager的实现,包含了堆内/堆外的执行/存储内存四个变量来记录真实的内存情况。

  1. 堆内内存的大小是堆内存储内存大小[onHeapStorageMemory]和堆内执行内存大小[onHeapExecutionMemory]的和,这两个大小都是由具体的MemoryManager子类根据用户设定的executor-memory参数进行划分得到的,具体划分是由子类实现的;
  2. 堆外内存是否开启是由参数spark.memory.offHeap.enabled决定的,如果开启了堆外内存大小等于参数spark.memory.offHeap.size的值,存储内存的大小等于参数spark.memory.storageFraction和总内存大小的积,而执行内存则是总内存与存储内存的差,从源码中可以清楚地看出来。
private[spark] abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int, // 核数
    onHeapStorageMemory: Long, // 堆内存储内存大小
    onHeapExecutionMemory: Long) extends Logging { // 堆内执行内存大小

  /** 管理了on-heap & off-heap的存储内存池和执行内存池 */
  @GuardedBy("this") // MemoryPool中的lock对象就是MemoryManager对象
  protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  @GuardedBy("this")
  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
	
  // 堆内内存大小分配
  onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
  onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

  // 堆外内存大小分配
  // spark.memory.offHeap.size控制堆外内存大小
  protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
  // 堆外内存中存储内存是由spark.memory.storageFraction控制,默认为0.5
  protected[this] val offHeapStorageMemory =
  (maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong
  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
}

另外还包含了Tungsten的一些常量,这三个常量分别定义了tungsten的内存形式、内存页大小和内存分配器。

final val tungstenMemoryMode: MemoryMode = { // MemoryMode
  if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
    require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
            "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
    require(Platform.unaligned(),
            "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
    MemoryMode.OFF_HEAP
  } else {
    MemoryMode.ON_HEAP
  }
}

/** Tungsten采用的Page的默认大小[单位为字节], 可通过spark.buffer.pageSize属性进行配置。
	如果未指定spark.buffer.pageSize属性,则使用该方法进行计算。 */
val pageSizeBytes: Long = {
  val minPageSize = 1L * 1024 * 1024   // 1MB
  val maxPageSize = 64L * minPageSize  // 64MB
  // 获取CPU核数,如果指定了numCores就使用numCores,否则使用机器的CPU可用核数
  val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
  // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
  val safetyFactor = 16
  val maxTungstenMemory: Long = tungstenMemoryMode match {
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
  }
  /** 计算页大小: 传入的参数是maxTungstenMemory/cores/safetyFactor,
     * 最终得到的页大小是小于maxTungstenMemory/cores/safetyFactor的最大的2的次方值。*/
  val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
  // 页的大小需要在 1MB ~ 64MB 之间
  val default = math.min(maxPageSize, math.max(minPageSize, size))
  // 尝试从spark.buffer.pageSize参数获取,如果没有指定就使用上面计算的默认值
  conf.get(BUFFER_PAGESIZE).getOrElse(default)
}

/** Tungsten采用的内存分配器(MemoryAllocator)
如果tungstenMemoryMode为MemoryMode.ON_HEAP,那么tungstenMemoryAllocator为堆内存分配器(HeapMemoryAllocator),
否则为使用sun.misc.Unsafe的API分配操作系统内存的分配器UnsafeMemoryAllocator。*/
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
  tungstenMemoryMode match {
    case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
    case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
  }
}

内存申请&释放

内存的申请都是未实现的,需要具体的子类进行实现,内存的释放都是委托给对应的MemoryPool来做了。

/** 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。 */
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

/**为展开BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。*/
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

/** 为执行taskAttemptId对应的TaskAttempt,从堆内存或堆外内存获取所需大小(即numBytes)的内存。*/
private[memory]
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long

/** 从堆内存或堆外内存释放taskAttemptId对应的TaskAttempt所消费的指定大小(即numBytes)的执行内存。*/
private[memory]
def releaseExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Unit = synchronized {
  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
  }
}

/**从堆内存及堆外内存释放taskAttemptId代表的TaskAttempt所消费的所有执行内存。*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
  onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
  offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

/**  从堆内存或堆外内存释放指定大小的内存。*/
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
  memoryMode match {
    case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
    case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
  }
}

/**从堆内存及堆外内存释放所有内存。 */
final def releaseAllStorageMemory(): Unit = synchronized {
  onHeapStorageMemoryPool.releaseAllMemory()
  offHeapStorageMemoryPool.releaseAllMemory()
}

/** 释放指定大小的展开内存。 */
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
  releaseStorageMemory(numBytes, memoryMode)
}

内存使用查看

MemoryManager还提供了查看堆内堆外执行内存,存储内存的内存使用情况,以及单个Task的内存占用情况,都比较简单,可以参考源码:

/**  获取堆上执行内存池与堆外执行内存池已经使用的执行内存之和 */
final def executionMemoryUsed: Long = synchronized {
  onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed
}

/**  onHeapStorageMemoryPool与offHeapStorageMemoryPool中一共占用的存储内存。  */
final def storageMemoryUsed: Long = synchronized {
  onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}

/**  onHeapExecutionMemoryPool中一共占用执行内存  */
final def onHeapExecutionMemoryUsed: Long = synchronized {
  onHeapExecutionMemoryPool.memoryUsed
}

/**  offHeapExecutionMemoryPool中一共占用执行内存 */
final def offHeapExecutionMemoryUsed: Long = synchronized {
  offHeapExecutionMemoryPool.memoryUsed
}

/**  onHeapStorageMemoryPool中一共占用执行内存 */
final def onHeapStorageMemoryUsed: Long = synchronized {
  onHeapStorageMemoryPool.memoryUsed
}

/**  offHeapStorageMemoryPool中一共占用执行内存 */
final def offHeapStorageMemoryUsed: Long = synchronized {
  offHeapStorageMemoryPool.memoryUsed
}

/**  获取某个任务占用的内存 */
private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {
  onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
  offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
}

StaticMemoryManager

虽然StaticMemoryManager已经在spark 3.0中移除了,我们还是看下它的思路,这样才知道后面默认使用的统一内存管理的优点,静态内存管理是由参数spark.memory.useLegacyMode控制的,如果开启则使用静态内存管理,执行内存池和存储内存池之间有严格的界限,两个池的大小永不改变。整体内存结构如下图所示:

在这里插入图片描述

从上图可以看出来,整体内存分为存储内存(默认占比0.6,有参数spark.shuffle.memoryFraction控制)、执行内存(默认占比0.2)以及其他内存(默认占比0.2,用来存储用户代码中自定义的数据结构,以及Spark内部的一些元数据)三个部分,最小的内存不能低于32M,下面我们来通过源码看下各部分的内存具体分配。

  1. 获取最大的执行内存,来进行执行内存池的扩充,由于执行内存使用过程中是用估算的方式进行内存判断,所以为了降低OOM的风险,执行内存中有一个最大安全系数[参数spark.shuffle.safetyFraction默认为0.8],用来防止执行内存溢出,所以最大的执行内存值是可用的最大内存 * 0.2 * 0.8

为什么除了实际占比之外,还会有一个安全比例呢?我们已经知道,Spark中的对象可以序列化存储,也可以非序列化存储。对于序列化对象,可以通过其字节流的长度获知其大小。而对于非序列化对象,其占用的内存就只能通过估算得到,与实际情况可能出入较大。另外,MemoryManager申请的内存空间可能还未实际分配,而标记为释放的内存空间也可能并未被JVM实际GC掉,存在滞后性。总之,Spark并不能准确地跟踪堆内内存的占用量,为了防止偏差过大出现OOM,就必须预留一些缓冲空间了。默认会预留10%的存储内存、20%的执行内存作为缓冲。

 private def getMaxExecutionMemory(conf: SparkConf): Long = {
  // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存<这个是JVM启动时候指定的>
  val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

  // 判断可用最大内存是否小于32MB,如果小于32MB则抛出异常
  if (systemMaxMemory < MIN_MEMORY_BYTES) {
    throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
                                       s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
                                       s"option or spark.driver.memory in Spark configuration.")
  }

  // 判断是否有spark.executor.memory配置
  if (conf.contains("spark.executor.memory")) {
    // 获取spark.executor.memory配置,即Execution使用的内存大小
    val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
    // 判断Execution使用的内存大小是否小于32MB,如果小于32MB则抛出异常
    if (executorMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
                                         s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
                                         s"--executor-memory option or spark.executor.memory in Spark configuration.")
    }
  }

  // 执行内存占比,默认为0.2,即存储内存占内存池总大小的20%
  val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
  // 执行内存的最大安全系数,默认为0.8,该值用于防止执行内存溢出
  val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
  // 因此,执行内存的最大大小为 可用的最大内存 * 0.2 * 0.8,即可用最大内存的16%
  (systemMaxMemory * memoryFraction * safetyFraction).toLong
}
  1. 获取最大的存储内存,来进行存储内存池的扩充,同样为了降低OOM的风险,存储内存也有一个安全系数[spark.storage.safetyFraction,默认为0.9],所以得到的最大内存是可用最大内存 * 0.6 * 0.9

    private def getMaxStorageMemory(conf: SparkConf): Long = {
      // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存
      val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
      // 存储内存占比,默认为0.6,即存储内存占内存池总大小的60%
      val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
      // 存储内存的最大安全系数,默认为0.9,该值用于防止存储内存溢出
      val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
    
      // 因此,存储内存的最大大小为 可用的最大内存 * 0.6 * 0.9,即可用最大内存的54%
      (systemMaxMemory * memoryFraction * safetyFraction).toLong
    }
    
  2. 填充内存池,通过MemoryManager进行扩充内存池大小;另外不支持堆外存储内存,所以要把原本属于堆外存储池的空间转移到堆外执行池,代码如下所示:

    // The StaticMemoryManager does not support off-heap storage memory:
    offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
    offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
    
  3. 另外还提供了一个maxUnrollMemory,是通过spark.storage.unrollFraction来控制的,默认是0.2

申请内存

StaticMemoryManager重写了三个申请内存的方法,我们分别来分析具体实现。

acquireStorageMemory

由于静态内存管理器不支持堆外存储内存,所以获取存储内存要先验证内存模型,如果申请内存大小过大,则返回错误,不然交给存储内存池进行申请内存即可。

override def acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean = synchronized {
  // StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
  require(memoryMode != MemoryMode.OFF_HEAP,
          "StaticMemoryManager does not support off-heap storage memory")

  if (numBytes > maxOnHeapStorageMemory) { // 申请的内存大小大于最大的堆内存储内存大小 
    logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
            s"memory limit ($maxOnHeapStorageMemory bytes)")
    // 直接返回false
    false
  } else { // 交给onHeapStorageMemoryPool内存池处理
    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
  }
}

acquireUnrollMemory

由于将RDD展开为块需要占用连续的存储空间,在必要的情况下需要释放其他缓存的空间,以放下这个块。释放空间的上限为“最大展开内存 - 现占用的展开内存 - 空闲存储内存”,之所以要规定这个上限,是为了防止展开一个超大的块导致所有缓存都失效。

override def acquireUnrollMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean = synchronized {
  // StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
  require(memoryMode != MemoryMode.OFF_HEAP,
          "StaticMemoryManager does not support off-heap unroll memory")
  // 获取当前可用于Unroll的存储内存大小
  val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
  // 当前空闲的存储内存大小
  val freeMemory = onHeapStorageMemoryPool.memoryFree
  // 不足的内存大小 = 最大可用于Unroll的存储内存大小 - 当前可用于Unroll的存储内存大小 - 当前空闲的存储内存大小
  val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
  // 计算需要释放的内存大小
  val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))

  /** 委托给onHeapStorageMemoryPool进行处理,注意第3个参数 acquireMemory()方法内会尝试腾出额外的空间以满足不足的内存*/
  onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

acquireExecutionMemory

获取执行内存,只需要根据内存Mode交给相应的内存池进行处理即可。

private[memory] override def acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long = synchronized {
  memoryMode match {// 委托给具体的内存池处理
    case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
  }
}

总结

从上面分析,可以看出StaticMemoryManager的内存区域都是由各个比例参数规定好的。这样实现起来简单,但是在复杂业务场景或者参数设定不当时,它容易造成一方内存过剩而另一方内存紧张,需要很强的调参能力才能应对复杂的程序处理,所以spark 1.6以后都默认为统一内存管理器,我们下节再说。

参考

  1. https://www.jianshu.com/p/ed38f19d4bce
  2. http://shiyanjun.cn/archives/1585.html
  3. http://arganzheng.life/spark-executor-memory-management.html
  4. https://www.jianshu.com/p/87a36488993a
  5. https://www.cnblogs.com/johnny666888/p/11197519.html