大数据
流式处理
源码解析
Spark

Spark源码分析11 - 存储体系04:存储层(1)内存池

简介:Spark中数据的存储按照位置来分,可以分为磁盘和内存,存储体系也分别根据这两种存储位置做出了不同的实现;同时,存储层还有一项非常重要的工作,就是对这两种存储进行管理。

1. 存储层简介

在前两篇文章中,详细介绍了Spark存储体系的通信层实现;通信层通过各类消息来控制对存储数据的具体操作,最终对数据的操作依旧是委托给了存储层,那么在本文中我们将详细讨论Spark存储体系的存储层的实现。

Spark中数据的存储按照位置来分,可以分为磁盘和内存,存储体系也分别根据这两种存储位置做出了不同的实现;同时,存储层还有一项非常重要的工作,就是对这两种存储进行管理。

对于磁盘存储,Spark中使用DiskBlockManager和DiskStore两个类来配合实现,它们都属于Spark Core模块的org.apache.spark.storage包下,每个Driver或Executor节点都存在这两种角色。DiskBlockManager主要负责建立逻辑上的数据块与该数据块在磁盘上的写入位置之间的映射关系,DiskStore则负责数据块在磁盘上的进行存储的具体实现方式。

对于内存存储,Spark将数据块抽象为具体的MemoryEntry对象,通过具体的MemoryManager内存管理器组织节点实现内存的分配与回收,而MemoryStore的角色则与DiskStore类似,它负责数据块在内存中进行存储的具体实现方式。

2. 内存存储

我们先来考察Spark存储体系中对内存存储的实现,内存存储体系涉及到多个实现类,我们这里先做一个简短的介绍:

  1. MemoryPool:内存池。内存池用于实现对操作系统分配给Spark的物理内存的逻辑规划,协助Spark管理可调节的内存区域;Spark中将内存规划分为两个部分,分别是执行内存池和存储内存池,对应于ExecutionMemoryPool和StorageMemoryPool两个实现类;从它们的命名就可以得知,一部分是用于执行各类操作,一部分是用于存储。
  2. MemoryManager:内存管理器。负责协调内存存储各类组件的相互协作,有静态内存管理器和统一内存管理器两种实现,对应于StaticMemoryManager和UnifiedMemoryManager两个实现类;前者对执行内存和存储内存的比例控制是固定的,后者则实现了动态比例划分。
  3. MemoryEntry:数据块的内存抽象。数据块存储在内存中的具体表现形式,有DeserializedMemoryEntry和SerializedMemoryEntry两个实现,分别对应于未序列化的对象存储和序列化的二进制数据存储。
  4. MemoryStore:内存存储的具体执行类。它是执行各类内存存储操作的主要实现类。

3. 内存池

Spark在每个实现存储体系的节点上都不止有一个内存池。MemoryPool是内存池的顶层抽象类,定义和重要字段如下:

org.apache.spark.memory.MemoryPool
  • /**
  • * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to
  • * the [[MemoryManager]]. See subclasses for more details.
  • *
  • * 无论是堆内存还是堆外内存,都需要一个内存池对内存进行资源管理。
  • *
  • * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type
  • * to `Object` to avoid programming errors, since this object should only be used for
  • * synchronization purposes.
  • * 对内存池提供线程安全保证的锁对象。
  • */
  • private[memory] abstract class MemoryPool(lock: Object) {
  • // 内存池的大小(单位为字节)。
  • @GuardedBy("lock")
  • private[this] var _poolSize: Long = 0
  • ...
  • }

从上述源码可知,MemoryPool定义了一个私有的_poolSize字段用于表示内存池的总大小(以字节为单位),不过MemoryPool的还定义了一个Object类型的构造参数lock,该参数是用于同步控制的锁对象;在MemoryPool在它已实现的方法中,利用lock锁保证了对_poolSize字段的安全修改:

org.apache.spark.memory.MemoryPool
  • /**
  • * Returns the amount of used memory in this pool (in bytes).
  • * 获取已经使用的内存大小(单位为字节)。由子类实现。
  • */
  • def memoryUsed: Long
  • /**
  • * Returns the current size of the pool, in bytes.
  • * 返回内存池的大小(即_poolSize,单位为字节)的方法。
  • */
  • final def poolSize: Long = lock.synchronized {
  • _poolSize
  • }
  • /**
  • * Returns the amount of free memory in the pool, in bytes.
  • * 获取内存池的空闲空间(即_poolSize减去memoryUsed的大小,单位为字节)。
  • */
  • final def memoryFree: Long = lock.synchronized {
  • _poolSize - memoryUsed
  • }
  • /**
  • * Expands the pool by `delta` bytes.
  • * 给内存池扩展delta给定的大小(单位为字节)。delta必须为正整数。
  • */
  • final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
  • require(delta >= 0)
  • _poolSize += delta
  • }
  • /**
  • * Shrinks the pool by `delta` bytes.
  • * 将内存池缩小delta给定的大小(单位为字节)。
  • * delta必须为正整数且_poolSize与delta的差要大于等于memoryUsed(即已经使用的内存不能从内存池中移除)。
  • */
  • final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
  • require(delta >= 0)
  • require(delta <= _poolSize)
  • require(_poolSize - delta >= memoryUsed)
  • _poolSize -= delta
  • }

MemoryPool中要求子类必须实现的方法只有def memoryUsed: Long,它用于获取已用内存大小;其它的方法几乎都是对_poolSize字段的值进行获取或修改,大家仔细观察会发现这些方法都是用了lock.synchronized进行同步控制。

在Spark中MemoryPool有两个实现了:ExecutionMemoryPool和StorageMemoryPool。下面将分别介绍。

3.1. ExecutionMemoryPool

ExecutionMemoryPool实现了执行内存池的分配和管理,它用于保证Task合理地进行内存使用,避免由于某些Task过度使用内存导致其它的Task频繁将数据溢写到磁盘。

ExecutionMemoryPool会根据Task的数量来动态控制每个Task所能申请的用于执行操作的内存大小范围。假如有N个Task,那么每个Task所能申请的执行内存大小在总内存的1 / 2N ~ 1 / N这个范围之间。由于Task数量是动态变化的,因此ExecutionMemoryPool会跟踪所有激活的Task的数量以便动态更新1 / 2N ~ 1 / N范围值。

ExecutionMemoryPool的定义和重要字段如下:

org.apache.spark.memory.ExecutionMemoryPool
  • /**
  • * Implements policies and bookkeeping for sharing an adjustable-sized pool of memory between tasks.
  • *
  • * Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up
  • * to a large amount first and then causing others to spill to disk repeatedly.
  • *
  • * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
  • * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
  • * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this
  • * set changes. This is all done by synchronizing access to mutable state and using wait() and
  • * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
  • * tasks was performed by the ShuffleMemoryManager.
  • *
  • * 执行内存池的实现。
  • * 该类用于保证Task合理地进行内存使用,避免因为某些Task过度使用内存导致其它的Task频繁将数据溢写到磁盘。
  • *
  • * 如果有N个Task,则每个Task所能分配到的内存在总内存的 1 / 2N ~ 1 / N 之间。
  • * 由于Task数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1 / 2N 和 1 / N 的值。
  • *
  • * @param lock a [[MemoryManager]] instance to synchronize on
  • * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
  • * 内存模式。用于执行的内存池包括堆内存和堆外内存两种。
  • */
  • private[memory] class ExecutionMemoryPool(
  • lock: Object,
  • memoryMode: MemoryMode
  • ) extends MemoryPool(lock) with Logging {
  • /**
  • * 内存池的名称。
  • * 如果memoryMode是MemoryMode.ON_HEAP,则内存池名称为on-heap execution。
  • * 如果memoryMode是MemoryMode.OFF_HEAP,则内存池名称为off-heap execution。
  • */
  • private[this] val poolName: String = memoryMode match {
  • case MemoryMode.ON_HEAP => "on-heap execution"
  • case MemoryMode.OFF_HEAP => "off-heap execution"
  • }
  • /**
  • * Map from taskAttemptId -> memory consumption in bytes
  • *
  • * TaskAttempt的身份标识(taskAttemptId)与所消费内存的大小之间的映射关系。
  • */
  • @GuardedBy("lock")
  • private val memoryForTask = new mutable.HashMap[Long, Long]()
  • ...
  • }

ExecutionMemoryPool在其父类的基础上,增加了MemoryMode类型的参数memoryMode,MemoryMode用于表示使用的内存模式,它的定义如下:

org.apache.spark.memory.MemoryMode
  • @Private
  • public enum MemoryMode {
  • ON_HEAP, // 堆内存;并不是JVM中的Java堆,它只是JVM堆内存的一部分。
  • OFF_HEAP // 堆外内存;是Spark使用sun.misc.Unsafe的API直接在工作节点的系统内存中开辟的空间。
  • }

可见,MemoryMode是一个枚举类,定义了ON_HEAP和OFF_HEAP两种内存模式,分别表示堆内存模式和堆外内存模式。

ExecutionMemoryPool还使用HashMap[Long, Long]类型的字典memoryForTask记录每个TaskAttempt的ID与其所使用的执行内存的大小之间的映射关系。ExecutionMemoryPool复写了父类的memoryUsed()方法,提供了统计已使用总内存大小的方法,它是通过累加memoryForTask字典所有的值来实现的:

org.apache.spark.memory.ExecutionMemoryPool#memoryUsed
  • /**
  • * 已经使用的内存大小(单位为字节)。
  • * 实际为所有TaskAttempt所消费的内存大小之和,即memoryForTask这个Map中所有value的和。
  • */
  • override def memoryUsed: Long = lock.synchronized {
  • memoryForTask.values.sum
  • }

getMemoryUsageForTask(...)方法可以根据TaskAttempt的ID从ExecutionMemoryPool中获取指定TaskAttempt使用的执行内存的大小:

org.apache.spark.memory.ExecutionMemoryPool#getMemoryUsageForTask
  • /**
  • * Returns the memory consumption, in bytes, for the given task.
  • * 获取TaskAttempt使用的内存大小,即memoryForTask中taskAttemptId对应的value值。
  • */
  • def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
  • memoryForTask.getOrElse(taskAttemptId, 0L)
  • }

3.1.1. 申请内存

可以使用ExecutionMemoryPool的acquireMemory(...)方法为指定的TaskAttempt申请指定大小的执行内存,它的实现比较复杂,源码如下:

org.apache.spark.memory.ExecutionMemoryPool#acquireMemory
  • /**
  • * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
  • * obtained, or 0 if none can be allocated.
  • *
  • * This call may block until there is enough free memory in some situations, to make sure each
  • * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
  • * active tasks) before it is forced to spill. This can happen if the number of tasks increase
  • * but an older task had a lot of memory already.
  • *
  • * 用于给taskAttemptId对应的TaskAttempt获取指定大小(即numBytes)的内存
  • *
  • * @param numBytes number of bytes to acquire
  • * 分配的内存大小
  • * @param taskAttemptId the task attempt acquiring memory
  • * 指定的TaskAttempt的ID
  • * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
  • * one parameter (Long) that represents the desired amount of memory by
  • * which this pool should be expanded.
  • * 回调函数,用于处理潜在的内存池增长情况
  • * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
  • * at this given moment. This is not a field because the max pool
  • * size is variable in certain cases. For instance, in unified
  • * memory management, the execution pool can be expanded by evicting
  • * cached blocks, thereby shrinking the storage pool.
  • * 用于限制本次分配的最大内存的回调函数,默认传入() => poolSize,即可分配所有内存。
  • * 传入回调函数的原因在于,不同的内存管理器对执行内存和存储内存的划分方式是不同的,
  • * 例如UnifiedMemoryManager可以通过挤压存储内存区域以扩大执行内存区域。
  • *
  • * @return the number of bytes granted to the task.
  • */
  • private[memory] def acquireMemory(
  • numBytes: Long,
  • taskAttemptId: Long,
  • maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
  • computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
  • assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
  • // TODO: clean up this clunky method signature
  • // Add this task to the taskMemory map just so we can keep an accurate count of the number
  • // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
  • if (!memoryForTask.contains(taskAttemptId)) { // 如果memoryForTask中还没有记录taskAttemptId
  • // 将taskAttemptId放入memoryForTask,初始状态taskAttemptId所消费的内存为0
  • memoryForTask(taskAttemptId) = 0L
  • // This will later cause waiting tasks to wake up and check numTasks again
  • // 唤醒其他等待获取ExecutionMemoryPool的锁的线程
  • lock.notifyAll()
  • }
  • // Keep looping until we're either sure that we don't want to grant this request (because this
  • // task would have more than 1 / numActiveTasks of the memory) or we have enough free
  • // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
  • // TODO: simplify this to limit each task to its own slot
  • while (true) {
  • // 获取当前激活的Task的数量
  • val numActiveTasks = memoryForTask.keys.size
  • // 获取当前TaskAttempt所消费的内存
  • val curMem = memoryForTask(taskAttemptId)
  • // In every iteration of this loop, we should first try to reclaim any borrowed execution
  • // space from storage. This is necessary because of the potential race condition where new
  • // storage blocks may steal the free execution memory that this task was waiting for.
  • /**
  • * numBytes - memoryFree计算出不够分配的内存大小,然后尝试从StorageMemoryPool回收或借用内存。
  • * 这里使用的即是maybeGrowPool回调函数,对于不同的内存管理器,实现方式是不同的,
  • */
  • maybeGrowPool(numBytes - memoryFree)
  • // Maximum size the pool would have after potentially growing the pool.
  • // This is used to compute the upper bound of how much memory each task can occupy. This
  • // must take into account potential free memory as well as the amount this pool currently
  • // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
  • // we did not take into account space that could have been freed by evicting cached blocks.
  • // 从这里可以看出内存池对每次每个TaskAttempt可申请的内存范围是动态进行计算的
  • // 计算当前内存池的最大大小
  • val maxPoolSize = computeMaxPoolSize()
  • // 计算每个TaskAttempt最大可以使用的内存大小,即 可用总内存大小 / 激活任务数量
  • val maxMemoryPerTask = maxPoolSize / numActiveTasks
  • // 计算每个TaskAttempt最小保证使用的内存大小,即 当前内存池大小 / (激活任务数量 * 2)
  • val minMemoryPerTask = poolSize / (2 * numActiveTasks)
  • // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
  • // 计算本次可分配给TaskAttempt的最大可保证的大小
  • val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
  • // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
  • // 计算当前TaskAttempt真正可以申请获取的内存大小
  • val toGrant = math.min(maxToGrant, memoryFree)
  • // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
  • // if we can't give it this much now, wait for other tasks to free up memory
  • // (this happens if older tasks allocated lots of memory before N grew)
  • /**
  • * 判断内存是否满足这一次的申请:
  • * 1. toGrant < numBytes:表示可分配大小小于本次申请需要的大小;
  • * 2. curMem + toGrant < minMemoryPerTask:表示该TaskAttempt申请的大小已经超过了
  • * 单个TaskAttempt可申请的最大大小
  • */
  • if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
  • logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
  • // 内存不足,使当前线程处于等待状态
  • lock.wait()
  • } else {
  • // 成功获取到toGrant指定大小的内存
  • memoryForTask(taskAttemptId) += toGrant
  • return toGrant
  • }
  • }
  • 0L // Never reached
  • }

acquireMemory(...)方法由四个参数,我们需要先理清这四个参数的用处:

  1. numBytes:Long类型,用于表示本次申请的内存大小,以字节为单位。
  2. taskAttemptId:Long类型,用于表示发起本次申请操作的TaskAttempt的ID。
  3. maybeGrowPool:它是类型为Long => Unit的回调函数,默认值为(additionalSpaceNeeded: Long) => Unit空实现。它需要传入一个Long类型的参数additionalSpaceNeeded表示额外需要的内存空间,这个回调函数用于在内存池内存无法满足本次分配时,尝试进行额外的内存增长操作;读者可能有疑惑,为什么内存池的内存还可以增长?这是由于在Spark中实现了两种内存管理器,其中UnifiedMemoryManager内存管理器可以让执行内存和存储内存的划分比例相互挤压,当执行内存不够时,如果存储内存有空闲,可以暂时向存储内存借用一些空间。maybeGrowPool参数正是用于这种情况下尝试进行额外内存增长的回调函数。
  4. computeMaxPoolSize:它是类型为() => Long的回调函数,默认值为() => poolSize,即返回当前内存池的最大大小;该回调是用于在执行maybeGrowPool进行额外内存增长后重新计算当前内存池可用的最大内存大小。

acquireMemory(...)方法正是通过这四个参数来决定分配操作如何执行,执行过程有以下几步:

  1. 首先判断当前内存池是否记录有指定TaskAttempt的内存分配记录,如果没有就将其分配记录置为0,同时唤醒其它的内存分配申请者。
  2. while死循环体内是内存分配的主要实现;首先计算当前激活的TaskAttempt的数量,然后通过numBytes - memoryFree计算内存池剩余内存是否满足本次分配,如果不满足会使用maybeGrowPool回调函数尝试进行额外内存增长操作,增长完后会重新计算当前内存池最大可用内存。
  3. 通过激活TaskAttempt数量分别计算每个TaskAttempt可申请的内存大小范围;这里的激活TaskAttempt表示当前内存池中已经记录的申请过内存分配的TaskAttempt。
  4. 计算本次申请理论上可分配给指定TaskAttempt的内存大小,以及根据内存池当前的空闲内存大小计算实际可分配给指定TaskAttempt的内存大小。
  5. 如果实际可分配的内存大小无法满足此次分配,且该TaskAttempt所使用的总内存大小小于单个TaskAttempt可申请的最小大小,就阻塞本次的申请线程;在有其它TaskAttempt释放内存时会被唤醒,进而重新尝试申请。
  6. 如果实际可分配的内存大小满足此次分配;或者虽然实际可分配的内存大小不满足此次分配,但分配给该TaskAttempt的总内存大小大于单个TaskAttempt可申请的最小大小,就将实际可分配的内存分配给该TaskAttempt。

注意,这第5和第6步的操作中,限制了每个TaskAttempt申请的内存大小必须大于每个TaskAttempt可申请的最小大小;即就算实际可分配的内存大小满足该TaskAttempt的本次申请,但分配给该TaskAttempt的累计内存大小如果少于单个TaskAttempt可申请的最小大小,也不会进行分配。

3.1.2. 释放内存

ExecutionMemoryPool的releaseMemory(...)方法用于释放指定的TaskAttempt占用的指定大小的执行内存,该方法的实现是比较简单的,源码如下:

org.apache.spark.memory.ExecutionMemoryPool#releaseMemory
  • /**
  • * Release `numBytes` of memory acquired by the given task.
  • *
  • * 用于给taskAttemptId对应的TaskAttempt释放指定大小(即numBytes)的内存。
  • */
  • def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
  • // 获取taskAttemptId代表的TaskAttempt消费的内存(即curMem)
  • val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
  • // 计算能够释放的内存大小
  • var memoryToFree = if (curMem < numBytes) { // 可释放内存大小小于指定释放的内存
  • logWarning(
  • s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
  • s"of memory from the $poolName pool")
  • // 只释放可释放的内存大小
  • curMem
  • } else { // 可释放内存大小大于指定释放的内存
  • // 释放指定的内存大小
  • numBytes
  • }
  • /**
  • * taskAttemptId代表的TaskAttempt占用的内存大小减去memoryToFree。
  • * 如果taskAttemptId代表的TaskAttempt占用的内存大小小于等于零,
  • * 还需要将taskAttemptId与所消费内存的映射关系从memoryForTask中清除。
  • */
  • if (memoryForTask.contains(taskAttemptId)) { // 释放内存(逻辑内存)
  • memoryForTask(taskAttemptId) -= memoryToFree
  • // 如果TaskAttempt占用的内存小于等于0,就将其移除
  • if (memoryForTask(taskAttemptId) <= 0) {
  • memoryForTask.remove(taskAttemptId)
  • }
  • }
  • // 唤醒所有申请获得内存,但是处于等待状态的线程
  • lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
  • }

除了releaseMemory(...)方法,还有一个releaseAllMemoryForTask(...)方法用于释放指定TaskAttempt所占用的所有执行内存,内部调用了releaseMemory(...)方法:

org.apache.spark.memory.ExecutionMemoryPool#releaseAllMemoryForTask
  • /**
  • * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
  • *
  • * 用于释放taskAttemptId对应的TaskAttempt所消费的所有内存。
  • *
  • * @return the number of bytes freed.
  • */
  • def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
  • // 获取taskAttemptId对应的TaskAttempt消费的内存
  • val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
  • // 进行释放
  • releaseMemory(numBytesToFree, taskAttemptId)
  • // 返回释放的内存大小
  • numBytesToFree
  • }

3.2. StorageMemoryPool

上一节我们讨论了执行内存池的实现,本节我们将讨论存储内存池的实现。StorageMemoryPool是用于管理存储内存池的实现类,位于Spark Core模块的org.apache.spark.memory包下,同样继承自MemoryPool类。StorageMemoryPool是针对数据块存储而设计的,它实现相对于ExecutionMemoryPool来说要简单不少;我们先来看一下它的定义和重要字段,如下:

org.apache.spark.memory.StorageMemoryPool
  • /**
  • * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
  • * (caching).
  • *
  • * StorageMemoryPool是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。
  • *
  • * @param lock a [[MemoryManager]] instance to synchronize on
  • * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
  • * 内存模式;用于存储的内存池包括堆内存的内存池和堆外内存的内存池。
  • */
  • private[memory] class StorageMemoryPool(
  • lock: Object,
  • memoryMode: MemoryMode
  • ) extends MemoryPool(lock) with Logging {
  • /**
  • * 内存池的名称。
  • * - 如果memoryMode是MemoryMode.ON_HEAP,则内存池名称为on-heap storage。
  • * - 如果memoryMode是MemoryMode.OFF_HEAP,则内存池名称为off-heap storage。
  • */
  • private[this] val poolName: String = memoryMode match {
  • case MemoryMode.ON_HEAP => "on-heap storage"
  • case MemoryMode.OFF_HEAP => "off-heap storage"
  • }
  • // 已经使用的内存大小(单位为字节)。
  • @GuardedBy("lock")
  • private[this] var _memoryUsed: Long = 0L
  • // 当前StorageMemoryPool所关联的MemoryStore。
  • private var _memoryStore: MemoryStore = _
  • ...
  • }

StorageMemoryPool中,并不会记录每个TaskAttempt使用的存储量;它提供了_memoryUsed字段用于表示已经使用的内存大小,另外还提供了MemoryStore类型的字段_memoryStore及其Getter和Setter方法,用于实现数据块在内存中的存储。关于MemoryStore在后面的章节中会详细介绍。

3.2.1. 申请内存

同样的,StorageMemoryPool提供了acquireMemory(...)方法用于申请内存,它有两个重载版本,其中acquireMemory(blockId: BlockId, numBytes: Long): Boolean方法仅仅指定了需要申请内存的数据块的BlockId以及申请的内存大小,其内部调用了另外一个重载版本:

org.apache.spark.memory.StorageMemoryPool#acquireMemory
  • /**
  • * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
  • *
  • * 用于给BlockId对应的Block获取numBytes指定大小的内存。
  • *
  • * @return whether all N bytes were successfully granted.
  • */
  • def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
  • // 计算申请内存和可用内存的差值,避免申请过量
  • val numBytesToFree = math.max(0, numBytes - memoryFree)
  • // 调用重载方法分配
  • acquireMemory(blockId, numBytes, numBytesToFree)
  • }

可见,该方法会根据内存池当前剩余的内存来计算不足内存,然后交给重载版本的acquireMemory(...)方法处理,重载版本的源码如下:

org.apache.spark.memory.StorageMemoryPool#acquireMemory
  • /**
  • * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
  • *
  • * @param blockId the ID of the block we are acquiring storage memory for
  • * 申请内存的BlockId
  • * @param numBytesToAcquire the size of this block
  • * 申请的内存大小
  • * @param numBytesToFree the amount of space to be freed through evicting blocks
  • * 本次申请需要额外空出来的内存大小
  • * @return whether all N bytes were successfully granted.
  • * 所需要的内存是否申请成功了
  • */
  • def acquireMemory(
  • blockId: BlockId,
  • numBytesToAcquire: Long,
  • numBytesToFree: Long): Boolean = lock.synchronized { // 加锁
  • // 检查参数
  • assert(numBytesToAcquire >= 0)
  • assert(numBytesToFree >= 0)
  • // 已用内存大小需要小于或等于内存池总大小
  • assert(memoryUsed <= poolSize)
  • // 判断是否需要腾出额外的内存大小
  • if (numBytesToFree > 0) { // 腾出numBytesToFree属性指定大小的空间
  • // 使用MemoryStore的evictBlocksToFreeSpace()方法进行腾出
  • memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
  • }
  • // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
  • // back into this StorageMemoryPool in order to free memory. Therefore, these variables
  • // should have been updated.
  • // 判断可用内存是否充足
  • val enoughMemory = numBytesToAcquire <= memoryFree
  • if (enoughMemory) { // 可用内存充足,增加已经使用的内存大小
  • _memoryUsed += numBytesToAcquire
  • }
  • // 返回是否成功获得了用于存储Block的内存空间
  • enoughMemory
  • }

相对于ExecutionMemoryPool来说,StorageMemoryPool对申请内存的操作要简单不少。这里需要注意acquireMemory(...)方法的第三个参数,该参数用于控制需要额外腾出的内存大小,如果大于0,acquireMemory(...)方法会尝试使用MemoryStore的evictBlocksToFreeSpace(...)方法腾出指定大小即内存模式的空间。

3.2.2. 释放内存

StorageMemoryPool用于释放内存的方法也比较简单,源码如下:

org.apache.spark.memory.StorageMemoryPool#releaseMemory
  • // 释放内存
  • def releaseMemory(size: Long): Unit = lock.synchronized {
  • if (size > _memoryUsed) {
  • // 释放的大小大于已使用大小,则释放当前内存池的所有内存,即将_memoryUsed设置为0。
  • logWarning(s"Attempted to release $size bytes of storage " +
  • s"memory when we only have ${_memoryUsed} bytes")
  • _memoryUsed = 0
  • } else {
  • // 否则从已使用内存大小中减去释放的大小
  • _memoryUsed -= size
  • }
  • }

可见,其内部仅仅是做了参数校验,并维护_memoryUsed字段的值。同样的,releaseAllMemory()方法用于释放所有的内存,它内部会将_memoryUsed字段置为0:

org.apache.spark.memory.StorageMemoryPool#releaseAllMemory
  • // 释放当前内存池的所有内存,即将_memoryUsed设置为0。
  • def releaseAllMemory(): Unit = lock.synchronized {
  • _memoryUsed = 0
  • }

3.2.3. 缩小内存池大小

StorageMemoryPool比ExecutionMemoryPool多提供了一个方法freeSpaceToShrinkPool(...)用于释放指定大小的内存以缩小内存池的大小,不过该方法内部仅仅是进行了可释放大小的计算,并未真正对内存池的大小进行修改;它的源码如下:

org.apache.spark.memory.StorageMemoryPool#freeSpaceToShrinkPool
  • /**
  • * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
  • * Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
  • *
  • * 用于释放指定大小的空间,缩小内存池的大小。
  • *
  • * @return number of bytes to be removed from the pool's capacity.
  • */
  • def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
  • // 计算释放的大小和空闲大小的最小值
  • val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
  • // 计算可释放的内存是否足够
  • val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
  • if (remainingSpaceToFree > 0) { // 如果可释放的内存不够
  • // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
  • // 使用evictBlocksToFreeSpace()方法尝试腾出一些内存
  • val spaceFreedByEviction =
  • memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
  • // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
  • // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
  • // 返回最终释放的大小
  • spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
  • } else {
  • // 可释放的内存足够,直接返回释放的大小即可
  • spaceFreedByReleasingUnusedMemory
  • }
  • }

该方法会判断当前内存池空闲大小是否满足要求释放的大小,如果满足则返回可释放的大小,否则会使用MemoryStore的evictBlocksToFreeSpace(...)尝试腾出一些内存,然后返回最终可释放的大小。从该方法的整体实现可以看出,它并没有修改当前内存池的大小。