概述
spark执行应用程序的时候,会启动Driver和executor两种JVM进程,Driver进程负责创建SparkContext上下文,提交任务,分发task等;Executor进程负责执行task,并返回结果给Driver以及提供RDD持久化所需的内存。我们所说的Spark内存管理是指Executor的内存管理。
- Executor内存管理分为两种
- 静态内存管理 Spark1.6之前默认的
顾名思义,这种内存管理方式下的存储内存、执行内存和其他内存的大小在Spark应用程序运行之间都是固定的,但是可以在应用程序启动之前进行配置 - 统一(动态)内存管理 Spark1.6之后默认的
与静态内存管理的区别在于存储内存和执行内存共享一块内存,可以借用彼此的空间,可以通过设置spark.memory.useLegacyMode参数为true来启动静态内存管理
源码(2.4.4)
SparkEnv.scala
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
从这段源码也可以看出,现在默认使用的是统一内存管理,但是可以通过参数spark.memory.useLegacyMode
来启用静态内存管理
静态内存管理
- 源码
StaticMemoryManager.scala
private[spark] object StaticMemoryManager {
private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
/**
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
/**
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
}
- 分析
- getMaxStorageMemory()
该方法用于计算分配给storage
的内存大小;通过计算得到分配给storage的默认内存大小为
systemMaxMemory * 0.6 *0.9
占比为54%
- getMaxExecutionMemory()
该方法用于计算分配给execution
的内存大小;通过计算得到分配给execution的默认内存大小为
systemMaxMemory * 0.2 * 0.8
占比为16%
统一内存管理
- 源码
object UnifiedMemoryManager {
// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
/**
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
}
- 分析
- 首先计算出分配给
storage
和execution
的总的内存的大小,为总内存减去预留内存之后的60%,剩下40%的可用内存为元数据以及用户数据结构等使用 - 默认
storeage
和execution
各占50%,但是并不是固定的,在后面的实际使用的,他们可以互相占用彼此的内存,
execution占用的内存,storage不可以强行使用,直到execution释放之后;但是storage已经使用的内存,当execution要使用的时候可以强行抢占过来使用,但是不会将storage所有的内存都占用,会留下一部分(spark.memory.storageFraction);这样设计的原因是:storage存储的东西丢了还可以再找回来,但是execution产生的东西丢了将会影响到后续的计算
注意
通过Runtime.getRuntime.maxMemory
拿到的内存大小,并不等于通过--executor-memory
设置的值,而是小于它,且不同的JDK版本得到的值可能不一样