淘先锋技术网

首页 1 2 3 4 5 6 7

概述

spark执行应用程序的时候,会启动Driver和executor两种JVM进程,Driver进程负责创建SparkContext上下文,提交任务,分发task等;Executor进程负责执行task,并返回结果给Driver以及提供RDD持久化所需的内存。我们所说的Spark内存管理是指Executor的内存管理。

  1. Executor内存管理分为两种
  • 静态内存管理 Spark1.6之前默认的
    顾名思义,这种内存管理方式下的存储内存、执行内存和其他内存的大小在Spark应用程序运行之间都是固定的,但是可以在应用程序启动之前进行配置
  • 统一(动态)内存管理 Spark1.6之后默认的
    与静态内存管理的区别在于存储内存和执行内存共享一块内存,可以借用彼此的空间,可以通过设置spark.memory.useLegacyMode参数为true来启动静态内存管理

源码(2.4.4)

SparkEnv.scala


val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

从这段源码也可以看出,现在默认使用的是统一内存管理,但是可以通过参数spark.memory.useLegacyMode来启用静态内存管理

静态内存管理

  1. 源码
StaticMemoryManager.scala

private[spark] object StaticMemoryManager {

  private val MIN_MEMORY_BYTES = 32 * 1024 * 1024

  /**
   * Return the total amount of memory available for the storage region, in bytes.
   */
  private def getMaxStorageMemory(conf: SparkConf): Long = {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

  /**
   * Return the total amount of memory available for the execution region, in bytes.
   */
  private def getMaxExecutionMemory(conf: SparkConf): Long = {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < MIN_MEMORY_BYTES) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

}
  1. 分析
  • getMaxStorageMemory()

该方法用于计算分配给storage的内存大小;通过计算得到分配给storage的默认内存大小为

systemMaxMemory * 0.6 *0.9

占比为54%

  • getMaxExecutionMemory()
    该方法用于计算分配给execution的内存大小;通过计算得到分配给execution的默认内存大小为

systemMaxMemory * 0.2 * 0.8

占比为16%

统一内存管理

  1. 源码
object UnifiedMemoryManager {

  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
  // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

  /**
   * Return the total amount of memory shared between execution and storage, in bytes.
   */
  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }
}
  1. 分析
  • 首先计算出分配给storageexecution的总的内存的大小,为总内存减去预留内存之后的60%,剩下40%的可用内存为元数据以及用户数据结构等使用
  • 默认storeageexecution各占50%,但是并不是固定的,在后面的实际使用的,他们可以互相占用彼此的内存,
    execution占用的内存,storage不可以强行使用,直到execution释放之后;但是storage已经使用的内存,当execution要使用的时候可以强行抢占过来使用,但是不会将storage所有的内存都占用,会留下一部分(spark.memory.storageFraction);这样设计的原因是:storage存储的东西丢了还可以再找回来,但是execution产生的东西丢了将会影响到后续的计算

注意

通过Runtime.getRuntime.maxMemory拿到的内存大小,并不等于通过--executor-memory设置的值,而是小于它,且不同的JDK版本得到的值可能不一样