大数据
流式处理
Spark
源码解析

Spark源码分析09 - 存储体系02:通信层(1)消息通信

简介:本文将主要讲解存储体系中消息通信层的各类组件和实现原理。

1. 通信层结构

对于存储体系的通信层,主要有以下几个组件,它们都分布在Spark Core模块中:

  1. BlockManagerMaster:位于org.apache.spark.storage包下;它主要负责代理Driver和Executor上BlockManager之间的通信。Driver和Executor上都存在BlockManagerMaster实例。
  2. BlockManagerMasterEndpoint:位于org.apache.spark.storage包下;即RpcEndpoint端点,它只存在于Driver上,而Driver和Executor会持有BlockManagerMasterEndpoint端点的BlockManagerMasterEndpointRef引用,用于通信操作。
  3. BlockManagerSlaveEndpoint:位于org.apache.spark.storage包下;每个Driver和Executor上都有属于自己的BlockManagerSlaveEndpoint,且它们的BlockManager还持有各自BlockManagerSlaveEndpoint对应的BlockManagerSlaveEndpointRef引用。
  4. BlockTransferService:位于org.apache.spark.network包下;每个Driver和Executor上都存在BlockTransferService组件,它用于不同阶段的任务之间进行存储数据的读写及传输操作。

上面的介绍不是很直观,这里补充一些说明。Driver和Executor上都存在自己的存储系统,但是Driver会充当Master的角色,负责整个存储体系的元数据的管理和维护,同时Driver和Executor都会充当Slave角色,负责各自存储系统的管理;因此在Driver上会存在BlockManagerMasterEndpoint组件,该组件就是用于和Driver及Executor上的BlockManager进行通信的;而Driver和Executor上的存储系统是由BlockManager来统一管理的,每个BlockManager都会拥有BlockManagerMaster,由它使用BlockManagerMasterEndpointRef来与Driver上的BlockManagerMasterEndpoint发送消息;而BlockManagerMasterEndpoint会使用BlockManagerSlaveEndpointRef向Driver和Executor上的BlockManagerSlaveEndpoint发送消息。从这里可以看出,Driver在于BlockManagerMasterEndpoint通信是本地通信,不需要网络传输,而Executor与BlockManagerMasterEndpoint之间的通信是需要经过网络传输的。BlockTransferService则是Driver及Executor之间进行数据传输的组件。

对于上述的组件,我们有以下的结构示意图:

1.存储体系通信层的结构.png

2. BlockManagerMaster

在上一篇文章中,我们提到了BlockManager的构造过程需要传入BlockManagerMaster;BlockManagerMaster负责代理Driver和Executor上BlockManager之间的通信,它也由SparkEnv创建。

BlockManagerMaster定义在Spark Core模块的org.apache.spark.storage包下,在Driver和Executor上都会创建自己的BlockManagerMaster,但是创建过程不一样,SparkEnv中对应的源码片段如下:

org.apache.spark.SparkEnv#create
  • /**
  • * 创建BlockManagerMaster,Driver和Executor都会持有一个BlockManagerMaster,
  • * 因此这个方法对不同的角色有不同的实现,会查找或注册BlockManagerMasterEndpoint:
  • * - 当前应用程序是Driver,则创建BlockManagerMasterEndpoint,并且注册到Dispatcher中,注册名为BlockManagerMaster。
  • * - 当前应用程序是Executor,则从远端Driver实例的NettyRpcEnv的Dispatcher中查找BlockManagerMasterEndpoint的引用。
  • */
  • val blockManagerMaster = new BlockManagerMaster(
  • // 注册或查找RpcEndpoint
  • registerOrLookupEndpoint(
  • BlockManagerMaster.DRIVER_ENDPOINT_NAME, // 字符串“BlockManagerMaster”
  • new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  • conf, isDriver)

创建BlockManagerMaster时,以前面讲过的rpcEnv和LiveListenerBus事件总线构建了一个BlockManagerMasterEndpoint端点对象,而registerOrLookupEndpoint(...)方法则根据节点角色决定是查找还是注册BlockManagerMasterEndpoint,它的源码如下:

org.apache.spark.SparkEnv#create
  • // 注册RpcEndpoint或者查找RpcEndpoint
  • def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
  • if (isDriver) { // 如果是Driver
  • logInfo("Registering " + name)
  • // 使用RpcEnv的方法进行注册,会返回对应的RpcEndpointRef
  • rpcEnv.setupEndpoint(name, endpointCreator)
  • } else { // 如果是Executor
  • // 向远端Driver的NettyRpcEnv询问获取相关RpcEndpoint的RpcEndpointRef
  • RpcUtils.makeDriverRef(name, conf, rpcEnv)
  • }
  • }

可见,如果是Driver角色,它会将创建的BlockManagerMasterEndpoint注册到rpcEnv中,名称为BlockManagerMaster.DRIVER_ENDPOINT_NAME,即字符串“BlockManagerMaster”,并返回对应的RpcEndpointRef;否则说明是Executor角色,此时调用RpcUtils的makeDriverRef(...)方法向远端Driver的NettyRpcEnv询问获取Driver上BlockManagerMasterRpcEndpoint的RpcEndpointRef;源码如下:

org.apache.spark.util.RpcUtils#makeDriverRef
  • /**
  • * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
  • */
  • def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
  • val driverHost: String = conf.get("spark.driver.host", "localhost")
  • val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  • Utils.checkHost(driverHost, "Expected hostname")
  • // 使用Executor的NettyRpcEnv,通过URI和名称来询问Driver
  • rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
  • }

不知道大家还是否记得NettyRpcEnv中的setupEndpointRef(...)方法,它最终是调用自己的asyncSetupEndpointRefByURI(...)实现的,该方法会通过NettyRpcEnv向远端未知的RpcEndpoint发送询问请求,如果能得到响应,说明远端的RpcEndpoint是存在的,直接将对应的NettyRpcEndpointRef返回即可,否则会抛出异常,源码如下:

org.apache.spark.rpc.netty.NettyRpcEnv#asyncSetupEndpointRefByURI
  • def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
  • // 得到RpcEndpointAddress对象
  • val addr = RpcEndpointAddress(uri)
  • // 构建远端RpcEndpoint的NettyRpcEndpointRef
  • val endpointRef = new NettyRpcEndpointRef(conf, addr, this)
  • // 构造用于发送验证请求的NettyRpcEndpointRef
  • val verifier = new NettyRpcEndpointRef(
  • conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
  • // 使用ask方法进行询问,向远端NettyRpcEnv的NettyRpcEndpointRef发送RpcEndpointVerifier.CheckExistence消息
  • verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>
  • if (find) {
  • // 能够查询到
  • Future.successful(endpointRef)
  • } else {
  • // 没有查询到
  • Future.failed(new RpcEndpointNotFoundException(uri))
  • }
  • }(ThreadUtils.sameThread)
  • }

在经过这一番操作之后,最终Driver和Executor都会创建自己的BlockManagerMaster,并且都会在BlockManagerMaster的driverEndpoint字段上记录BlockManagerMasterEndpoint的RpcEndpointRef。拥有BlockManagerMasterEndpointRef是向BlockManagerMasterEndpoint通信的前提条件。

由于BlockManagerMasterEndpoint管理着集群所有节点中BlockManager相关的信息,因此BlockManagerMaster中定义了大量的方法,用于与BlockManagerMasterEndpoint进行通信,以报告或获取相关信息,下面汇总了一个表格:

功能 发送的消息 消息携带的信息 返回值
注册BlockManager RegisterBlockManager 标识BlockManager的BlockManagerId、BlockManager所管理的内存大小以及BlockManager所在节点的BlockManagerSlaveEndpoint的RpcEndpointRef等 分配的BlockManagerId
同步移除指定的Executor RemoveExecutor 指定Executor ID
异步移除指定的Executor RemoveExecutor 指定Executor ID
移除指定的数据块 RemoveBlock 指定BlockId
移除指定的RDD的所有数据块 RemoveRdd RDD ID
移除指定的Broadcast的所有数据块 RemoveBroadcast Broadcast ID
移除指定的Shuffle的所有数据块 RemoveShuffle Shuffle ID
更新指定的数据块的信息 UpdateBlockInfo BlockManagerId、BlockId、新的数据块配置(包括存储级别、内存大小、磁盘大小)等 Boolean,是否更新成功
判断是否管理着指定的数据块 GetLocations 指定BlockId Boolean,是否管理
判断Executor是否缓存了数据块 HasCachedBlocks Executor ID Boolean,是否缓存
获取指定数据块的状态 GetBlockStatus 指定BlockId Map[BlockManagerId, BlockStatus]状态字典
获取Executor的RpcEndpointRef GetExecutorEndpointRef Executor ID Option[RpcEndpointRef],BlockManagerSlaveEndpoint的RpcEndpointRef
获取指定数据块的位置信息 GetLocations BlockId Seq[BlockManagerId],数据所在的BlockManager的标识序列
获取多个指定数据块的位置信息 GetLocationsMultipleBlockIds BlockId数组 IndexedSeq[Seq[BlockManagerId]],每个数据块所在的BlockManager的标识序列
获取匹配的数据块 GetMatchingBlockIds 过滤器函数 Seq[BlockId],匹配的数据块的BlockId序列
获取所有BlockManager的内存状态 GetMemoryStatus Map[BlockManagerId, (Long, Long)],每个BlockManager的内存状态
获取所有BlockManager的存储状态 GetStorageStatus Array[StorageStatus],每个BlockManager的存储状态
获取BlockManager的标识集合 GetPeers 当前BlockManager的BlockManagerId标识 Seq[BlockManagerId],BlockManager的标识序列
停止Driver上的BlockManagerMasterEndpoint(该消息只有Driver自己的BlockManagerMaster可以发送) StopBlockManagerMaster

虽然方法数量非常多,但读者不用担心,因为它们的实现一般都很简单;下面我们将分别对这些方法的实现进行介绍。

注:这些方法的功能基本上都是通过向BlockManagerMasterEndpoint发送特定消息来实现的,关于BlockManagerMasterEndpoint对消息的处理逻辑这里暂且不表,在后面讲解BlockManagerMasterEndpoint的相关章节会有详细介绍。

2.1. 注册BlockManager

BlockManager通过自己所拥有的BlockManagerMaster的registerBlockManager(...)方法向BlockManagerMasterEndpoint注册自己,实现方式是向BlockManagerMasterEndpoint的RpcEndpointRef发送携带有标识BlockManager的BlockManagerId、BlockManager所管理的内存大小以及BlockManager所在节点的BlockManagerSlaveEndpoint的RpcEndpointRef等信息的RegisterBlockManager消息,该方法源码如下:

注:关于BlockManagerSlaveEndpoint会在后面的章节中介绍。

Scala
  • /**
  • * Register the BlockManager's id with the driver. The input BlockManagerId does not contain
  • * topology information. This information is obtained from the master and we respond with an
  • * updated BlockManagerId fleshed out with this information.
  • *
  • * 注册BlockManager
  • *
  • * @param blockManagerId BlockManager的唯一标识
  • * @param maxMemSize BlockManager管理的最大内存大小(字节)
  • * @param slaveEndpoint BlockManager上的BlockManagerSlaveEndpoint,
  • * 主要用于BlockManagerMasterEndpoint与该BlockManager通信
  • * @return 由BlockManagerMasterEndpoint分配的正式BlockManagerId
  • */
  • def registerBlockManager(
  • blockManagerId: BlockManagerId,
  • maxMemSize: Long,
  • slaveEndpoint: RpcEndpointRef): BlockManagerId = {
  • logInfo(s"Registering BlockManager $blockManagerId")
  • /**
  • * 向BlockManagerMasterEndpoint发送RegisterBlockManager消息
  • * RegisterBlockManager将携带要注册的BlockManager的blockManagerId、
  • * 最大内存大小及BlockManagerSlaveEndpoint
  • */
  • val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
  • RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
  • logInfo(s"Registered BlockManager $updatedId")
  • updatedId
  • }

注意,在RegisterBlockManager消息中会携带BlockManager的BlockManagerId标识,但这个标识是BlockManager暂定的,在BlockManagerMasterEndpoint收到消息并成功处理后会返回一个分配的BlockManagerId,这个才是有效的标识。

2.2. 移除指定的Executor

removeExecutor(...)方法用于移除指定的Executor,该方法通过向BlockManagerMasterEndpoint发送携带了指定Executor ID的RemoveExecutor消息实现:

  • /** Remove a dead executor from the driver endpoint. This is only called on the driver side.
  • * 根据Executor ID移除Driver或Executor
  • **/
  • def removeExecutor(execId: String) {
  • // 需要通知BlockManagerMasterEndpoint,即向它发送RemoveExecutor消息
  • tell(RemoveExecutor(execId))
  • logInfo("Removed " + execId + " successfully in removeExecutor")
  • }

此方法内调用了tell(...)方法,这个方法就是简单地向BlockManagerMasterEndpoint发送OneWayMessage:

org.apache.spark.storage.BlockManagerMaster#tell
  • /** Send a one-way message to the master endpoint, to which we expect it to reply with true.
  • * 用于发送不需要回复的OneWayMessage消息给BlockManagerMasterEndpoint
  • **/
  • private def tell(message: Any) {
  • if (!driverEndpoint.askWithRetry[Boolean](message)) {
  • // 发送失败会抛出异常
  • throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
  • }
  • }

另外,removeExecutor(...)方法还有一个异步版本,它直接调用了BlockManagerMasterEndpoint的RpcEndpointRef的ask(...)方法实现:

org.apache.spark.storage.BlockManagerMaster#removeExecutorAsync
  • /** Request removal of a dead executor from the driver endpoint.
  • * This is only called on the driver side. Non-blocking
  • *
  • * 根据Executor ID异步移除Driver或Executor
  • */
  • def removeExecutorAsync(execId: String) {
  • // 向它发送BlockManagerMasterEndpoint消息;ask操作是个异步操作,它会返回一个Future对象
  • driverEndpoint.ask[Boolean](RemoveExecutor(execId))
  • logInfo("Removal of executor " + execId + " requested")
  • }

ask(...)方法askWithRetry(...)方法的区别在于,后者发送后立即返回一个Future对象,而后者会根据配置的超时时间和重试次数进行阻塞重试。

2.3. 移除指定的数据块

BlockManagerMasterremoveBlock(...)方法用于移除指定的数据块,它通过向BlockManagerMasterEndpoint发送携带了指定BlockId的RemoveBlock消息实现:

org.apache.spark.storage.BlockManagerMaster#removeBlock
  • /**
  • * Remove a block from the slaves that have it. This can only be used to remove
  • * blocks that the driver knows about.
  • *
  • * 移除指定的数据块
  • */
  • def removeBlock(blockId: BlockId) {
  • // 向BlockManagerMasterEndpoint发送RemoveBlock消息以删除Block
  • driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
  • }

2.4. 移除指定的RDD的所有数据块

removeRdd(...)方法用于移除所属于指定RDD的所有数据块,通过向BlockManagerMasterEndpoint发送携带了RDD ID信息的RemoveRdd消息实现:

org.apache.spark.storage.BlockManagerMaster#removeRdd
  • /** Remove all blocks belonging to the given RDD.
  • * 移除属于给定RDD的所有数据块,第二个参数指定是否阻塞
  • **/
  • def removeRdd(rddId: Int, blocking: Boolean) {
  • // 向BlockManagerMasterEndpoint发送RemoveRdd消息以删除数据块
  • val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
  • future.onFailure {
  • case e: Exception =>
  • logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
  • }(ThreadUtils.sameThread)
  • if (blocking) {
  • timeout.awaitResult(future)
  • }
  • }

2.5. 移除指定的Broadcast的所有数据块

Broadcast广播的数据可能会被存储为多个数据块,removeBroadcast(...)方法用于移除所属于指定Broadcast的所有数据块;它通过向BlockManagerMasterEndpoint发送携带有Broadcast ID信息的RemoveBroadcast消息实现:

org.apache.spark.storage.BlockManagerMaster#removeBroadcast
  • /** Remove all blocks belonging to the given broadcast.
  • * 移除属于给定Broadcast的所有数据块,第二个参数指定是否阻塞
  • **/
  • def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
  • // 向BlockManagerMasterEndpoint发送RemoveBroadcast消息以删除Block
  • val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](
  • RemoveBroadcast(broadcastId, removeFromMaster))
  • future.onFailure {
  • case e: Exception =>
  • logWarning(s"Failed to remove broadcast $broadcastId" +
  • s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
  • }(ThreadUtils.sameThread)
  • if (blocking) {
  • timeout.awaitResult(future)
  • }
  • }

2.6. 移除指定的Shuffle的所有数据块

removeShuffle(...)方法则用于删除所属于指定Shuffle过程的所有数据块,通过向BlockManagerMasterEndpoint发送携带了Shuffle ID信息的RemoveShuffle消息实现:

org.apache.spark.storage.BlockManagerMaster#removeShuffle
  • /** Remove all blocks belonging to the given shuffle.
  • * 移除属于给定Shuffle的所有数据块,第二个参数指定是否阻塞
  • **/
  • def removeShuffle(shuffleId: Int, blocking: Boolean) {
  • // 向BlockManagerMasterEndpoint发送RemoveShuffle消息以删除Block
  • val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
  • future.onFailure {
  • case e: Exception =>
  • logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
  • }(ThreadUtils.sameThread)
  • if (blocking) {
  • timeout.awaitResult(future)
  • }
  • }

2.7. 更新指定的数据块的信息

updateBlockInfo(...)方法用于更新指定BlockManager管理的指定数据块的信息,通过向BlockManagerMasterEndpoint发送携带了BlockManagerId、BlockId、新的数据块配置(包括存储级别、内存大小、磁盘大小)等信息的UpdateBlockInfo消息实现:

org.apache.spark.storage.BlockManagerMaster#updateBlockInfo
  • /**
  • * 更新指定BlockManager管理的指定数据块的信息
  • * @param blockManagerId BlockManager的唯一标识
  • * @param blockId 指定的数据块的BlockId
  • * @param storageLevel 更新的存储级别
  • * @param memSize 更新的内存使用大小
  • * @param diskSize 更新的磁盘使用大小
  • *
  • * @return 是否更新成功
  • */
  • def updateBlockInfo(
  • blockManagerId: BlockManagerId,
  • blockId: BlockId,
  • storageLevel: StorageLevel,
  • memSize: Long,
  • diskSize: Long): Boolean = {
  • // 使用BlockManagerMasterEndpoint的RpcEndpointRef进行通信,发送UpdateBlockInfo消息
  • val res = driverEndpoint.askWithRetry[Boolean](
  • UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
  • logDebug(s"Updated info of block $blockId")
  • res
  • }

2.8. 判断是否管理着指定的数据块

contains(...)方法判断当前所有BlockManager中是否管理着指定的数据块,通过向BlockManagerMasterEndpoint发送携带有BlockId信息的GetLocations消息实现:

org.apache.spark.storage.BlockManagerMaster#contains
  • /**
  • * Check if block manager master has a block. Note that this can be used to check for only
  • * those blocks that are reported to block manager master.
  • *
  • * 判断当前所有BlockManager中是否管理着指定的数据块
  • */
  • def contains(blockId: BlockId): Boolean = {
  • // 使用BlockManagerMasterEndpoint的RpcEndpointRef进行通信,发送GetLocationsMultipleBlockIds消息
  • !getLocations(blockId).isEmpty
  • }

可见,该方法内部使用了getLocations(...)方法,该方法用于获取指定数据块的位置信息,返回的是该数据块所在BlockManager的标识序列;该方法后面会介绍。

2.9. 判断Executor是否缓存了数据块

hasCachedBlocks(...)方法用于判断指定的Executor是否缓存了数据块,通过向BlockManagerMasterEndpoint发送携带了Executor ID信息的HasCachedBlocks消息实现:

org.apache.spark.storage.BlockManagerMaster#hasCachedBlocks
  • /**
  • * Find out if the executor has cached blocks. This method does not consider broadcast blocks,
  • * since they are not reported the master.
  • *
  • * 判断指定的Executor是否缓存了数据块
  • */
  • def hasCachedBlocks(executorId: String): Boolean = {
  • // 向BlockManagerMasterEndpoint发送HasCachedBlocks消息以获取
  • driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
  • }

2.10. 获取指定数据块的状态

getBlockStatus(...)用于获取指定数据块的状态,通过向BlockManagerMasterEndpoint发送携带了BlockId信息的GetBlockStatus消息实现:

org.apache.spark.storage.BlockManagerMaster#getBlockStatus
  • /**
  • * Return the block's status on all block managers, if any. NOTE: This is a
  • * potentially expensive operation and should only be used for testing.
  • *
  • * If askSlaves is true, this invokes the master to query each block manager for the most
  • * updated block statuses. This is useful when the master is not informed of the given block
  • * by all block managers.
  • *
  • * 获取指定数据块的状态
  • */
  • def getBlockStatus(
  • blockId: BlockId,
  • askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
  • // 创建GetBlockStatus消息
  • val msg = GetBlockStatus(blockId, askSlaves)
  • /*
  • * To avoid potential deadlocks, the use of Futures is necessary, because the master endpoint
  • * should not block on waiting for a block manager, which can in turn be waiting for the
  • * master endpoint for a response to a prior message.
  • *
  • * BlockMasterEndpoint不能因BlockManager的处理过程而阻塞
  • *
  • * 向BlockManagerMasterEndpoint发送GetBlockStatus消息以获取
  • */
  • val response = driverEndpoint.
  • askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
  • val (blockManagerIds, futures) = response.unzip
  • implicit val sameThread = ThreadUtils.sameThread
  • val cbf =
  • implicitly[
  • CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
  • Option[BlockStatus],
  • Iterable[Option[BlockStatus]]]]
  • val blockStatus = timeout.awaitResult(
  • Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
  • if (blockStatus == null) {
  • throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
  • }
  • blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
  • status.map { s => (blockManagerId, s) }
  • }.toMap
  • }

2.11. 获取Executor的RpcEndpointRef

getExecutorEndpointRef(...)方法用于获取指定Executor的BlockManagerSlaveEndpoint的RpcEndpointRef,通过向BlockManagerMasterEndpoint发送携带了Executor ID信息的GetExecutorEndpointRef消息实现:

org.apache.spark.storage.BlockManagerMaster#getExecutorEndpointRef
  • /**
  • * 获取Executor ID指定的Executor的RpcEndpointRef
  • * @param executorId
  • * @return
  • */
  • def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
  • // 使用BlockManagerMasterEndpoint的RpcEndpointRef发送发送GetExecutorEndpointRef消息
  • driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
  • }

2.12. 获取指定数据块的位置信息

getLocations(...)方法用于获取指定数据块的位置信息,通过向BlockManagerMasterEndpoint发送携带了BlockId信息的GetLocations消息实现:

org.apache.spark.storage.BlockManagerMaster#getLocations
  • /** Get locations of the blockId from the driver
  • * 获取指定数据块的位置信息
  • * @param blockId 指定的数据块的BlockId
  • *
  • * @return 返回数据块所在的BlockManager的BlockManagerId的集合
  • **/
  • def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
  • // 使用BlockManagerMasterEndpoint的RpcEndpointRef进行通信,发送GetLocations消息
  • driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
  • }

getLocations(...)方法还有一个重载版本,可以批量获取获取多个指定数据块的位置信息,通过向BlockManagerMasterEndpoint发送携带了BlockId数组信息的GetLocationsMultipleBlockIds消息实现;它的返回值是一个类型为IndexedSeq[Seq[BlockManagerId]]的索引序列:

org.apache.spark.storage.BlockManagerMaster#getLocations
  • /** Get locations of multiple blockIds from the driver
  • * 获取多个指定数据块的位置信息
  • *
  • * @param blockIds 指定的数据块的BlockId集合
  • *
  • * @return 返回数据块所在的BlockManager的BlockManagerId集合的索引序列
  • * 即索引对应于传入的BlockId的索引,索引位上为该数据块所在的BlockManager的ID标识
  • **/
  • def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
  • // 使用BlockManagerMasterEndpoint的RpcEndpointRef进行通信,发送GetLocationsMultipleBlockIds消息
  • driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
  • GetLocationsMultipleBlockIds(blockIds))
  • }

2.13. 获取匹配的数据块

getMatchingBlockIds(...)方法用于获取匹配的数据块,通过向BlockManagerMasterEndpoint发送携带了过滤器函数的GetMatchingBlockIds消息实现:

org.apache.spark.storage.BlockManagerMaster#getMatchingBlockIds
  • /**
  • * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
  • * is a potentially expensive operation and should only be used for testing.
  • *
  • * If askSlaves is true, this invokes the master to query each block manager for the most
  • * updated block statuses. This is useful when the master is not informed of the given block
  • * by all block managers.
  • *
  • * 获取匹配的数据块;
  • * 第一个参数是一个过滤器,用于过滤BlockId
  • */
  • def getMatchingBlockIds(
  • filter: BlockId => Boolean,
  • askSlaves: Boolean): Seq[BlockId] = {
  • // 构造GetMatchingBlockIds消息
  • val msg = GetMatchingBlockIds(filter, askSlaves)
  • // 向BlockManagerMasterEndpoint发送GetMatchingBlockIds消息以获取
  • val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
  • // 阻塞等待
  • timeout.awaitResult(future)
  • }

2.14. 获取所有BlockManager的内存状态

getMemoryStatus(...)方法用于获取所有BlockManager的内存状态,通过向BlockManagerMasterEndpoint发送GetMemoryStatus消息实现:

org.apache.spark.storage.BlockManagerMaster#getMemoryStatus
  • /**
  • * Return the memory status for each block manager, in the form of a map from
  • * the block manager's id to two long values. The first value is the maximum
  • * amount of memory allocated for the block manager, while the second is the
  • * amount of remaining memory.
  • *
  • * 获取所有BlockManager的内存状态
  • */
  • def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
  • // 向BlockManagerMasterEndpoint发送GetMemoryStatus消息以获取
  • driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
  • }

该方法会返回一个Map,Map的键是BlockManager的BlockManagerId标识,值是一个二元Tuple,分别表示该BlockManager的内存总大小和剩余可用内存大小。

2.15. 获取所有BlockManager的存储状态

getStorageStatus(...)方法用于获取所有BlockManager的存储状态,通过向BlockManagerMasterEndpoint发送GetStorageStatus消息实现:

org.apache.spark.storage.BlockManagerMaster#getStorageStatus
  • // 获取所有BlockManager的存储状态
  • def getStorageStatus: Array[StorageStatus] = {
  • // 向BlockManagerMasterEndpoint发送GetStorageStatus消息以获取
  • driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
  • }

该方法的返回值是StorageStatus数组,这里没有使用Map是因为StorageStatus对象中已经记录了BlockManagerId标识;同时StorageStatus还记录BlockManager的最大存储大小以及所有数据块的BlockStatus状态。

2.16. 获取BlockManager的标识集合

getPeers(...)方法用于获取除当前BlockManager以外的其它的BlockManager的BlockManagerId集合,通过向BlockManagerMasterEndpoint发送携带了当前BlockManager的BlockManagerId标识的GetPeers消息实现:

org.apache.spark.storage.BlockManagerMaster#getPeers
  • /** Get ids of other nodes in the cluster from the driver
  • * 获取其它的BlockManager的BlockManagerId集合
  • **/
  • def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
  • // 发送GetPeers消息
  • driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
  • }

2.17. 停止BlockManager

stop(...)方法用于停止Driver上的BlockManagerMasterEndpoint,通过向BlockManagerMasterEndpoint发送StopBlockManagerMaster消息实现:

org.apache.spark.storage.BlockManagerMaster#stop
  • /** Stop the driver endpoint, called only on the Spark driver node
  • *
  • * 停止Driver上的BlockManagerMasterEndpoint
  • **/
  • def stop() {
  • // 只有在BlockManagerMasterEndpoint的RpcEndpointRef不为空,且是Driver的情况下才能停止,也就是说,只有Driver自己才能停止位于自己节点上的BlockManagerMasterEndpoint
  • if (driverEndpoint != null && isDriver) {
  • // 需要告知BlockManagerMaster停止的消息StopBlockManagerMaster
  • tell(StopBlockManagerMaster)
  • // 将BlockManagerMasterEndpointRef置空
  • driverEndpoint = null
  • logInfo("BlockManagerMaster stopped")
  • }
  • }

关于所有消息的发送实现都讲解完毕了,这里回顾一下Spark通信层中消息的发送流程。在BlockManagerMaster使用BlockManagerMasterEndpoint的RpcEndpointRef发送消息之后,会调用NettyRpcEnv的同名方法来处理;而NettyRpcEnv会根据消息的目的地来决定是投放到Inbox收件箱还是投放到Outbox发件箱;Outbox在处理消息时,会使用TransportClient将消息发送给远程的RpcEnv服务端;如果发送的是需要回复的RPC请求消息,TransportClient会构造回调对象并记录在TransportResponseHandler中,在收到服务端回复时,TransportClient会取出当时记录的回调对象处理响应;在上面BlockManagerMaster发送消息的操作中,涉及到需要回复的请求消息,几乎都使用了RpcEndpointRef的askWithRetry(...)方法,该方法是阻塞的,内部会处理响应数据,并直接返回给上层调用方法,因此BlockManagerMaster中需要结果的方法都没有自己处理回调,而是直接交给了askWithRetry(...)方法。

BlockManagerMaster的消息发送,几乎都是Executor向Driver的发送,也就是说,在存储体系中BlockManagerMasterEndpoint只有一个且位于Driver上,这与我们前面讲解的是一致的。这里需要注意,Driver和Executor都存在RpcEnv环境,都有自己的RpcHandler,且都有自己的RpcEndpoint(在后面我们就知道,都是BlockManagerSlaveRpcEndpoint)。

同时从发送的消息可以看出,BlockManagerMaster通常是向BlockManagerMasterEndpoint获取信息或者将自身的一些操作报告给BlockManagerMasterEndpoint,以便BlockManagerMasterEndpoint更新它所维护的信息。

在后面的内容中,我们也会详细介绍BlockManagerMasterEndpoint的实现,届时读者朋友们就可以看到消息是如何被处理并响应了。

3. BlockManagerMasterEndpoint

从上面BlockManagerMaster的分析可知,只有Driver上存在BlockManagerMasterEndpoint实例,且它会注册到Driver上的rpcEnv中(如有遗忘可回顾本文第2节的内容);BlockManagerMasterEndpoint位于Spark Core模块的org.apache.spark.storage包下,它的定义和重要字段如下:

org.apache.spark.storage.BlockManagerMasterEndpoint
  • /**
  • * BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
  • * of all slaves' block managers.
  • *
  • * 由Driver上的SparkEnv负责创建和注册到Driver的RpcEnv中。
  • * BlockManagerMasterEndpoint只存在于Driver的SparkEnv中,
  • * Driver或Executor上的BlockManagerMaster的driverEndpoint属性
  • * 将持有BlockManagerMasterEndpoint的RpcEndpointRef。
  • *
  • * 主要对各个节点上的BlockManager、BlockManager与Executor的映射关系
  • * 及Block位置信息(即Block所在的BlockManager)等进行管理。
  • *
  • * BlockManagerMasterEndpoint接收Driver或Executor上BlockManagerMaster发送的消息,
  • * 对所有的BlockManager统一管理。
  • */
  • private[spark]
  • class BlockManagerMasterEndpoint(
  • override val rpcEnv: RpcEnv,
  • val isLocal: Boolean,
  • conf: SparkConf,
  • listenerBus: LiveListenerBus)
  • extends ThreadSafeRpcEndpoint with Logging {
  • // Mapping from block manager id to the block manager's information.
  • // BlockManagerId与BlockManagerInfo之间映射关系的缓存。
  • private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  • // Mapping from executor ID to block manager ID.
  • // Executor ID与BlockManagerId之间映射关系的缓存。
  • private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
  • // Mapping from block id to the set of block managers that have the block.
  • // BlockId与存储了此BlockId对应Block的BlockManager的BlockManagerId之间的一对多关系缓存。
  • private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
  • // 执行询问操作会使用到的线程池
  • private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
  • // 转为了隐式对象
  • private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
  • // 拓扑映射处理类,用于处理集群所有节点的拓扑结构的映射关系
  • private val topologyMapper = {
  • /**
  • * 从spark.storage.replication.topologyMapper参数获取处理拓扑映射的类的类名,
  • * 默认为DefaultTopologyMapper类型的类名
  • */
  • val topologyMapperClassName = conf.get(
  • "spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName)
  • // 反射创建拓扑映射类的实例
  • val clazz = Utils.classForName(topologyMapperClassName)
  • val mapper =
  • clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
  • logInfo(s"Using $topologyMapperClassName for getting topology information")
  • // 返回创建好的实例
  • mapper
  • }
  • ...
  • }

BlockManagerMasterEndpoint维护了三个字典:blockManagerInfoblockManagerIdByExecutorblockLocations;它们的作用都是为了维护BlockManager的相关信息,这也恰好印证了BlockManagerMasterEndpoint的一大功能,即维护存储体系中所有BlockManager的信息。

有了对上面的理解,我们来关注它对接收的消息的处理。从BlockManagerMasterEndpoint的定义可知,它也是一个ThreadSafeRpcEndpoint类型的RPC端点,我们可以关注它的receive(...)receiveAndReply(...)方法,不过BlockManagerMasterEndpoint只实现了receiveAndReply(...)方法,receive(...)使用了继承自RpcEndpoint的默认实现,会抛出SparkException异常。

3.1. 对消息进行路由

BlockManagerMasterEndpoint的receiveAndReply(...)方法是处理所有RPC消息的主要方法,它的实现其实比较简单,即根据消息类型派发给特定的内部方法进行处理,然后将处理结果回复给发送者:

  • // 用于接收BlockManager相关的消息
  • override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  • case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
  • context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
  • case _updateBlockInfo @
  • UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
  • context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
  • listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
  • case GetLocations(blockId) =>
  • context.reply(getLocations(blockId))
  • case GetLocationsMultipleBlockIds(blockIds) =>
  • context.reply(getLocationsMultipleBlockIds(blockIds))
  • case GetPeers(blockManagerId) =>
  • context.reply(getPeers(blockManagerId))
  • case GetExecutorEndpointRef(executorId) =>
  • context.reply(getExecutorEndpointRef(executorId))
  • case GetMemoryStatus =>
  • context.reply(memoryStatus)
  • case GetStorageStatus =>
  • context.reply(storageStatus)
  • case GetBlockStatus(blockId, askSlaves) =>
  • context.reply(blockStatus(blockId, askSlaves))
  • case GetMatchingBlockIds(filter, askSlaves) =>
  • context.reply(getMatchingBlockIds(filter, askSlaves))
  • case RemoveRdd(rddId) =>
  • context.reply(removeRdd(rddId))
  • case RemoveShuffle(shuffleId) =>
  • context.reply(removeShuffle(shuffleId))
  • case RemoveBroadcast(broadcastId, removeFromDriver) =>
  • context.reply(removeBroadcast(broadcastId, removeFromDriver))
  • case RemoveBlock(blockId) =>
  • // 这里还需要删除其他Worker上的Block
  • removeBlockFromWorkers(blockId)
  • // 然后返回true
  • context.reply(true)
  • case RemoveExecutor(execId) =>
  • removeExecutor(execId)
  • context.reply(true)
  • case StopBlockManagerMaster =>
  • context.reply(true)
  • stop()
  • case BlockManagerHeartbeat(blockManagerId) =>
  • context.reply(heartbeatReceived(blockManagerId))
  • case HasCachedBlocks(executorId) =>
  • blockManagerIdByExecutor.get(executorId) match {
  • case Some(bm) =>
  • if (blockManagerInfo.contains(bm)) {
  • val bmInfo = blockManagerInfo(bm)
  • context.reply(bmInfo.cachedBlocks.nonEmpty)
  • } else {
  • context.reply(false)
  • }
  • case None => context.reply(false)
  • }
  • }

在接下来的内容中将分别介绍对各种消息的具体处理。

3.2. 处理RegisterBlockManager消息

RegisterBlockManager是BlockManagerMaster向BlockManagerMasterEndpoint注册自己所属的BlockManager时所发送的消息,BlockManagerMasterEndpoint会将其交给register(...)方法处理,该方法的源码如下:

org.apache.spark.storage.BlockManagerMasterEndpoint#register
  • /**
  • * Returns the BlockManagerId with topology information populated, if available.
  • * BlockManagerMasterEndpoint在接收到RegisterBlockManager消息后,
  • * 将调用该方法将指定的BlockManager的信息注册到自己所管理的信息中
  • */
  • private def register(
  • idWithoutTopologyInfo: BlockManagerId,
  • maxMemSize: Long,
  • slaveEndpoint: RpcEndpointRef): BlockManagerId = {
  • // the dummy id is not expected to contain the topology information.
  • // we get that info here and respond back with a more fleshed out block manager id
  • /**
  • * 根据BlockManager发送消息中的BlockManagerId的相关信息,构造新的BlockManagerId
  • * 主要是使用topologyMapper生产拓扑信息
  • */
  • val id = BlockManagerId(
  • idWithoutTopologyInfo.executorId,
  • idWithoutTopologyInfo.host,
  • idWithoutTopologyInfo.port,
  • topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
  • // 记录当前时间
  • val time = System.currentTimeMillis()
  • if (!blockManagerInfo.contains(id)) { // 当前并未管理该BlockManagerId的BlockManagerInfo对象
  • // 根据BlockManagerId中保存的Executor ID获取旧的BlockManagerId
  • blockManagerIdByExecutor.get(id.executorId) match {
  • case Some(oldId) => // 存在,则移除
  • // A block manager of the same executor already exists, so remove it (assumed dead)
  • logError("Got two different block manager registrations on same executor - "
  • + s" will replace old one $oldId with new one $id")
  • removeExecutor(id.executorId)
  • case None =>
  • }
  • logInfo("Registering block manager %s with %s RAM, %s".format(
  • id.hostPort, Utils.bytesToString(maxMemSize), id))
  • // 更新blockManagerIdByExecutor字典
  • blockManagerIdByExecutor(id.executorId) = id
  • /**
  • * 更新blockManagerInfo字典,添加新的BlockManagerInfo对象,
  • * 最终将触发对所有SparkListener的onBlockManagerAdded方法的调用,进而达到监控的目的。
  • */
  • blockManagerInfo(id) = new BlockManagerInfo(
  • id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
  • }
  • // 投递BlockManager被添加的事件到事件总线
  • listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
  • // 返回新的BlockManagerId
  • id
  • }

register(...)方法的主要工作是维护blockManagerIdByExecutorblockManagerInfo两个字典,注意,最终注册的BlockManager的BlockManagerSlaveEndpoint会保存到与之对应的BlockManagerInfo对象中。register(...)方法会向发送者返回新构造的BlockManagerId标识,其中包含了新添加的拓扑信息。

3.3. 处理RemoveExecutor消息

RemoveExecutor消息用于移除指定的Executor,该消息会被BlockManagerMasterEndpoint的removeExecutor(...)方法进行处理,并直接返回结果true:

org.apache.spark.storage.BlockManagerMasterEndpoint#removeExecutor
  • // 移除指定的Executor
  • private def removeExecutor(execId: String) {
  • logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
  • // 从blockManagerIdByExecutor中获取对应的BlockManagerId,然后交给removeBlockManager()方法
  • blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
  • }

removeExecutor(...)方法会获取Executor对应的BlockManagerId,然后交给removeBlockManager(...)方法:

org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager
  • // 移除指定的BlockManagerId
  • private def removeBlockManager(blockManagerId: BlockManagerId) {
  • // 获取对应的BlockManagerInfo
  • val info = blockManagerInfo(blockManagerId)
  • // Remove the block manager from blockManagerIdByExecutor.
  • // 移除记录的BlockManagerId
  • blockManagerIdByExecutor -= blockManagerId.executorId
  • // Remove it from blockManagerInfo and remove all the blocks.
  • // 移除记录的BlockManagerInfo
  • blockManagerInfo.remove(blockManagerId)
  • // 从BlockManagerInfo中获取该Executor管理的数据块的迭代器
  • val iterator = info.blocks.keySet.iterator
  • // 迭代数据块
  • while (iterator.hasNext) {
  • val blockId = iterator.next
  • // 获取数据块的位置序列
  • val locations = blockLocations.get(blockId)
  • // 因为Executor被移除了,因此数据块中关于该BlockManager的位置信息也应该被移除
  • locations -= blockManagerId
  • // 如果数据块的位置信息为空,说明该数据块被移除了
  • if (locations.size == 0) {
  • // 从blockLocations中移除该数据块的位置记录
  • blockLocations.remove(blockId)
  • }
  • }
  • listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
  • logInfo(s"Removing block manager $blockManagerId")
  • }

removeBlockManager(...)方法方法不光会移除BlockManager对应的BlockManagerInfo,还会维护位置信息包含该BlockManager的数据块的信息,将其从记录的数据块的位置信息集合中移除。

3.4. 处理RemoveBlock消息

RemoveBlock消息用于移除指定的数据块,它携带了数据块的BlockId;在BlockManagerMasterEndpoint中会解析该消息的BlockId,并传递给removeBlockFromWorkers(...)方法处理,并直接返回true:

  • // Remove a block from the slaves that have it. This can only be used to remove
  • // blocks that the master knows about.
  • private def removeBlockFromWorkers(blockId: BlockId) {
  • // 获取Block的位置
  • val locations = blockLocations.get(blockId)
  • if (locations != null) {
  • // 遍历所有位置
  • locations.foreach { blockManagerId: BlockManagerId =>
  • // 获取对应的BlockManagerInfo对象
  • val blockManager = blockManagerInfo.get(blockManagerId)
  • if (blockManager.isDefined) {
  • // Remove the block from the slave's BlockManager.
  • // Doesn't actually wait for a confirmation and the message might get lost.
  • // If message loss becomes frequent, we should add retry logic here.
  • // 获取其中的BlockManagerSlaveEndpoint发送RemoveBlock消息
  • blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
  • }
  • }
  • }
  • }

需要注意的是,该方法仅仅是维护了BlockManagerMasterEndpoint内的blockLocationsblockManagerInfo等字典,正式的数据块删除操作得交给该数据块所在的BlockManager,BlockManagerMasterEndpoint通过向这些BlockManager的BlockManagerSlaveEndpoint发送RemoveBlock消息以通知BlockManager移除对应的数据块。关于BlockManagerSlaveEndpoint对这些消息的处理将在后面讲解。

3.5. 处理RemoveRdd消息

RemoveRdd消息用于移除指定RDD的所有数据块,它携带了RDD的ID;在BlockManagerMasterEndpoint中会解析该消息的RDD ID,并传递给removeRdd(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#removeRdd
  • // 移除指定RDD的所有数据块
  • private def removeRdd(rddId: Int): Future[Seq[Int]] = {
  • // First remove the metadata for the given RDD, and then asynchronously remove the blocks
  • // from the slaves.
  • // Find all blocks for the given RDD, remove the block from both blockLocations and
  • // the blockManagerInfo that is tracking the blocks.
  • /**
  • * 从blockLocations中过滤出所有属于该RDD的数据块的BlockId;该过滤过程基于以下两点:
  • * 1. 数据块BlockId的类型需要是RDDBlockId;
  • * 2. 该RDDBlockId的rddId要与传入的rddId参数相同。
  • */
  • val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
  • // 遍历这些BlockId
  • blocks.foreach { blockId =>
  • // 获取数据块所在的BlockManager的标识
  • val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
  • // 获取对应的BlockManagerInfo,使用BlockManagerInfo的removeBlock()方法处理
  • bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
  • // 移除该数据块的位置信息
  • blockLocations.remove(blockId)
  • }
  • // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
  • // The dispatcher is used as an implicit argument into the Future sequence construction.
  • // 构造RemoveRdd消息
  • val removeMsg = RemoveRdd(rddId)
  • Future.sequence(
  • // 向所有BlockManager广播RemoveRdd消息,通知它们
  • blockManagerInfo.values.map { bm =>
  • bm.slaveEndpoint.ask[Int](removeMsg)
  • }.toSeq
  • )
  • }

removeRdd(...)方法的过程比较多,它会找出所属于指定RDD的数据块,然后通过数据块所在的BlockManager的BlockManagerInfo对象的removeBlock(...)方法来进行移除操作,这个方法在上一篇文章的6.2节有讲解,读者们可以自行回顾。

removeRdd(...)方法最后还会构造一个包含了RDD ID的RemoveRdd消息,发送给所有的BlockManager以通知它们发生了移除指定RDD的所有数据块这个事件。关于BlockManagerSlaveEndpoint如何处理RemoveRdd消息将在后面讲解。

3.6. 处理RemoveBroadcast消息

RemoveBroadcast消息用于移除指定Broadcast的所有数据块,它携带了Broadcast的ID;在BlockManagerMasterEndpoint中会解析该消息的Broadcast ID,并传递给removeBroadcast(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#removeBroadcast
  • /**
  • * Delegate RemoveBroadcast messages to each BlockManager because the master may not notified
  • * of all broadcast blocks. If removeFromDriver is false, broadcast blocks are only removed
  • * from the executors, but not from the driver.
  • */
  • private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
  • // 构造RemoveBroadcast消息
  • val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
  • /**
  • * 根据removeFromDriver过滤出需要通知的BlockManager
  • * removeFromDriver参数决定了Driver上的相关数据块是否也要移除
  • */
  • val requiredBlockManagers = blockManagerInfo.values.filter { info =>
  • removeFromDriver || !info.blockManagerId.isDriver
  • }
  • Future.sequence(
  • // 向所有需要通知的BlockManager发送RemoveBroadcast消息
  • requiredBlockManagers.map { bm =>
  • bm.slaveEndpoint.ask[Int](removeMsg)
  • }.toSeq
  • )
  • }

处理流程比较简单,会构造RemoveBroadcast消息发送给所有BlockManager将删除操作委托给BlockManager分别处理;注意这里的removeFromDriver参数,它决定了是否将Driver上的相关数据块也一并删除。

3.7. 处理RemoveShuffle消息

RemoveShuffle消息用于移除指定Shuffle的所有数据块,它携带了Shuffle的ID;在BlockManagerMasterEndpoint中会解析该消息的Shuffle ID,并传递给removeShuffle(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#removeShuffle
  • private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
  • // Nothing to do in the BlockManagerMasterEndpoint data structures
  • // 构造RemoveShuffle消息
  • val removeMsg = RemoveShuffle(shuffleId)
  • // 委托给所有BlockManager处理
  • Future.sequence(
  • blockManagerInfo.values.map { bm =>
  • bm.slaveEndpoint.ask[Boolean](removeMsg)
  • }.toSeq
  • )
  • }

removeShuffle(...)方法其实将操作委托给了所有的BlockManager处理。

3.8. 处理UpdateBlockInfo消息

UpdateBlockInfo消息用于更新指定BlockManager管理的指定数据块的信息,它携带了BlockManagerId、BlockId、新的数据块配置(包括存储级别、内存大小、磁盘大小)等信息,BlockManagerMasterEndpoint中会解析得到这些信息,并传递给updateBlockInfo(...)方法处理:

  • // 更新指定的数据块
  • private def updateBlockInfo(
  • blockManagerId: BlockManagerId,
  • blockId: BlockId,
  • storageLevel: StorageLevel,
  • memSize: Long,
  • diskSize: Long): Boolean = {
  • // 判断是否存在对应的BlockManager
  • if (!blockManagerInfo.contains(blockManagerId)) {
  • if (blockManagerId.isDriver && !isLocal) {
  • // We intentionally do not register the master (except in local mode),
  • // so we should not indicate failure.
  • return true
  • } else {
  • return false
  • }
  • }
  • // 检查参数
  • if (blockId == null) {
  • // 如果指定的数据块的BlockId为空,仅仅更新BlockManagerInfo中记录的最后更新时间
  • blockManagerInfo(blockManagerId).updateLastSeenMs()
  • return true
  • }
  • // 使用BlockManager对应的BlockManagerInfo的updateBlockInfo(...)方法进行处理
  • blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
  • // 维护数据块的位置信息
  • var locations: mutable.HashSet[BlockManagerId] = null
  • if (blockLocations.containsKey(blockId)) {
  • locations = blockLocations.get(blockId)
  • } else {
  • locations = new mutable.HashSet[BlockManagerId]
  • blockLocations.put(blockId, locations)
  • }
  • // 存储级别有效,需要将BlockManagerId添加到数据块的位置信息中
  • if (storageLevel.isValid) {
  • locations.add(blockManagerId)
  • } else {
  • locations.remove(blockManagerId)
  • }
  • // Remove the block from master tracking if it has been removed on all slaves.
  • // 如果数据块的位置信息为空,则将其从blockLocations中移除
  • if (locations.size == 0) {
  • blockLocations.remove(blockId)
  • }
  • true
  • }

updateBlockInfo(...)方法会将具体的更新操作交给BlockManager对应的BlockManagerInfo的updateBlockInfo(...)方法处理,自己只是做一些检查、信息维护等工作。关于BlockManagerInfo的updateBlockInfo(...)方法读者可以回顾上一篇文章的6.1节。

需要注意的是,在经过updateBlockInfo(...)方法完后,BlockManagerMasterEndpoint还会向事件总线投递SparkListenerBlockUpdated事件,该事件中保存了UpdateBlockInfo消息。

3.9. 处理HasCachedBlocks消息

HasCachedBlocks消息用于判断指定的Executor是否缓存了数据块,它会携带指定的Executor ID;在BlockManagerMasterEndpoint中会解析该消息的Executor ID,直接通过receiveAndReply(...)方法的代码片段进行处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#receiveAndReply
  • // 从blockManagerIdByExecutor获取对应的BlockManagerId
  • blockManagerIdByExecutor.get(executorId) match {
  • case Some(bm) =>
  • // 获取对应的BlockManagerInfo
  • if (blockManagerInfo.contains(bm)) {
  • val bmInfo = blockManagerInfo(bm)
  • // 使用BlockManagerInfo来进行判断
  • context.reply(bmInfo.cachedBlocks.nonEmpty)
  • } else {
  • context.reply(false)
  • }
  • case None => context.reply(false)
  • }

可见,最终还是通过对应的BlockManagerInfo进行判断。

3.10. 处理GetBlockStatus消息

GetBlockStatus消息用于获取指定数据块的状态信息,它会携带指定的BlockId;在BlockManagerMasterEndpoint中会解析该消息的BlockId,并传递给blockStatus(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#blockStatus
  • /**
  • * Return the block's status for all block managers, if any. NOTE: This is a
  • * potentially expensive operation and should only be used for testing.
  • *
  • * If askSlaves is true, the master queries each block manager for the most updated block
  • * statuses. This is useful when the master is not informed of the given block by all block
  • * managers.
  • */
  • private def blockStatus(
  • blockId: BlockId,
  • askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
  • // 构造GetBlockStatus消息
  • val getBlockStatus = GetBlockStatus(blockId)
  • /*
  • * Rather than blocking on the block status query, master endpoint should simply return
  • * Futures to avoid potential deadlocks. This can arise if there exists a block manager
  • * that is also waiting for this master endpoint's response to a previous message.
  • *
  • * 委托给BlockManager进行处理
  • */
  • blockManagerInfo.values.map { info =>
  • val blockStatusFuture =
  • // 如果需要问询Slave角色的BlockManager
  • if (askSlaves) {
  • // 就向它们发送消息
  • info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
  • } else {
  • // 否则直接从BlockManagerInfo中获取缓存的信息
  • Future { info.getStatus(blockId) }
  • }
  • (info.blockManagerId, blockStatusFuture)
  • }.toMap
  • }

blockStatus(...)方法会根据askSlaves参数决定是直接从对应的BlockManager中获取缓存的信息,还是向BlockManager发送GetBlockStatus消息获取最新的状态。

3.11. 处理GetExecutorEndpointRef消息

GetExecutorEndpointRef消息用于获取指定Executor的BlockManagerSlaveEndpoint的RpcEndpointRef,它携带了指定的Executor ID;在BlockManagerMasterEndpoint中会解析该消息的Executor ID,并传递给getExecutorEndpointRef(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#getExecutorEndpointRef
  • /**
  • * Returns an [[RpcEndpointRef]] of the [[BlockManagerSlaveEndpoint]] for sending RPC messages.
  • */
  • private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
  • // 从blockManagerIdByExecutor和blockManagerInfo中取得对应的BlockManagerInfo
  • for (
  • blockManagerId <- blockManagerIdByExecutor.get(executorId);
  • info <- blockManagerInfo.get(blockManagerId)
  • ) yield {
  • // 直接从BlockManagerInfo中获取对应的RpcEndpointRef
  • info.slaveEndpoint
  • }
  • }

getExecutorEndpointRef(...)方法比较简单,即通过blockManagerIdByExecutorblockManagerInfo中取得对应的BlockManagerInfo,直接从BlockManagerInfo中获取对应的RpcEndpointRef并返回。

3.12. 处理GetLocations消息

GetLocations消息用于获取指定数据块的位置信息,它携带了指定的BlockId;在BlockManagerMasterEndpoint中会解析该消息的BlockId,并传递给getLocations(...)方法处理:

org.apache.spark.storage.BlockManagerMasterEndpoint#getLocations
  • private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
  • // 从blockLocations中获取
  • if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
  • }

3.13. 处理GetLocationsMultipleBlockIds消息

GetLocationsMultipleBlockIds消息用于批量获取获取多个指定数据块的位置信息,它携带了BlockId数组;在BlockManagerMasterEndpoint中会解析该消息的BlockId数组,并传递给getLocationsMultipleBlockIds(...)方法处理:

  • private def getLocationsMultipleBlockIds(
  • blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
  • // 遍历BlockId数组,交给getLocations()方法获取
  • blockIds.map(blockId => getLocations(blockId))
  • }

该方法遍历BlockId数组,然后通过上一节讲解的getLocations(...)方法获取。

3.14. 处理GetMatchingBlockIds消息

GetMatchingBlockIds用于获取匹配的数据块,它携带了一个过滤器函数;在BlockManagerMasterEndpoint中会解析该消息的过滤器函数,并传递给getMatchingBlockIds(...)方法处理:

  • /**
  • * Return the ids of blocks present in all the block managers that match the given filter.
  • * NOTE: This is a potentially expensive operation and should only be used for testing.
  • *
  • * If askSlaves is true, the master queries each block manager for the most updated block
  • * statuses. This is useful when the master is not informed of the given block by all block
  • * managers.
  • */
  • private def getMatchingBlockIds(
  • filter: BlockId => Boolean,
  • askSlaves: Boolean): Future[Seq[BlockId]] = {
  • // 构造GetMatchingBlockIds消息
  • val getMatchingBlockIds = GetMatchingBlockIds(filter)
  • // 这里用到了隐式参数askExecutionContext
  • Future.sequence(
  • blockManagerInfo.values.map { info =>
  • val future =
  • // 判断是否需要从Slave BlockMaster获取最新的
  • if (askSlaves) {
  • // 需要,向它们发送GetMatchingBlockIds消息获取
  • info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
  • } else {
  • // 不需要,直接从本地缓存获取
  • Future { info.blocks.asScala.keys.filter(filter).toSeq }
  • }
  • future
  • }
  • ).map(_.flatten.toSeq)
  • }

实现方式与前面的blockStatus(...)方法类似,可能需要通过向BlockManager发送GetMatchingBlockIds消息来获取。需要注意的是,如果是从本地缓存取,会直接使用过滤器过滤,但如果是委托给BlockManager处理,那么过滤会在对应的BlockManager上进行。

3.15. 处理GetMemoryStatus消息

GetMemoryStatus消息用于获取所有BlockManager的内存状态;在BlockManagerMasterEndpoint中会传递给memoryStatus()方法处理,实现比较简单:

org.apache.spark.storage.BlockManagerMasterEndpoint#memoryStatus
  • // Return a map from the block manager id to max memory and remaining memory.
  • private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
  • // 从缓存中通过BlockManagerInfo来获取
  • blockManagerInfo.map { case(blockManagerId, info) =>
  • (blockManagerId, (info.maxMem, info.remainingMem))
  • }.toMap
  • }

3.16. 处理GetStorageStatus消息

GetStorageStatus消息用于获取所有BlockManager的存储状态;在BlockManagerMasterEndpoint中会传递给storageStatus()方法处理,实现比较简单:

org.apache.spark.storage.BlockManagerMasterEndpoint#storageStatus
  • private def storageStatus: Array[StorageStatus] = {
  • // 从缓存中通过BlockManagerInfo来获取
  • blockManagerInfo.map { case (blockManagerId, info) =>
  • new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
  • }.toArray
  • }

3.17. 处理GetPeers消息

GetPeers消息中携带了一个BlockManagerId,它用于获取除该BlockManagerId以外的其它的BlockManager的BlockManagerId集合;在BlockManagerMasterEndpoint中会解析该消息的BlockManagerId,并传递给getPeers(...)方法处理,实现比较简单:

org.apache.spark.storage.BlockManagerMasterEndpoint#getPeers
  • /** Get the list of the peers of the given block manager */
  • private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
  • // 从blockManagerInfo字典获取所有的BlockManagerId
  • val blockManagerIds = blockManagerInfo.keySet
  • // 将传入的BlockManagerId过滤掉
  • if (blockManagerIds.contains(blockManagerId)) {
  • blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
  • } else {
  • Seq.empty
  • }
  • }

3.18. 处理StopBlockManagerMaster消息

StopBlockManagerMaster消息用于停止Driver上的BlockManagerMasterEndpoint,这个消息只能是Driver自己发送给自己的;在BlockManagerMasterEndpoint中会先回复true,然后由stop()方法处理,而stop()方法是继承于RpcEndpoint的,它会调用RpcEnv的stop(...)方法:

org.apache.spark.rpc.RpcEndpoint#stop
  • /**
  • * A convenient method to stop [[RpcEndpoint]].
  • *
  • * 用于停止当前RpcEndpoint。
  • */
  • final def stop(): Unit = {
  • val _self = self
  • if (_self != null) {
  • // 实际调用了RpcEnv的stop方法
  • rpcEnv.stop(_self)
  • }
  • }

NettyRpcEnv的stop(...)方法会停止其内部的Dispatcher,取消BlockManagerMasterEndpoint的注册,移除对应的EndpointData;关于这些内容请回顾前面讲解Spark通信架构的相关内容。

至此,关于BlockManagerMasterEndpoint对消息的处理实现已全部讲解完毕。

4. BlockManagerSlaveEndpoint

BlockManagerSlaveEndpoint作为一个RPC端点,主要用于接收来自BlockManagerMasterEndpoint下发的指令;在前面讲解BlockManagerMasterEndpoint时相信大家也看到了,有些发给BlockManagerMasterEndpoint的消息所指定的操作,它会通过转发消息的方式委托给BlockManagerSlaveEndpoint。

每个Driver和Executor上都有属于自己的BlockManagerSlaveEndpoint,且它们的BlockManager还持有各自BlockManagerSlaveEndpoint对应的BlockManagerSlaveEndpointRef引用,下面的代码片段是位于BlockManager类中的:

org.apache.spark.storage.BlockManager
  • // BlockManagerSlaveEndpoint
  • private val slaveEndpoint = rpcEnv.setupEndpoint(
  • "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
  • new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

BlockManager在初始化时,就会创建BlockManagerSlaveEndpoint并将其注册到对应的Driver或Executor的RpcEnv中,注册的名称格式由"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next生成,其中BlockManager.ID_GENERATOR.next的原理是由全局AtomicInteger对象获取自增的Integer序号。

BlockManagerSlaveEndpoint位于Spark Core模块的org.apache.spark.storage包下,它的定义如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint
  • /**
  • * An RpcEndpoint to take commands from the master to execute options. For example,
  • * this is used to remove blocks from the slave's BlockManager.
  • *
  • * 每个Executor或Driver的SparkEnv中都有属于自己的BlockManagerSlaveEndpoint,
  • * 分别由各自的SparkEnv负责创建和注册到各自的RpcEnv中。
  • * Driver或Executor都存在各自的BlockManagerSlaveEndpoint,
  • * 并由各自BlockManager的slaveEndpoint属性持有各自BlockManagerSlaveEndpoint的RpcEndpointRef。
  • * BlockManagerSlaveEndpoint将接收BlockManagerMasterEndpoint下发的命令。
  • * 例如,删除Block、获取Block状态、获取匹配的BlockId等。
  • *
  • * BlockManagerSlaveEndpoint用于接收BlockManagerMasterEndpoint的命令并执行相应的操作。
  • *
  • * @param rpcEnv 所属的RpcEnv
  • * @param blockManager 所属对应的BlockManager
  • * @param mapOutputTracker MapOutputTracker实例,用于Map任务输出跟踪,后面会讲解
  • */
  • private[storage]
  • class BlockManagerSlaveEndpoint(
  • override val rpcEnv: RpcEnv,
  • blockManager: BlockManager,
  • mapOutputTracker: MapOutputTracker)
  • extends ThreadSafeRpcEndpoint with Logging {
  • ...
  • }

从它的定义可知,它也是继承自ThreadSafeRpcEndpoint的RPC端点;BlockManagerSlaveEndpoint也只实现了receiveAndReply(...)这一个用于处理消息的方法,它主要用于处理来自BlockManagerMasterEndpoint的指令消息。

4.1. 对消息进行路由

BlockManagerSlaveEndpoint的receiveAndReply(...)方法与BlockManagerMasterEndpoint的一样,主要根据消息类型派发给所属的BlockManager或特定的内部方法进行处理,然后将处理结果回复给BlockManagerMasterEndpoint,源码如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • // Operations that involve removing blocks may be slow and should be done asynchronously
  • // 用于接收BlockManagerMasterEndpoint的命令并执行相应的操作
  • override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  • case RemoveBlock(blockId) =>
  • // 异步执行
  • doAsync[Boolean]("removing block " + blockId, context) {
  • blockManager.removeBlock(blockId)
  • true
  • }
  • case RemoveRdd(rddId) =>
  • doAsync[Int]("removing RDD " + rddId, context) {
  • blockManager.removeRdd(rddId)
  • }
  • case RemoveShuffle(shuffleId) =>
  • doAsync[Boolean]("removing shuffle " + shuffleId, context) {
  • if (mapOutputTracker != null) {
  • mapOutputTracker.unregisterShuffle(shuffleId)
  • }
  • SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
  • }
  • case RemoveBroadcast(broadcastId, _) =>
  • doAsync[Int]("removing broadcast " + broadcastId, context) {
  • blockManager.removeBroadcast(broadcastId, tellMaster = true)
  • }
  • case GetBlockStatus(blockId, _) =>
  • context.reply(blockManager.getStatus(blockId))
  • case GetMatchingBlockIds(filter, _) =>
  • context.reply(blockManager.getMatchingBlockIds(filter))
  • case TriggerThreadDump =>
  • context.reply(Utils.getThreadDump())
  • }

在接下来的内容中将分别介绍对各种消息的具体处理。

4.2. 异步执行方法

在讲解消息处理之前,我们先了解一下BlockManagerSlaveEndpoint定义了用于异步处理的方法doAsync(...);由于一些操作比较耗时,为了不阻塞BlockManagerSlaveEndpoint的处理,将会通过该方法进行异步处理,源码如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#doAsync
  • // 用于异步处理消息
  • private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
  • // 创建Future
  • val future = Future {
  • logDebug(actionMessage)
  • // 执行具体操作
  • body
  • }
  • // 执行成功时调用
  • future.onSuccess { case response =>
  • logDebug("Done " + actionMessage + ", response is " + response)
  • // 回复Response
  • context.reply(response)
  • logDebug("Sent response: " + response + " to " + context.senderAddress)
  • }
  • // 执行失败时调用
  • future.onFailure { case t: Throwable =>
  • logError("Error in " + actionMessage, t)
  • // 回复失败出现的异常
  • context.sendFailure(t)
  • }
  • }

该方法是一个柯里化方法,分别接收了消息描述、回调上下文和具体的操作;在其内部会定义一个Future对象,然后在回调内部执行具体的操作,并叫响应交给Future对象特定的方法处理,由这些方法决定如何使用回调上下文回复响应给BlockManagerMasterEndpoint。

4.3. 处理RemoveBlock消息

由于BlockManager才是管理存储的主体,因此移除指定数据块会被BlockManagerMasterEndpoint委托给BlockManager;BlockManagerSlaveEndpoint中对RemoveBlock消息的处理正是这样实现的,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case RemoveBlock(blockId) =>
  • // 异步执行
  • doAsync[Boolean]("removing block " + blockId, context) {
  • blockManager.removeBlock(blockId)
  • true
  • }
  • ...

可见,最终的移除数据块的操作是由BlockManager的removeBlock(...)方法负责的,关于这部分涉及到存储的操作会在后面存储层中详细介绍。

4.4. 处理RemoveRdd消息

BlockManagerSlaveEndpoint中对RemoveRdd消息的处理也是这样实现的,其交给了BlockManager的具体方法,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case RemoveRdd(rddId) =>
  • doAsync[Int]("removing RDD " + rddId, context) {
  • blockManager.removeRdd(rddId)
  • }
  • ...

4.5. 处理RemoveShuffle消息

对于RemoveShuffle消息,BlockManagerSlaveEndpoint则委托给了MapOutputTracker和ShuffleManager的具体方法,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case RemoveShuffle(shuffleId) =>
  • doAsync[Boolean]("removing shuffle " + shuffleId, context) {
  • if (mapOutputTracker != null) {
  • mapOutputTracker.unregisterShuffle(shuffleId)
  • }
  • SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
  • }
  • ...

4.6. 处理RemoveBroadcast消息

BlockManagerSlaveEndpoint中对RemoveBroadcast消息的处理委托给了BlockManager的具体方法,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case RemoveBroadcast(broadcastId, _) =>
  • doAsync[Int]("removing broadcast " + broadcastId, context) {
  • blockManager.removeBroadcast(broadcastId, tellMaster = true)
  • }
  • ...

4.7. 处理GetBlockStatus消息

BlockManagerSlaveEndpoint中对GetBlockStatus消息的处理委托给了BlockManager的具体方法,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case GetBlockStatus(blockId, _) =>
  • context.reply(blockManager.getStatus(blockId))
  • ...

4.8. 处理GetMatchingBlockIds消息

BlockManagerSlaveEndpoint中对GetMatchingBlockIds消息的处理委托给了BlockManager的具体方法,代码片段如下:

org.apache.spark.storage.BlockManagerSlaveEndpoint#receiveAndReply
  • ...
  • case GetMatchingBlockIds(filter, _) =>
  • context.reply(blockManager.getMatchingBlockIds(filter))
  • ...

通过对BlockManagerMasterEndpoint和BlockManagerSlaveEndpoint的讲解,相信大家对于通信层数据的流动和处理方式能有一个很好的理解。