大数据
流式处理
源码解析
Spark

Spark源码分析10 - 存储体系03:通信层(2)数据传输

简介:对于Spark分布式计算的特性而言,Map任务和Reduce任务很有可能会发生Shuffle过程,此时Map任务会将产生的数据保存到存储体系中,然后由Reduce任务进行拉取,当遇到数据非本地化的情况则需要进行跨节点数据传输。BlockManager通过BlockTransferService向外提供数据拉取服务,同时在没有配置外部Shuffle客户端的情况下,BlockTransferService还会充当Shuffle客户端用于拉取数据。

1. BlockTransferService

我们知道,对于Spark分布式计算的特性而言,Map任务和Reduce任务很有可能会发生Shuffle过程,此时Map任务会将产生的数据保存到存储体系中,然后由Reduce任务进行拉取,当遇到数据非本地化的情况则需要进行跨节点数据传输。BlockManager通过BlockTransferService向外提供数据拉取服务,同时在没有配置外部Shuffle客户端的情况下,BlockTransferService还会充当Shuffle客户端用于拉取数据。

BlockTransferService定义于Spark Core模块的org.apache.spark.network包下,在SparkEnv中的create(...)方法中会创建它的实例,并将其交给BlockManager持有。我们回顾一下它的构建过程:

org.apache.spark.SparkEnv#create
  • // 块传输服务BlockTransferService对外提供的端口号
  • val blockManagerPort = if (isDriver) { // Driver
  • conf.get(DRIVER_BLOCK_MANAGER_PORT) // spark.driver.blockManager.port
  • } else { // Executor
  • conf.get(BLOCK_MANAGER_PORT) // spark.blockManager.port
  • }
  • // 创建块传输服务
  • val blockTransferService =
  • new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
  • blockManagerPort, numUsableCores)

可见,它的构造依赖于由spark.driver.blockManager.portspark.blockManager.port配置的端口;在Driver上取决于spark.driver.blockManager.port,在Executor上取决于spark.blockManager.port。需要注意的是,Driver上如果没有配置spark.driver.blockManager.port,则会采用spark.blockManager.port的值,如果spark.blockManager.port没有配置,会随机选择一个端口。

对于BlockTransferService来说,它只有两个实现:NettyBlockTransferService和MockBlockTransferService,而MockBlockTransferService是用于测试的,因此正式环境可用的只有NettyBlockTransferService。

NettyBlockTransferService实例最终会交给BlockManager管理,这从SparkEnv的create(...)方法中对BlockManager的构造过程就可以看出来;而在BlockManager的初始化过程中,会根据是否配置了外部ShuffleClient,决定使用何种ShuffleClient,源码片段如下:

org.apache.spark.storage.BlockManager#shuffleClient
  • // Client to read other executors' shuffle files. This is either an external service, or just the
  • // standard BlockTransferService to directly connect to other Executors.
  • /**
  • * 创建ShuffleClient客户端.
  • * 如果部署了外部的Shuffle服务,则需要配置spark.shuffle.service.enabled属性为true(默认是false),
  • * 此时将创建ExternalShuffleClient。
  • * 默认情况下,NettyBlockTransferService会作为Shuffle的客户端。
  • */
  • private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
  • val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
  • new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
  • securityManager.isSaslEncryptionEnabled())
  • } else {
  • blockTransferService
  • }

为什么要讲解ShuffleClient呢?因为在没有配置外部ShuffleClient的情况下(由spark.shuffle.service.enabled参数决定,默认为false),NettyBlockTransferService实例blockTransferService还会充当BlockManager的ShuffleClient。

因此,NettyBlockTransferService可能会同时充当着数据传输服务的服务端和客户端两种角色,下面将对这两种角色分别介绍NettyBlockTransferService的实现。

1.1. 充当数据传输客户端

为了让数据块传输的通信流程更加顺畅,我们先来分析NettyBlockTransferService充当客户端的实现。NettyBlockTransferService的定义和重要字段:

org.apache.spark.network.netty.NettyBlockTransferService
  • /**
  • * A BlockTransferService that uses Netty to fetch a set of blocks at at time.
  • *
  • * @param conf SparkConf
  • * @param securityManager 安全管理器
  • * @param bindAddress 绑定的地址
  • * @param hostName 绑定的主机名
  • * @param _port 绑定的端口
  • * @param numCores 使用的CPU Core数量
  • */
  • private[spark] class NettyBlockTransferService(
  • conf: SparkConf,
  • securityManager: SecurityManager,
  • bindAddress: String,
  • override val hostName: String,
  • _port: Int,
  • numCores: Int)
  • extends BlockTransferService {
  • // TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
  • // Java序列化器;提示说明不要用Java序列化器,应该用一个版本兼容性更好的序列化器
  • private val serializer = new JavaSerializer(conf)
  • // 是否开启了安全认证
  • private val authEnabled = securityManager.isAuthenticationEnabled()
  • // 传输层TransportConf配置对象
  • private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
  • // 传输层TransportContext上下文对象
  • private[this] var transportContext: TransportContext = _
  • // TransportServer服务端
  • private[this] var server: TransportServer = _
  • // 创建TransportClient的工厂
  • private[this] var clientFactory: TransportClientFactory = _
  • // BlockTransferService服务的应用的ID
  • private[this] var appId: String = _
  • ...
  • }

相信大家在NettyBlockTransferService字段中找到了似曾相识的各类对象:TransportConf、TransportContext、TransportServer以及TransportClientFactory;这些正是用于构建服务端和客户端的通信传输层组件。

NettyBlockTransferService提供了两个方法给客户端角色使用,分别是fetchBlocks(...)uploadBlock(...),从它们的命名就可以看出,一个用于拉取数据,一个用于上传数据。

1.1.1. 拉取数据

我们先来分析用于拉取数据fetchBlocks(...)方法,源码如下:

org.apache.spark.network.netty.NettyBlockTransferService#fetchBlocks
  • // 作为默认的Shuffle客户端下载Blocks
  • override def fetchBlocks(
  • host: String,
  • port: Int,
  • execId: String,
  • blockIds: Array[String],
  • listener: BlockFetchingListener): Unit = {
  • logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
  • try {
  • // 创建RetryingBlockFetcher.BlockFetchStarter匿名对象
  • val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
  • override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
  • // 创建TransportClient
  • val client = clientFactory.createClient(host, port)
  • // 创建OneForOneBlockFetcher对象并调用其start()方法
  • new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
  • }
  • }
  • // 最大重试次数,由spark.模块.io.maxRetries实现决定,这里为spark.shuffle.io.maxRetries
  • val maxRetries = transportConf.maxIORetries()
  • if (maxRetries > 0) { // 重试次数大于0
  • // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
  • // a bug in this code. We should remove the if statement once we're sure of the stability.
  • // 创建RetryingBlockFetcher并调用start方法,传入了上面创建的RetryingBlockFetcher.BlockFetchStarter对象
  • new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
  • } else {
  • // 调用blockFetchStarter(即RetryingBlockFetcher.BlockFetchStarter对象)的createAndStart()方法
  • blockFetchStarter.createAndStart(blockIds, listener)
  • }
  • } catch {
  • case e: Exception =>
  • logError("Exception while beginning fetchBlocks", e)
  • blockIds.foreach(listener.onBlockFetchFailure(_, e))
  • }
  • }

fetchBlocks(...)方法需要传入四个参数,其中hostportexecutorId指明了需要向哪台节点的哪个Executor拉取数据,blockIds则是需要拉取的数据块的BlockId数组,listener适用于获得拉取结果的监听器。在fetchBlocks(...)方法中,涉及了一个新的组件,即带有重试机制的拉取器RetryingBlockFetcher。方法的一开始,创建了RetryingBlockFetcher.BlockFetchStarter启动对象,在复写的createAndStart(...)方法中是创建TransportClient的代码,不过最终会使用OneForOneBlockFetcher拉取器封装TransportClient客户端,并调用了它的start()方法;创建TransportClient的过程这里就不多赘述了,我们先关注OneForOneBlockFetcher的用处。

1.1.1.1. OneForOneBlockFetcher

从OneForOneBlockFetcher的命名就知道它的用处,即一对一的数据块拉取器,OneForOneBlockFetcher内部对TransportClient进行了封装,将发送RPC请求及发送块请求的业务综合在一起;它的定义及重要字段如下:

org.apache.spark.network.shuffle.OneForOneBlockFetcher
  • /**
  • * Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
  • * invokes the BlockFetchingListener appropriately. This class is agnostic to the actual RPC
  • * handler, as long as there is a single "open blocks" message which returns a ShuffleStreamHandle,
  • * and Java serialization is used.
  • *
  • * Note that this typically corresponds to a
  • * {@link org.apache.spark.network.server.OneForOneStreamManager} on the server side.
  • */
  • public class OneForOneBlockFetcher {
  • private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class);
  • // 用于向服务端发送请求的TransportClient。
  • private final TransportClient client;
  • /**
  • * 即OpenBlocks。
  • * OpenBlocks将携带远端节点的:
  • * - appId(应用程序标识)
  • * - execId(Executor标识)
  • * - blockIds(BlockId的数组)
  • * 这表示从远端的哪个实例获取哪些Block,并且知道是哪个远端Executor生成的Block。
  • */
  • private final OpenBlocks openMessage;
  • // BlockId的数组。与openMessage的blockIds属性一致。
  • private final String[] blockIds;
  • // 将在获取Block成功或失败时被回调。
  • private final BlockFetchingListener listener;
  • // 获取块成功或失败时回调,配合BlockFetchingListener使用。
  • private final ChunkReceivedCallback chunkCallback;
  • /**
  • * 客户端给服务端发送OpenBlocks消息后,
  • * 服务端会在OneForOneStreamManager的streams缓存中缓存从存储体系中读取到的ManagedBuffer序列,
  • * 并生成与ManagedBuffer序列对应的streamId,
  • * 然后将streamId和ManagedBuffer序列的大小封装为StreamHandle消息返回给客户端,
  • * 客户端的streamHandle属性将持有此StreamHandle消息。
  • */
  • private StreamHandle streamHandle = null;
  • ...
  • }

OneForOneBlockFetcher在它的构造方法中,记录了传入的参数并构造了OpenBlocks消息及ChunkCallback回调对象:

org.apache.spark.network.shuffle.OneForOneBlockFetcher#OneForOneBlockFetcher
  • public OneForOneBlockFetcher(
  • TransportClient client,
  • String appId,
  • String execId,
  • String[] blockIds,
  • BlockFetchingListener listener) {
  • this.client = client;
  • this.openMessage = new OpenBlocks(appId, execId, blockIds);
  • this.blockIds = blockIds;
  • this.listener = listener;
  • this.chunkCallback = new ChunkCallback();
  • }

这里创建的ChunkCallback对象是OneForOneBlockFetcher内部定义的回调器,它主要是用来适配TransportClient的fetchChunk(...)方法要求的ChunkReceivedCallback参数,这是适配器模式的应用;定义如下:

org.apache.spark.network.shuffle.OneForOneBlockFetcher.ChunkCallback
  • /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block.
  • * 用于拉取数据后的回调
  • **/
  • private class ChunkCallback implements ChunkReceivedCallback {
  • // 拉取单个的块成功
  • @Override
  • public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
  • // On receipt of a chunk, pass it upwards as a block.
  • // 会通知给监听器的方法,从这里可以得知,服务端返回的数据是按照BlockId的顺序进行排列的
  • listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
  • }
  • // 拉取单个的块失败
  • @Override
  • public void onFailure(int chunkIndex, Throwable e) {
  • // On receipt of a failure, fail every block from chunkIndex onwards.
  • String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
  • // 由failRemainingBlocks()方法通知给监听器
  • failRemainingBlocks(remainingBlockIds, e);
  • }
  • }

可见,在ChunkCallback内部会将回调信息通过listener得到相关方法进行回调,以适配外面传递给OneForOneBlockFetcher的特定监听器。

我们知道,createAndStart(...)方法最终会调用OneForOneBlockFetcher的start()方法,该方法是拉取数据块的主要方法,源码如下:

org.apache.spark.network.shuffle.OneForOneBlockFetcher#start
  • /**
  • * Begins the fetching process, calling the listener with every block fetched.
  • * The given message will be serialized with the Java serializer, and the RPC must return a
  • * {@link StreamHandle}. We will send all fetch requests immediately, without throttling.
  • */
  • public void start() {
  • // 参数检查
  • if (blockIds.length == 0) {
  • throw new IllegalArgumentException("Zero-sized blockIds array");
  • }
  • // 使用TransportClient的sendRpc()发送OpenBlocks消息
  • client.sendRpc(openMessage.toByteBuffer(),
  • // 并向客户端的outstandingRpcs缓存注册匿名的RpcResponseCallback实现
  • new RpcResponseCallback() {
  • // 获取成功
  • @Override
  • public void onSuccess(ByteBuffer response) {
  • try {
  • // 从响应中反序列化得到StreamHandle对象
  • streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
  • logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);
  • // Immediately request all chunks -- we expect that the total size of the request is
  • // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
  • // 遍历StreamHandle的numChunks
  • for (int i = 0; i < streamHandle.numChunks; i++) {
  • /**
  • * 调用TransportClient的fetchChunk()方法逐个获取Block。
  • * 注意,该方法在完成后会将结果通过chunkCallback相关回调函数返回
  • * chunkCallback的回调函数会调用监听器,传递拉取数据和结果
  • * 详见{@link ChunkCallback}
  • */
  • client.fetchChunk(streamHandle.streamId, i, chunkCallback);
  • }
  • } catch (Exception e) {
  • logger.error("Failed while starting block fetches after success", e);
  • failRemainingBlocks(blockIds, e);
  • }
  • }
  • @Override
  • public void onFailure(Throwable e) {
  • logger.error("Failed while starting block fetches", e);
  • failRemainingBlocks(blockIds, e);
  • }
  • });
  • }

这个方法分为四步:

  1. 首先使用TransportClient客户端向数据块所在的节点发送RpcRequest,消息为OpenBlocks对象,其中包含了需要拉取的数据块的BlockId数组。
  2. 然后将服务端响应的数据通过BlockTransferMessage.Decoder解码器解码为StreamHandle对象,该对象记录了这次拉取数据所分配的Stream ID以及Chunk数量记录。
  3. 客户端通过Stream ID以及Chunk数量记录,循环发送ChunkFetchRequest请求以拉取数据块,发送顺序是按照需要拉取的数据块的BlockId数组元素的顺序,服务端返回的数据也会按照该顺序进行返回。
  4. 最终拉取到的数据会交给chunkCallback回调处理,而chunkCallback回调又会将数据传回给listener监听器。

从流程可知,拉取数据块是通过RpcRequest和ChunkFetchRequest两种请求的配合来完成的,同时,服务端会根据OpenBlocks消息中数据块的BlockId数组指定的BlockID顺序,依次返回每一个数据块的数据。

1.1.1.2. RetryingBlockFetcher

fetchChunk(...)方法中会根据配置的最大重试次数(由spark.模块.io.maxRetries参数实现决定,因为这里是ShuffleClient,所以是spark.shuffle.io.maxRetries)是否大于0,来决定是否启用重试机制,回顾代码片段:

org.apache.spark.network.netty.NettyBlockTransferService#fetchBlocks
  • ...
  • // 最大重试次数,由spark.模块.io.maxRetries实现决定,这里为spark.shuffle.io.maxRetries
  • val maxRetries = transportConf.maxIORetries()
  • if (maxRetries > 0) { // 重试次数大于0
  • // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
  • // a bug in this code. We should remove the if statement once we're sure of the stability.
  • // 创建RetryingBlockFetcher并调用start方法,传入了上面创建的RetryingBlockFetcher.BlockFetchStarter对象
  • new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
  • } else {
  • // 调用blockFetchStarter(即RetryingBlockFetcher.BlockFetchStarter对象)的createAndStart()方法
  • blockFetchStarter.createAndStart(blockIds, listener)
  • }
  • ...

在没有重试机制的情况下,仅仅使用OneForOneBlockFetcher拉取一次,就算失败也不会重试。当最大重试次数大于0时,会使用RetryingBlockFetcher对blockFetchStarter再次进行包装,以提供重试机制。我们先来看一下RetryingBlockFetcher的定义和重要字段:

org.apache.spark.network.shuffle.RetryingBlockFetcher
  • /**
  • * Wraps another BlockFetcher with the ability to automatically retry fetches which fail due to
  • * IOExceptions, which we hope are due to transient network conditions.
  • *
  • * This fetcher provides stronger guarantees regarding the parent BlockFetchingListener. In
  • * particular, the listener will be invoked exactly once per blockId, with a success or failure.
  • */
  • public class RetryingBlockFetcher {
  • ...
  • /** Shared executor service used for waiting and retrying.
  • * 用于重试的线程池
  • **/
  • private static final ExecutorService executorService = Executors.newCachedThreadPool(
  • NettyUtils.createThreadFactory("Block Fetch Retry"));
  • /** Used to initiate new Block Fetches on our remaining blocks.
  • * 拉取启动器
  • **/
  • private final BlockFetchStarter fetchStarter;
  • /** Parent listener which we delegate all successful or permanently failed block fetches to.
  • * 用于记录外界传入的监听器
  • **/
  • private final BlockFetchingListener listener;
  • /** Max number of times we are allowed to retry.
  • * 最大重试次数
  • **/
  • private final int maxRetries;
  • /** Milliseconds to wait before each retry.
  • * 两次重试之间的时间间隔,由spark.模块名.io.retryWait决定
  • **/
  • private final int retryWaitTime;
  • // NOTE:
  • // All of our non-final fields are synchronized under 'this' and should only be accessed/mutated
  • // while inside a synchronized block.
  • /** Number of times we've attempted to retry so far.
  • * 记录已经尝试拉取的次数
  • **/
  • private int retryCount = 0;
  • /**
  • * Set of all block ids which have not been fetched successfully or with a non-IO Exception.
  • * A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
  • * input ordering is preserved, so we always request blocks in the same order the user provided.
  • *
  • * 记录需要拉取的数据块的BlockId集合
  • */
  • private final LinkedHashSet<String> outstandingBlocksIds;
  • /**
  • * The BlockFetchingListener that is active with our current BlockFetcher.
  • * When we start a retry, we immediately replace this with a new Listener, which causes all any
  • * old Listeners to ignore all further responses.
  • *
  • * 重试监听器
  • */
  • private RetryingBlockFetchListener currentListener;
  • ...
  • }

RetryingBlockFetcher在构造方法中会对这些字段进行初始化:

org.apache.spark.network.shuffle.RetryingBlockFetcher#RetryingBlockFetcher
  • public RetryingBlockFetcher(
  • TransportConf conf,
  • BlockFetchStarter fetchStarter,
  • String[] blockIds,
  • BlockFetchingListener listener) {
  • // 记录拉取启动器
  • this.fetchStarter = fetchStarter;
  • // 记录传入的BlockFetchingListener监听器
  • this.listener = listener;
  • // 获取配置的最大重试次数
  • this.maxRetries = conf.maxIORetries();
  • // 获取配置两次重试的间隔等待时间
  • this.retryWaitTime = conf.ioRetryWaitTimeMs();
  • // 将需要拉取的数据块的BlockId全部放入outstandingBlocksIds保存
  • this.outstandingBlocksIds = Sets.newLinkedHashSet();
  • Collections.addAll(outstandingBlocksIds, blockIds);
  • // 创建新的监听器
  • this.currentListener = new RetryingBlockFetchListener();
  • }

RetryingBlockFetcher与OneForOneBlockFetcher类似,它定义了一个内部类RetryingBlockFetchListener用于适配传入的BlockFetchingListener监听器,其定义如下:

org.apache.spark.network.shuffle.RetryingBlockFetcher.RetryingBlockFetchListener
  • /**
  • * Our RetryListener intercepts block fetch responses and forwards them to our parent listener.
  • * Note that in the event of a retry, we will immediately replace the 'currentListener' field,
  • * indicating that any responses from non-current Listeners should be ignored.
  • */
  • private class RetryingBlockFetchListener implements BlockFetchingListener {
  • // 拉取数据成功的回调
  • @Override
  • public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
  • // We will only forward this success message to our parent listener if this block request is
  • // outstanding and we are still the active listener.
  • // 用于指定是否需要将成功获取的数据转发给listener
  • boolean shouldForwardSuccess = false;
  • synchronized (RetryingBlockFetcher.this) {
  • /**
  • * 1. 判断监听器是否被改变,每次重新拉取都会重置currentListener为新的RetryingBlockFetchListener对象;
  • * 因此如果currentListener与当前RetryingBlockFetchListener对象不一致,说明这次的拉取已经过期了。
  • * 2. 拉取的数据块是否是需要的;outstandingBlocksIds会在每次成功拉取后将当次拉取的数据库的BlockId移除,
  • * 防止重复拉取;如果拉取的数据块的BlockId不被outstandingBlocksIds包含,说明重复拉取了。
  • */
  • if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
  • // 满足条件,先把本次拉取的数据块的BlockId记录从outstandingBlocksIds移除
  • outstandingBlocksIds.remove(blockId);
  • // 标记本次拉取的数据可以转发给listener
  • shouldForwardSuccess = true;
  • }
  • }
  • // Now actually invoke the parent listener, outside of the synchronized block.
  • if (shouldForwardSuccess) {
  • // 将拉取的数据转发给listener
  • listener.onBlockFetchSuccess(blockId, data);
  • }
  • }
  • @Override
  • public void onBlockFetchFailure(String blockId, Throwable exception) {
  • // We will only forward this failure to our parent listener if this block request is
  • // outstanding, we are still the active listener, AND we cannot retry the fetch.
  • // 标记是否需要转发异常
  • boolean shouldForwardFailure = false;
  • synchronized (RetryingBlockFetcher.this) {
  • // 该判断与上面的onBlockFetchSuccess()方法中的判断是一致的
  • if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
  • // 判断是否还需要重试,只有在IOException和还有剩余重试次数时才重试
  • if (shouldRetry(exception)) {
  • // 准备重试
  • initiateRetry();
  • } else {
  • logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)",
  • blockId, retryCount), exception);
  • // 没有重试次数或者发生了其他异常,将本次拉取的数据块的BlockId记录从outstandingBlocksIds移除
  • outstandingBlocksIds.remove(blockId);
  • // 标记本次产生的异常可以转发给listener
  • shouldForwardFailure = true;
  • }
  • }
  • }
  • // Now actually invoke the parent listener, outside of the synchronized block.
  • if (shouldForwardFailure) {
  • // 将产生的异常转发给listener
  • listener.onBlockFetchFailure(blockId, exception);
  • }
  • }
  • }

BlockFetchingListener监听器的实现明显要复杂很多,因为多了重试机制,同时还需要对每个数据块的数据及可能产生的异常进行精准控制。具体实现大家可以仔细阅读源码。

有了对BlockFetchingListener的了解,我们来看一下RetryingBlockFetcher的工作机制。在NettyBlockTransferService的fetchBlocks(...)方法中会调用RetryingBlockFetcher实例的start()方法启动拉取操作,start()方法只做了一件事,即调用fetchAllOutstanding()方法,该方法源码如下:

org.apache.spark.network.shuffle.RetryingBlockFetcher#fetchAllOutstanding
  • /**
  • * Fires off a request to fetch all blocks that have not been fetched successfully or permanently
  • * failed (i.e., by a non-IOException).
  • */
  • private void fetchAllOutstanding() {
  • // Start by retrieving our shared state within a synchronized block.
  • // 还需要被拉取的数据块的BlockId
  • String[] blockIdsToFetch;
  • // 重试次数
  • int numRetries;
  • RetryingBlockFetchListener myListener;
  • synchronized (this) {
  • // 记录还需要拉取的数据块的BlockId
  • blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
  • // 记录重试次数
  • numRetries = retryCount;
  • // 记录监听器
  • myListener = currentListener;
  • }
  • // Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
  • try {
  • // 这里调用的是RetryingBlockFetcher.BlockFetchStarter对象的createAndStart()方法,会返回OneForOneBlockFetcher对象
  • fetchStarter.createAndStart(blockIdsToFetch, myListener);
  • } catch (Exception e) {
  • logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
  • blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
  • // 发生异常,判断是否还可以重试
  • if (shouldRetry(e)) { // 还可以重试
  • // 再次重试,此处会向线程池提交一个新的任务执行fetchAllOutstanding()方法
  • initiateRetry();
  • } else {
  • /**
  • * 没有重试次数了,通知listener产生的异常
  • * 注意,这里的bid是还没有拉取的数据块的BlockId
  • */
  • for (String bid : blockIdsToFetch) {
  • listener.onBlockFetchFailure(bid, e);
  • }
  • }
  • }
  • }

可见,fetchAllOutstanding()方法还是会使用BlockFetchStarter(它内部使用的就是前面讲解的OneForOneBlockFetcher)来拉取数据,不过在拉取产生异常时,会使用shouldRetry(...)方法判断是否还可以重试,该方法实现比较简单,内部判断只有发生IOException且还有剩余重试次数时才会返回true。如果还可以重试,会调用initiateRetry()方法开启一次新的重试,否则将拉取异常的数据块的BlockId和异常信息传回listener监听器就结束了。

initiateRetry()方法用于开启一次新的重试,它主要是维护已重试次数、创建新的RetryingBlockFetchListener监听器,然后向executorService线程池提交一个新的任务再次拉取,源码比较简单:

org.apache.spark.network.shuffle.RetryingBlockFetcher#initiateRetry
  • /**
  • * Lightweight method which initiates a retry in a different thread. The retry will involve
  • * calling fetchAllOutstanding() after a configured wait time.
  • *
  • * 开启一次新的重试
  • */
  • private synchronized void initiateRetry() {
  • // 重试次数自增
  • retryCount += 1;
  • // 创建RetryingBlockFetchListener监听器
  • currentListener = new RetryingBlockFetchListener();
  • logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
  • retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
  • // 向线程池提交一个任务,任务内容是执行fetchAllOutstanding()方法
  • executorService.submit(new Runnable() {
  • @Override
  • public void run() {
  • Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
  • // 调用fetchAllOutstanding()方法再次尝试拉取
  • fetchAllOutstanding();
  • }
  • });
  • }

至此,相信大家对拉取数据的实现应该很清晰了,在下面我们会继续分析上传数据的实现。

1.1.2. 上传数据

上传数据的工作由NettyBlockTransferService的uploadBlock(...)负责,它的实现要简单多了,直接使用TransportClient发送RpcRequest请求即可:

org.apache.spark.network.netty.NettyBlockTransferService#uploadBlock
  • /**
  • * 上传Block
  • * @param hostname 上传目的节点的主机地址
  • * @param port 上传目的节点的端口
  • * @param execId 上传目的节点的Executor ID
  • * @param blockId 上传数据块的BlockId
  • * @param blockData 上传数据块的数据
  • * @param level 上传数据块的存储级别
  • * @return
  • */
  • override def uploadBlock(
  • hostname: String,
  • port: Int,
  • execId: String,
  • blockId: BlockId,
  • blockData: ManagedBuffer,
  • level: StorageLevel,
  • classTag: ClassTag[_]): Future[Unit] = {
  • // 创建一个空Promise,调用方将持有此Promise的Future
  • val result = Promise[Unit]()
  • // 创建TransportClient
  • val client = clientFactory.createClient(hostname, port)
  • // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
  • // Everything else is encoded using our binary protocol.
  • // 将存储级别StorageLevel和类型标记classTag等元数据序列化
  • val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
  • // Convert or copy nio buffer into array in order to serialize it.
  • // 调用ManagedBuffer(实际是实现类NettyManagedBuffer)的nioByteBuffer方法将Block的数据转换或者复制为Nio的ByteBuffer类型。
  • val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
  • /**
  • * 调用TransportClient的sendRpc方法发送RPC消息UploadBlock,其中:
  • * - appId是应用程序的标识
  • * - execId是上传目的地的Executor的标识
  • *
  • * RpcResponseCallback是匿名的实现类。
  • * 上传成功则回调匿名RpcResponseCallback的onSuccess方法,进而调用Promise的success方法;
  • * 上传失败则回调匿名RpcResponseCallback的onFailure方法,进而调用Promise的failure方法。
  • */
  • client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
  • new RpcResponseCallback {
  • // 上传你成功
  • override def onSuccess(response: ByteBuffer): Unit = {
  • logTrace(s"Successfully uploaded block $blockId")
  • // 调用Promise的方法
  • result.success((): Unit)
  • }
  • // 上传失败
  • override def onFailure(e: Throwable): Unit = {
  • logError(s"Error while uploading block $blockId", e)
  • // 调用Promise的方法
  • result.failure(e)
  • }
  • })
  • result.future
  • }

上面源码中的注释已经将上传功能讲解的非常清楚了,读者可以自行阅读理解。

1.2. 充当数据传输服务端

知道了拉取数据块的请求是如何发送的,接下来对服务端的分析就会容易很多了。

BlockManager的initialize(...)方法是让BlockManager开始工作的方法,该方法中会调用它持有的blockTransferService.init(this)对NettyBlockTransferService进行初始化,代码片段如下:

注:关于BlockManager的initialize(...)方法将在后面讲解,暂时不用关注。

org.apache.spark.storage.BlockManager#initialize
  • def initialize(appId: String): Unit = {
  • // 初始化BlockTransferService
  • blockTransferService.init(this)
  • ...
  • }

因此我们需要看看NettyBlockTransferService的init(BlockDataManager)方法:

org.apache.spark.network.netty.NettyBlockTransferService#init
  • // 初始化方法
  • override def init(blockDataManager: BlockDataManager): Unit = {
  • /**
  • * 创建NettyBlockRpcServer,NettyBlockRpcServer继承了RpcHandler,
  • * 服务端对客户端的Block读写请求的处理都交给了RpcHandler的实现类,
  • * NettyBlockRpcServer将处理Block块的RPC请求。
  • */
  • val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
  • // 准备服务端和客户端的引导程序
  • var serverBootstrap: Option[TransportServerBootstrap] = None
  • var clientBootstrap: Option[TransportClientBootstrap] = None
  • // 如果开启了认证,需要在客户端和服务端分别添加支持认证的引导程序
  • if (authEnabled) {
  • serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
  • clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
  • securityManager.isSaslEncryptionEnabled()))
  • }
  • // 创建TransportContext
  • transportContext = new TransportContext(transportConf, rpcHandler)
  • // 创建TransportClientFactory
  • clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
  • // 创建TransportServer
  • server = createServer(serverBootstrap.toList)
  • // 获取当前应用的ID
  • appId = conf.getAppId
  • logInfo(s"Server created on ${hostName}:${server.getPort}")
  • }

这个方法所做的事情想必大家很清楚,即创建用于对外提供服务的Netty服务端以及创建TransportClient客户端的工厂;创建服务端的工作由createServer(...)方法负责:

org.apache.spark.network.netty.NettyBlockTransferService#createServer
  • /** Creates and binds the TransportServer, possibly trying multiple ports.
  • * 创建TransportServer
  • **/
  • private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
  • // 定义启动TransportServer的方法
  • def startService(port: Int): (TransportServer, Int) = {
  • // 使用TransportContext创建服务端,绑定大特定的地址和端口上
  • val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
  • // 返回TransportServer和绑定的端口
  • (server, server.getPort)
  • }
  • // 这个方法用于真正创建和启动TransportServer,它会在端口被占用的情况下逐个重试启动
  • Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
  • }

这个方法的实现其实与NettyRpcEnvFactory工厂中创建服务端的方式大同小异。

启动TransportServer的工作很简单,不过我们更应该关注的是,init(BlockDataManager)方法提供了哪种RpcHandler来处理RPC通信,提供了哪种StreamManager来处理流请求;这点在init(BlockDataManager)方法中已经明确了,即NettyBlockRpcServer以及OneForOneStreamManager。

NettyBlockRpcServer正是NettyBlockTransferService中服务端用于处理RPC请求的RpcHandler,OneForOneStreamManager则是通过NettyBlockRpcServer的getStreamManager()方法提供的,用于处理流请求的StreamManager。接下来我们将重点讲解着两个组件。

1.2.1. NettyBlockRpcServer

NettyBlockRpcServer继承自RpcHandler,定义如下:

org.apache.spark.network.netty.NettyBlockRpcServer
  • /**
  • * Serves requests to open blocks by simply registering one chunk per block requested.
  • * Handles opening and uploading arbitrary BlockManager blocks.
  • *
  • * Opened blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
  • * is equivalent to one Spark-level shuffle block.
  • *
  • * @param appId 当前应用的Application ID
  • * @param serializer 序列化器
  • * @param blockManager 所属的BlockManager
  • */
  • class NettyBlockRpcServer(
  • appId: String,
  • serializer: Serializer,
  • blockManager: BlockDataManager)
  • extends RpcHandler with Logging {
  • ...
  • }

它的定义很简单,仅仅是记录了自己所处的作业环境。作为一个RpcHandler,NettyBlockRpcServer实现了必要的receive(...)方法用于处理RPC消息,下面是该方法的整体框架:

org.apache.spark.network.netty.NettyBlockRpcServer#receive
  • override def receive(
  • client: TransportClient,
  • rpcMessage: ByteBuffer,
  • responseContext: RpcResponseCallback): Unit = {
  • // 获取并解码消息
  • val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
  • logTrace(s"Received request: $message")
  • // 根据消息类型别处理
  • message match {
  • case openBlocks: OpenBlocks => // 打开Block的消息
  • ...
  • case uploadBlock: UploadBlock => // 上传Block的消息
  • ...
  • }
  • }

从它的整体框架可知,它可以处理的消息有两种:OpenBlocks和UploadBlock,下面分别进行介绍。

1.2.2. 处理OpenBlocks消息

receive(...)方法处理OpenBlocks消息的代码片段如下:

org.apache.spark.network.netty.NettyBlockRpcServer#receive
  • ...
  • case openBlocks: OpenBlocks => // 打开Block的消息
  • // 取出消息携带的BlockId数组,获取每个BlockId对应的Block的ManagedBuffer,封装为序列
  • val blocks: Seq[ManagedBuffer] =
  • openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
  • // 使用OneForOneStreamManager对象将这些Block的ManagedBuffer序列注册到streams缓存
  • val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
  • logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
  • // 返回StreamHandle消息,包含Stream ID和ManagedBuffer序列的大小
  • responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
  • ...

OpenBlocks消息目的是拉取指定的数据块的数据。对于该消息的处理,首先是从消息中解析出BlockId集合,然后将具体获取数据块的操作委托给NettyBlockRpcServer所属的BlockManager的getBlockData(...)方法,该方法会根据BlockId获取对应的数据,返回ManagedBuffer缓冲区对象。在获取到所有BlockId对应数据块的ManagedBuffer缓冲区序列后,会将其注册到StreamManager中,这里使用的StreamManager即是OneForOneStreamManager,然后返回包含了Stream ID和ManagedBuffer序列大小的StreamHandle消息。

注:关于BlockManager的getBlockData(...)方法会在存储体系的存储层中详细介绍。

发送OpenBlocks消息的客户端会解析响应的StreamHandle消息,根据Stream ID和Chunk块大小依次从OneForOneStreamManager中拉取数据。

关于StreamManager和OneForOneStreamManager的实现,以及客户端对Chunk数据的处理,读者可以回顾Spark源码分析04 - 通信架构02:传输层原理(2)消息处理一文的1.4.1.7节和1.4.1.7.1节中有详细介绍,这里不再赘述了。

1.2.3. 处理UploadBlock消息

NettyBlockRpcServer中对UploadBlock消息的处理代码片段如下:

org.apache.spark.network.netty.NettyBlockRpcServer#receive
  • case uploadBlock: UploadBlock => // 上传Block的消息
  • // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
  • // 对消息携带的元数据进行反序列化,得到存储级别和类型标记
  • val (level: StorageLevel, classTag: ClassTag[_]) = {
  • serializer
  • .newInstance()
  • .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
  • .asInstanceOf[(StorageLevel, ClassTag[_])]
  • }
  • // 将UploadBlock消息携带的Block数据(即blockData),封装为NioManagedBuffer。
  • val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
  • // 获取UploadBlock消息携带的BlockId。
  • val blockId = BlockId(uploadBlock.blockId)
  • // 调用BlockManager的putBlockData方法,将Block存入本地存储体系。
  • blockManager.putBlockData(blockId, data, level, classTag)
  • // 通过响应上下文回复客户端。
  • responseContext.onSuccess(ByteBuffer.allocate(0))

对于UploadBlock消息的处理比OpenBlocks消息要简单,NettyBlockRpcServer会读取客户端通过UploadBlock消息传过来的具体数据及对应的元数据信息,将具体数据包装为NioManagedBuffer,通过NettyBlockRpcServer所属的BlockManager的putBlockData(...)方法将数据写入存储系统内,如果写入成功,会直接向客户端返回空的ByteBuffer对象。

注:关于BlockManager的putBlockData(...)方法会在存储体系的存储层中详细介绍。

2. BlockManager的初始化

有了对上面消息及数据的通信机制的了解,下面我们补充讲解一下BlockManager的初始化流程。

BlockManager在被实例化时,内部会创建许多与之相关的组件,这些组件中有大部分是服务于存储层的,我们在后面会讲解。BlockManager只有调用它的initialize(...)方法才会被初始化,如Executor中的初始化操作:

org.apache.spark.executor.Executor
  • // 非Local运行模式才会初始化
  • if (!isLocal) {
  • ...
  • // 初始化BlockManager
  • env.blockManager.initialize(conf.getAppId)
  • }

Driver则会在构造SparkContext时在SparkContext内部初始化BlockManager:

org.apache.spark.SparkContext
  • // 初始化块管理器
  • _env.blockManager.initialize(_applicationId)

BlockManager的initialize(...)方法涉及了多项工作,源码如下:

org.apache.spark.storage.BlockManager#initialize
  • /**
  • * Initializes the BlockManager with the given appId. This is not performed in the constructor as
  • * the appId may not be known at BlockManager instantiation time (in particular for the driver,
  • * where it is only learned after registration with the TaskScheduler).
  • *
  • * This method initializes the BlockTransferService and ShuffleClient, registers with the
  • * BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle
  • * service if configured.
  • *
  • * 初始化方法。只有在该方法被调用后BlockManager才能发挥作用。
  • */
  • def initialize(appId: String): Unit = {
  • // 初始化BlockTransferService
  • blockTransferService.init(this)
  • // 初始化Shuffle客户端
  • shuffleClient.init(appId)
  • // 设置Block的复制策略
  • blockReplicationPolicy = {
  • // 默认为RandomBlockReplicationPolicy
  • val priorityClass = conf.get(
  • "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
  • val clazz = Utils.classForName(priorityClass)
  • val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
  • logInfo(s"Using $priorityClass for block replication policy")
  • ret
  • }
  • /**
  • * 生成当前BlockManager的BlockManagerId。
  • * 此处创建的BlockManagerId实际只是在向BlockManagerMaster注册BlockManager时,给BlockManagerMaster提供参考,
  • * BlockManagerMaster将会创建一个包含了拓扑信息的新BlockManagerId作为正式分配给BlockManager的身份标识。
  • */
  • val id =
  • BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
  • /**
  • * 向BlockManagerMaster注册当前BlockManager,传递的参数有:
  • * 1. BlockManagerId;
  • * 2. 当前BlockManager管理的最大内存
  • * 3. 当前BlockManager的BlockManagerSlaveEndpoint端点
  • */
  • val idFromMaster = master.registerBlockManager(
  • id,
  • maxMemory,
  • slaveEndpoint)
  • // 根据注册返回的ID重置blockManagerId
  • blockManagerId = if (idFromMaster != null) idFromMaster else id
  • /**
  • * 生成shuffleServerId。
  • * 当启用了外部Shuffle服务时将新建一个BlockManagerId作为shuffleServerId,
  • * 由spark.shuffle.service.enabled参数配置,默认为false;
  • * 否则是BlockManager自身的BlockManagerId。
  • */
  • shuffleServerId = if (externalShuffleServiceEnabled) {
  • logInfo(s"external shuffle service port = $externalShuffleServicePort")
  • BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
  • } else {
  • blockManagerId
  • }
  • // Register Executors' configuration with the local shuffle service, if one should exist.
  • // 当启用了外部Shuffle服务,并且当前BlockManager所在节点不是Driver时,需要注册外部的Shuffle服务。
  • if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
  • registerWithExternalShuffleServer()
  • }
  • logInfo(s"Initialized BlockManager: $blockManagerId")
  • }

其中会对当前节点的BlockManagerMaster注册当前的BlockManager,这个注册操作会传递BlockManager的BlockManagerId,最大内存大小maxMemory,以及最重要的BlockManagerSlaveEndpoint端点的RpcEndpointRef引用slaveEndpoint

slaveEndpoint的产生是通过将当前BlockManager的BlockManagerSlaveEndpoint注册到RpcEnv中得到的,这部分代码定义在BlockManager中:

org.apache.spark.storage.BlockManager
  • // 此BlockManager的BlockManagerSlaveEndpoint
  • private val slaveEndpoint = rpcEnv.setupEndpoint(
  • "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
  • new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

我们知道,当RpcEndpoint注册到NettyRpcEnv中时,会将其注册到对应的Dispatcher中,并为之创建EndpointData及Inbox,此时Inbox会自己向自己投递一个OnStart消息,处理OnStart消息时会调用对应RpcEndpoint的onStart()方法,但BlockManagerSlaveEndpoint并没有实现onStart()方法。

不过BlockManagerMaster的registerBlockManager(...)会将当前BlockManager注册到BlockManagerMasterEndpoint中,源码如下:

org.apache.spark.storage.BlockManagerMaster#registerBlockManager
  • /**
  • * Register the BlockManager's id with the driver. The input BlockManagerId does not contain
  • * topology information. This information is obtained from the master and we respond with an
  • * updated BlockManagerId fleshed out with this information.
  • *
  • * 注册BlockManager
  • *
  • * @param blockManagerId BlockManager的唯一标识
  • * @param maxMemSize BlockManager管理的最大内存大小(字节)
  • * @param slaveEndpoint BlockManager上的BlockManagerSlaveEndpoint,
  • * 主要用于BlockManagerMasterEndpoint与该BlockManager通信
  • * @return 由BlockManagerMasterEndpoint分配的正式BlockManagerId
  • */
  • def registerBlockManager(
  • blockManagerId: BlockManagerId,
  • maxMemSize: Long,
  • slaveEndpoint: RpcEndpointRef): BlockManagerId = {
  • logInfo(s"Registering BlockManager $blockManagerId")
  • /**
  • * 向BlockManagerMasterEndpoint发送RegisterBlockManager消息
  • * RegisterBlockManager将携带要注册的BlockManager的blockManagerId、
  • * 最大内存大小及BlockManagerSlaveEndpoint
  • */
  • val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
  • RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
  • logInfo(s"Registered BlockManager $updatedId")
  • updatedId
  • }

可见,registerBlockManager(...)方法会向BlockManagerMasterEndpoint发送RegisterBlockManager消息,该消息携带了BlockManager的BlockManagerId标识、最大内存大小及BlockManagerSlaveEndpoint的RpcEndpointRef(关于BlockManagerMasterEndpoint对RegisterBlockManager消息的处理可以回顾本文的3.2节);这样一来,BlockManagerMasterEndpoint就知道有新的BlockManager成员加入了,同时因为拥有了它的BlockManagerSlaveEndpoint的RpcEndpointRef,后续就可以直接与之通信了。

3. 总结

回顾整个存储体系的通信层,我们有以下总结:

  1. Driver将拥有一个BlockManagerMasterEndpoint,用于管理所有BlockManager的相关信息。而Executor会拥有该BlockManagerMasterEndpoint的RpcEndpointRef。
  2. 每个Executor都会用有自己的BlockManager、BlockManagerMaster及BlockManagerSlaveEndpoint。
  3. Executor的BlockManagerMaster通过BlockManagerMasterEndpoint的RpcEndpointRef与Driver上的BlockManagerMasterEndpoint进行通信。
  4. BlockManager在自己的initialize(...)方法中会将自己的BlockManagerSlaveEndpoint注册到自己的BlockManagerMaster中,BlockManagerMaster通过向BlockManagerMasterEndpoint发送RegisterBlockManager消息告注册自己坐在的BlockManager,并将BlockManagerSlaveEndpoint的RpcEndpointRef告知了BlockManagerMasterEndpoint。
  5. BlockManagerMasterEndpoint拥有了BlockManager的BlockManagerSlaveEndpoint的RpcEndpointRef,就可以与BlockManager通信了。
  6. 数据传输操作由NettyBlockTransferService组件实现,它即可充当服务端,也可以在没有指定外部ShuffleClient时充当ShuffleClient,同时NettyBlockRpcServer将充当TransportServer的RpcHandler。数据传输操作是由TransportClient与TransportServer通过RpcRequest和ChunkFetchRequest请求进行信息协商的,而数据的传输工作由OneForOneStreamManager完成。

下面是Driver上存储体系的通信层构建时序图:

1.Driver的存储体系初始化流程.png

下面是Executor上存储体系的通信层构建时序图:

2.Executor的存储体系初始化流程.png