大数据
流式处理
源码解析
Spark

Spark源码分析12 - 存储体系05:存储层(2)内存管理器

简介:MemoryManager是用于对节点上内存的分配和回收的内存管理器,每个实现存储体系的节点上都会存在MemoryManager;在Spark中,MemoryManager的实现有三种,除去用于测试的TestMemoryManager,其余的两种分别是静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager。

1. 内存管理器简介

MemoryManager是用于对节点上内存的分配和回收的内存管理器,每个实现存储体系的节点上都会存在MemoryManager;在Spark中,MemoryManager的实现有三种,除去用于测试的TestMemoryManager,其余的两种分别是静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager。而对于每种内存管理器又分为堆外内存管理器和堆内存管理器两种。

StaticMemoryManager是早期版本的Spark遗留下来的,所谓静态是指,它会按照固定比例划分执行内存和存储内存两个区域。需要注意的是,StaticMemoryManager是不支持堆外内存的。

UnifiedMemoryManager是Spark从1.6.0版本开始引入的默认的内存管理器,对它的命名“统一”是指,它所管理的执行内存和存储内存两个区域并没有固定的界限,当执行内存不够用时,它可以挤压存储内存区域以动态调节,避免因为OOM导致Shuffle失败。不过需要注意的是,当存储内存不够用时,则只能将数据溢写到磁盘,无法挤压执行内存。

2. MemoryManager

在了解StaticMemoryManager和UnifiedMemoryManager之前,我们先来了解一下它们的父类MemoryManager。MemoryManager位于Spark Core模块的org.apache.spark.memory包下,它是一个抽象类,定义了内存管理器基本的规范字段和方法;我们先关注它定义的字段:

org.apache.spark.memory.MemoryManager
  • /**
  • * An abstract memory manager that enforces how memory is shared between execution and storage.
  • *
  • * In this context, execution memory refers to that used for computation in shuffles, joins,
  • * sorts and aggregations, while storage memory refers to that used for caching and propagating
  • * internal data across the cluster. There exists one MemoryManager per JVM.
  • *
  • * 内存管理器。负责对单个节点上内存的分配与回收。有两种:
  • * - StaticMemoryManager:静态内存管理器。
  • * - UnifiedMemoryManager:统一内存管理器。
  • * @param numCores CPU内核数,该值会影响计算的内存页大小
  • * @param onHeapStorageMemory 用于存储的堆内存大小。
  • * @param onHeapExecutionMemory 用于执行计算的堆内存大小。
  • */
  • private[spark] abstract class MemoryManager(
  • conf: SparkConf,
  • numCores: Int,
  • onHeapStorageMemory: Long,
  • onHeapExecutionMemory: Long) extends Logging {
  • // -- Methods related to memory allocation policies and bookkeeping ------------------------------
  • // 用于存储的堆内存的内存池(StorageMemoryPool),大小由onHeapStorageMemory属性指定。
  • @GuardedBy("this")
  • protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
  • // 用于存储的堆外内存的内存池(StorageMemoryPool)。
  • @GuardedBy("this")
  • protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  • // 用于计算的堆内存的内存池(ExecutionMemoryPool),大小由onHeapExecutionMemory属性指定。
  • @GuardedBy("this")
  • protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
  • // 用于计算的堆外内存的内存池(ExecutionMemoryPool)。
  • @GuardedBy("this")
  • protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
  • // 对堆内的执行内存池和存储内存池进行容量初始化
  • onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
  • onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
  • // 堆外内存的最大值。可以通过spark.memory.offHeap.size属性指定,默认为0。
  • protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
  • /**
  • * 用于存储的堆外内存初始大小。
  • * 先通过spark.memory.storageFraction参数确定用于堆外内存中用于存储的占比;
  • * 默认值为0.5,即表示offHeapStorageMemory和offHeapExecutionMemoryPool各占总堆外内存的50%;
  • * 然后根据将这个比例与总的堆外内存大小相乘,即可得到用于存储的堆外内存初始大小。
  • * 需要注意的是,由于StaticMemoryManager不可将堆外内存用于存储,因此该值对StaticMemoryManager无效。
  • */
  • protected[this] val offHeapStorageMemory =
  • (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
  • // 对堆外的执行内存池和存储内存池进行容量初始化
  • offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  • // 需要注意的是,由于StaticMemoryManager不可将堆外内存用于存储,因此该值对StaticMemoryManager无效。
  • offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
  • ...
  • }

MemoryManager的构造参数中,onHeapStorageMemoryonHeapExecutionMemory两个参数分别用于指定堆内存的存储内存池大小和执行内存池的初始大小;在MemoryManager初始化过程中会创建堆内存存储内存池和执行内存池onHeapStorageMemoryPoolonHeapExecutionMemoryPool,它们的初始大小正是根据onHeapStorageMemoryonHeapExecutionMemory两个参数设置的。

对于堆外内存,需要通过spark.memory.offHeap.size参数获取堆外内存的总大小,然后通过spark.memory.storageFraction获取存储内存在总堆外内存中的占比,以此来计算堆外内存中存储内存池和执行内存池的初始化大小。

MemoryManager提供了大量的方法,主要分为对存储内存的操作和对执行内存的操作,有些需要子类实现,有些则通过具体的内存池的相关方法实现,下面将分别介绍。

2.1. 对存储内存的操作

MemoryManager提供的对存储内存的操作方法主要有以下几个:

  1. 获取用于存储的最大内存大小,分为堆内存储内存和堆外存储内存;需要子类实现:
org.apache.spark.memory.MemoryManager
  • /**
  • * Total available on heap memory for storage, in bytes. This amount can vary over time,
  • * depending on the MemoryManager implementation.
  • * In this model, this is equivalent to the amount of memory not occupied by execution.
  • *
  • * 返回用于存储的最大堆内存。此方法需要子类实现。
  • */
  • def maxOnHeapStorageMemory: Long
  • /**
  • * Total available off heap memory for storage, in bytes. This amount can vary over time,
  • * depending on the MemoryManager implementation.
  • *
  • * 返回用于存储的最大堆外内存。此方法需要子类实现。
  • */
  • def maxOffHeapStorageMemory: Long
  1. 为指定的数据块申请指定大小和存储模式的存储内存,需要子类实现:
org.apache.spark.memory.MemoryManager
  • /**
  • * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
  • *
  • * 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • *
  • * @return whether all N bytes were successfully granted.
  • */
  • def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  1. 为指定的数据块申请用于展开操作的指定大小和存储模式的存储内存,需要子类实现:
org.apache.spark.memory.MemoryManager
  • /**
  • * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
  • *
  • * This extra method allows subclasses to differentiate behavior between acquiring storage
  • * memory and acquiring unroll memory. For instance, the memory management model in Spark
  • * 1.5 and before places a limit on the amount of space that can be freed from unrolling.
  • *
  • * 为展开BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • *
  • * @return whether all N bytes were successfully granted.
  • */
  • def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  1. 释放指定大小和内存模式的存储内存,该方法会根据内存模式的不同,分别使用堆内存储内存池和堆外存储内存池的releaseMemory(...)方法来实现:
org.apache.spark.memory.MemoryManager#releaseStorageMemory
  • /**
  • * Release N bytes of storage memory.
  • * 从堆内存或堆外内存释放指定大小的内存。
  • */
  • def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
  • memoryMode match { // 根据内存位置的不同使用不同的内存池来操作
  • case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
  • case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
  • }
  • }
  1. 释放指定大小和内存模式的用于展开的存储内存,通过调用上面的releaseStorageMemory(...)方法实现:
  • /**
  • * Release N bytes of unroll memory.
  • * 释放指定大小的展开内存。
  • */
  • final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
  • releaseStorageMemory(numBytes, memoryMode)
  • }
  1. 释放所有的存储内存池,该方法通过对堆内存储内存池和堆外存储内存池都调用releaseAllMemory()方法实现:
org.apache.spark.memory.MemoryManager#releaseAllStorageMemory
  • /**
  • * Release all storage memory acquired.
  • * 从堆内存及堆外内存释放所有内存。
  • */
  • final def releaseAllStorageMemory(): Unit = synchronized {
  • // 使用堆内存池和对外内存池的releaseAllMemory()方法实现
  • onHeapStorageMemoryPool.releaseAllMemory()
  • offHeapStorageMemoryPool.releaseAllMemory()
  • }
  1. 获取堆外存储内存和堆内存储内存的大小总和,实现比较简单:
org.apache.spark.memory.MemoryManager#storageMemoryUsed
  • /**
  • * Storage memory currently in use, in bytes.
  • * onHeapStorageMemoryPool与offHeapStorageMemoryPool中一共占用的存储内存。
  • */
  • final def storageMemoryUsed: Long = synchronized {
  • onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
  • }

2.2. 对执行内存的操作

  1. 为指定TaskAttempt申请指定大小和内存模式的执行内存:
org.apache.spark.memory.MemoryManager
  • /**
  • * Try to acquire up to `numBytes` of execution memory for the current task and return the
  • * number of bytes obtained, or 0 if none can be allocated.
  • *
  • * This call may block until there is enough free memory in some situations, to make sure each
  • * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
  • * active tasks) before it is forced to spill. This can happen if the number of tasks increase
  • * but an older task had a lot of memory already.
  • *
  • * 为执行taskAttemptId对应的TaskAttempt,从堆内存或堆外内存获取所需大小(即numBytes)的内存。
  • */
  • private[memory]
  • def acquireExecutionMemory(
  • numBytes: Long,
  • taskAttemptId: Long,
  • memoryMode: MemoryMode): Long

MemoryManager的其它方法,大多数都是以上面这些规范方法或对应内存池的方法来实现的:

  1. 释放指定TaskAttempt占用的指定大小和内存模式的执行内存,该方法会根据内存模式的不同,分别使用堆内执行内存池和堆外执行内存池的releaseMemory(...)方法来实现:
org.apache.spark.memory.MemoryManager
  • /**
  • * Release numBytes of execution memory belonging to the given task.
  • *
  • * 从堆内存或堆外内存释放taskAttemptId对应的TaskAttempt所消费的指定大小(即numBytes)的执行内存。
  • */
  • private[memory]
  • def releaseExecutionMemory(
  • numBytes: Long,
  • taskAttemptId: Long,
  • memoryMode: MemoryMode): Unit = synchronized {
  • memoryMode match {
  • case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
  • case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
  • }
  • }
  1. 释放指定TaskAttempt所占用的所有执行内存;该操作会将指定的TaskAttempt在堆外执行内存池和堆内执行内存池所占有所有执行内存都释放掉,实现方式是通过堆外执行内存池和堆内执行内存池所的releaseAllMemoryForTask(...)方法实现:
org.apache.spark.memory.MemoryManager#releaseAllExecutionMemoryForTask
  • /**
  • * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
  • *
  • * 从堆内存及堆外内存释放taskAttemptId代表的TaskAttempt所消费的所有执行内存。
  • *
  • * @return the number of bytes freed.
  • */
  • private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
  • // 释放占有的堆内执行内存,并释放占用的堆外执行内存
  • onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
  • offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
  • }
  1. 获取堆外执行内存和堆内执行内存的大小总和,实现比较简单:
org.apache.spark.memory.MemoryManager#executionMemoryUsed
  • /**
  • * Execution memory currently in use, in bytes.
  • *
  • * 获取堆上执行内存池与堆外执行内存池已经使用的执行内存之和。
  • */
  • final def executionMemoryUsed: Long = synchronized {
  • onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed
  • }
  1. 获取指定TaskAttempt所使用的执行内存总和,实现比较简单:
org.apache.spark.memory.MemoryManager#getExecutionMemoryUsageForTask
  • /**
  • * Returns the execution memory consumption, in bytes, for the given task.
  • *
  • * 获取taskAttemptId代表的TaskAttempt在堆上执行内存池与堆外执行内存池所消费的执行内存之和。
  • */
  • private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {
  • onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
  • offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
  • }

2.3. Tungsten存储

Tungsten Project名为“钨丝计划”,是Spark中致力于提升Spark程序对内存和CPU的利用率,优化硬件性能的项目。它包括以下三项主要的优化:

  1. Memory Management and Binary Processing:堆外内存管理,以降低创建及销毁对象的开销及消除JVM GC引起的延时。
  2. Cache-aware Computation:优化计算存储,提升CPU L1/L2/L3缓存命中率。
  3. Code Generation:针对Spark SQL的代码生成提升CPU利用率。

MemoryManager还提供了支持Tungsten存储的相关配置的初始化,这里一并介绍;关于Tungsten的使用方式会在后面的文章中讲解。

MemoryManager的tungstenMemoryMode字段指定了Tungsten的存储模式:

org.apache.spark.memory.MemoryManager#tungstenMemoryMode
  • /**
  • * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
  • * sun.misc.Unsafe.
  • *
  • * Tungsten的内存模式。tungstenMemoryMode也采用枚举类型MemoryMode来表示堆内存和堆外内存。
  • * 当Tungsten在堆内存模式下,数据存储在JVM堆上,这时Tungsten选择onHeapExecutionMemoryPool作为内存池。
  • * 当Tungsten在堆外内存模式下,数据则会存储在堆外内存中,这时Tungsten选择offHeapExecutionMemoryPool作为内存池。
  • * 可以通过spark.memory.offHeap.enabled属性(默认为false)来配置是否启用Tungsten的堆外内存。
  • */
  • final val tungstenMemoryMode: MemoryMode = {
  • if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
  • require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
  • "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
  • require(Platform.unaligned(),
  • "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
  • MemoryMode.OFF_HEAP
  • } else {
  • MemoryMode.ON_HEAP
  • }
  • }

可见,spark.memory.offHeap.enabled(是否开启堆外内存)、spark.memory.offHeap.size(堆外内存大小)两个参数以及平台特性共同决定了Tungsten存储是否可以使用堆外内存。

MemoryManager的pageSizeBytes字段会根据Tungsten内存模式以及创建MemoryManager时指定的CPU Cores数量来计算内存页大小,它的源码如下:

  • /**
  • * The default page size, in bytes.
  • *
  • * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
  • * by looking at the number of cores available to the process, and the total amount of memory,
  • * and then divide it by a factor of safety.
  • *
  • * Tungsten采用的Page的默认大小(单位为字节)。
  • * 可通过spark.buffer.pageSize属性进行配置。
  • * 如果未指定spark.buffer.pageSize属性,则使用该方法进行计算。
  • */
  • val pageSizeBytes: Long = {
  • val minPageSize = 1L * 1024 * 1024 // 1MB
  • val maxPageSize = 64L * minPageSize // 64MB
  • // 获取CPU核数,如果指定了numCores就使用numCores,否则使用机器的CPU可用核数
  • val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
  • // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
  • // 安全因子
  • val safetyFactor = 16
  • // 获取对应内存模式下可用的最大Tungsten内存
  • val maxTungstenMemory: Long = tungstenMemoryMode match {
  • case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
  • case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
  • }
  • /**
  • * 计算页大小。传入的参数是maxTungstenMemory / cores / safetyFactor,
  • * 最终得到的页大小是小于 (maxTungstenMemory / cores / safetyFactor) 的最大的2的次方值。
  • */
  • val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
  • // 页的大小需要在 1MB ~ 64MB 之间
  • val default = math.min(maxPageSize, math.max(minPageSize, size))
  • // 尝试从spark.buffer.pageSize参数获取,如果没有指定就使用上面计算的默认值
  • conf.getSizeAsBytes("spark.buffer.pageSize", default)
  • }

可见,当Tungsten在堆内存模式下,数据存储在JVM堆上,这时Tungsten选择onHeapExecutionMemoryPool作为内存池;当Tungsten在堆外内存模式下,数据则会存储在堆外内存中,这时Tungsten选择offHeapExecutionMemoryPool作为内存池。页大小可以通过spark.buffer.pageSize参数指定,如果没有指定则需要通过MemoryManager指定的CPU Core数量来计算默认值,计算得到的默认值必须在1MB ~ 64MB之间。

tungstenMemoryAllocator字段则是根据Tungsten使用的内存模式决定的内存分配器:

org.apache.spark.memory.MemoryManager#tungstenMemoryAllocator
  • /**
  • * Allocates memory for use by Unsafe/Tungsten code.
  • *
  • * Tungsten采用的内存分配器(MemoryAllocator)。
  • * 如果tungstenMemoryMode为MemoryMode.ON_HEAP,
  • * 那么tungstenMemoryAllocator为堆内存分配器(HeapMemoryAllocator),
  • * 否则为使用sun.misc.Unsafe的API分配操作系统内存的分配器UnsafeMemoryAllocator。
  • */
  • private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
  • tungstenMemoryMode match {
  • case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
  • case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
  • }
  • }

Tungsten目前实现了两种内存分配器:HeapMemoryAllocator和UnsafeMemoryAllocator,分别适用于堆内存分配和堆外内存分配。

2.3.1. MemoryLocation和MemoryBlock

在讲解Tungsten内存分配器之前,我们先了解一下MemoryLocation类,它是用于追踪内存位置的类;在使用堆外内存分配的场景下,它将追踪具体的内存地址,在使用堆内存分配的场景下,它将追踪对象在JVM中的存储偏移量;它的定义如下:

org.apache.spark.unsafe.memory.MemoryLocation
  • /**
  • * A memory location. Tracked either by a memory address (with off-heap allocation),
  • * or by an offset from a JVM object (in-heap allocation).
  • */
  • public class MemoryLocation {
  • /**
  • * Tungsten处于堆内存模式时,数据作为对象存储在JVM的堆上,此时的obj不为空。
  • * Tungsten处于堆外内存模式时,数据存储在JVM的堆外内存(操作系统内存)中,因而不会在JVM中存在对象。
  • */
  • @Nullable
  • Object obj;
  • /**
  • * offset属性主要用来定位数据。
  • * 当Tungsten处于堆内存模式时,首先从堆内找到对象,然后使用offset定位数据的具体位置。
  • * 当Tungsten处于堆外内存模式时,则直接使用offset从堆外内存中定位。
  • */
  • long offset;
  • // 构造方法
  • public MemoryLocation(@Nullable Object obj, long offset) {
  • this.obj = obj;
  • this.offset = offset;
  • }
  • public MemoryLocation() {
  • this(null, 0);
  • }
  • // 设置新的obj和offset
  • public void setObjAndOffset(Object newObj, long newOffset) {
  • this.obj = newObj;
  • this.offset = newOffset;
  • }
  • // 获取obj
  • public final Object getBaseObject() {
  • return obj;
  • }
  • // 获取offset
  • public final long getBaseOffset() {
  • return offset;
  • }
  • }

MemoryLocations的实现比较简单,它只维护了objoffset两个属性并提供具体的访问方法。MemoryBlock继承自MemoryLocations,在其基础上扩展了length字段表示连续内存块的长度,MemoryBlock将作为Tungsten对数据进行存储的载体:

org.apache.spark.unsafe.memory.MemoryBlock
  • /**
  • * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
  • *
  • * 继承自MemoryLocation,代表从obj和offset定位的起始位置开始,固定长度(由MemoryBlock的length属性确定)的连续内存块。
  • */
  • public class MemoryBlock extends MemoryLocation {
  • // 当前MemoryBlock的连续内存块的长度
  • private final long length;
  • /**
  • * Optional page number; used when this MemoryBlock represents a page allocated by a
  • * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
  • * which lives in a different package.
  • *
  • * 当前MemoryBlock的页号。TaskMemoryManager分配由MemoryBlock表示的Page时,将使用此属性。
  • */
  • public int pageNumber = -1;
  • public MemoryBlock(@Nullable Object obj, long offset, long length) {
  • super(obj, offset);
  • this.length = length;
  • }
  • /**
  • * Returns the size of the memory block.
  • *
  • * MemoryBlock的大小,即length
  • */
  • public long size() {
  • return length;
  • }
  • /**
  • * Creates a memory block pointing to the memory used by the long array.
  • *
  • * 创建一个指向由长整型数组使用的内存的MemoryBlock
  • */
  • public static MemoryBlock fromLongArray(final long[] array) {
  • return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
  • }
  • /**
  • * Fills the memory block with the specified byte value.
  • *
  • * 以指定的字节填充整个MemoryBlock,
  • * 即将obj对象从offset开始,长度为length的堆内存替换为指定字节的值。
  • * Platform中封装了对sun.misc.Unsafe的API调用,
  • * Platform的setMemory方法实际调用了sun.misc.Unsafe的setMemory方法。
  • */
  • public void fill(byte value) {
  • Platform.setMemory(obj, offset, length, value);
  • }
  • }

可见,MemoryBlock提供了填充方法fill(...),它将调用Platform的setMemory(...)方法以指定字节填充,最终value的字节数据会填充到obj对象的以offset开始,长度为length的堆内存。

2.3.2. HeapMemoryAllocator

我们来看看Tungsten的HeapMemoryAllocator的实现;HeapMemoryAllocator继承自MemoryAllocator接口,该接口定义一些规范方法和用于Debug的字段,不用关注。在HeapMemoryAllocator中,定义了一个类型为Map<Long, LinkedList<WeakReference<MemoryBlock>>>的缓冲池字段bufferPoolsBySize

org.apache.spark.unsafe.memory.HeapMemoryAllocator#bufferPoolsBySize
  • // 关于MemoryBlock的弱引用的缓冲池,用于Page页(即MemoryBlock)的分配。
  • @GuardedBy("this")
  • private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
  • new HashMap<>();

对于bufferPoolsBySize我一般称之为“池化链”字典,它有有以下结构示意图:

1.MemoryBlock存储池化链.png

HeapMemoryAllocator对于内存的分配和回收,其实都是基于MemoryBlock的,它会将数据存放在MemoryBlock对象的value字段,这个前面有简单提到过;同时它会在bufferPoolsBySize字典中根据指定的分配大小,缓存一些MemoryBlock对象在池化链中,避免频繁创建MemoryBlock带来的性能损坏;在HeapMemoryAllocator中,默认规定只有分配超过1MB的内存大小时,才会使用池化链,这个判断由以下的代码负责:

  • // 池化阈值,只有在池化的MemoryBlock大于该值时,才需要被池化
  • private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
  • /**
  • * Returns true if allocations of the given size should go through the pooling mechanism and
  • * false otherwise.
  • *
  • * 用于判断对于指定大小的MemoryBlock,
  • * 是否需要采用池化机制(即从缓冲池bufferPoolsBySize中存取MemoryBlock)。
  • */
  • private boolean shouldPool(long size) {
  • // Very small allocations are less likely to benefit from pooling.
  • // 当要分配的内存大小大于等于1MB时,需要从bufferPoolsBySize中获取MemoryBlock。
  • return size >= POOLING_THRESHOLD_BYTES; // 1MB
  • }

进行MemoryBlock分配的方法是allocate(...),源码如下:

org.apache.spark.unsafe.memory.HeapMemoryAllocator#allocate
  • // 用于分配指定大小(size)的MemoryBlock
  • @Override
  • public MemoryBlock allocate(long size) throws OutOfMemoryError {
  • if (shouldPool(size)) { // 指定大小(size)的MemoryBlock需要采用池化机制
  • synchronized (this) {
  • // 从bufferPoolsBySize的弱引用中获取指定大小的MemoryBlock链表
  • final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
  • // 池链不为空
  • if (pool != null) {
  • // 当池链中还存在MemoryBlock时
  • while (!pool.isEmpty()) {
  • // 取出池链头的MemoryBlock
  • final WeakReference<MemoryBlock> blockReference = pool.pop();
  • final MemoryBlock memory = blockReference.get();
  • if (memory != null) { // 取出的MemoryBlock不为空
  • // MemoryBlock的大小需要与分配的大小相同
  • assert (memory.size() == size);
  • // 从MemoryBlock的缓存中获取指定大小的MemoryBlock并返回
  • return memory;
  • }
  • }
  • // 没有指定大小的MemoryBlock,移除指定大小的MemoryBlock缓存
  • bufferPoolsBySize.remove(size);
  • }
  • }
  • }
  • /**
  • * 走到此处,说明满足以下任意一点:
  • * 1. 指定大小的MemoryBlock不需要采用池化机制。
  • * 2. bufferPoolsBySize中没有指定大小的MemoryBlock。
  • *
  • * MemoryBlock中以Long类型数组装载数据,所以需要对申请的大小进行转换,
  • * 由于申请的是字节数,因此先为其多分配7个字节,避免最终分配的字节数不够,
  • * 除以8是按照Long类型由8个字节组成来计算的。
  • *
  • * 例如:申请字节数为50,理想情况应该分配56字节,即7个Long型数据。
  • * 如果直接除以8,会得到6,即6个Long型数据,导致只会分配48个字节,
  • * 但先加7后再除以8,即 (50 + 7) / 8 = 7个Long型数据,满足分配要求。
  • */
  • long[] array = new long[(int) ((size + 7) / 8)];
  • // 创建MemoryBlock并返回
  • MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
  • // Debug相关
  • if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
  • memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
  • }
  • // 返回创建的MemoryBlock
  • return memory;
  • }

可见,它会判断分配的内存大小是否需要使用池化链进行优化,如果需要则从bufferPoolsBySize“池化链”字典中尝试获取,如果获取不到,或者不需要使用池化链优化,就直接创建新的MemoryBlock返回。

释放内存操作由free(...)方法实现,它与allocate(...)方法刚好相反,源码如下:

org.apache.spark.unsafe.memory.HeapMemoryAllocator#free
  • // 用于释放MemoryBlock
  • @Override
  • public void free(MemoryBlock memory) {
  • // 获取待释放MemoryBlock的大小
  • final long size = memory.size();
  • if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
  • memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
  • }
  • if (shouldPool(size)) { // MemoryBlock的大小需要采用池化机制
  • // 将MemoryBlock的弱引用放入bufferPoolsBySize中
  • synchronized (this) {
  • LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
  • if (pool == null) {
  • pool = new LinkedList<>();
  • bufferPoolsBySize.put(size, pool);
  • }
  • // 将MemoryBlock的弱引用放入bufferPoolsBySize中
  • pool.add(new WeakReference<>(memory));
  • }
  • } else {
  • // Do nothing
  • }
  • }

可见,在释放内存时,归还的MemoryBlock会按照其大小决定是否存入池化链字典bufferPoolsBySize中,这也是池化链字典唯一被填充的地方。

2.3.3. UnsafeMemoryAllocator

相对来说,UnsafeMemoryAllocator的实现方式比较简洁,由于它是对堆外内存进行分配,因此它内部只提供了分配和释放内存的两个方法,具体的实现委托给了Platform类,源码如下:

org.apache.spark.unsafe.memory.UnsafeMemoryAllocator
  • /**
  • * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate off-heap memory.
  • *
  • * Tungsten在堆外内存模式下使用的内存分配器,与offHeap ExecutionMemoryPool配合使用。
  • */
  • public class UnsafeMemoryAllocator implements MemoryAllocator {
  • // 用于分配指定大小(size)的MemoryBlock。
  • @Override
  • public MemoryBlock allocate(long size) throws OutOfMemoryError {
  • /**
  • * 在堆外内存分配指定大小的内存。
  • * Platform的allocateMemory()方法实际代理了sun.misc.Unsafe的allocateMemory()方法,
  • * sun.misc.Unsafe的allocateMemory()方法将返回分配的内存地址。
  • */
  • long address = Platform.allocateMemory(size);
  • // 创建MemoryBlock,可以看到未传递MemoryBlock的obj属性
  • MemoryBlock memory = new MemoryBlock(null, address, size);
  • if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
  • memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
  • }
  • // 返回MemoryBlock
  • return memory;
  • }
  • // 用于释放MemoryBlock。
  • @Override
  • public void free(MemoryBlock memory) {
  • assert (memory.obj == null) :
  • "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
  • if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
  • memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
  • }
  • // 调用了Platform的freeMemory()方法,后者实际代理了sun.misc.Unsafe的freeMemory方法。
  • Platform.freeMemory(memory.offset);
  • }
  • }

关于Platform类,它的方法几乎都是用了sun.misc.Unsafe类来实现,有兴趣的读者可以自己了解该类。

3. StaticMemoryManager

前面对StaticMemoryManager有过简短的介绍,它是以固定比例划分执行内存和存储内存的静态内存管理器。StaticMemoryManager继承自MemoryManager,默认的构造方法与MemoryManager一致:

org.apache.spark.memory.StaticMemoryManager
  • /**
  • * A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
  • *
  • * The sizes of the execution and storage regions are determined through
  • * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
  • * regions are cleanly separated such that neither usage can borrow memory from the other.
  • */
  • private[spark] class StaticMemoryManager(
  • conf: SparkConf,
  • maxOnHeapExecutionMemory: Long,
  • override val maxOnHeapStorageMemory: Long,
  • numCores: Int)
  • extends MemoryManager(
  • conf,
  • numCores,
  • maxOnHeapStorageMemory,
  • maxOnHeapExecutionMemory) {
  • ...
  • }

3.1. 内存划分

StaticMemoryManager提供了重载的构造方法,在该构造方法中会对内存区域进行初始划分:

org.apache.spark.memory.StaticMemoryManager#this
  • /**
  • * 重载构造器,只需要SparkConf和numCores两个参数;
  • * maxOnHeapStorageMemory和maxOnHeapExecutionMemory将
  • * 使用getMaxExecutionMemory()和getMaxStorageMemory()方法来获取。
  • */
  • def this(conf: SparkConf, numCores: Int) {
  • this(
  • conf,
  • StaticMemoryManager.getMaxExecutionMemory(conf),
  • StaticMemoryManager.getMaxStorageMemory(conf),
  • numCores)
  • }

可见,该方法自行指定了maxOnHeapStorageMemorymaxOnHeapExecutionMemory参数,分别通过StaticMemoryManager伴生对象的getMaxExecutionMemory(...)getMaxStorageMemory(...)两个方法获取:

org.apache.spark.memory.StaticMemoryManager
  • private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
  • /**
  • * Return the total amount of memory available for the storage region, in bytes.
  • * 获取最大的存储内存大小
  • */
  • private def getMaxStorageMemory(conf: SparkConf): Long = {
  • // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存
  • val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  • // 存储内存占比,默认为0.6,即存储内存占内存池总大小的60%
  • val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
  • // 存储内存的最大安全系数,默认为0.9,该值用于防止存储内存溢出
  • val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
  • // 因此,存储内存的最大大小为 可用的最大内存 * 0.6 * 0.9,即可用最大内存的54%
  • (systemMaxMemory * memoryFraction * safetyFraction).toLong
  • }
  • /**
  • * Return the total amount of memory available for the execution region, in bytes.
  • */
  • private def getMaxExecutionMemory(conf: SparkConf): Long = {
  • // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存
  • val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  • // 判断可用最大内存是否小于32MB,如果小于32MB则抛出异常
  • if (systemMaxMemory < MIN_MEMORY_BYTES) {
  • throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
  • s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
  • s"option or spark.driver.memory in Spark configuration.")
  • }
  • // 判断是否有spark.executor.memory配置
  • if (conf.contains("spark.executor.memory")) {
  • // 获取spark.executor.memory配置,即Execution使用的内存大小
  • val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
  • // 判断Execution使用的内存大小是否小于32MB,如果小于32MB则抛出异常
  • if (executorMemory < MIN_MEMORY_BYTES) {
  • throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
  • s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
  • s"--executor-memory option or spark.executor.memory in Spark configuration.")
  • }
  • }
  • // 执行内存占比,默认为0.2,即存储内存占内存池总大小的20%
  • val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
  • // 执行内存的最大安全系数,默认为0.8,该值用于防止执行内存溢出
  • val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
  • // 因此,执行内存的最大大小为 可用的最大内存 * 0.2 * 0.8,即可用最大内存的16%
  • (systemMaxMemory * memoryFraction * safetyFraction).toLong
  • }

这两个方法对于maxOnHeapStorageMemorymaxOnHeapExecutionMemory两个参数的默认值计算,上面源码中的注释已经讲解的非常清楚了。

同时,在StaticMemoryManager初始化时,它还会从spark.storage.unrollFraction配置中读取堆内存池里存储内存安全区域用于Unroll操作的最大内存比例,默认为0.2:

org.apache.spark.memory.StaticMemoryManager#maxUnrollMemory
  • // Max number of bytes worth of blocks to evict when unrolling
  • private val maxUnrollMemory: Long = {
  • (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
  • }

根据上面的各项配置,我们可以得出StaticMemoryManager对于堆内存池中存储内存和执行内存的比例划分示意图:

2.StaticMemoryManager堆内存划分.png

读者朋友们可以根据上述的配置自行对照示意图进行分析。

在前面我们提到过,StaticMemoryManager是不可将堆外内存用于存储,这一点也可以从下面的的代码中体现出来:

org.apache.spark.memory.StaticMemoryManager
  • // The StaticMemoryManager does not support off-heap storage memory:
  • offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)

可见,StaticMemoryManager将分配给存储使用的堆外内存收回了。

StaticMemoryManager实现了MemoryManager中要求实现的几个方法。其中maxOffHeapStorageMemory()用于获取堆外内存中用做存储的最大内存大小,但由于StaticMemoryManager不可将堆外内存用于存储,因此该方法直接返回0。在下面的内容中我们将讲解其它的实现方法。

3.2. 申请存储内存

acquireStorageMemory(...)方法用于为指定的数据块申请指定大小的存储内存,虽然该方法接收了指定内存模式的memoryMode参数,但内部检查到尝试申请堆外内存用于存储时会抛出异常。该方法的实现最终是委托给onHeapStorageMemoryPool内存池了:

org.apache.spark.memory.StaticMemoryManager#acquireStorageMemory
  • override def acquireStorageMemory(
  • blockId: BlockId,
  • numBytes: Long,
  • memoryMode: MemoryMode): Boolean = synchronized {
  • // StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
  • require(memoryMode != MemoryMode.OFF_HEAP,
  • "StaticMemoryManager does not support off-heap storage memory")
  • if (numBytes > maxOnHeapStorageMemory) { // 申请的内存大小大于最大的堆内存储内存大小
  • // Fail fast if the block simply won't fit
  • logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
  • s"memory limit ($maxOnHeapStorageMemory bytes)")
  • // 直接返回false
  • false
  • } else {
  • // 交给onHeapStorageMemoryPool内存池处理
  • onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
  • }
  • }

acquireStorageMemory(...)方法只使用maxOnHeapStorageMemory限定了申请的内存大小,其它的检查将交给onHeapStorageMemoryPool内存池。

3.3. 申请展开存储内存

acquireUnrollMemory(...)方法用于为指定的数据块申请指定大小的展开存储内存,源码如下:

org.apache.spark.memory.StaticMemoryManager#acquireUnrollMemory
  • override def acquireUnrollMemory(
  • blockId: BlockId,
  • numBytes: Long,
  • memoryMode: MemoryMode): Boolean = synchronized {
  • // StaticMemoryManager不支持将堆外内存用于存储,需要检查内存模式
  • require(memoryMode != MemoryMode.OFF_HEAP,
  • "StaticMemoryManager does not support off-heap unroll memory")
  • // 获取当前可用于Unroll的存储内存大小
  • val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
  • // 当前空闲的存储内存大小
  • val freeMemory = onHeapStorageMemoryPool.memoryFree
  • // When unrolling, we will use all of the existing free memory, and, if necessary,
  • // some extra space freed from evicting cached blocks. We must place a cap on the
  • // amount of memory to be evicted by unrolling, however, otherwise unrolling one
  • // big block can blow away the entire cache.
  • // 不足的内存大小 = 最大可用于Unroll的存储内存大小 - 当前可用于Unroll的存储内存大小 - 当前空闲的存储内存大小
  • val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
  • // Keep it within the range 0 <= X <= maxNumBytesToFree
  • // 计算需要释放的内存大小
  • val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
  • /**
  • * 委托给onHeapStorageMemoryPool进行处理,注意第3个参数
  • * acquireMemory()方法内会尝试腾出额外的空间以满足不足的内存
  • */
  • onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
  • }

该方法会计算可能不足的内存大小,让onHeapStorageMemoryPool内存池尝试腾出来。

3.4. 申请执行内存

申请执行内存则简单很多,直接根据内存模式委托给特定的内存池即可:

org.apache.spark.memory.StaticMemoryManager#acquireExecutionMemory
  • private[memory] override def acquireExecutionMemory(
  • numBytes: Long,
  • taskAttemptId: Long,
  • memoryMode: MemoryMode): Long = synchronized {
  • memoryMode match {// 委托给具体的内存池处理
  • case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
  • case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
  • }
  • }

4. UnifiedMemoryManager

UnifiedMemoryManager内存管理器实现了动态的内存区域划分;我们先来看一下UnifiedMemoryManager的定义:

org.apache.spark.memory.UnifiedMemoryManager
  • /**
  • * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
  • * either side can borrow memory from the other.
  • *
  • * The region shared between execution and storage is a fraction of (the total heap space - 300MB)
  • * configurable through `spark.memory.fraction` (default 0.6). The position of the boundary
  • * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
  • * This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default.
  • *
  • * Storage can borrow as much execution memory as is free until execution reclaims its space.
  • * When this happens, cached blocks will be evicted from memory until sufficient borrowed
  • * memory is released to satisfy the execution memory request.
  • *
  • * Similarly, execution can borrow as much storage memory as is free. However, execution
  • * memory is *never* evicted by storage due to the complexities involved in implementing this.
  • * The implication is that attempts to cache blocks may fail if execution has already eaten
  • * up most of the storage space, in which case the new blocks will be evicted immediately
  • * according to their respective storage levels.
  • *
  • * UnifiedMemoryManager在MemoryManager的内存模型之上,
  • * 将执行内存和存储内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存。
  • *
  • * @param maxHeapMemory 最大堆内存。大小为系统可用内存与spark.memory.fraction属性值(默认为0.6)的乘积。
  • * @param onHeapStorageRegionSize Size of the storage region, in bytes.
  • * This region is not statically reserved; execution can borrow from
  • * it if necessary. Cached blocks can be evicted only if actual
  • * storage memory usage exceeds this region.
  • * 用于存储的堆内存大小。
  • * @param numCores CPU内核数,该值会影响计算的内存页大小
  • */
  • private[spark] class UnifiedMemoryManager private[memory] (
  • conf: SparkConf,
  • val maxHeapMemory: Long,
  • onHeapStorageRegionSize: Long,
  • numCores: Int)
  • extends MemoryManager(
  • conf,
  • numCores,
  • onHeapStorageRegionSize,
  • maxHeapMemory - onHeapStorageRegionSize) {
  • ...
  • }

UnifiedMemoryManager继承自MemoryManager,定义上也保持了原有的样子,只是增加了maxHeapMemoryonHeapStorageRegionSize参数,这是为了在创建UnifiedMemoryManager实例时更方便地指定总的堆内存大小和用于存储区域的堆内存大小。

4.1. 内存划分

在UnifiedMemoryManager的伴生对象中,定义了便捷的apply(...)方法用于创建UnifiedMemoryManager实例:

org.apache.spark.memory.UnifiedMemoryManager#apply
  • def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
  • // 获取可用的最大堆内存
  • val maxMemory = getMaxMemory(conf)
  • new UnifiedMemoryManager(
  • conf,
  • maxHeapMemory = maxMemory,
  • // 用于存储的堆内存大小,默认为 可用的最大堆内存 * 0.5
  • onHeapStorageRegionSize =
  • (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
  • numCores = numCores)
  • }

从源码可知,系统可用的最大堆内存maxHeapMemory是由getMaxMemory(...)方法决定的,同时用于存储区域的最大堆内存则是由maxHeapMemory与堆内存区域占比来决定的,该占比由spark.memory.storageFraction参数配置,默认为0.5。我们来看一下getMaxMemory(...)方法的源码:

org.apache.spark.memory.UnifiedMemoryManager#getMaxMemory
  • /**
  • * Return the total amount of memory shared between execution and storage, in bytes.
  • * 获取统一内存管理器可用的最大内存
  • */
  • private def getMaxMemory(conf: SparkConf): Long = {
  • // 系统可用的最大内存,通过spark.testing.memory配置,未配置的话则取运行时环境的最大内存
  • val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
  • /**
  • * 获取系统保留内存大小,通过spark.testing.reservedMemory配置获取,
  • * 如果没有指定,判断是否配置了spark.testing,如果配置了则为0,否则默认为300 * 1024 * 1024,即300MB
  • */
  • val reservedMemory = conf.getLong("spark.testing.reservedMemory",
  • if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
  • // 最小的系统内存阈值,默认为450MB
  • val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
  • // 检查系统可用的最大内存是否小于最小系统内存阈值,如果小于则抛出异常
  • if (systemMemory < minSystemMemory) {
  • throw new IllegalArgumentException(s"System memory $systemMemory must " +
  • s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
  • s"option or spark.driver.memory in Spark configuration.")
  • }
  • // SPARK-12759 Check executor memory to fail fast if memory is insufficient
  • // 判断是否有spark.executor.memory配置
  • if (conf.contains("spark.executor.memory")) {
  • // 获取spark.executor.memory配置,即Execution使用的内存大小
  • val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
  • // 判断Execution使用的内存大小是否小于最小系统内存阈值,如果小于则抛出异常
  • if (executorMemory < minSystemMemory) {
  • throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
  • s"$minSystemMemory. Please increase executor memory using the " +
  • s"--executor-memory option or spark.executor.memory in Spark configuration.")
  • }
  • }
  • // 可用内存 = 系统可用的最大内存 - 系统保留内存
  • val usableMemory = systemMemory - reservedMemory
  • // 可用内存占比
  • val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
  • // 最终统一内存管理器的可用内存大小 = 可用内存 * 可用内存占比
  • (usableMemory * memoryFraction).toLong
  • }

getMaxMemory(...)方法根据各种参数,计算得到了最终统一内存管理器可用的内存大小;对于UnifiedMemoryManager中,堆内存的区域划分,有以下示意图:

3.UnifiedMemoryManager堆内存划分.png

对于堆外内存,则由maxOffHeapMemory字段直接决定堆外内存的总大小,该字段定义在UnifiedMemoryManager的父类MemoryManager中,可从配置参数spark.memory.offHeap.size中获取,默认为0:

org.apache.spark.memory.MemoryManager#maxOffHeapMemory
  • // 堆外内存的最大值。可以通过spark.memory.offHeap.size属性指定,默认为0。
  • protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)

同时MemoryManager中还规定了堆外内存中存储内存的比例并对执行内存池和存储内存池都做了容量初始化,回顾源码:

org.apache.spark.memory.MemoryManager
  • /**
  • * 用于存储的堆外内存初始大小。
  • * 先通过spark.memory.storageFraction参数确定用于堆外内存中用于存储的占比;
  • * 默认值为0.5,即表示offHeapStorageMemory和offHeapExecutionMemoryPool各占总堆外内存的50%;
  • * 然后根据将这个比例与总的堆外内存大小相乘,即可得到用于存储的堆外内存初始大小。
  • */
  • protected[this] val offHeapStorageMemory =
  • (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
  • // 对堆外的执行内存池和存储内存池进行容量初始化
  • offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  • offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

UnifiedMemoryManager中可通过maxOffHeapStorageMemory(...)方法获取可用于存储的最大堆外内存大小:

org.apache.spark.memory.UnifiedMemoryManager#maxOffHeapStorageMemory
  • // 返回用于存储的最大堆外内存。
  • override def maxOffHeapStorageMemory: Long = synchronized {
  • // 总对外内存 - 用于计算操作的堆外内存
  • maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
  • }

根据上面的描述,我们可以得到UnifiedMemoryManager中堆外内存的区域划分示意图:

4.UnifiedMemoryManager堆外内存划分.png

4.2. 动态调整

虽然UnifiedMemoryManager所管理的执行和存储两个内存区域的比例在初始时是确定的,即各占50%,但它们在作业运行期间是动态变化的,当申请执行内存时,发现执行内存不够用,则可以向存储内存区域借用一部分内存,同样的,当存储内存不够用时,如果执行内存有空闲,则可以向执行内存区域借用一部分内存;虽说两个内存区域可以互相借还,但还是存在一定的规则:

  1. 两个内存区域空间都不足时,则需要存储到磁盘。
  2. 一方空间不足而对方有空闲空间时,可以借用对方的空间。
  3. 执行内存区域被存储内存借用后,可让其将存储在占用部分的数据转储到硬盘,以归还借用的空间。
  4. 存储内存区域被执行内存借用后,无法让对方归还空间,这是为了保证计算操作不会因为内存不足而OOM。

从上面的规则可以看出,执行内存的优先级是大于存储内存的。对于UnifiedMemoryManager的动态调整机制,有以下的示意图:

5.UnifiedMemoryManager内存区域动态调整.png

另外,为了保证在动态调整时,堆内存和堆外内存的总量能够维持不变,UnifiedMemoryManager提供了assertInvariants()方法检查不可变性,如果调整期间出现了总量发生变化,则会抛出异常:

org.apache.spark.memory.UnifiedMemoryManager#assertInvariants
  • // 检查不可变性
  • private def assertInvariants(): Unit = {
  • // 堆内存用于执行和存储的总内存大小不能发生改变
  • assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
  • // 堆外内存用于执行和存储的总内存大小不能发生改变
  • assert(
  • offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
  • }

4.3. 申请存储内存

有了对UnifiedMemoryManager内存管理中内存划分和动态调整机制的理解之后,我们来看一下申请存储内存的方法acquireStorageMemory(...),源码如下:

org.apache.spark.memory.UnifiedMemoryManager#acquireStorageMemory
  • // 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • override def acquireStorageMemory(
  • blockId: BlockId,
  • numBytes: Long,
  • memoryMode: MemoryMode): Boolean = synchronized { // 加锁
  • // 检查不可变性
  • assertInvariants()
  • // 检查申请的内存大小
  • assert(numBytes >= 0)
  • // 根据内存模式获取执行内存池、存储内存池和可以用于存储的最大空间
  • val (executionPool, storagePool, maxMemory) = memoryMode match {
  • case MemoryMode.ON_HEAP => ( // 堆内存
  • onHeapExecutionMemoryPool,
  • onHeapStorageMemoryPool,
  • maxOnHeapStorageMemory)
  • case MemoryMode.OFF_HEAP => ( // 堆外内存
  • offHeapExecutionMemoryPool,
  • offHeapStorageMemoryPool,
  • maxOffHeapMemory)
  • }
  • // 如果申请的用于存储的内存大于可用于存储的(经过协调之后的)最大空间,则返回false,表示获取失败
  • if (numBytes > maxMemory) {
  • // Fail fast if the block simply won't fit
  • logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
  • s"memory limit ($maxMemory bytes)")
  • return false
  • }
  • // 如果申请的用于存储的内存大于存储内存池的可用空间,则需要去执行内存池中收回之前借出的空间
  • if (numBytes > storagePool.memoryFree) {
  • // There is not enough free memory in the storage pool, so try to borrow free memory from
  • // the execution pool.
  • // 计算从执行内存池中借来的空间
  • val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
  • // 从执行内存池中减去借走的空间
  • executionPool.decrementPoolSize(memoryBorrowedFromExecution)
  • // 将借来的空间加到存储内存池中
  • storagePool.incrementPoolSize(memoryBorrowedFromExecution)
  • }
  • // 使用存储内存池分配空间
  • storagePool.acquireMemory(blockId, numBytes)
  • }

从源码可知,当申请的存储内存大于当前存储内存池的空闲大小时,如果执行内存池有空闲空间,则会尝试向其借用。最终存储内存的分配交给了存储内存池的acquireMemory(...)方法。

4.4. 申请展开存储内存

对于展开存储内存的申请,直接使用了acquireStorageMemory(...)方法:

org.apache.spark.memory.UnifiedMemoryManager#acquireUnrollMemory
  • // 为展开BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • override def acquireUnrollMemory(
  • blockId: BlockId,
  • numBytes: Long,
  • memoryMode: MemoryMode): Boolean = synchronized {
  • acquireStorageMemory(blockId, numBytes, memoryMode)
  • }

4.5. 申请执行内存

对于执行内存的申请则相对来说要复杂一些,由acquireExecutionMemory(...)方法负责:

org.apache.spark.memory.UnifiedMemoryManager#acquireExecutionMemory
  • /**
  • * Try to acquire up to `numBytes` of execution memory for the current task and return the
  • * number of bytes obtained, or 0 if none can be allocated.
  • *
  • * This call may block until there is enough free memory in some situations, to make sure each
  • * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
  • * active tasks) before it is forced to spill. This can happen if the number of tasks increase
  • * but an older task had a lot of memory already.
  • *
  • * 获取执行内存
  • */
  • override private[memory] def acquireExecutionMemory(
  • numBytes: Long,
  • taskAttemptId: Long,
  • memoryMode: MemoryMode): Long = synchronized {
  • // 检查不可变性
  • assertInvariants()
  • // 检查申请的内存大小
  • assert(numBytes >= 0)
  • /**
  • * 根据内存模式获取UnifiedMemoryManager中管理的堆上或堆外的
  • * 执行内存池(executionPool)、存储内存池(storagePool)、
  • * 存储区域大小(storageRegionSize)、内存最大值(maxMemory)。
  • */
  • val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
  • // 堆内存
  • case MemoryMode.ON_HEAP => (
  • onHeapExecutionMemoryPool,
  • onHeapStorageMemoryPool,
  • onHeapStorageRegionSize,
  • maxHeapMemory)
  • // 堆外内存
  • case MemoryMode.OFF_HEAP => (
  • offHeapExecutionMemoryPool,
  • offHeapStorageMemoryPool,
  • offHeapStorageMemory,
  • maxOffHeapMemory)
  • }
  • /**
  • * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
  • *
  • * When acquiring memory for a task, the execution pool may need to make multiple
  • * attempts. Each attempt must be able to evict storage in case another task jumps in
  • * and caches a large block between the attempts. This is called once per attempt.
  • *
  • * 此函数用于借用或收回存储内存。
  • *
  • * 如果存储内存池的空闲空间大于存储内存池从执行内存池借用的空间大小,
  • * 那么除了回收被借用的空间外,还会向存储池再借用一些空间;
  • * 如果存储池的空闲空间小于等于存储池从执行池借用的空间大小,那么只需要回收被借用的空间。
  • */
  • def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
  • if (extraMemoryNeeded > 0) {
  • // There is not enough free memory in the execution pool, so try to reclaim memory from
  • // storage. We can reclaim any free memory from the storage pool. If the storage pool
  • // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
  • // the memory that storage has borrowed from execution.
  • // 可从存储内存池借用的内存大小
  • val memoryReclaimableFromStorage = math.max(
  • storagePool.memoryFree,
  • storagePool.poolSize - storageRegionSize)
  • // 大于0,说明可以从存储区域借用内存
  • if (memoryReclaimableFromStorage > 0) {
  • // Only reclaim as much space as is necessary and available:
  • // 仅借用必要的满足需求的内存大小
  • val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
  • math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
  • // 将借用的内存大小从存储内存池缩减
  • storagePool.decrementPoolSize(spaceToReclaim)
  • // 将借用的内存大小添加到执行内存池
  • executionPool.incrementPoolSize(spaceToReclaim)
  • }
  • }
  • }
  • /**
  • * The size the execution pool would have after evicting storage memory.
  • *
  • * The execution memory pool divides this quantity among the active tasks evenly to cap
  • * the execution memory allocation for each task. It is important to keep this greater
  • * than the execution pool size, which doesn't take into account potential memory that
  • * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
  • *
  • * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
  • * in execution memory allocation across tasks, Otherwise, a task may occupy more than
  • * its fair share of execution memory, mistakenly thinking that other tasks can acquire
  • * the portion of storage memory that cannot be evicted.
  • *
  • * 计算最大的执行内存池时,如果存储区域的边界大小大于已经被存储使用的内存,
  • * 那么执行内存的最大空间可以跨越存储内存与执行内存之间的“软”边界;
  • * 如果存储区域的边界大小小于等于已经被存储使用的内存,
  • * 这说明存储内存已经跨越了存储内存与执行内存之间的“软”边界,
  • * 执行内存可以收回被存储内存借用的空间。
  • */
  • def computeMaxExecutionPoolSize(): Long = {
  • maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
  • }
  • // 调用ExecutionMemoryPool的acquireMemory()方法,给taskAttemptId对应的TaskAttempt获取指定大小的内存。
  • executionPool.acquireMemory(
  • numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
  • }

从上述的源码中定义的内部函数maybeGrowExecutionPool(...)可以看出,申请执行内存时,如遇空间不足,可向存储内存区域借用或回收空间。最终执行内存的分配由执行内存池的acquireMemory(...)方法负责实现。