淘先锋技术网

首页 1 2 3 4 5 6 7

内存池是Spark内存的抽象,它记录了总内存大小,已使用内存大小,剩余内存大小,提供给MemoryManager进行分配/回收内存。它包括两个实现类:ExecutionMemoryPool和StorageMemoryPool,分别对应execution memory和storage memory。当需要新的内存时,spark通过memoryPool来判断内存是否充足。需要注意的是memoryPool以及子类方法只是用来标记内存使用情况,而不实际分配/回收内存。

内存池MemoryPool

MemoryPool管理的内存大小是构建在Executor的JVM上面的,由于一个Executor可能有多个Task(取决于分配的内存大小和core数目),所以需要一个锁来进行同步操作,这里的锁一般是MemoryManager对象,后续在分析MemoryManager时候会说到,源码如下所示:

// org.apache.spark.memory.MemoryPool
private[memory] abstract class MemoryPool(lock: Object) {
	...
}

MemoryPool主要用来标记内存的使用情况,所以包含总的内存大小,已经使用的大小,增加或者缩减内存等操作。

/** 内存池的大小(单位为字节)。**/
private[this] var _poolSize: Long = 0

/** 获取已经使用的内存大小(单位为字节)。由子类实现。 **/
def memoryUsed: Long

/** 给内存池扩展delta给定的大小(单位为字节)。delta必须为正整数。 */
final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
  require(delta >= 0)
  _poolSize += delta
}

/** 将内存池缩小delta给定的大小(单位为字节);delta必须为正整数且_poolSize与delta的差要大于等于memoryUsed[已经使用的内存不能从内存池中移除]。 */
final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
  require(delta >= 0)
  require(delta <= _poolSize)
  require(_poolSize - delta >= memoryUsed)
  _poolSize -= delta
}

执行内存池ExecutionMemoryPool

概述

执行内存池属于计算引擎的一部分,它的内存只会分配给Task进行使用,主要用于Task中的Shuffle、Join、Aggregation等操作时候的内存提供。由于执行内存会由多个Task进行共享,所以为了保证Task合理地进行内存使用,避免某些Task过度使用内存导致其它的Task频繁将数据溢写到磁盘,拖垮整体执行速度,执行内存池需要保证在N个Task的情况下,每个Task所能分配到的内存在总内存的 1/2N~1/N 之间,由于Task数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1/2N 和 1/N 的值。

源码实现中,ExecutionMemoryPool用一个HashMap来维护了一个TaskAttempt[身份标识为taskAttemptId]与所消费内存的大小之间的映射关系,很明显重写的memoryUsed方法是这个Map中所有value的和,另外给外界提供了可以获取某个taskAttempt目前使用内存的值。

// org.apache.spark.memory.ExecutionMemoryPool
/** 维护了一个TaskAttempt的身份标识(taskAttemptId)与所消费内存的大小之间的映射关系。 */
@GuardedBy("lock")
private val memoryForTask = new mutable.HashMap[Long, Long]()

/**  获取TaskAttempt使用的内存大小,即memoryForTask中taskAttemptId对应的value值。 */
def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
  memoryForTask.getOrElse(taskAttemptId, 0L)
}

/** 重写了memoryUsed方法: 所有TaskAttempt所消费的内存大小之和,即memoryForTask这个Map中所有value的和。**/
override def memoryUsed: Long = lock.synchronized {
  memoryForTask.values.sum
}

分配内存

acquireMemory为每个任务分配内存,返回实际分配的内存大小,如果当任务数量增多,而老任务已经占据大量内存时,新来的任务不能获取到至少1 / 2N的内存时,来保证每个任务都有机会获取到execution总内存的1 / 2N时候,会阻塞申请内存的任务直到其他Task释放内存唤醒当前线程,重新进行计算,尝试获取到足够的空闲内存。

acquireMemory分配内存需要以下步骤:

如果该task之前没有进行过内存申请,则将其加入memoryForTask,内存大小为0,并且唤醒所有等待申请内存的线程。然后需要不断循环以下操作,直至申请到内存:

  1. 获取当前task的数量(numActiveTasks),以及当前待申请内存Task的已有内存(curMem);
  2. 执行内存增长策略maybeGrowPool,如果执行内存不足,在有一些MemoryManager比如UnifiedMemoryManager时候,会向存储内存借用或者回收执行内存挪用给存储内存的内存;
  3. 执行完内存增长策略后,调用computeMaxPollSize,计算释放存储内存后,执行内存池可用的最大大小(maxPoolSize);
  4. 计算每个Task可以申请的最大内存:maxPoolSize / numActiveTasks,表示内存增长策略后当前总内存的1/n;
  5. 计算每个Task可以申请的最小内存:poolSize / (2 * numActiveTasks),表示没有进行增长策略时候执行内存总大小的1/2n;
  6. 计算当前任务可以申请到最大的内存大小(maxToGrant):math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)),表示不超过numBytes,不超过每个task可申请最大内存。保持在0 <= X <= 1 / numActiveTasks之间;
  7. 计算当前任务真正可以申请到的内存大小(toGrant):math.min(maxToGrant, memoryFree);
  8. 如果申请的内存小于待申请内存numBytes,且当前总的Task内存小于Task可以申请的最小内存,说明连Task执行的最基本内存要求都无法满足,则执行lock.wait进行线程等待,等待内存有释放再唤醒;否则,更新memoryForTask的当前Task内存大小为toGrant,并返回toGrant,退出循环。

源码如下所示:

/** 
   * 用于给taskAttemptId对应的TaskAttempt获取指定大小(即numBytes)的内存
   *
   * @param numBytes 						分配的内存大小
   * @param taskAttemptId  		  指定的TaskAttempt的ID
   * @param maybeGrowPool       回调函数,用于处理潜在的内存池增长情况
   * @param computeMaxPoolSize  用于限制本次分配的最大内存的回调函数,默认传入() => poolSize,即可分配所有内存。
   *        传入回调函数的原因在于,不同的内存管理器对执行内存和存储内存的划分方式是不同的,
   *        例如UnifiedMemoryManager可以通过挤压存储内存区域以扩大执行内存区域。
   * @return 分配的内存大小
   */
private[memory] def acquireMemory(
  numBytes: Long,
  taskAttemptId: Long,
  maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
  computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
  assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

  if (!memoryForTask.contains(taskAttemptId)) {  // 如果memoryForTask中还没有记录taskAttemptId
    // 将taskAttemptId放入memoryForTask,初始状态taskAttemptId所消费的内存为0
    memoryForTask(taskAttemptId) = 0L
    // 新增了Task,需要唤醒其他等待获取ExecutionMemoryPool的锁的线程,重新计算Task的最大最小能申请到的内存值
    lock.notifyAll()
  }

  // 由于不一定能申请到需要的内存,可能会由于没有足够多的内存需要wait,直到有Task进行释放内存操作或者新加入Task操作来进行唤醒重新计算,所以需要循环等待直到获取的内存在1/N~1/2N之间
  while (true) { 
    // 获取当前激活的Task的数量
    val numActiveTasks = memoryForTask.keys.size
    // 获取当前TaskAttempt所消费的内存
    val curMem = memoryForTask(taskAttemptId)

    // 执行内存增长策略,有MemoryManager进行实现
    // numBytes - memoryFree计算出不够分配的内存大小,然后尝试从其他内存池[StorageMemoryPool]回收或借用内存
    maybeGrowPool(numBytes - memoryFree)

    // 当前能获取到的最大内存,通过增长策略或者会增加内存
    val maxPoolSize = computeMaxPoolSize()
    /** 1/2N ~ 1/N */
    // 计算每个TaskAttempt最大可以使用的内存大小,即 可用总内存大小 / 激活任务数量
    val maxMemoryPerTask = maxPoolSize / numActiveTasks
    // 计算每个TaskAttempt最小保证使用的内存大小,即 当前内存池大小 / (激活任务数量 * 2)
    val minMemoryPerTask = poolSize / (2 * numActiveTasks)
 		
    // 理论上可以分配给当前任务的最大内存(min(申请内存数,任务可获得的内存数))
    val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
    // 实际可以分配给任务的内存(min(可分配最大内存数,当前剩余内存数))
    val toGrant = math.min(maxToGrant, memoryFree)

    // toGrant < numBytes:表示可分配大小小于本次申请需要的大小;
    // curMem + toGrant < minMemoryPerTask:表示该TaskAttempt申请的大小小于单个TaskAttempt可申请的最小大小
    if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
      logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
      lock.wait()  // 没有足够多的内存分配,wait等待有Task变化时候进行notifyAll操作唤醒当前线程,唤醒重新计算while循环,还可能拿不到内存
    } else {
      memoryForTask(taskAttemptId) += toGrant // 分配到内存了
      return toGrant
    }
  }
  0L  // Never reached
}

回收内存

回收内存思路比较简单,如果释放内存大于分配给当前Task已经分配的内存,那么需要释放的内存大小是当前Task所申请的内存大小,否则是指定的内存大小。然后更新记录Task占用内存的Map[memoryForTask],对该Task需要释放的内存大小进行收回,如果剩余内存为0,则将该Task移除。最后由于释放了内存,其他正在等待内存分配的Task或许可以申请到需要的内存大小,所以通过notiyAll唤醒等待申请内存的其他线程,进行acquireMemory中的申请尝试,源码如下所示:

/** 用于给taskAttemptId对应的TaskAttempt释放指定大小(即numBytes)的内存。 */
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
  val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
  val memoryToFree = if (curMem < numBytes) { // 释放内存大于分配给它的内存,只释放可释放的内存大小
    logWarning(
      s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
      s"of memory from the $poolName pool")
    curMem
  } else { // 可释放内存大小大于指定释放的内存
    numBytes
  }
  if (memoryForTask.contains(taskAttemptId)) {
    memoryForTask(taskAttemptId) -= memoryToFree
    if (memoryForTask(taskAttemptId) <= 0) {
      // 如果taskAttemptId代表的TaskAttempt占用的内存大小小于等于零,
      // 还需要将taskAttemptId与所消费内存的映射关系从memoryForTask中清除。
      memoryForTask.remove(taskAttemptId)
    }
  }
  // 释放了内存,可以唤醒其他等待内存分配的线程Task进行重新计算和申请内存
  lock.notifyAll() 
}

/**  用于释放taskAttemptId对应的TaskAttempt所消费的所有内存。 */
def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
  val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
  releaseMemory(numBytesToFree, taskAttemptId) // 进行释放
  numBytesToFree  // 返回释放的内存大小
}

存储内存池StorageMemoryPool

概述

存储内存池主要用于RDD的缓存,广播以及备份中。不像执行内存池需要维护每个Task的内存占用情况,存储内存池只提供了一个_memoryUsed的变量来进行当前内存的使用情况,源码如下所示,另外可以看到StorageMemoryPool还维护了一个memoryStore,这个是用来将数据块保存到申请的storage内存中,并提供了从内存/磁盘获取保存的数据的方法,我们会在后续文章中进行分析。

// 已经使用的内存大小(单位为字节)。
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L

// 重写内存占用值函数,返回了_memoryUsed属性的值
override def memoryUsed: Long = lock.synchronized {
  _memoryUsed
}

// 当前StorageMemoryPool所关联的MemoryStore,存储空间的实际使用是由MemoryStore来进行控制的
private var _memoryStore: MemoryStore = _
// 返回了_memoryStore属性引用的MemoryStore
def memoryStore: MemoryStore = {
  if (_memoryStore == null) {
    throw new IllegalStateException("memory store not initialized yet")
  }
  _memoryStore
}

/** 设置当前StorageMemoryPool所关联的MemoryStore,实际设置了_memoryStore属性。 */
final def setMemoryStore(store: MemoryStore): Unit = {
  _memoryStore = store
}

分配内存

acquireMemory用于给指定的BlockId对应的Block获取指定大小的内存,如果存储内存池内存不足,那么需要通过memoryStore.evictBlocksToFreeSpace进行内存释放,释放掉其他Block占用的内存;否则不需要释放内存,直接申请即可。然后判断释放内存后的free内存是否比待申请内存大,如果满足条件,则进行内存分配,否则不进行内存分配,返回false,告知失败。

/** 用于给BlockId对应的Block获取numBytes指定大小的内存。 */
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
  val numBytesToFree = math.max(0, numBytes - memoryFree)
  acquireMemory(blockId, numBytes, numBytesToFree)
}

/**
   * @param blockId  						申请内存的BlockId
   * @param numBytesToAcquire   申请的内存大小
   * @param numBytesToFree      本次申请需要额外空出来的内存大小
   * @return                     所需要的内存是否申请成功了
   */
def acquireMemory(
  blockId: BlockId,
  numBytesToAcquire: Long,
  numBytesToFree: Long): Boolean = lock.synchronized {
  assert(numBytesToAcquire >= 0)
  assert(numBytesToFree >= 0)
  assert(memoryUsed <= poolSize)
  if (numBytesToFree > 0) { // 如果需要腾出额外的内存大小,则腾出numBytesToFree属性指定大小的空间
    memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
  }
  // 判断可用内存是否充足
  val enoughMemory = numBytesToAcquire <= memoryFree
  if (enoughMemory) { // 可用内存充足,增加已经使用的内存大小
    _memoryUsed += numBytesToAcquire
  }
  enoughMemory
}

回收内存

存储内存池的释放非常简单,直接将_memoryUsed=0即可,如果需要释放指定大小内存,那么只要减少一部分即可。

def releaseMemory(size: Long): Unit = lock.synchronized {
  if (size > _memoryUsed) { // 释放的大小大于已使用大小,则释放当前内存池的所有内存,即将_memoryUsed设置为0。
    logWarning(s"Attempted to release $size bytes of storage " +
               s"memory when we only have ${_memoryUsed} bytes")
    _memoryUsed = 0
  } else { // 否则从已使用内存大小中减去释放的大小
    _memoryUsed -= size
  }
}

def releaseAllMemory(): Unit = lock.synchronized {
  _memoryUsed = 0
}

缩减内存池大小

另外存储内存池还提供了一个缩减内存池大小的函数,作为Spark的ExecutionMemoryPool和StorageMemoryPool动态调整大小的辅助函数。首先判断StorageMemoryPool是否有足够的空间可以释放,如果剩余空间不足需要释放的空间,则通过memoryStore的evictBlocksToFreeSpace来释放其他Block占用的空间,最后返回StorageMemoryPool可释放空间大小。

/** 用于释放指定大小的空间,缩小内存池的大小。*/
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
  // 计算当前可释放的最大内存大小
  val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
  val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
  if (remainingSpaceToFree > 0) { // 如果可释放的内存不满足要求释放的大小,需要尝试腾出一些内存
    val spaceFreedByEviction =
    memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
    // 返回最终释放的大小
    spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
  } else { // 可释放的内存足够,直接返回释放的大小即可
    spaceFreedByReleasingUnusedMemory
  }
}

总结

相信通过上述的分析,我们已经掌握了存储内存池和执行内存池的精髓,最后我们来对比下这两个内存池异同:

  1. 两者都是继承与MemoryPool,通过一个变量poolSize来记录内存池所拥有的的最大内存量;
  2. 执行内存池由于要记录各个Task的内存占用情况,所以它是通过一个Map来进行各个TaskAttempt的内存的记录;而存储内存池只需要知道整个内存池的占用情况,所以只需要一个变量来控制当前已经占用的内存大小;
  3. 分配内存时候执行内存池要保证每个Task获取的内存大小在[1/2N,1/N]之间,所以可能会阻塞当前Task直到有相应的内存分配为止,而且还可能会有一些内存增长策略<从存储内存中占用或者收回内存操作>;存储内存池在分配时候如果目前内存不足,则会通过memoryStore来进行额外的需要内存的腾出,如果腾出后还不满足直接返回无法满足;
  4. 回收内存两个比较类似,都是释放当前已经使用的内存和指定释放内存的最小值。
  5. 存储内存池另外提供了缩减当前内存池大小的功能,方便执行内存池从存储内存池进行占用内存。

参考

  1. https://www.jianshu.com/p/1821f1a0924b
  2. https://www.cnblogs.com/cenglinjinran/p/8476300.html