大数据
流式处理
源码解析
Spark

Spark源码分析13 - 存储体系06:存储层(3)内存存储的具体实现

简介:内存存储的具体实现由存储实体MemoryEntry类和用于内存操作的MemoryStore类相互配合以实现。

1. 内存存储具体实现

内存存储具体由存储实体MemoryEntry类和用于内存操作的MemoryStore类相互配合以实现。

2. MemoryEntry

在内存存储中,数据块会被抽象为MemoryEntry类型,它是一个特质,定义如下:

org.apache.spark.storage.memory.MemoryEntry
  • // 内存中的Block抽象为特质MemoryEntry
  • private sealed trait MemoryEntry[T] {
  • def size: Long // 当前Block的大小
  • def memoryMode: MemoryMode // 当前Block的存储的内存类型
  • def classTag: ClassTag[T] // 当前Block的类型标记
  • }

MemoryEntry定义了三个规范方法,分别表示数据块的大小、存储类型以及数据块的类型标记。MemoryEntry有两个实现类,用于表示实际存储到内存的存储实体,它们都是样例类,分别表示序列化和反序列化的存储实体;其中序列化存储实体是SerializedMemoryEntry类,源码如下:

org.apache.spark.storage.memory.SerializedMemoryEntry
  • // 表示序列化后的MemoryEntry
  • private case class SerializedMemoryEntry[T](
  • buffer: ChunkedByteBuffer,
  • memoryMode: MemoryMode,
  • classTag: ClassTag[T]) extends MemoryEntry[T] {
  • def size: Long = buffer.size
  • }

反序列化存储实体是DeserializedMemoryEntry类,源码如下

org.apache.spark.storage.memory.DeserializedMemoryEntry
  • // 表示反序列化后的MemoryEntry
  • private case class DeserializedMemoryEntry[T](
  • value: Array[T],
  • size: Long,
  • classTag: ClassTag[T]) extends MemoryEntry[T] {
  • val memoryMode: MemoryMode = MemoryMode.ON_HEAP
  • }

3. MemoryStore

MemoryStore是用于内存操作功能类,它依赖于上面讲解的MemoryManager,负责对数据块进行内存存储。MemoryStore的定义如下:

org.apache.spark.storage.memory.MemoryStore
  • /**
  • * Stores blocks in memory, either as Arrays of deserialized Java objects or as
  • * serialized ByteBuffers.
  • *
  • * 内存存储。依赖于MemoryManager,负责对Block的内存存储。
  • *
  • * @param blockInfoManager Block信息管理器BlockInfoManager
  • * @param serializerManager 序列化管理器SerializerManager
  • * @param memoryManager 内存管理器MemoryManager
  • * @param blockEvictionHandler Block驱逐处理器。用于将Block从内存中驱逐出去。
  • */
  • private[spark] class MemoryStore(
  • conf: SparkConf,
  • blockInfoManager: BlockInfoManager,
  • serializerManager: SerializerManager,
  • memoryManager: MemoryManager,
  • blockEvictionHandler: BlockEvictionHandler)
  • extends Logging {
  • ...
  • }

MemoryStore构造时需要传入BlockInfoManager、SerializerManager、MemoryManager和BlockEvictionHandler,其中BlockInfoManager和MemoryManager在前面已经讲解过了。SerializerManager是用于序列化操作的序列化器,用于在存储或读取数据时进行序列化和反序列化;BlockEvictionHandler意味数据块驱逐处理器,它用于从内存中驱逐指定的数据块。这两个辅助组件将在后面讲解。

在MemoryStore中,定义了三个集合字段用于记录特定的关系:

org.apache.spark.storage.memory.MemoryStore
  • // 内存中的BlockId与MemoryEntry(Block的内存形式)之间映射关系的缓存。
  • private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
  • // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
  • // All accesses of this map are assumed to have manually synchronized on `memoryManager`
  • // TaskAttempt线程的标识TaskAttemptId与该TaskAttempt线程在堆内存展开的所有Block占用的内存大小之和之间的映射关系。
  • private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
  • // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching
  • // always stores serialized values.
  • // TaskAttempt线程的标识TaskAttemptId与该TaskAttempt线程在堆外内存展开的所有Block占用的内存大小之和之间的映射关系。
  • private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()

unrollMemoryThreshold字段限制了在Unroll数据块之前,初次申请内存时的容量阈值,默认为1MB:

  • // Initial memory to request before unrolling any block
  • // 用来展开任何Block之前,初始请求的内存大小,可以修改属性spark.storage.unrollMemoryThreshold(默认为1MB)改变大小。
  • private val unrollMemoryThreshold: Long =
  • conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

MemoryStore还定义了一些方法,用于描述MemoryStore中记录存储内存相关的使用量:

  • /** Total amount of memory available for storage, in bytes.
  • * MemoryStore用于存储Block的最大内存,其实质为MemoryManager的maxOnHeapStorageMemory和maxOffHeapStorageMemory之和。
  • * - 如果Memory Manager为StaticMemoryManager,那么maxMemory的大小是固定的。
  • * - 如果Memory Manager为UnifiedMemoryManager,那么maxMemory的大小是动态变化的。
  • **/
  • private def maxMemory: Long = {
  • memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
  • }
  • /** Total storage memory used including unroll memory, in bytes.
  • * MemoryStore中已经使用的内存大小。
  • * 其实质为MemoryManager中onHeapStorageMemoryPool已经使用的大小和offHeapStorageMemoryPool已经使用的大小之和。
  • **/
  • private def memoryUsed: Long = memoryManager.storageMemoryUsed
  • /**
  • * Return the amount of memory currently occupied for unrolling blocks across all tasks.
  • *
  • * 用于展开数据块所使用的内存大小,
  • * 是onHeapUnrollMemoryMap中的所有用于展开Block所占用的内存大小与
  • * offHeap-UnrollMemoryMap中的所有用于展开Block所占用的内存大小之和。
  • */
  • def currentUnrollMemory: Long = memoryManager.synchronized {
  • onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
  • }
  • /**
  • * Amount of storage memory, in bytes, used for caching blocks.
  • * This does not include memory used for unrolling.
  • *
  • * MemoryStore用于存储Block(即MemoryEntry)使用的内存大小,即memoryUsed与currentUnrollMemory的差值。
  • */
  • private def blocksMemoryUsed: Long = memoryManager.synchronized {
  • memoryUsed - currentUnrollMemory
  • }

从上述几个方法的实现可知:

  1. MemoryStore中用于存储的最大内存量maxMemory包括用于存储的最大堆内存量memoryManager.maxOnHeapStorageMemory和用于存储的最大堆外内存量memoryManager.maxOffHeapStorageMemory
  2. MemoryStore中用于展开所有数据块所使用的内存量包括堆内存中用于展开所有数据块所使用的内存量onHeapUnrollMemoryMap.values.sum和堆外内存中用于展开所有数据块所使用的内存量offHeapUnrollMemoryMap.values.sum
  3. MemoryStore中已使用的内存大小memoryUsed为用于展开数据块所使用的内存大小currentUnrollMemory与用于存储数据块所使用的内存大小blocksMemoryUsed之和。

通过以上的介绍,我们可以得出MemoryStore中的内存抽象示意图:

1.MemoryStore的内存抽象.png

MemoryStore的currentUnrollMemoryForThisTask(...)方法可以获取当前上下文环境中的TaskAttempt用于展开数据块所使用的内存量:

org.apache.spark.storage.memory.MemoryStore#currentUnrollMemoryForThisTask
  • /**
  • * Return the amount of memory currently occupied for unrolling blocks by this task.
  • * 当前的TaskAttempt线程用于展开Block所占用的内存。
  • * 即onHeapUnrollMemoryMap中缓存的当前TaskAttempt线程对应的占用大小与
  • * offHeapUnrollMemoryMap中缓存的当前的TaskAttempt线程对应的占用大小之和。
  • */
  • def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
  • // onHeapUnrollMemoryMap中缓存的当前TaskAttempt线程对应的占用大小
  • onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) +
  • // offHeapUnrollMemoryMap中缓存的当前的TaskAttempt线程对应的占用大小
  • offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
  • }

从源码可知,它会根据TaskAttemptId去onHeapUnrollMemoryMapoffHeapUnrollMemoryMap两个字典中分别获取对应的堆内存使用量和堆外内存使用量并相加。

MemoryStore的numTasksUnrolling(...)方法可以获取当前正在进行数据块展开的TaskAttempt的数量,它是通过统计onHeapUnrollMemoryMapoffHeapUnrollMemoryMap两个字典的键的并集得到的:

org.apache.spark.storage.memory.MemoryStore#numTasksUnrolling
  • /**
  • * Return the number of tasks currently unrolling blocks.
  • * 当前使用MemoryStore展开Block的任务的数量。
  • * 其实质为onHeapUnrollMemoryMap的键集合与offHeapUnrollMemoryMap的键集合的并集。
  • */
  • private def numTasksUnrolling: Int = memoryManager.synchronized {
  • (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size
  • }

接下来我们分析一下MemoryStore提供的方法。

3.1. 保留与释放展开内存

在MemoryStore写入数据到内存的过程中,存在数据块所对应的数据是由多个元素组成的情况,在这种情况下,需要预先计算并保留写入需要占用的内存,以便让数据块能够一次写入成功,因此MemoryStore提供了两个方法用于保留和释放展开内存,即reserveUnrollMemoryForThisTask(...)releaseUnrollMemoryForThisTask(...),下面将分别介绍。

reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean方法有三个参数:

  1. blockId:BlockId类型,数据块对应的BlockId标识。
  2. memory:Long类型,申请保留的展开内存大小。
  3. memoryMode:MemoryMode类型,申请保留的展开内存的类型。

从该方法的命名就可以得知,它是用于为当前上下文中的TaskAttempt保留指定大小、指定内存模式的展开内存,源码如下:

org.apache.spark.storage.memory.MemoryStore#reserveUnrollMemoryForThisTask
  • /**
  • * Reserve memory for unrolling the given block for this task.
  • *
  • * 用于为展开TaskAttempt任务给定的Block,保留指定内存模式上指定大小的内存。
  • *
  • * @return whether the request is granted.
  • */
  • def reserveUnrollMemoryForThisTask(
  • blockId: BlockId,
  • memory: Long,
  • memoryMode: MemoryMode): Boolean = {
  • memoryManager.synchronized { // 加锁
  • // 尝试申请展开内存
  • val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
  • if (success) { // 申请成功
  • // 获取当前上下文中的TaskAttemptId
  • val taskAttemptId = currentTaskAttemptId()
  • // 区分内存模式
  • val unrollMemoryMap = memoryMode match {
  • case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
  • case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
  • }
  • // 更新TaskAttemptId与展开内存大小之间的映射关系
  • unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
  • }
  • // 返回操作状态
  • success
  • }
  • }

reserveUnrollMemoryForThisTask(...)方法将申请展开内存的操作委托给了MemoryManager的acquireUnrollMemory(...)方法,在申请成功之后,会将申请的内存量记录到对应的字典中,并返回是否申请成功。

reserveUnrollMemoryForThisTask(...)方法相反,releaseUnrollMemoryForThisTask(...)方法用于释放为当前上下文中的TaskAttempt保留的指定大小、指定内存模式的展开内存:

org.apache.spark.storage.memory.MemoryStore#releaseUnrollMemoryForThisTask
  • /**
  • * Release memory used by this task for unrolling blocks.
  • * If the amount is not specified, remove the current task's allocation altogether.
  • *
  • * 用于释放TaskAttempt任务占用的内存
  • */
  • def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
  • // 获取TaskAttemptId
  • val taskAttemptId = currentTaskAttemptId()
  • memoryManager.synchronized { // 加锁
  • // 区分内存模式
  • val unrollMemoryMap = memoryMode match {
  • case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
  • case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
  • }
  • if (unrollMemoryMap.contains(taskAttemptId)) { // 检查是否包含taskAttemptId对应的占用内存大小
  • // 计算要释放的内存
  • val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
  • if (memoryToRelease > 0) { // 释放展开内存
  • unrollMemoryMap(taskAttemptId) -= memoryToRelease
  • memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
  • }
  • // 如果TaskAttemptId所对应的TaskAttempt任务所占用的unroll内存为0,则将其从记录中移除
  • if (unrollMemoryMap(taskAttemptId) == 0) {
  • unrollMemoryMap.remove(taskAttemptId)
  • }
  • }
  • }
  • }

释放展开内存的操作委托给了MemoryManager的releaseUnrollMemory(...)方法;在释放操作中,需要进行一定的判断以防过量释放。

3.2. 写入操作

  1. putBytes(...)方法用于将指定的数据块写入到内存存储中,其实是将数据包装为SerializedMemoryEntry对象,然后存入entries字典中:
org.apache.spark.storage.memory.MemoryStore#putBytes
  • /**
  • * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
  • * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
  • *
  • * The caller should guarantee that `size` is correct.
  • *
  • * 将BlockId对应的Block(已经封装为ChunkedByteBuffer)写入内存。
  • *
  • * @return true if the put() succeeded, false otherwise.
  • */
  • def putBytes[T: ClassTag](
  • blockId: BlockId,
  • size: Long,
  • memoryMode: MemoryMode,
  • _bytes: () => ChunkedByteBuffer): Boolean = {
  • require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
  • // 申请内存
  • if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // 申请成功
  • // We acquired enough memory for the block, so go ahead and put it
  • // 获取Block的数据
  • val bytes = _bytes()
  • assert(bytes.size == size)
  • // 将数据包装为SerializedMemoryEntry对象
  • val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
  • entries.synchronized {
  • // 将Block数据写入entries,即写入内存
  • entries.put(blockId, entry)
  • }
  • logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
  • blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  • true
  • } else {
  • false
  • }
  • }

putBytes(...)方法其实是将_bytes参数获得的ChunkedByteBuffer对象赋值给一个新的SerializedMemoryEntry实例的buffer字段上,然后将该SerializedMemoryEntry与对应的BlockId存入entries字典进行记录。

  1. putIteratorAsValues(...)方法用于将指定的数据块写入内存,与putBytes(...)方法在于,它接收的数据参数value是一个迭代器,该方法的定义如下:
org.apache.spark.storage.memory.MemoryStore#putIteratorAsValues
  • private[storage] def putIteratorAsValues[T](
  • blockId: BlockId,
  • values: Iterator[T],
  • classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
  • ...
  • }

该方法接收三个参数,其中第一个参数表示指定数据块的BlockId,第二个参数是Iterator[T]类型的数据迭代器,它实现的写入操作是比较复杂的;我们先说明一下它的写入流程,主要分为:

  1. 计算所有数据大致需要的内存大小,这个操作需要遍历传入数据块的数据迭代器,并对每个元素的数据量逐次累加并申请保留展开内存;这样就可以为整个数据块保留所需的大致内存。
  2. 接下来会将数据迭代器转换为数组,并赋值给一个新的DeserializedMemoryEntry对象的value字段进行记录,同时将第1步中保留的展开内存统一进行释放,并重新申请为存储内存,这个操作使用MemoryManager进行加锁同步,因此是线程安全的。
  3. 判断第1步保留的大致内存的大小是否满足实际展开后所需的内存的大小,如果不满足则还需要申请额外的存储内存;如果有多余就将多余的部分进行释放。
  4. 一切就绪后,此时申请的内存大小已经满足数据块的写入了,就将指定的BlockId和第2步中创建好的DeserializedMemoryEntry对象作为映射关系存入entires字典中。
  5. 如果内存不够,则直接返回PartiallyUnrolledIterator对象,其中记录了没有被保存的数据迭代器,以及在前面的步骤中已经为之保留的内存大小。

putIteratorAsValues(...)方法的源码如下:

org.apache.spark.storage.memory.MemoryStore#putIteratorAsValues
  • /**
  • * Attempt to put the given block in memory store as values.
  • *
  • * It's possible that the iterator is too large to materialize and store in memory. To avoid
  • * OOM exceptions, this method will gradually unroll the iterator while periodically checking
  • * whether there is enough free memory. If the block is successfully materialized, then the
  • * temporary unroll memory used during the materialization is "transferred" to storage memory,
  • * so we won't acquire more memory than is actually needed to store the block.
  • *
  • * 将BlockId对应的Block(已经转换为Iterator)写入内存。
  • *
  • * @return in case of success, the estimated size of the stored data. In case of failure, return
  • * an iterator containing the values of the block. The returned iterator will be backed
  • * by the combination of the partially-unrolled block and the remaining elements of the
  • * original input iterator. The caller must either fully consume this iterator or call
  • * `close()` on it in order to free the storage memory consumed by the partially-unrolled
  • * block.
  • */
  • private[storage] def putIteratorAsValues[T](
  • blockId: BlockId,
  • values: Iterator[T],
  • classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
  • require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
  • // Number of elements unrolled so far
  • // 已经展开的元素数量。
  • var elementsUnrolled = 0
  • // Whether there is still enough memory for us to continue unrolling this block
  • // MemoryStore是否仍然有足够的内存,以便于继续展开Block。
  • var keepUnrolling = true
  • // Initial per-task memory to request for unrolling blocks (bytes).
  • /**
  • * 即unrollMemoryThreshold。
  • * 用来展开任何Block之前,初始请求的内存大小,
  • * 可以修改属性spark.storage.unrollMemoryThreshold(默认为1MB)改变大小。
  • */
  • val initialMemoryThreshold = unrollMemoryThreshold
  • // How often to check whether we need to request more memory
  • // 检查内存是否足够的阀值,此值固定为16。即每展开16个元素就检查一次。
  • val memoryCheckPeriod = 16
  • // Memory currently reserved by this task for this particular unrolling operation
  • // 当前任务用于展开Block所保留的内存。
  • var memoryThreshold = initialMemoryThreshold
  • // Memory to request as a multiple of current vector size
  • // 展开内存不充足时,请求增长的因子。此值固定为1.5。
  • val memoryGrowthFactor = 1.5
  • // Keep track of unroll memory used by this particular block / putIterator() operation
  • // Block已经使用的展开内存量计数器
  • var unrollMemoryUsedByThisBlock = 0L
  • // Underlying vector for unrolling the block
  • // 用于追踪Block每次迭代的数据。
  • var vector = new SizeTrackingVector[T]()(classTag)
  • // Request enough memory to begin unrolling
  • // 请求足够的内存开始展开操作,默认为initialMemoryThreshold,即1M
  • keepUnrolling =
  • reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
  • if (!keepUnrolling) { // 无法请求到足够的初始内存,记录日志
  • logWarning(s"Failed to reserve initial memory threshold of " +
  • s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
  • } else {
  • // 将申请到的内存添加到已使用的展开内存量计数器中
  • unrollMemoryUsedByThisBlock += initialMemoryThreshold
  • }
  • // Unroll this block safely, checking whether we have exceeded our threshold periodically
  • // 如果还有元素,且申请到了足够的初始内存
  • while (values.hasNext && keepUnrolling) {
  • // 将下一个元素添加到vector进行记录
  • vector += values.next()
  • if (elementsUnrolled % memoryCheckPeriod == 0) { // 判断是否需要检查内存是否足够
  • // If our vector's size has exceeded the threshold, request more memory
  • val currentSize = vector.estimateSize() // 所有已经分配的内存
  • if (currentSize >= memoryThreshold) { // 所有已经分配的内存大于为当前展开保留的内存
  • // 计算还需要请求的内存大小
  • val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
  • // 尝试申请更多内存
  • keepUnrolling =
  • reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
  • if (keepUnrolling) { // 申请成功
  • // 将申请到的内存添加到已使用的展开内存计数器中
  • unrollMemoryUsedByThisBlock += amountToRequest
  • }
  • // New threshold is currentSize * memoryGrowthFactor
  • // 更新为当前展开保留的内存大小
  • memoryThreshold += amountToRequest
  • }
  • }
  • // 完成了一次元素展开,展开个数加1
  • elementsUnrolled += 1
  • }
  • if (keepUnrolling) { // 走到这里,说明计算的申请内存是足够的
  • // We successfully unrolled the entirety of this block
  • val arrayValues = vector.toArray
  • vector = null
  • // 将所有Block数据构造为DeserializedMemoryEntry对象
  • val entry =
  • new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
  • // 最终的数据大小
  • val size = entry.size
  • // 定义将展开Block的内存转换为存储Block的内存的方法
  • def transferUnrollToStorage(amount: Long): Unit = {
  • // Synchronize so that transfer is atomic
  • memoryManager.synchronized { // 这里是需要加锁的,保证释放后能立即获取
  • // 先尝试释放一些展开内存
  • releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
  • // 申请存储内存
  • val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
  • assert(success, "transferring unroll memory to storage memory failed")
  • }
  • }
  • // Acquire storage memory if necessary to store this block in memory.
  • val enoughStorageMemory = {
  • // 可能需要申请额外的内存
  • if (unrollMemoryUsedByThisBlock <= size) { // 如果计算的展开使用内存小于等于实际使用内存
  • // 需要申请额外的内存
  • val acquiredExtra =
  • memoryManager.acquireStorageMemory(
  • blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
  • if (acquiredExtra) { // 申请成功
  • // 根据实际使用内存大小将展开Block的内存转换为存储Block的内存
  • transferUnrollToStorage(unrollMemoryUsedByThisBlock)
  • }
  • acquiredExtra
  • } else { // unrollMemoryUsedByThisBlock > size
  • // If this task attempt already owns more unroll memory than is necessary to store the
  • // block, then release the extra memory that will not be used.
  • // 如果计算的展开使用内存大于实际使用内存大小,则将过剩的内存释放掉
  • val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
  • releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
  • // 根据实际使用内存大小将展开Block的内存转换为存储Block的内存
  • transferUnrollToStorage(size)
  • true
  • }
  • }
  • if (enoughStorageMemory) { // 如果内存分配足够
  • entries.synchronized {
  • // 将对应的映射关系添加到entries字典
  • entries.put(blockId, entry)
  • }
  • logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
  • blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  • Right(size)
  • } else { // 内存分配不够
  • assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
  • "released too much unroll memory")
  • Left(new PartiallyUnrolledIterator(
  • this,
  • MemoryMode.ON_HEAP,
  • unrollMemoryUsedByThisBlock,
  • unrolled = arrayValues.toIterator,
  • rest = Iterator.empty))
  • }
  • } else { // 计算展开使用内存时就无法满足
  • // We ran out of space while unrolling the values for this block
  • logUnrollFailureMessage(blockId, vector.estimateSize())
  • Left(new PartiallyUnrolledIterator(
  • this,
  • MemoryMode.ON_HEAP,
  • unrollMemoryUsedByThisBlock,
  • unrolled = vector.iterator,
  • rest = values))
  • }
  • }

代码比较长,但注释讲解得非常清晰,读者可以自行理解。

  1. putIteratorAsBytes(...)方法的实现与上面的putIteratorAsValues(...)在流程上是基本一致的,唯一的区别是它在存储数据迭代器中的数据时,都进行了序列化;下面是它的源码:
org.apache.spark.storage.memory.MemoryStore#putIteratorAsBytes
  • /**
  • * Attempt to put the given block in memory store as bytes.
  • *
  • * It's possible that the iterator is too large to materialize and store in memory. To avoid
  • * OOM exceptions, this method will gradually unroll the iterator while periodically checking
  • * whether there is enough free memory. If the block is successfully materialized, then the
  • * temporary unroll memory used during the materialization is "transferred" to storage memory,
  • * so we won't acquire more memory than is actually needed to store the block.
  • *
  • * 以序列化后的字节数组方式,将BlockId对应的Block(已经转换为Iterator)写入内存。
  • *
  • * @return in case of success, the estimated size of the stored data. In case of failure,
  • * return a handle which allows the caller to either finish the serialization by
  • * spilling to disk or to deserialize the partially-serialized block and reconstruct
  • * the original input iterator. The caller must either fully consume this result
  • * iterator or call `discard()` on it in order to free the storage memory consumed by the
  • * partially-unrolled block.
  • */
  • private[storage] def putIteratorAsBytes[T](
  • blockId: BlockId,
  • values: Iterator[T],
  • classTag: ClassTag[T],
  • memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
  • require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
  • // 根据内存模式获取内存分配器
  • val allocator = memoryMode match {
  • case MemoryMode.ON_HEAP => ByteBuffer.allocate _ // 堆内存ByteBuffer
  • case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ // 堆外内存DirectBuffer
  • }
  • // Whether there is still enough memory for us to continue unrolling this block
  • // MemoryStore是否仍然有足够的内存,以便于继续展开Block。
  • var keepUnrolling = true
  • // Initial per-task memory to request for unrolling blocks (bytes).
  • /**
  • * 即unrollMemoryThreshold。
  • * 用来展开任何Block之前,初始请求的内存大小
  • * 可以修改属性spark.storage.unrollMemoryThreshold(默认为1MB)改变大小。
  • */
  • val initialMemoryThreshold = unrollMemoryThreshold
  • // Keep track of unroll memory used by this particular block / putIterator() operation
  • // Block已经使用的展开内存量计数器
  • var unrollMemoryUsedByThisBlock = 0L
  • // Underlying buffer for unrolling the block
  • // 包装为重定向输出流
  • val redirectableStream = new RedirectableOutputStream
  • val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
  • redirectableStream.setOutputStream(bbos)
  • // 包装为序列化流
  • val serializationStream: SerializationStream = {
  • val autoPick = !blockId.isInstanceOf[StreamBlockId]
  • val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
  • ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream))
  • }
  • // Request enough memory to begin unrolling
  • // 请求足够的内存开始展开操作,默认为initialMemoryThreshold,即1M
  • keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
  • if (!keepUnrolling) { // 无法请求到足够的初始内存,记录日志
  • logWarning(s"Failed to reserve initial memory threshold of " +
  • s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
  • } else {
  • // 将申请到的内存添加到已使用的展开内存量计数器中
  • unrollMemoryUsedByThisBlock += initialMemoryThreshold
  • }
  • // 在需要时申请保留额外的展开内存
  • def reserveAdditionalMemoryIfNecessary(): Unit = {
  • // 构建的ChunkedByteBufferOutputStream大小大于已经保留到的展开内存量
  • if (bbos.size > unrollMemoryUsedByThisBlock) {
  • // 需要申请保留额外内存
  • val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
  • // 尝试申请保留
  • keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
  • if (keepUnrolling) {
  • // 申请保留成功,将申请量累加到unrollMemoryUsedByThisBlock中
  • unrollMemoryUsedByThisBlock += amountToRequest
  • }
  • }
  • }
  • // Unroll this block safely, checking whether we have exceeded our threshold
  • // 如果还有元素,且申请到了足够的初始内存
  • while (values.hasNext && keepUnrolling) {
  • // 将数据元素进行序列化,并写入到流中
  • serializationStream.writeObject(values.next())(classTag)
  • // 在需要时申请保留额外的展开内存
  • reserveAdditionalMemoryIfNecessary()
  • }
  • // Make sure that we have enough memory to store the block. By this point, it is possible that
  • // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
  • // perform one final call to attempt to allocate additional memory if necessary.
  • if (keepUnrolling) { // 申请到了足够的初始内存
  • // 将序列化流关闭
  • serializationStream.close()
  • // 在需要时申请保留额外的展开内存
  • reserveAdditionalMemoryIfNecessary()
  • }
  • if (keepUnrolling) { // 申请到了足够的初始内存
  • // 将序列化后的数据赋值给SerializedMemoryEntry对象的buffer字段持有
  • val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
  • // Synchronize so that transfer is atomic
  • memoryManager.synchronized { // 加锁同步,保证释放和申请是同步的
  • // 释放之前保留的内存
  • releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
  • // 申请对应的内存存储空间,用于正式存储
  • val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
  • assert(success, "transferring unroll memory to storage memory failed")
  • }
  • entries.synchronized {
  • // 将指定的BlockId与上面创建的SerializedMemoryEntry对象存入entries字典进行记录
  • entries.put(blockId, entry)
  • }
  • logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
  • blockId, Utils.bytesToString(entry.size),
  • Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  • // 保存成功,返回
  • Right(entry.size)
  • } else { // 没有申请到足够的初始内存
  • // We ran out of space while unrolling the values for this block
  • logUnrollFailureMessage(blockId, bbos.size)
  • // 返回保留了数据迭代器的PartiallySerializedBlock对象
  • Left(
  • new PartiallySerializedBlock(
  • this,
  • serializerManager,
  • blockId,
  • serializationStream,
  • redirectableStream,
  • unrollMemoryUsedByThisBlock,
  • memoryMode,
  • bbos,
  • values,
  • classTag))
  • }
  • }

3.3. 获取操作

  1. MemoryStore的getSize(...)方法用于获取指定BlockId对应的数据块在内存池中存储为MemoryEntry对象的大小,方法比较简单:
org.apache.spark.storage.memory.MemoryStore#getSize
  • // 获取BlockId对应MemoryEntry(即Block的内存形式)所占用的大小。
  • def getSize(blockId: BlockId): Long = {
  • entries.synchronized {
  • entries.get(blockId).size
  • }
  • }
  1. getBytes(...)方法用于从内存中读取指定BlockId对应的数据块的数据:
org.apache.spark.storage.memory.MemoryStore#getBytes
  • // 从内存中读取BlockId对应的Block(已经封装为ChunkedByteBuffer)。
  • def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
  • // 加锁获取
  • val entry = entries.synchronized { entries.get(blockId) }
  • // 判断是否获取到
  • entry match {
  • case null => None
  • case e: DeserializedMemoryEntry[_] =>
  • throw new IllegalArgumentException("should only call getBytes on serialized blocks")
  • case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
  • }
  • }

该方法是直接通过entries字典获取数据块对应的MemoryEntry对象的,从源码可以得知,该方法获取的数据块必须存储为SerializedMemoryEntry对象,否则在获取的时候会抛出IllegalArgumentException异常

  1. getValues(...)getBytes(...)方法类似,只不过它获取的结果是迭代器对象,同时它获取的数据块数据是存储为DeserializedMemoryEntry对象的:
org.apache.spark.storage.memory.MemoryStore#getValues
  • // 用于从内存中读取BlockId对应的Block(已经封装为Iterator)。
  • def getValues(blockId: BlockId): Option[Iterator[_]] = {
  • // 加锁获取
  • val entry = entries.synchronized { entries.get(blockId) }
  • // 判断是否获取到
  • entry match {
  • case null => None
  • case e: SerializedMemoryEntry[_] =>
  • throw new IllegalArgumentException("should only call getValues on deserialized blocks")
  • case DeserializedMemoryEntry(values, _, _) =>
  • val x = Some(values)
  • x.map(_.iterator)
  • }
  • }

3.4. 辅助操作

contains(...)方法用于判断当前内存存储中是否保存了指定BlockId对应的数据块,源码比较简单:

org.apache.spark.storage.memory.MemoryStore#contains
  • def contains(blockId: BlockId): Boolean = {
  • entries.synchronized { entries.containsKey(blockId) }
  • }

3.5. 驱逐操作

evictBlocksToFreeSpace(...)用于驱逐数据块,尝试腾出指定大小的内存空间,以便存储新的数据块,源码如下:

org.apache.spark.storage.memory.MemoryStore#evictBlocksToFreeSpace
  • /**
  • * Try to evict blocks to free up a given amount of space to store a particular block.
  • * Can fail if either the block is bigger than our memory or it would require replacing
  • * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
  • * RDDs that don't fit into memory that we want to avoid).
  • *
  • * 驱逐Block,尝试腾出指定大小的内存空间,以便存储新的Block
  • *
  • * @param blockId the ID of the block we are freeing space for, if any
  • * 要存储的Block的BlockId。
  • * @param space the size of this block
  • * 需要驱逐Block所腾出的内存大小。
  • * @param memoryMode the type of memory to free (on- or off-heap)
  • * 存储Block所需的内存模式。
  • * @return the amount of memory (in bytes) freed by eviction
  • */
  • private[spark] def evictBlocksToFreeSpace(
  • blockId: Option[BlockId],
  • space: Long,
  • memoryMode: MemoryMode): Long = {
  • // 检查参数
  • assert(space > 0)
  • // 使用MemoryManager进行操作
  • memoryManager.synchronized {
  • // 已经释放的内存大小。
  • var freedMemory = 0L
  • // 将要添加的RDD的RDDBlockId标记。
  • val rddToAdd = blockId.flatMap(getRddId)
  • // 已经选择的用于驱逐的Block的BlockId的数组。
  • val selectedBlocks = new ArrayBuffer[BlockId]
  • // 判断BlockId指定的Block是否可以驱逐
  • def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
  • /** 需要满足两个条件:
  • * 1. 该Block使用的内存模式与申请的相同。
  • * 2. BlockId对应的Block不是RDD,或者BlockId与blockId不是同一个RDD。
  • */
  • entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
  • }
  • // This is synchronized to ensure that the set of entries is not changed
  • // (because of getValue or getBytes) while traversing the iterator, as that
  • // can lead to exceptions.
  • entries.synchronized {
  • // 遍历entries字典
  • val iterator = entries.entrySet().iterator()
  • // 只要空闲空间小于需要的空间,且还有可遍历的Block,就继续遍历
  • while (freedMemory < space && iterator.hasNext) {
  • // 获取BlockId和对应的MemoryEntry
  • val pair = iterator.next()
  • val blockId = pair.getKey
  • val entry = pair.getValue
  • // 判断该Block是否可以驱逐
  • if (blockIsEvictable(blockId, entry)) {
  • // We don't want to evict blocks which are currently being read, so we need to obtain
  • // an exclusive write lock on blocks which are candidates for eviction. We perform a
  • // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
  • // 可以驱逐,先尝试获取写锁
  • if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
  • // 获取写锁成功,驱逐并释放该Block
  • selectedBlocks += blockId
  • freedMemory += pair.getValue.size
  • }
  • }
  • }
  • }
  • // 丢弃Block
  • def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
  • // 获取对应的数据
  • val data = entry match {
  • case DeserializedMemoryEntry(values, _, _) => Left(values)
  • case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
  • }
  • // 从内存中驱逐
  • val newEffectiveStorageLevel =
  • blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
  • // 检查迁移后的存储系统是否有效
  • if (newEffectiveStorageLevel.isValid) {
  • // The block is still present in at least one store, so release the lock
  • // but don't delete the block info
  • // 释放当前TaskAttempt线程获取的被迁移Block的写锁
  • blockInfoManager.unlock(blockId)
  • } else { // 迁移后的存储系统无效,说明Block从存储体系中彻底移除了
  • // The block isn't present in any store, so delete the block info so that the
  • // block can be stored again
  • // 删除被迁移Block的信息
  • blockInfoManager.removeBlock(blockId)
  • }
  • }
  • if (freedMemory >= space) { // 空闲内存大于等于需要的内存
  • logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
  • s"(${Utils.bytesToString(freedMemory)} bytes)")
  • // 遍历所有取值的BlockId
  • for (blockId <- selectedBlocks) {
  • // 获取对应的MemoryEntry
  • val entry = entries.synchronized { entries.get(blockId) }
  • // This should never be null as only one task should be dropping
  • // blocks and removing entries. However the check is still here for
  • // future safety.
  • if (entry != null) {
  • // 驱逐对应的Block
  • dropBlock(blockId, entry)
  • }
  • }
  • logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
  • s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
  • freedMemory
  • } else { // 空闲内存小于需要的内存
  • // 即便驱逐内存中所有符合条件的Block,腾出的空间也不足以存储blockId对应的Block
  • // 记录日志
  • blockId.foreach { id =>
  • logInfo(s"Will not store $id")
  • }
  • // 由当前TaskAttempt线程释放selectedBlocks中每个BlockId对应的Block的写锁
  • selectedBlocks.foreach { id =>
  • blockInfoManager.unlock(id)
  • }
  • 0L
  • }
  • }
  • }

evictBlocksToFreeSpace(...)方法中其实将驱逐方法委托给了blockEvictionHandlerdropFromMemory(...)方法,blockEvictionHandler的类型是BlockEvictionHandler特质,意味数据块驱逐处理器,前面我们介绍的BlockManager其实就继承了该特质,它内部就定义了一个dropFromMemory(...)方法,用于实现将特定数据块从内存中驱逐出来:

org.apache.spark.storage.memory.BlockEvictionHandler#dropFromMemory
  • private[storage] trait BlockEvictionHandler {
  • /**
  • * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
  • * store reaches its limit and needs to free up space.
  • *
  • * If `data` is not put on disk, it won't be created.
  • *
  • * The caller of this method must hold a write lock on the block before calling this method.
  • * This method does not release the write lock.
  • *
  • * @return
  • *
  • * @param blockId 被驱逐的数据块的BlockId
  • * @param data 被驱逐的数据块的数据
  • * @tparam T 被驱逐数据块的数据类型,从前面写入数据的流程可知,
  • * 写入的数据可能是ChunkedByteBuffer缓冲区,也有可能是数组类型的集合
  • * @return the block's new effective StorageLevel.
  • * 数据块被驱逐之后新的持久化级别
  • */
  • private[storage] def dropFromMemory[T: ClassTag](
  • blockId: BlockId,
  • data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel
  • }

BlockManager中对被驱逐的数据块是采取先溢写到磁盘,然后将内存中的数据进行删除的处理方式,具体的实现会在后面讲解。

3.6. 清理操作

  1. remove(...)方法用于从内存中移除指定BlockId对应的数据块,源码如下:
org.apache.spark.storage.memory.MemoryStore#remove
  • // 从内存中移除BlockId对应的Block
  • def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
  • // 加锁移除
  • val entry = entries.synchronized {
  • entries.remove(blockId)
  • }
  • // 判断移除结果
  • if (entry != null) {
  • entry match {
  • case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
  • case _ =>
  • }
  • // 移除后需要从存储内存中释放
  • memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
  • logDebug(s"Block $blockId of size ${entry.size} dropped " +
  • s"from memory (free ${maxMemory - blocksMemoryUsed})")
  • true
  • } else {
  • false
  • }
  • }
  1. clear()方法会清除entriesonHeapUnrollMemoryMapoffHeapUnrollMemoryMap三个集合中的所有记录,并使用MemoryManager释放所有存储内存:
org.apache.spark.storage.memory.MemoryStore#clear
  • // 清空MemoryStore
  • def clear(): Unit = memoryManager.synchronized {
  • // 清空entr字典
  • entries.synchronized {
  • entries.clear()
  • }
  • // 清空堆内存中展开内存的映射关系
  • onHeapUnrollMemoryMap.clear()
  • // 清空堆外内存中展开内存的映射关系
  • offHeapUnrollMemoryMap.clear()
  • // 从MemoryManager中清空占用的存储内存
  • memoryManager.releaseAllStorageMemory()
  • logInfo("MemoryStore cleared")
  • }