大数据
流式处理
Spark
源码解析

Spark源码分析08 - 存储体系01:辅助类

简介:Spark存储体系由各个Driver和Executor实例中的BlockManager所构成,实现了分布式管理,而从Driver和Executor单个节点来看,Spark存储体系属于节点的SparkEnv的内部组成部分;存储体系主要分为两层:通信层和存储层

1. Spark存储体系简介

Spark存储体系由各个Driver和Executor实例中的BlockManager所构成,实现了分布式管理,而从Driver和Executor单个节点来看,Spark存储体系属于节点的SparkEnv的内部组成部分;存储体系主要分为两层:

  1. 通信层:由于需要进行分布式管理,存储体系使用了Master-Slave的模式来实现通信层,Master和Slave(这里以及后面内容中所讨论的Master和Slave除非特殊说明,都是指存储体系涉及的角色,并非Spark的RpcEndpoint端点)之间的控制信息和状态信息都在通信层上流动。Master负责整个应用程序运行期间的数据块元数据的管理和维护,Slave负责将本地数据块的状态信息报告给Master,同时接收Master回传的执行指令。
  2. 存储层:存储体系将数据存储在磁盘或内存中,因此存储层又分为内存存储和磁盘存储;同时节点之间还可能会发生存储副本的拷贝,这些功能都是由存储层来实现的。

在后面的内容中,我们将从这两个层次讲解Spark存储体系的构成和原理。不过在讲解之前,先让我们充分了解相关的辅助类。

2. BlockManager初识

BlockManager定义于Spark Core模块的org.apache.spark.storage包下。它主要用于提供对本地或远端节点上的内存、磁盘及堆外内存中数据的存储管理功能。对于单个节点,它负责本地存储功能涉及的各个组件之间的相互协调,而所有节点上的BlockManager则构成了Spark的整个存储体系。

对于Driver或Executor来说,它们的构建过程都是以任务执行环境SparkEnv的初始化为前提的,在每个SparkEnv初始化时,会构造相应的BlockManager。SparkEnv伴生对象提供了针对Driver和Executor分别创建SparkEnv实例的方法createDriverEnv(...)createExecutorEnv(...),它们都调用了私有的create(...)方法,只是参数传递不一样。SparkEnv的create(...)方法中创建BlockManager的源码片段如下:

org.apache.spark.SparkEnv#create
  • ...
  • // NB: blockManager is not valid until initialize() is called later.
  • // 创建BlockManager
  • val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  • serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
  • blockTransferService, securityManager, numUsableCores)
  • ...

从BlockManager的构造参数可以得知,BlockManager的构建过程依赖于多个相关相关的组件,这里我们分别进行简短的介绍,具体原理会在后面的内容中一一讲解:

  • executorId:Executor的标识,对于Executor来说,该标识是在启动Executor时被分配的,将会由Master负责分配,是格式为“app-日期格式字符串-数字”的字符串;对于Driver来说该标识一直是SparkContext.DRIVER_IDENTIFIER,即字符串“driver”。
  • rpcEnv:NettyRpcEnv实例,即前面讲解的RpcEnv环境,它负责协调节点上BlockManager与本地及远端的通信,是由SparkEnv自己创建的。
  • blockManagerMaster:BlockManagerMaster实例,由SparkEnv自行创建,用于代理BlockManager与Driver上的BlockManagerMasterEndpoint进行通信,它持有BlockManagerMasterEndpoint的RpcEndpointRef。
  • serializerManager:SerializerManager实例,序列化管理器,由SparkEnv创建,用于Spark中对象在通过网络传输或写入存储体系时使用。
  • memoryManager:MemoryManager实例,内存管理器,由SparkEnv创建,负责对节点上内存存储的分配和回收。
  • mapOutputTracker:MapOutputTracker实例,用于跟踪Map任务输出数据的跟踪器,由SparkEnv创建。
  • shuffleManager:ShuffleManager实例,Shuffle操作管理器,由SparkEnv创建。
  • blockTransferService:NettyBlockTransferService实例,用于数据的传输服务,由SparkEnv创建。
  • securityManager:SecurityManager实例,安全管理器,由SparkEnv创建。

关于BlockManager在初始化时涉及的所有组件,包括已列举和未列举的,都将在后面的文章中详细介绍。

3. 存储体系的RpcEnv

在SparkEnv中创建rpcEnv属性的源码片段如下:

org.apache.spark.SparkEnv#create
  • // RpcEnv
  • /**
  • * 生成系统名称:
  • * - 如果当前应用为Driver,那么systemName为sparkDriver;
  • * - 否则systemName为sparkExecutor。
  • */
  • val systemName = if (isDriver) driverSystemName else executorSystemName
  • // 创建RpcEnv,注意最后一个参数,只有在不是Driver的时候才为true
  • val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
  • securityManager, clientMode = !isDriver)

rpcEnv的创建过程可见,创建的RpcEnv依旧是NettyRpcEnv实例,名称根据角色的不同而不同;另外需要注意的是,如果创建的是Executor的NettyRpcEnv,在创建过程中不会构建并启动TransportServer服务器(由clientMode参数决定),这是由于Executor大部分情况下只需要向Driver报告的自己的状态信息并接受操作指令,并不需要处理来自其它节点的请求;同时,Executor是运行在Worker上的,Worker节点是拥有包括TransportServer在内的完整的RPC环境的。

NettyRpcEnv的工作原理在前面通信架构相关的文章中已经得非常清楚了,如有不明白的读者请翻阅前面的文章Spark源码分析05 - 通信架构03:高层实现(1)RpcEnv和Dispatcher

4. 辅助状态类

在讲解BlockManager的其它通信层组件之前,我们需要先了解与存储状态相关的一些辅助类,有BlockManagerId、BlockId、StorageLevel、BlockStatus、BlockInfo等。

  • BlockManagerId:用于唯一标识一个BlockManager。
  • BlockId:存储数据的数据块的唯一标识,针对不同类型的数据块,对应的BlockId也不相同。
  • StorageLevel:存储级别标识。
  • BlockStatus:用于表示数据块的状态,其中记录了数据块的存储级别,以及它在内存和磁盘中各占用的大小。
  • BlockInfo:用于描述BlockManager管理的数据块的元数据信息,包括数据块的存储级别,数据块的类型及大小,以及相关的锁信息。

4.1. BlockManagerId

先说BlockManagerId类,它用于唯一标识一个BlockManager;由于BlockManager位于不同的Driver或者Executor节点和实例上,而BlockManager之间需要通过通信层进行通信,因此每一个BlockManager都需要在集群内有一个属于自己的唯一的标识,BlockManagerId就用于标识节点内的BlockManager实例,它的定义如下:

org.apache.spark.storage.BlockManagerId
  • /**
  • * :: DeveloperApi ::
  • * This class represent an unique identifier for a BlockManager.
  • *
  • * The first 2 constructors of this class are made private to ensure that BlockManagerId objects
  • * can be created only using the apply method in the companion object. This allows de-duplication
  • * of ID objects. Also, constructor parameters are private to ensure that parameters cannot be
  • * modified from outside this class.
  • *
  • * @param executorId_ 当前BlockManager所在的实例的ID。
  • * 如果实例是Driver,那么ID为字符串"driver",
  • * 否则由Master负责给各个Executor分配,ID格式为app-日期格式字符串-数字。
  • * @param host_ BlockManager所在的主机地址
  • * @param port_ BlockManager中BlockTransferService对外服务的端口
  • * @param topologyInfo_ 拓扑信息
  • */
  • @DeveloperApi
  • class BlockManagerId private (
  • private var executorId_ : String,
  • private var host_ : String,
  • private var port_ : Int,
  • private var topologyInfo_ : Option[String])
  • extends Externalizable {
  • ...
  • }

BlockManagerId内的大部分方法都是在操作executorId_host_port_topologyInfo_四个参数,比较简单这里就不贴出来了;需要我们注意的是,BlockManagerId提供了序列化writeExternal(...)和反序列化readExternal(...)方法用将BlockManagerId写出到二进制刘或者从二进制流读取BlockManagerId实例;另外,只有两个BlockManagerId的executorId_host_port_topologyInfo_四个字段都相同才表示它们是同一个BlockManagerId实例。

4.2. BlockId

Spark在管理存储的数据时,将数据按块(Block)进行划分,以块作为存储单位,而每个块都有唯一的BlockId标识。BlockId中记录了数据块的基本属性,它的定义如下:

org.apache.spark.storage.BlockId
  • /**
  • * :: DeveloperApi ::
  • * Identifies a particular Block of data, usually associated with a single file.
  • * A Block can be uniquely identified by its filename, but each type of Block has a different
  • * set of keys which produce its unique name.
  • *
  • * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
  • *
  • * 在Spark的存储体系中,数据的读写是以块为单位,只不过这个块并非操作系统的块,而是设计用于Spark存储体系的块Block。
  • * 每个块都有唯一的标识,Spark把这个标识抽象为BlockId。
  • */
  • @DeveloperApi
  • sealed abstract class BlockId {
  • /** A globally unique identifier for this Block. Can be used for ser/de.
  • * Block全局唯一的标识名。
  • **/
  • def name: String
  • // convenience methods
  • // 将当前BlockId转换为RDDBlockId。如果当前BlockId是RDDBlockId,则转换为RDDBlockId,否则返回None。
  • def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  • // 当前BlockId是否是RDDBlockId。
  • def isRDD: Boolean = isInstanceOf[RDDBlockId]
  • // 当前BlockId是否是ShuffleBlockId。
  • def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
  • // 当前BlockId是否是BroadcastBlockId。
  • def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
  • override def toString: String = name
  • override def hashCode: Int = name.hashCode
  • // 判断BlockId是否相等,需要类相同,name也相同
  • override def equals(other: Any): Boolean = other match {
  • case o: BlockId => getClass == o.getClass && name.equals(o.name)
  • case _ => false
  • }
  • }

BlockId的name字段用于唯一标识一个数据块;BlockId是一个抽象类,在Spark中,数据块分为了多种类型,每种类型的数据块都有对应的BlockId类型的实现,具体有以下十种:

org.apache.spark.storage
  • // RDDBlock块的Id,name由"rdd_"开头
  • @DeveloperApi
  • case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  • override def name: String = "rdd_" + rddId + "_" + splitIndex
  • }
  • // Format of the shuffle block ids (including data and index) should be kept in sync with
  • // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
  • // ShuffleBlockId块,name由"shuffle_"开头
  • @DeveloperApi
  • case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  • override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
  • }
  • // ShuffleDataBlock块的Id,name由"shuffle_"开头,以".data"结尾
  • @DeveloperApi
  • case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  • override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
  • }
  • // ShuffleIndexBlock块的Id,name由"shuffle_"开头,以".index"结尾
  • @DeveloperApi
  • case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  • override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
  • }
  • // BroadcastBlock块的Id,name由"broadcast_"开头
  • @DeveloperApi
  • case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  • override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
  • }
  • // TaskResultBlock块的Id,name由"taskresult_"开头
  • @DeveloperApi
  • case class TaskResultBlockId(taskId: Long) extends BlockId {
  • override def name: String = "taskresult_" + taskId
  • }
  • // StreamBlock块的Id,name由"input-"开头
  • @DeveloperApi
  • case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
  • override def name: String = "input-" + streamId + "-" + uniqueId
  • }
  • /** Id associated with temporary local data managed as blocks. Not serializable.
  • * TempLocalBlock块的Id,name由"temp_local_"开头
  • **/
  • private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
  • override def name: String = "temp_local_" + id
  • }
  • /** Id associated with temporary shuffle data managed as blocks. Not serializable.
  • * TempShuffleBlock块的Id,name由"temp_shuffle_"开头
  • **/
  • private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
  • override def name: String = "temp_shuffle_" + id
  • }
  • // Intended only for testing purposes
  • // TestBlock块的Id,name由"test_"开头
  • private[spark] case class TestBlockId(id: String) extends BlockId {
  • override def name: String = "test_" + id
  • }

从定义中我们可以得知几个熟悉的概念,类似于RDD、Shuffle、Broadcast、TaskResult、Stream等,可以看出,Block是根据使用的业务场景的不同而进行区分的。在BlockId的伴生对象中,定义了除TempLocalBlockId、TempShuffleBlockId之外的BlockId的匹配正则以及方便创建对应类型BlockId的apply(...)方法:

org.apache.spark.storage.BlockId
  • @DeveloperApi
  • object BlockId {
  • // 方便使用的正则表达式
  • val RDD = "rdd_([0-9]+)_([0-9]+)".r
  • val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
  • val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
  • val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
  • val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
  • val TASKRESULT = "taskresult_([0-9]+)".r
  • val STREAM = "input-([0-9]+)-([0-9]+)".r
  • val TEST = "test_(.*)".r
  • /** Converts a BlockId "name" String back into a BlockId.
  • * 用于根据传入的id,即name名称来创建对应的BlockId
  • **/
  • def apply(id: String): BlockId = id match {
  • case RDD(rddId, splitIndex) =>
  • RDDBlockId(rddId.toInt, splitIndex.toInt)
  • case SHUFFLE(shuffleId, mapId, reduceId) =>
  • ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
  • case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
  • ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
  • case SHUFFLE_INDEX(shuffleId, mapId, reduceId) =>
  • ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
  • case BROADCAST(broadcastId, field) =>
  • BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
  • case TASKRESULT(taskId) =>
  • TaskResultBlockId(taskId.toLong)
  • case STREAM(streamId, uniqueId) =>
  • StreamBlockId(streamId.toInt, uniqueId.toLong)
  • case TEST(value) =>
  • TestBlockId(value)
  • case _ =>
  • throw new IllegalStateException("Unrecognized BlockId: " + id)
  • }
  • }

4.3. StorageLevel

StorageLevel想必大家已经很熟悉了,它是用于表示存储级别的。所有的存储级别都以静态字段在StorageLevel的伴生对象表示:

  • /**
  • * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
  • * new storage levels.
  • */
  • object StorageLevel {
  • val NONE = new StorageLevel(false, false, false, false)
  • val DISK_ONLY = new StorageLevel(true, false, false, false)
  • val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  • val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  • val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  • val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  • val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  • val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  • val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  • val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  • val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  • val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
  • /**
  • * :: DeveloperApi ::
  • * Return the StorageLevel object with the specified name.
  • *
  • * 从指定的字符串得到对应的存储级别
  • */
  • @DeveloperApi
  • def fromString(s: String): StorageLevel = s match {
  • case "NONE" => NONE
  • case "DISK_ONLY" => DISK_ONLY
  • case "DISK_ONLY_2" => DISK_ONLY_2
  • case "MEMORY_ONLY" => MEMORY_ONLY
  • case "MEMORY_ONLY_2" => MEMORY_ONLY_2
  • case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
  • case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
  • case "MEMORY_AND_DISK" => MEMORY_AND_DISK
  • case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
  • case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
  • case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
  • case "OFF_HEAP" => OFF_HEAP
  • case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
  • }
  • ...
  • }

可见,每个存储级别最终都会构造对应的StorageLevel对象,我们来分析一下StorageLevel的定义和重要字段:

org.apache.spark.storage.StorageLevel
  • /**
  • * :: DeveloperApi ::
  • * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
  • * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
  • * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
  • * to replicate the RDD partitions on multiple nodes.
  • *
  • * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
  • * for commonly useful storage levels. To create your own storage level object, use the
  • * factory method of the singleton object (`StorageLevel(...)`).
  • *
  • * @param _useDisk 能否写入磁盘。当Block的StorageLevel中的_useDisk为true时,存储体系将允许Block写入磁盘。
  • * @param _useMemory 能否写入堆内存。当Block的StorageLevel中的_useMemory为true时,存储体系将允许Block写入堆内存。
  • * @param _useOffHeap 能否写入堆外内存。当Block的StorageLevel中的_useOffHeap为true时,存储体系将允许Block写入堆外内存。
  • * @param _deserialized 表示数据是否已被反序列化。当Block本身经过了序列化后,Block的StorageLevel中的_deserialized将被设置为false,即表示在使用数据时需要对Block进行反序列化。
  • * @param _replication Block的复制份数。Block的StorageLevel中的_replication默认等于1,可以在构造Block的StorageLevel时明确指定_replication的数量。
  • * 当_replication大于1时,Block除了在本地的存储体系中写入一份,还会复制到其他不同节点的存储体系中写入,达到复制备份的目的。
  • */
  • @DeveloperApi
  • class StorageLevel private(
  • private var _useDisk: Boolean,
  • private var _useMemory: Boolean,
  • private var _useOffHeap: Boolean,
  • private var _deserialized: Boolean,
  • private var _replication: Int = 1)
  • extends Externalizable {
  • ...
  • }

StorageLevel提供了两个重载的构造方法用于方便地创建特定的StorageLevel对象:

org.apache.spark.storage.StorageLevel
  • // TODO: Also add fields for caching priority, dataset ID, and flushing.
  • private def this(flags: Int, replication: Int) {
  • this((flags & 8) != 0, // 是否使用磁盘
  • (flags & 4) != 0, // 是否使用内存
  • (flags & 2) != 0, // 是否使用堆外内存
  • (flags & 1) != 0, // 是否已被反序列化
  • replication) // 副本数量
  • }
  • // 创建用于反序列化的StorageLevel对象
  • def this() = this(false, true, false, false) // For deserialization

另外,通过两个检查可以得知StorageLevel有两个限制:副本数量必须小于40;使用堆外存储级别时数据是必须要反序列化的:

org.apache.spark.storage.StorageLevel
  • // 副本数量必须小于40
  • assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
  • // 当使用堆外内存存储时,数据必须序列化存储
  • if (useOffHeap) {
  • require(!deserialized, "Off-heap storage level does not support deserialized storage")
  • }

StorageLevel中还提供了一些辅助方法,实现都比较简单:

org.apache.spark.storage.StorageLevel
  • // 内存模式
  • private[spark] def memoryMode: MemoryMode = {
  • // 如果useOffHeap为true,则返回MemoryMode.OFF_HEAP
  • if (useOffHeap) MemoryMode.OFF_HEAP
  • else MemoryMode.ON_HEAP // 否则返回MemoryMode.ON_HEAP
  • }
  • // 克隆操作,会返回一个新的StorageLevel对象
  • override def clone(): StorageLevel = {
  • new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
  • }
  • // 判断是否相等,需要所有属性都相同,才代表两个StorageLevel相等
  • override def equals(other: Any): Boolean = other match {
  • case s: StorageLevel =>
  • s.useDisk == useDisk &&
  • s.useMemory == useMemory &&
  • s.useOffHeap == useOffHeap &&
  • s.deserialized == deserialized &&
  • s.replication == replication
  • case _ =>
  • false
  • }
  • // 当前的StorageLevel是否有效:使用了内存或磁盘级别,且副本数量大于1
  • def isValid: Boolean = (useMemory || useDisk) && (replication > 0)
  • /**
  • * 将当前StorageLevel转换为整型表示。
  • * 四位二进制位,每一位都表示是否开启对应的级别。
  • */
  • def toInt: Int = {
  • var ret = 0
  • if (_useDisk) { // 1000
  • ret |= 8
  • }
  • if (_useMemory) { // 0100
  • ret |= 4
  • }
  • if (_useOffHeap) { // 0010
  • ret |= 2
  • }
  • if (_deserialized) { // 0001
  • ret |= 1
  • }
  • ret
  • }
  • /**
  • * 将StorageLevel首先通过toInt方法将_useDisk、_useMemory、_useOffHeap、_deserialized四个属性
  • * 设置到四位数的状态位,
  • * 然后与_replication一起被序列化写入外部二进制流。
  • */
  • override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
  • out.writeByte(toInt)
  • out.writeByte(_replication)
  • }
  • // 从外部二进制流中读取StorageLevel的各个属性。
  • override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
  • val flags = in.readByte()
  • _useDisk = (flags & 8) != 0
  • _useMemory = (flags & 4) != 0
  • _useOffHeap = (flags & 2) != 0
  • _deserialized = (flags & 1) != 0
  • _replication = in.readByte()
  • }
  • // 缓存操作,会把当前StorageLevel缓存到伴生对象的storageLevelCache字典中
  • @throws(classOf[IOException])
  • private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

4.4. BlockStatus

BlockStatus则是一个样例类,实现非常简单。它用于表示数据块的状态,其中记录了数据块的存储级别,以及它在内存和磁盘中各占用的大小:

org.apache.spark.storage.BlockStatus
  • /**
  • * 用于封装Block的状态信息
  • * @param storageLevel Block的StorageLevel。
  • * @param memSize Block占用的内存大小。
  • * @param diskSize Block占用的磁盘大小。
  • */
  • @DeveloperApi
  • case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
  • // 是否存储到存储体系中,即memSize与diskSize的大小之和是否大于0。
  • def isCached: Boolean = memSize + diskSize > 0
  • }
  • @DeveloperApi
  • object BlockStatus {
  • // 提供一个静态方法用于获取没有记录任何状态的BlockStatus对象
  • def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
  • }

4.5. BlockInfo

BlockInfo用于描述BlockManager管理的数据块的元数据信息,包括数据块的存储级别,数据块的类型及大小,以及相关的锁信息;它的定义如下:

org.apache.spark.storage.BlockInfo
  • /**
  • * Tracks metadata for an individual block.
  • *
  • * Instances of this class are _not_ thread-safe and are protected by locks in the
  • * [[BlockInfoManager]].
  • *
  • * BlockInfo用于描述块的元数据信息,包括存储级别、Block类型、大小、锁信息等。
  • *
  • * @param level the block's storage level. This is the requested persistence level, not the
  • * effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
  • * does not imply that the block is actually resident in memory).
  • * BlockInfo所描述的Block的存储级别,即StorageLevel。
  • * @param classTag the block's [[ClassTag]], used to select the serializer
  • * BlockInfo所描述的Block的类型。
  • * @param tellMaster whether state changes for this block should be reported to the master. This
  • * is true for most blocks, but is false for broadcast blocks.
  • * BlockInfo所描述的Block是否需要告知Master。
  • */
  • private[storage] class BlockInfo(
  • val level: StorageLevel,
  • val classTag: ClassTag[_],
  • val tellMaster: Boolean) {
  • ...
  • }

BlockInfo有三个字段,分别是_size_readerCount_writerTask,定义如下:

org.apache.spark.storage.BlockInfo
  • // BlockInfo所描述的Block的大小。
  • private[this] var _size: Long = 0
  • // 读锁被锁次数
  • private[this] var _readerCount: Int = 0
  • /**
  • * TaskAttempt在对Block进行写操作前,首先必须获得对应BlockInfo的写锁。
  • * _writerTask用于保存TaskAttempt的ID(每个任务在实际执行时,会多次尝试,每次尝试都会分配一个ID)。
  • */
  • private[this] var _writerTask: Long = BlockInfo.NO_WRITER

其中_size表示数据块的大小,比较简单;_readerCount字段用于表示当前数据块被锁的次数,即当前数据块正在被多少个任务读取。_writerTask则用于记录当前正在对数据块进行写操作的TaskAttempt的ID。我们知道,读写锁的原则一般是,读共享,写互斥,即同一时刻可以被多个线程读取,但不能被多个线程修改;_readerCount_writerTask正体现了这种读写锁机制。

注:关于TaskAttempt会在后面的章节中介绍,这里将其理解为Task即可。

BlockInfo对_size_readerCount_writerTask三个字段都提供了Getter和Setter方法,Getter方法的实现都比较简单,但Setter方法则需要考虑“不可变性”:

org.apache.spark.storage.BlockInfo
  • // 设置数据块的大小
  • def size_=(s: Long): Unit = {
  • _size = s
  • // 检查
  • checkInvariants()
  • }
  • // 设置读锁被锁次数
  • def readerCount_=(c: Int): Unit = {
  • _readerCount = c
  • // 读锁次数增加需要在数据块不可变的情况下执行
  • checkInvariants()
  • }
  • // 设置执行写操作的任务,即TaskAttemptId
  • def writerTask_=(t: Long): Unit = {
  • _writerTask = t
  • // 设置写锁任务需要在数据块不可变的情况下执行
  • checkInvariants()
  • }

可见,它们在设置了相应字段之后,都调用了checkInvariants()检查不可变性,该方法源码如下:

org.apache.spark.storage.BlockInfo
  • /**
  • * 检查是否不可变,当数据块可变时,该方法会抛出异常;
  • * 这个方法是为了保证一些操作只在数据块不可变的情况下被执行,即下列情况之一:
  • * 1. 数据块没有进行读操作;
  • * 2. 数据块正在被读,且没有写操作。
  • */
  • private def checkInvariants(): Unit = {
  • // A block's reader count must be non-negative:
  • // 读锁被锁次数必须非负
  • assert(_readerCount >= 0)
  • // A block is either locked for reading or for writing, but not for both at the same time:
  • // 读锁被锁次数大于0时,必须是没有写任务;即不能在读的时候写数据
  • assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
  • }

从该方法的实现可知,当设置完相应的字段后,如果发现此时读操作和写操作同时存在,就会抛出异常。其中BlockInfo.NO_WRITER用于标识当前没有写操作,它定义在BlockInfo的伴生对象里:

org.apache.spark.storage.BlockInfo
  • private[storage] object BlockInfo {
  • /**
  • * Special task attempt id constant used to mark a block's write lock as being unlocked.
  • *
  • * 无写操作的标识
  • */
  • val NO_WRITER: Long = -1
  • /**
  • * Special task attempt id constant used to mark a block's write lock as being held by
  • * a non-task thread (e.g. by a driver thread or by unit test code).
  • *
  • * 读锁被持有了,但不是被Task线程持有;如,可能被Driver线程持有,或其他测试线程持有
  • */
  • val NON_TASK_WRITER: Long = -1024
  • }

5. BlockInfoManager

有了对上面的状态辅助类的理解,我们来分析一下BlockInfoManager类。在存储体系中,还有一个命名类似的BlockManagerInfo,这里先澄清一下它们的区别:

  • BlockInfoManager:是BlockManager用于管理数据块相关信息的类;BlockManager中会维护存储数据的元数据信息,BlockInfoManager就是辅助BlockManager专门用于组织和管理元数据信息的管理器。
  • BlockManagerInfo:在BlockManagerMasterEndpoint中,需要知道所有节点的BlockManager的状态,而BlockManagerInfo就是用于描述BlockManager信息的类;BlockManagerMasterEndpoint中有一个HashMap[BlockManagerId, BlockManagerInfo]类型的字典blockManagerInfo,键是标识唯一BlockManager的BlockManagerId对象,值则是记录了该BlockManager相关信息的BlockManagerInfo对象。这些在后面的内容中会详细介绍。

在BlockManager中,会实例化一个BlockInfoManager管理器,所有元数据管理工作就交给了该管理器,源码如下:

org.apache.spark.storage.BlockManager#blockInfoManager
  • // Visible for testing
  • private[storage] val blockInfoManager = new BlockInfoManager

BlockInfoManager类定义比较简单,它只是继承了Logging类提供日志功能;在BlockInfoManager中,使用infos字典记录数据块的信息,键为数据块的BlockId,值为数据块的BlockInfo对象:

org.apache.spark.storage.BlockInfoManager#infos
  • /**
  • * Used to look up metadata for individual blocks. Entries are added to this map via an atomic
  • * set-if-not-exists operation ([[lockNewBlockForWriting()]]) and are removed
  • * by [[removeBlock()]].
  • *
  • * BlockId与BlockInfo之间映射关系的缓存字典
  • */
  • @GuardedBy("this")
  • private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]

5.1. 锁记录维护

BlockInfoManager的一个很重要的功能就是维护BlockInfo的读锁和写锁记录并进行控制,它通过两个字典分别记录TaskAttempt持有的BlockInfo对应的读锁和写锁:

  • /**
  • * Tracks the set of blocks that each task has locked for writing.
  • *
  • * 每次TaskAttempt的标识TaskAttemptId与执行获取的Block的写锁之间的映射关系。
  • * TaskAttemptId与写锁之间是一对多的关系,即一次TaskAttempt执行会获取零到多个Block的写锁。
  • */
  • @GuardedBy("this")
  • private[this] val writeLocksByTask =
  • new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
  • with mutable.MultiMap[TaskAttemptId, BlockId] // 复合类型
  • /**
  • * Tracks the set of blocks that each task has locked for reading, along with the number of times
  • * that a block has been locked (since our read locks are re-entrant).
  • *
  • * 每次TaskAttempt执行的标识TaskAttemptId与获取的Block的读锁之间的映射关系。
  • * TaskAttemptId与读锁之间是一对多的关系,即一次TaskAttempt执行会获取零到多个Block的读锁,
  • * 并且会记录对于同一个Block的读锁的占用次数。
  • */
  • @GuardedBy("this")
  • private[this] val readLocksByTask =
  • new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

writeLocksByTask是一个复合字典,它的键为TaskAttempt的ID,值为单个BlockInfo或一个BlockInfo的集合,这是因为,一个TaskAttempt可能同时需要访问多个数据块,它会获取所有这些BlockInfo的写锁。

readLocksByTask结构则比较简单,就是一个HashMap,它的键也是TaskAttempt,值表示该TaskAttempt正在读的数据块的BlockInfo。

对于TaskAttempt对读锁和写锁的持有状态,有以下示意图:

1.BlockInfoManager的锁持有.png

这里对上述示意图进行几点解释:

  1. TaskAttempt - 0拥有BlockInfo - 0的写锁,因此其它的TaskAttempt不可对其写,也不可对其读。
  2. 对于BlockInfo - 1和BlockInfo - N来说,因为没有TaskAttempt拥有它的写锁,所以多个TaskAttempt可以同时拥有它的读锁并进行读操作。

5.2. 读锁定

BlockInfoManager的lockForReading(...)方法可以对指定的BlockInfo进行读锁定,它的源码如下:

org.apache.spark.storage.BlockInfoManager#lockForReading
  • /**
  • * Lock a block for reading and return its metadata.
  • *
  • * If another task has already locked this block for reading, then the read lock will be
  • * immediately granted to the calling task and its lock count will be incremented.
  • *
  • * If another task has locked this block for writing, then this call will block until the write
  • * lock is released or will return immediately if `blocking = false`.
  • *
  • * A single task can lock a block multiple times for reading, in which case each lock will need
  • * to be released separately.
  • *
  • * 锁定读:
  • * 1. 获取要读的BlockInfo,判断它是否正在被写;
  • * 2. 如果正在被写,就判断是否指定了阻塞等待;
  • * - 如果指定了阻塞等待,则阻塞等待直到写锁释放后被唤醒,然后重新获取读锁;
  • * - 如果没有指定阻塞等待,就放弃,返回NONE
  • * 3. 如果没有正在被写,就将BlockInfo的读锁次数加1,
  • * 然后将维护readLocksByTask字典中的记录,并返回BlockInfo。
  • *
  • * @param blockId the block to lock.
  • * @param blocking if true (default), this call will block until the lock is acquired. If false,
  • * this call will return immediately if the lock acquisition fails.
  • * @return None if the block did not exist or was removed (in which case no lock is held), or
  • * Some(BlockInfo) (in which case the block is locked for reading).
  • */
  • def lockForReading(
  • blockId: BlockId,
  • blocking: Boolean = true): Option[BlockInfo] = synchronized { // 加锁
  • logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
  • do {
  • // 从infos中获取BlockId对应的BlockInfo
  • infos.get(blockId) match {
  • case None => return None // 获取不到则返回None
  • case Some(info) =>
  • if (info.writerTask == BlockInfo.NO_WRITER) { // 没有写锁
  • // 由当前TaskAttempt线程持有读锁并返回BlockInfo
  • info.readerCount += 1
  • readLocksByTask(currentTaskAttemptId).add(blockId)
  • logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
  • return Some(info)
  • }
  • }
  • // 走到这里说明无法获取读锁,有其他TaskAttempt线程正在写
  • if (blocking) {
  • // 如果设置了阻塞,则等待,阻塞在BlockManager对象上
  • wait()
  • }
  • } while (blocking) // 如果设置了阻塞,则一直循环获取直到获取到
  • None
  • }

实现比较简单,由于是进行读锁定,因此要求不能有正在写的TaskAttempt;获取读锁后,会将该BlockInfo的BlockId记录到readLocksByTask字典的TaskAttempt对应的集合中,然后将BlockInfo返回。该方法的blocking参数决定是否开启阻塞模式,如果开启阻塞模式,当无法进行读锁定时,阻塞等待写锁被释放,写锁被释放时会唤醒阻塞的线程,此时读锁定线程将重试。

5.3. 写锁定

相应的,BlockInfoManager也提供了写锁定,源码如下:

org.apache.spark.storage.BlockInfoManager#lockForWriting
  • /**
  • * Lock a block for writing and return its metadata.
  • *
  • * If another task has already locked this block for either reading or writing, then this call
  • * will block until the other locks are released or will return immediately if `blocking = false`.
  • *
  • * 写锁定:
  • * 1. 获取要读的BlockInfo,判断它是否正在被写或者被读;
  • * 2. 如果是,就判断是否指定了阻塞等待;
  • * - 如果指定了阻塞等待,则阻塞等待直到读锁和写锁都释放后被唤醒,然后重新获取写锁;
  • * - 如果没有指定阻塞等待,就放弃,返回NONE
  • * 3. 如果没有正在被写或被读,就使用BlockInfo的writerTask记录当前TaskAttempt的ID,
  • * 然后将维护writeLocksByTask字典中的记录,并返回BlockInfo。
  • *
  • * @param blockId the block to lock.
  • * @param blocking if true (default), this call will block until the lock is acquired. If false,
  • * this call will return immediately if the lock acquisition fails.
  • * @return None if the block did not exist or was removed (in which case no lock is held), or
  • * Some(BlockInfo) (in which case the block is locked for writing).
  • */
  • def lockForWriting(
  • blockId: BlockId,
  • blocking: Boolean = true): Option[BlockInfo] = synchronized {
  • logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
  • do {
  • // 从infos中获取BlockId对应的BlockInfo
  • infos.get(blockId) match {
  • case None => return None // 获取不到则返回None
  • case Some(info) =>
  • if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
  • // 没有写锁,且没有读锁重入,则由当前TaskAttempt线程持有写锁并返回BlockInfo
  • info.writerTask = currentTaskAttemptId
  • writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
  • logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
  • return Some(info)
  • }
  • }
  • // 走到这里说明无法获取写锁,有其他TaskAttempt线程正在写或读
  • if (blocking) {
  • // 如果设置了阻塞,则等待,阻塞在BlockManager对象上
  • wait()
  • }
  • } while (blocking) // 如果设置了阻塞,则一直循环获取直到获取到
  • None
  • }

写锁定与读锁定的实现逻辑类似,唯一不同的是,写锁定需要指定的BlockInfo当前没有被写也没有被读。

5.4. 释放锁

BlockInfoManager将释放锁的操作放在了一个方法中,源码如下:

org.apache.spark.storage.BlockInfoManager#unlock
  • /**
  • * Release a lock on the given block.
  • *
  • * 释放BlockId对应的Block上的锁
  • */
  • def unlock(blockId: BlockId): Unit = synchronized {
  • // 当前TaskAttempt释放了锁
  • logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
  • // 获取对应的BlockInfo
  • val info = get(blockId).getOrElse {
  • throw new IllegalStateException(s"Block $blockId not found")
  • }
  • if (info.writerTask != BlockInfo.NO_WRITER) { // 持有写锁
  • info.writerTask = BlockInfo.NO_WRITER // 释放写锁
  • writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
  • } else { // 持有读锁
  • assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
  • // 读锁重入次数减1
  • info.readerCount -= 1
  • // 获取对应的BlockId集合
  • val countsForTask = readLocksByTask(currentTaskAttemptId)
  • // 对TaskAttempt在readLocksByTask集合中对应的BlockId的出现次数减1,返回的次数是BlockId之前的出现次数
  • val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
  • /**
  • * newPinCountForTask表示当前TaskAttempt持有BlockId对应的Block的读锁次数与1的差值
  • * 如果newPinCountForTask次数小于0,表示读锁释放次数大于加锁次数,会抛出异常
  • */
  • assert(newPinCountForTask >= 0, // 该条件不成立时才会抛出异常
  • s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it")
  • }
  • // 唤醒其它获取读锁或写锁失败的TaskAttempt线程,让它们重新尝试获取
  • notifyAll()
  • }

该方法也比较简单,对BlockInfo持有的锁的释放无非是读锁和写锁,持有读锁,将BlockInfo的读锁计数减1即可,持有写锁则直接将BlockInfo记录的_writerTask置为BlockInfo.NO_WRITER即可。而这个的操作都会维护对应的writeLocksByTaskreadLocksByTask字典;释放锁之后,会使用notifyAll()唤醒其它获取读锁或写锁失败的TaskAttempt线程,让它们重新尝试获取。

5.5. 锁降级

对锁比较了解的读者应该知道锁降级的概念,它是用于将当前持有的写锁降级为读锁;如果没有锁降级,在写锁释放后,然后获取读锁可能由于其他线程竞争而获取失败,锁降级能够保证写锁释放之后一定能够获取到读锁。BlockInfo上的实现锁降级的方法如下:

org.apache.spark.storage.BlockInfoManager#downgradeLock
  • /**
  • * Downgrades an exclusive write lock to a shared read lock.
  • *
  • * 锁降级,即写锁 -> 降级为读锁
  • */
  • def downgradeLock(blockId: BlockId): Unit = synchronized {
  • logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
  • // 获取blockId对应的BlockInfo
  • val info = get(blockId).get
  • require(info.writerTask == currentTaskAttemptId,
  • s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
  • s" block $blockId")
  • // 调用unlock方法释放当前TaskAttempt线程从BlockId对应Block获取的写锁。
  • unlock(blockId)
  • // 非阻塞方式获取读锁
  • val lockOutcome = lockForReading(blockId, blocking = false)
  • assert(lockOutcome.isDefined)
  • }

注意,该方法体都包裹在synchronized同步块中,只有这种方式才能保证锁降级成功。如果没有同步块控制,释放写锁和获取读锁的间隔可能因为竞争线程抢先获取读锁而导致锁降级失败。

5.6. 判断读锁持有

assertBlockIsLockedForWriting(...)可用于判断当前任务上下文TaskContext中当前正在执行的TaskAttempt是否持有了某个BlockInfo的读锁,源码比较简单:

  • /**
  • * Throws an exception if the current task does not hold a write lock on the given block.
  • * Otherwise, returns the block's BlockInfo.
  • *
  • * 判断当前TaskAttempt是否获取了指定BlockInfo的读锁
  • */
  • def assertBlockIsLockedForWriting(blockId: BlockId): BlockInfo = synchronized {
  • infos.get(blockId) match {
  • case Some(info) =>
  • if (info.writerTask != currentTaskAttemptId) {
  • // 当前TaskAttempt没有获取该BlockInfo的读锁,抛出异常
  • throw new SparkException(
  • s"Task $currentTaskAttemptId has not locked block $blockId for writing")
  • } else {
  • // 当前TaskAttempt获取了该BlockInfo的读锁,将该BlockInfo返回
  • info
  • }
  • case None =>
  • throw new SparkException(s"Block $blockId does not exist")
  • }
  • }

这个操作用到了currentTaskAttemptId()方法,用于获取任务上下文TaskContext中当前正在执行的TaskAttempt的TaskAttemptId,源码如下:

org.apache.spark.storage.BlockInfoManager#currentTaskAttemptId
  • /**
  • * Returns the current task's task attempt id (which uniquely identifies the task), or
  • * [[BlockInfo.NON_TASK_WRITER]] if called by a non-task thread.
  • *
  • * 获取任务上下文TaskContext中当前正在执行的TaskAttempt的TaskAttemptId
  • */
  • private def currentTaskAttemptId: TaskAttemptId = {
  • // 如果没有将返回BlockInfo.NON_TASK_WRITER(-1024)
  • Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(BlockInfo.NON_TASK_WRITER)
  • }

5.7. 释放所有锁

releaseAllLocksForTask(...)方法用于释放指定TaskAttempt在所有BlockInfo上获取的锁,该方法的实现比较简单,源码如下:

org.apache.spark.storage.BlockInfoManager#releaseAllLocksForTask
  • /**
  • * Release all lock held by the given task, clearing that task's pin bookkeeping
  • * structures and updating the global pin counts. This method should be called at the
  • * end of a task (either by a task completion handler or in `TaskRunner.run()`).
  • *
  • * @return the ids of blocks whose pins were released
  • */
  • def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
  • // 记录释放了锁的数据库的BlockId
  • val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
  • // 获取TaskAttempt读锁定的BlockInfo的BlockId的集合
  • val readLocks = synchronized {
  • readLocksByTask.remove(taskAttemptId).get
  • }
  • // 获取该TaskAttempt写锁定的BlockInfo的BlockId的集合
  • val writeLocks = synchronized {
  • writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
  • }
  • // 遍历写锁定的BlockId的集合
  • for (blockId <- writeLocks) {
  • // 获取对应的BlockInfo
  • infos.get(blockId).foreach { info =>
  • // 判断当前持有该BlockInfo写锁的TaskAttempt是否是指定的TaskAttempt
  • assert(info.writerTask == taskAttemptId)
  • // 如果是,清除该BlockInfo的写锁
  • info.writerTask = BlockInfo.NO_WRITER
  • }
  • // 对该BlockInfo的BlockId进行记录
  • blocksWithReleasedLocks += blockId
  • }
  • // 遍历读锁定的BlockId的集合
  • readLocks.entrySet().iterator().asScala.foreach { entry =>
  • // 获取BlockId
  • val blockId = entry.getElement
  • // 获取读锁的次数
  • val lockCount = entry.getCount
  • // 记录该BlockInfo的BlockId
  • blocksWithReleasedLocks += blockId
  • synchronized {
  • // 获取对应BlockInfo
  • get(blockId).foreach { info =>
  • // 释放1次读锁
  • info.readerCount -= lockCount
  • assert(info.readerCount >= 0)
  • }
  • }
  • }
  • synchronized {
  • // 唤醒其它获取读锁或写锁失败的TaskAttempt线程,让它们重新尝试获取
  • notifyAll()
  • }
  • // 返回记录的释放了锁的BlockInfo的BlockId集合
  • blocksWithReleasedLocks
  • }

5.8. 添加新的BlockInfo

lockNewBlockForWriting(blockId: BlockId, newBlockInfo: BlockInfo): Boolean会尝试添加一个新的BlockInfo,它的第二个参数是新BlockInfo,第一个参数则是该BlockInfo对应的BlockId,返回值表示是否添加成功;源码如下:

org.apache.spark.storage.BlockInfoManager#lockNewBlockForWriting
  • /**
  • * Attempt to acquire the appropriate lock for writing a new block.
  • *
  • * This enforces the first-writer-wins semantics. If we are the first to write the block,
  • * then just go ahead and acquire the write lock. Otherwise, if another thread is already
  • * writing the block, then we wait for the write to finish before acquiring the read lock.
  • *
  • * 尝试添加新的BlockInfo并获取其写锁
  • * 如果对应的BlockInfo已经存在,就返回false
  • *
  • * @return true if the block did not already exist, false otherwise. If this returns false, then
  • * a read lock on the existing block will be held. If this returns true, a write lock on
  • * the new block will be held.
  • */
  • def lockNewBlockForWriting(
  • blockId: BlockId,
  • newBlockInfo: BlockInfo): Boolean = synchronized {
  • logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
  • // 获取读锁
  • lockForReading(blockId) match {
  • case Some(info) => // 能获取得到,说明BlockId对应的Block已经存在
  • // 说明有其它TaskAttempt线程竞争操作,直接返回false即可
  • // Block already exists. This could happen if another thread races with us to compute
  • // the same block. In this case, just keep the read lock and return.
  • false
  • case None => // 如果没有获取到,说明BlockId对应的Block还不存在
  • // Block does not yet exist or is removed, so we are free to acquire the write lock
  • // 将新的BlockInfo放入infos字典
  • infos(blockId) = newBlockInfo
  • // 尝试获取写锁,默认阻塞
  • lockForWriting(blockId)
  • // 获取到则返回true
  • true
  • }
  • }

如果添加的BlockInfo已经存在,该方法会直接返回false,否则会将新的BlockInfo添加到infos字典,同时非阻塞方式获取它的读锁,最后返回true。

5.9. 移除BlockInfo

removeBlock(...)用于根据指定的BlockId移除对应的BlockInfo,源码如下:

org.apache.spark.storage.BlockInfoManager#removeBlock
  • /**
  • * Removes the given block and releases the write lock on it.
  • *
  • * This can only be called while holding a write lock on the given block.
  • *
  • * 移除BlockId对应的BlockInfo。
  • */
  • def removeBlock(blockId: BlockId): Unit = synchronized {
  • logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
  • infos.get(blockId) match { // 获取对应的BlockInfo
  • case Some(blockInfo) => // 能获取到
  • if (blockInfo.writerTask != currentTaskAttemptId) { // 查看拥有写锁的线程是否就是当前线程
  • // 如果不是则抛出异常
  • throw new IllegalStateException(
  • s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
  • } else { // 拥有写锁的线程就是当前线程
  • // 从infos中移除对应的BlockInfo
  • infos.remove(blockId)
  • // 将BlockInfo的读锁重入次数置为0
  • blockInfo.readerCount = 0
  • // 将BlockInfo的读TaskAttempt线程置为NO_WRITER
  • blockInfo.writerTask = BlockInfo.NO_WRITER
  • // 从writeLocksByTask移除对应TaskAttempt对应的blockId
  • writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
  • }
  • case None =>
  • throw new IllegalArgumentException(
  • s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
  • }
  • // 唤醒其它阻塞获取的线程
  • notifyAll()
  • }

在移除操作中,如果有其它的TaskAttempt正在对该BlockInfo进行读锁定是无法移除该Block的。移除操作会同时会清空该BlockInfo的读锁和写锁,并且从writeLocksByTask字典移除对应的记录。

5.10. 清空所有记录

clear()操作将当前BlockInfoManager中记录的BlockInfo的相关信息全部清除,源码比较简单:

org.apache.spark.storage.BlockInfoManager#clear
  • /**
  • * Delete all state. Called during shutdown.
  • * 清除BlockInfoManager中的所有信息,并通知所有在BlockInfoManager管理的Block的锁上等待的线程。
  • */
  • def clear(): Unit = synchronized {
  • // 清理所有BlockInfo的读锁重入次数和TaskAttempt读线程
  • infos.valuesIterator.foreach { blockInfo =>
  • blockInfo.readerCount = 0
  • blockInfo.writerTask = BlockInfo.NO_WRITER
  • }
  • // 清空infos数组
  • infos.clear()
  • // 清空读写映射关系
  • readLocksByTask.clear()
  • writeLocksByTask.clear()
  • // 唤醒其它阻塞获取的线程
  • notifyAll()
  • }

6. BlockManagerInfo

前面有提到过,BlockManagerInfo是用于记录BlockManager相关信息的类,每个BlockManager实例对应一个BlockManagerInfo实例;在BlockManagerMasterEndpoint中定义了一个HashMap类型的字典blockManagerInfo,它的键为BlockManagerId,值为BlockManagerInfo对象:

org.apache.spark.storage.BlockManagerMasterEndpoint#blockManagerInfo
  • // Mapping from block manager id to the block manager's information.
  • // BlockManagerId与BlockManagerInfo之间映射关系的缓存。
  • private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

我们现在不必关注BlockManagerMasterEndpoint的原理,在后面会讲解。BlockManagerInfo的定义和重要字段如下:

org.apache.spark.storage.BlockManagerInfo
  • private[spark] class BlockManagerInfo(
  • val blockManagerId: BlockManagerId, // 对应BlockManager的BlockManagerId
  • timeMs: Long, // 创建时间
  • val maxMem: Long, // BlockManager中剩余可用内存的大小
  • val slaveEndpoint: RpcEndpointRef) // 对应的BlockManager所在节点的BlockManagerSlaveEndpointRef
  • extends Logging {
  • // 记录最后一次访问当前BlockManagerInfo的时间
  • private var _lastSeenMs: Long = timeMs
  • // 记录当前BlockManagerInfo对应的BlockManager管理的所有数据块的剩余的可用内存大小
  • private var _remainingMem: Long = maxMem
  • // Mapping from block id to its status.
  • // 记录当前BlockManagerInfo对应的BlockManager管理的所有数据块的BlockStatus状态对象
  • private val _blocks = new JHashMap[BlockId, BlockStatus]
  • // Cached blocks held by this BlockManager. This does not include broadcast blocks.
  • // 记录当前BlockManagerInfo对应的BlockManager管理的数据块,但不包括Broadcast数据块
  • private val _cachedBlocks = new mutable.HashSet[BlockId]
  • ...
  • }

可见,_blocks字典记录着对应的BlockManager管理的数据块的状态。这里需要注意的是_remainingMem字段,它用于记录当前BlockManagerInfo对应的BlockManager管理的所有数据块的剩余的可用内存大小。

BlockManagerInfo中大部分方法都比较简单,只有updateBlockInfo(...)removeBlock(...)两个方法实现较负责,下面将详细介绍。

6.1. 更新数据块状态

updateBlockInfo(...)方法用于更新指定的数据块的BlockStatus,源码如下:

org.apache.spark.storage.BlockManagerInfo#updateBlockInfo
  • // 更新指定数据块的BlockStatus信息
  • def updateBlockInfo(
  • blockId: BlockId,
  • storageLevel: StorageLevel,
  • memSize: Long,
  • diskSize: Long) {
  • // 更新时间最后一次访问当前BlockManagerInfo的时间
  • updateLastSeenMs()
  • // 判断当前BlockManagerInfo对应的BlockManager是否管理着指定的数据块
  • if (_blocks.containsKey(blockId)) { // 存在,即管理着
  • // The block exists on the slave already.
  • // 获取数据块对应的BlockStatus
  • val blockStatus: BlockStatus = _blocks.get(blockId)
  • // 记录旧的值
  • val originalLevel: StorageLevel = blockStatus.storageLevel
  • val originalMemSize: Long = blockStatus.memSize
  • // 检查原来的存储级别是否使用了内存存储
  • if (originalLevel.useMemory) {
  • /**
  • * 因为原来的存储级别使用的内存,现在要对其进行修改,
  • * 则先将旧的内存值累加到_remainingMem中,说明将该数据块原来使用的内存大小归还了,
  • * 在后面的操作还会将新的内存值从_remainingMem中减去,说明又将特定的内存大小分配给该数据块了。
  • */
  • _remainingMem += originalMemSize
  • }
  • }
  • // 检查设置的存储级别是否有效,即数据块使用了内存或磁盘存储级别,且副本数量大于1
  • if (storageLevel.isValid) {
  • /* isValid means it is either stored in-memory or on-disk.
  • * The memSize here indicates the data size in or dropped from memory,
  • * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
  • * and the diskSize here indicates the data size in or dropped to disk.
  • * They can be both larger than 0, when a block is dropped from memory to disk.
  • * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
  • // 设置的存储级别有效
  • var blockStatus: BlockStatus = null
  • // Q: 疑问:为什么内存和存储只会使用一个???
  • if (storageLevel.useMemory) { // 使用内存存储级别
  • // 构建新的BlockStatus对象,磁盘存储为0
  • blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
  • // 更新_blocks字典
  • _blocks.put(blockId, blockStatus)
  • // 将特定的内存大小分配给该数据块了,所以需要从总的剩余内存中减去
  • _remainingMem -= memSize
  • logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
  • blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
  • Utils.bytesToString(_remainingMem)))
  • }
  • if (storageLevel.useDisk) { // 使用磁盘存储级别
  • // 构建新的BlockStatus对象,内存存储为0
  • blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
  • // 更新_blocks字典
  • _blocks.put(blockId, blockStatus)
  • logInfo("Added %s on disk on %s (size: %s)".format(
  • blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
  • }
  • // 检查数据块是否不是Broadcast数据块,且已经存在缓存数据
  • if (!blockId.isBroadcast && blockStatus.isCached) {
  • // 如果是,将其记录到_cachedBlocks缓存集合
  • _cachedBlocks += blockId
  • }
  • } else if (_blocks.containsKey(blockId)) { // 存储级别无效,检查当前BlockManager是否管理该数据块
  • // 说明此时是需要将该数据块进行移除的
  • // If isValid is not true, drop the block.
  • // 获取对应的BlockStatus
  • val blockStatus: BlockStatus = _blocks.get(blockId)
  • // 从_blocks中移除
  • _blocks.remove(blockId)
  • // 从_cachedBlocks中移除
  • _cachedBlocks -= blockId
  • // 打印日志
  • if (blockStatus.storageLevel.useMemory) {
  • logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
  • blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
  • Utils.bytesToString(_remainingMem)))
  • }
  • if (blockStatus.storageLevel.useDisk) {
  • logInfo("Removed %s on %s on disk (size: %s)".format(
  • blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
  • }
  • }
  • }

该方法的整体逻辑比较简单,但需要注意各个步骤对_remainingMem字段的维护;在更新数据块使用的内存大小时,先将旧的使用量还给_remainingMem,然后从_remainingMem分配新的使用量,以保证更新前后内存的总量是不变的。

6.2. 移除数据块

removeBlock(...)方法用于移除指定的数据块,源码如下:

org.apache.spark.storage.BlockManagerInfo#removeBlock
  • // 移除指定数据块
  • def removeBlock(blockId: BlockId) {
  • // 先判断是否存在该数据块
  • if (_blocks.containsKey(blockId)) { // 存在
  • // 归还数据块占用的内存给_remainingMem
  • _remainingMem += _blocks.get(blockId).memSize
  • // 从_blocks中移除
  • _blocks.remove(blockId)
  • }
  • // 从_cachedBlocks中移除
  • _cachedBlocks -= blockId
  • }

可见,在移除操作中,也向_remainingMem字段归还了该数据块使用的内存量。

有了对以上辅助类的了解,我们可以得到存储体系中对于数据块状态的层级管理示意图:

2.存储体系的状态结构.png

至此,关于Spark存储体系相关的辅助类都已介绍完了,在下一篇文章中我们将探讨存储体系中通信层的实现和工作原理。