大数据
流式处理
Spark
源码解析

Spark源码分析04 - 通信架构02:传输层原理(2)消息处理

简介:主要讲解TransportServer对四类消息的处理流程

1. 出入站处理器

在前面讲解的内容中,我们知道,TransportServer和TransportClient底层分别使用了Netty提供的ServerBootstrap和Bootstrap来构建通信架构,二者有一个相同点,即使用的处理器链都是一样的。对于这个设计在前面已经简单探讨过它的考虑。但是我们需要考虑的问题在于,服务端和客户端收到的消息完全一样吗?对各类消息的处理是一致的吗?如果存在差异,如何使用相同的处理器链完成不同的处理?这也就是我们下面将要探讨的内容。

回顾前面的内容,处理器链一共添加了下列五个处理器:

  • 消息编解码器MessageEncoder和MessageDecoder。
  • 通过NettyUtils的createFrameDecoder(...)方法创建的TransportFrameDecoder帧解码器。
  • Netty自带的心跳检测处理器IdleStateHandler,只监听了读写空闲,默认超时时长为120秒。
  • 通过TransportContext的createChannelHandler(...)方法创建的TransportChannelHandler处理器。

我们首先要明确下面的几个要点:

  1. 客户端和服务端的出站数据都会依次经过IdleStateHandler处理器和MessageEncoder编码器。
  2. 客户端和服务端的入站数据都会依次经过TransportFrameDecoder帧解码器、MessageDecoder解码器、IdleStateHandler处理器和TransportChannelHandler处理器。

对于Netty自带的心跳检测处理器IdleStateHandler这里不过多介绍,后面将分别分析其余的四种处理器。

1.1. MessageEncoder编码器

MessageEncoder继承自Netty的MessageToMessageEncoder,它是一个出站处理器,用于客户端和服务端在发送数据时对数据进行编码,继承体系如下:

  • ChannelOutboundHandler
  • ↳ ChannelOutboundHandlerAdapter
  • ↳ MessageToMessageEncoder
  • ↳ MessageEncoder

MessageEncoder的定义如下:

  • /**
  • * Encoder used by the server side to encode server-to-client responses.
  • * This encoder is stateless so it is safe to be shared by multiple threads.
  • *
  • * 在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
  • */
  • @ChannelHandler.Sharable
  • public final class MessageEncoder extends MessageToMessageEncoder<Message> {
  • ...
  • }

可见,传入MessageEncoder的数据是Message对象,我们主要关注其encode(...)方法的实现,源码如下:

org.apache.spark.network.protocol.MessageEncoder#encode
  • /***
  • * Encodes a Message by invoking its encode() method. For non-data messages, we will add one
  • * ByteBuf to 'out' containing the total frame length, the message type, and the message itself.
  • * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the
  • * data to 'out', in order to enable zero-copy transfer.
  • */
  • @Override
  • public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) throws Exception {
  • // 用于存放消息体
  • Object body = null;
  • // 用于记录消息体长度
  • long bodyLength = 0;
  • // 用于记录消息体是否包含在消息的同一帧中
  • boolean isBodyInFrame = false;
  • // If the message has a body, take it out to enable zero-copy transfer for the payload.
  • if (in.body() != null) { // 在消息体不为null时
  • try {
  • // 读消息体大小
  • bodyLength = in.body().size();
  • // 读消息体,返回值为io.netty.buffer.ByteBuf和io.netty.channel.FileRegion其中一种。
  • body = in.body().convertToNetty();
  • // 读消息体是否包含在消息的同一帧中的标记
  • isBodyInFrame = in.isBodyInFrame();
  • } catch (Exception e) { // 遇见异常
  • // 释放消息体
  • in.body().release();
  • if (in instanceof AbstractResponseMessage) {
  • AbstractResponseMessage resp = (AbstractResponseMessage) in;
  • // Re-encode this message as a failure response.
  • String error = e.getMessage() != null ? e.getMessage() : "null";
  • logger.error(String.format("Error processing %s for client %s",
  • in, ctx.channel().remoteAddress()), e);
  • encode(ctx, resp.createFailureResponse(error), out);
  • } else {
  • throw e;
  • }
  • return;
  • }
  • }
  • // 读取消息类型
  • Message.Type msgType = in.type();
  • // All messages have the frame length, message type, and message itself. The frame length
  • // may optionally include the length of the body data, depending on what message is being
  • // sent.
  • /**
  • * 计算消息头长度:表示帧大小的8字节 + 消息类型编码后的长度 + 消息编码后的长度
  • */
  • int headerLength = 8 + msgType.encodedLength() + in.encodedLength();
  • // 计算帧大小
  • long frameLength = headerLength + (isBodyInFrame ? bodyLength : 0);
  • // 存放消息头的ByteBuf
  • ByteBuf header = ctx.alloc().heapBuffer(headerLength);
  • // 写入帧大小
  • header.writeLong(frameLength);
  • // 写入消息类型
  • msgType.encode(header);
  • /**
  • * 写入消息体相关信息,这个方法在每种消息的实现是不一样的。例如:
  • * OneWayMessage只写入了消息体大小,
  • * RpcRequest消息写入了Request ID和消息体大小,
  • * StreamRequest消息写入了Stream ID。
  • */
  • in.encode(header);
  • // 检查消息头是否合法
  • assert header.writableBytes() == 0;
  • if (body != null) {
  • // We transfer ownership of the reference on in.body() to MessageWithHeader.
  • // This reference will be freed when MessageWithHeader.deallocate() is called.
  • // 消息体不为空,构建一个MessageWithHeader对象,保存了消息体、消息头、消息体大小、消息头大小
  • out.add(new MessageWithHeader(in.body(), header, body, bodyLength));
  • } else {
  • // 消息体为空,只保存消息头
  • out.add(header);
  • }
  • }

encode(...)方法内部实现比较简单,就是将Message中的消息数据进行解析,构造消息头和消息体;最终会根据是否有消息体决定是返回MessageWithHeader对象还是只包含消息头的ByteBuf对象。从上述对消息的编码过程,我们可以推测出消息内部的结构,如下:

1.Message编码后的抽象结构.png

1.1.1. Message消息体系

Spark将消息分为了两个大类:RequestMessage和ResponseMessage,和一个用于SASL验证的SaslMessage。Message接口是消息体系的顶层接口,它的源码比较简单:

org.apache.spark.network.protocol.Message
  • /** An on-the-wire transmittable message. */
  • public interface Message extends Encodable {
  • /**
  • * Used to identify this request type.
  • * 返回消息类型
  • **/
  • Type type();
  • /**
  • * An optional body for the message.
  • * 返回消息中可选的内容体
  • **/
  • ManagedBuffer body();
  • /**
  • * Whether to include the body of the message in the same frame as the message.
  • * 用于判断消息体是否包含在消息的同一帧中
  • **/
  • boolean isBodyInFrame();
  • /** Preceding every serialized Message is its type, which allows us to deserialize it. */
  • enum Type implements Encodable {
  • // 请求获取流的单个块的序列
  • ChunkFetchRequest(0),
  • ChunkFetchSuccess(1), // 处理ChunkFetchRequest成功返回的消息
  • ChunkFetchFailure(2), // 处理ChunkFetchRequest失败返回的消息
  • // 此消息由远程RPC服务端进行处理,需要服务端向客户端回复的RPC请求信息
  • RpcRequest(3),
  • RpcResponse(4), // 处理RpcRequest成功返回的消息
  • RpcFailure(5), // 处理RpcRequest失败返回的消息
  • // 表示向远程的服务发起请求,以获取流式数据
  • StreamRequest(6),
  • StreamResponse(7), // 处理StreamRequest成功返回的消息
  • StreamFailure(8), // 处理StreamRequest失败返回的消息
  • // 此消息由远程RPC服务端进行处理,但不需要服务端向客户端回复
  • OneWayMessage(9),
  • // 用户自定义类型的消息,是无法被decode的
  • User(-1);
  • private final byte id;
  • Type(int id) {
  • assert id < 128 : "Cannot have more than 128 message types";
  • this.id = (byte) id;
  • }
  • public byte id() { return id; }
  • @Override public int encodedLength() { return 1; }
  • @Override public void encode(ByteBuf buf) { buf.writeByte(id); }
  • public static Type decode(ByteBuf buf) {
  • byte id = buf.readByte();
  • switch (id) {
  • case 0: return ChunkFetchRequest;
  • case 1: return ChunkFetchSuccess;
  • case 2: return ChunkFetchFailure;
  • case 3: return RpcRequest;
  • case 4: return RpcResponse;
  • case 5: return RpcFailure;
  • case 6: return StreamRequest;
  • case 7: return StreamResponse;
  • case 8: return StreamFailure;
  • case 9: return OneWayMessage;
  • case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
  • default: throw new IllegalArgumentException("Unknown message type: " + id);
  • }
  • }
  • }
  • }

从源码可知,它只规定了用于确定消息类型、获取消息体和判断消息体是否包含在消息的同一帧中三个接口方法。更重要的是它内部定义了Type枚举,规定了消息的类型。源码列出的10种消息,加上没有列入SASLMessage,Spark中存在一共11种消息,它们的继承体系如下:

2.Message消息的继承体系.png

从上述示意图可知,RpcMessage、StreamRequest、ChunkFetchRequest和OneWayMessage都属于RequestMessage,RpcResponse、RpcFailure、StreamResponse、StreamFailure、ChunkFetchSuccess、ChunkFetchFailure则属于ResponseMessage;SaslMessage则不属于任何一类。这种分类其实很好理解,对于RpcMessage请求而言,它需要的回复结果有两种:表示成功的RpcResponse和表示失败的RpcFailure,像StreamRequest和ChunkFetchRequest也是类似的,但OneWayMessage是唯一一种不需要回复的请求消息。

从前面对TransportClient的分析可知,TransportClient会向服务端发送RpcMessage、StreamRequest、ChunkFetchRequest和OneWayMessage;而且与此对应的,服务端向客户端返回的消息正是RpcResponse、RpcFailure、StreamResponse、StreamFailure、ChunkFetchSuccess和ChunkFetchFailure。这也就将服务端和客户端发送的消息类型给区分开了。

1.2. TransportFrameDecoder帧解码器

上一节我们介绍了消息的编码操作,从编码实现可知,消息在出站时会通过MessageEncoder编码为ByteBuf对象,然后以字节形式写出到网络中。在TransportServer收到其他节点发来的消息数据时,则需要对字节形式的数据进行解码,熟悉Netty的读者可能知道,由于TCP协议中的数据是以流的形式进行传输的,因此可能会发生“粘包”问题,而TransportFrameDecoder帧解码器就是为了解决“粘包”问题,对数据进行拆包。

从MessageEncoder实现可知,编码得到的最终数据里,最前面记录了帧的大小,TransportFrameDecoder帧解码器就是通过该记录实现了消息分界。我们先来看看TransportFrameDecoder类的定义和重要的字段:

org.apache.spark.network.util.TransportFrameDecoder
  • /**
  • * A customized frame decoder that allows intercepting raw data.
  • * <p>
  • * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
  • * needs), except it allows an interceptor to be installed to read data directly before it's
  • * framed.
  • * <p>
  • * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
  • * decoded, instead of building as many frames as the current buffer allows and dispatching
  • * all of them. This allows a child handler to install an interceptor if needed.
  • * <p>
  • * If an interceptor is installed, framing stops, and data is instead fed directly to the
  • * interceptor. When the interceptor indicates that it doesn't need to read any more data,
  • * framing resumes. Interceptors should not hold references to the data buffers provided
  • * to their handle() method.
  • *
  • * 对从管道中读取的ByteBuf按照数据帧进行解析。
  • */
  • public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
  • // 处理器名
  • public static final String HANDLER_NAME = "frameDecoder";
  • // 表示帧大小的数据长度
  • private static final int LENGTH_SIZE = 8;
  • // 最大帧大小,为Integer.MAX_VALUE
  • private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
  • // 标记字段,用于标记帧大小记录是无效的
  • private static final int UNKNOWN_FRAME_SIZE = -1;
  • // 存储ByteBuf的链表,收到的ByteBuf会存入该链表
  • private final LinkedList<ByteBuf> buffers = new LinkedList<>();
  • // 存放帧大小的ByteBuf
  • private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE);
  • // 当次总共可读字节
  • private long totalSize = 0;
  • // 下一个帧的大小
  • private long nextFrameSize = UNKNOWN_FRAME_SIZE;
  • // 拦截器
  • private volatile Interceptor interceptor;
  • ...
  • }

TransportFrameDecoder中最重要的方法是channelRead(...)方法,它实现了对数据的拆包,源码如下:

org.apache.spark.network.util.TransportFrameDecoder#channelRead
  • @Override
  • public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
  • // 将传入的数据转换为Netty的ByteBuf
  • ByteBuf in = (ByteBuf) data;
  • // 添加到LinkedList类型的buffers链表中进行记录
  • buffers.add(in);
  • // 增加总共可读取的字节数
  • totalSize += in.readableBytes();
  • // 遍历buffers链表
  • while (!buffers.isEmpty()) {
  • // First, feed the interceptor, and if it's still, active, try again.
  • if (interceptor != null) { // 有拦截器,让拦截器处理,拦截器只会处理一次
  • // 取出链表头的ByteBuf
  • ByteBuf first = buffers.getFirst();
  • // 计算可读字节数
  • int available = first.readableBytes();
  • // 先使用intercepter处理数据
  • if (feedInterceptor(first)) {
  • assert !first.isReadable() : "Interceptor still active but buffer has data.";
  • }
  • // 计算已读字节数
  • int read = available - first.readableBytes();
  • // 如果全部读完,就将该ByteBuf从buffers链表中移除
  • if (read == available) {
  • buffers.removeFirst().release();
  • }
  • // 维护可读字节计数
  • totalSize -= read;
  • } else { // 没有拦截器
  • // Interceptor is not active, so try to decode one frame.
  • // 尝试解码帧数据
  • ByteBuf frame = decodeNext();
  • // 解码出来的帧数据为null,直接跳出循环
  • if (frame == null) {
  • break;
  • }
  • // 能够解码得到数据,传递给下一个Handler
  • ctx.fireChannelRead(frame);
  • }
  • }
  • }

从源码可知,当收到一个ByteBuf,channelRead(...)方法会先将其存入到buffers链表,并将其可读字节数添加到totalSize,然后在buffers链表不为空的情况下,不断取出链表头的ByteBuf进行处理,其实这里buffers链表充当了一个FIFO的队列,保证了ByteBuf的顺序。

取到的ByteBuf会根据是否有拦截器分别进行处理;拦截器会对消息进行拦截,在Spark中实现的可用拦截器只有StreamInterceptor,是用于流传输的场景,后面会讲解;在有拦截器的情况下,仅仅是将在拦截操作中读取的数据丢弃并维护可读字节数记录,并没有做其他的操作;另外需要注意的是,拦截器会在检查通过后被移除,否则会被理由为“Interceptor still active but buffer has data.”的断言终止。

在没有拦截器的情况下,会调用decodeNext()方法进行解码帧数据,该方法才是拆包的关键,源码如下:

org.apache.spark.network.util.TransportFrameDecoder#decodeNext
  • private ByteBuf decodeNext() throws Exception {
  • // 得到帧大小
  • long frameSize = decodeFrameSize();
  • // 检查帧大小的合法性
  • if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
  • return null;
  • }
  • // Reset size for next frame.
  • nextFrameSize = UNKNOWN_FRAME_SIZE;
  • Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
  • Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
  • // If the first buffer holds the entire frame, return it.
  • // 剩余可读帧数
  • int remaining = (int) frameSize;
  • /**
  • * 如果buffers中第一个ByteBuf的可读字节数大于等于可读帧数,
  • * 表示这一个ByteBuf包含了一整个帧的数据,可以一次读到一个帧
  • */
  • if (buffers.getFirst().readableBytes() >= remaining) {
  • // 读取一个帧的数据并返回
  • return nextBufferForFrame(remaining);
  • }
  • // Otherwise, create a composite buffer.
  • // 此时说明一个ByteBuf的数据不够一个帧,构造一个复合ByteBuf
  • CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
  • // 当还没读到一个帧的数据时,循环处理
  • while (remaining > 0) {
  • /**
  • * 获取buffers链表头的ByteBuf中remaining长度的数据
  • * 读取过程中如果将链表头的ByteBuf读完了,会将其从buffers中移除
  • */
  • ByteBuf next = nextBufferForFrame(remaining);
  • // 减去读到的数据
  • remaining -= next.readableBytes();
  • // 添加到CompositeByteBuf
  • frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
  • }
  • assert remaining == 0;
  • // 终于读完一个帧了,返回复合ByteBuf
  • return frame;
  • }

decodeNext()方法的实现其实也比较简单,它会先读取帧大小记录,如果帧大小记录合法,就根据其表示的帧大小读取相应长度的字节;这里需要注意的是,由于可能一个ByteBuf无法读满一个帧,在这种情况下使用了CompositeByteBuf,将同一个帧的所有ByteBuf存放在其中并返回。

TransportFrameDecoder在对数据进行拆包之后,会将得到的ByteBuf传递给下一个入站处理器,也即是MessageDecoder解码器。

1.3. MessageDecoder解码器

MessageDecoder与MessageEncoder刚好相反,它会将经过TransportFrameDecoder帧解码器解析后得到的ByteBuf类型的数据缓冲对象转换为Message类型的消息数据,定义如下:

org.apache.spark.network.protocol.MessageDecoder
  • /**
  • * Decoder used by the client side to encode server-to-client responses.
  • * This encoder is stateless so it is safe to be shared by multiple threads.
  • *
  • * 对从管道中读取的ByteBuf进行解析,防止丢包和解析错误。
  • */
  • @ChannelHandler.Sharable
  • public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  • ...
  • }

我们只需要关注它的decode(...)方法即可:

org.apache.spark.network.protocol.MessageDecoder#decode
  • @Override
  • public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
  • // 从ByteBuf中获取消息类型
  • Message.Type msgType = Message.Type.decode(in);
  • // 使用重载的decode()方法进行解码
  • Message decoded = decode(msgType, in);
  • // 检查解码后的消息类型是否正确
  • assert decoded.type() == msgType;
  • logger.trace("Received message {}: {}", msgType, decoded);
  • // 将解码后的消息添加到out中
  • out.add(decoded);
  • }

decode(...)方法其实调用了重载的方法,源码如下:

org.apache.spark.network.protocol.MessageDecoder#decode
  • private Message decode(Message.Type msgType, ByteBuf in) {
  • // 根据消息类型,选择不同类型的消息类,使用它们的静态方法decode(...)进行解码
  • switch (msgType) {
  • case ChunkFetchRequest:
  • return ChunkFetchRequest.decode(in);
  • case ChunkFetchSuccess:
  • return ChunkFetchSuccess.decode(in);
  • case ChunkFetchFailure:
  • return ChunkFetchFailure.decode(in);
  • case RpcRequest:
  • return RpcRequest.decode(in);
  • case RpcResponse:
  • return RpcResponse.decode(in);
  • case RpcFailure:
  • return RpcFailure.decode(in);
  • case OneWayMessage:
  • return OneWayMessage.decode(in);
  • case StreamRequest:
  • return StreamRequest.decode(in);
  • case StreamResponse:
  • return StreamResponse.decode(in);
  • case StreamFailure:
  • return StreamFailure.decode(in);
  • default:
  • throw new IllegalArgumentException("Unexpected message type: " + msgType);
  • }
  • }

可见,MessageDecoder其实将消息的解码委托给了各种Message具体的实现类,具体的实现大家可以参阅对应的源码,这里不多赘述。

1.4. TransportChannelHandler处理器

从前面的代码可知,TransportChannelHandler是通过TransportContext的createChannelHandler(...)方法创建的,源码如下:

org.apache.spark.network.TransportContext#createChannelHandler
  • /**
  • * Creates the server- and client-side handler which is used to handle both RequestMessages and
  • * ResponseMessages. The channel is expected to have been successfully created, though certain
  • * properties (such as the remoteAddress()) may not be available yet.
  • *
  • * TransportChannelHandler在服务端将代理Transport-RequestHandler对请求消息进行处理,
  • * 并在客户端代理TransportResponseHandler对响应消息进行处理。
  • */
  • private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
  • // 创建处理传输响应的处理器
  • TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
  • // 创建TransportClient
  • TransportClient client = new TransportClient(channel, responseHandler);
  • // 创建处理传输请求的处理器
  • TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler);
  • // 将TransportClient、TransportResponseHandler和TransportRequestHandler绑定在一个TransportChannelHandler上
  • return new TransportChannelHandler(client, responseHandler, requestHandler,
  • conf.connectionTimeoutMs(), closeIdleConnections);
  • }

TransportChannelHandler是继承自SimpleChannelInboundHandler,说明它也是一个进站处理器,定义如下:

org.apache.spark.network.server.TransportChannelHandler
  • /**
  • * The single Transport-level Channel handler which is used for delegating requests to the
  • * {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
  • *
  • * All channels created in the transport layer are bidirectional. When the Client initiates a Netty
  • * Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
  • * will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
  • * also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
  • * Client.
  • * This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
  • * for the Client's responses to the Server's requests.
  • *
  • * This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
  • * We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
  • * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
  • * timeout if the client is continuously sending but getting no responses, for simplicity.
  • *
  • * 代理由TransportRequestHandler处理的请求和由TransportResponseHandler处理的响应,并加入传输层的处理。
  • * 继承了SimpleChannelInboundHandler,说明是入站处理器
  • */
  • public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
  • ...
  • private final TransportClient client;
  • // 代理的响应处理器和请求处理器
  • private final TransportResponseHandler responseHandler;
  • private final TransportRequestHandler requestHandler;
  • // 超时时间(纳秒)
  • private final long requestTimeoutNs;
  • // 是否关闭空闲连接
  • private final boolean closeIdleConnections;
  • ...
  • }

从TransportChannelHandler的创建过程可知,TransportChannelHandler中包装了TransportResponseHandler、TransportRequestHandler和TransportClient三个对象。

这里我们需要思考,为什么向TransportChannelHandler处理器中添加了请求和响应两种处理器?

其实,这里的请求和响应是针对于消息类型而言的,从前面对TransportClient的分析可知,TransportClient会向服务端发送RpcMessage、StreamRequest、ChunkFetchRequest和OneWayMessage;而且与此对应的,服务端向客户端返回的消息正是RpcResponse、RpcFailure、StreamResponse、StreamFailure、ChunkFetchSuccess和ChunkFetchFailure。这也就将服务端和客户端发送的消息类型给区分开了。

这两类消息对于服务端和客户端来说都是入站数据,但是会根据类别委托给TransportRequestHandler和TransportResponseHandler分别处理。这也就解释了,在客户端和服务端使用相同的处理器链时,如何对消息区别处理。

TransportRequestHandler负责处理服务端的入站消息,而构建的TransportClient对象中保存了Channel对象以及TransportResponseHandler,它们将协同工作负责客户端的入站消息。

在TransportChannelHandler重写的各类方法中,几乎都是将响应的操作代理给了TransportRequestHandler和TransportResponseHandler两个处理器,以其中最重要的channelRead0(...)方法为例:

org.apache.spark.network.server.TransportChannelHandler#channelRead0
  • @Override
  • public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
  • if (request instanceof RequestMessage) { // Request消息,交给requestHandler处理
  • requestHandler.handle((RequestMessage) request);
  • } else { // Response消息,交给responseHandler处理
  • responseHandler.handle((ResponseMessage) request);
  • }
  • }

可见,当TransportChannelHandler收到的是RequestMessage消息时,会将其交给TransportRequestHandler处理,当收到的是ResponseMessage消息时,会交给TransportResponseHandler处理。

1.4.1. TransportRequestHandler处理器

TransportRequestHandler继承自MessageHandler类,MessageHandler类比较简单,它仅仅是定义了一类抽象方法,如下:

注:TransportResponseHandler也继承自MessageHandler类。

org.apache.spark.network.server.MessageHandler
  • /**
  • * Handles either request or response messages coming off of Netty. A MessageHandler instance
  • * is associated with a single Netty Channel (though it may have multiple clients on the same
  • * Channel.)
  • */
  • public abstract class MessageHandler<T extends Message> {
  • /**
  • * Handles the receipt of a single message.
  • * 用于对接收到的单个消息进行处理
  • **/
  • public abstract void handle(T message) throws Exception;
  • /**
  • * Invoked when the channel this MessageHandler is on is active.
  • * 当Channel激活时调用
  • **/
  • public abstract void channelActive();
  • /**
  • * Invoked when an exception was caught on the Channel.
  • * 当捕获到异常时调用
  • **/
  • public abstract void exceptionCaught(Throwable cause);
  • /**
  • * Invoked when the channel this MessageHandler is on is inactive.
  • * 当Channel非激活时调用
  • **/
  • public abstract void channelInactive();
  • }

TransportRequestHandler的定义中,有几个比较重要的字段:

  • /**
  • * A handler that processes requests from clients and writes chunk data back. Each handler is
  • * attached to a single Netty channel, and keeps track of which streams have been fetched via this
  • * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
  • *
  • * The messages should have been processed by the pipeline setup by {@link TransportServer}.
  • *
  • * 用于处理客户端的请求并在写完块数据后返回的处理程序。
  • */
  • public class TransportRequestHandler extends MessageHandler<RequestMessage> {
  • /** The Netty channel that this handler is associated with. */
  • private final Channel channel;
  • /** Handles all RPC messages.
  • * 用于处理RPC请求消息
  • */
  • private final RpcHandler rpcHandler;
  • /** Returns each chunk part of a stream.
  • * 用于处理流请求消息
  • */
  • private final StreamManager streamManager;
  • ...
  • }

Channel类型的channel字段比较简单,它是Netty提供的通信抽象,是服务端与客户端进行通信的通道,该对象主要用于数据直接的通信。

由于服务端接收的RequestMessage中,有简单的RPC请求,也有拉取数据的请求(例如Shuffle过程),这里的RpcHandler和StreamManager则是分别处理两类请求消息的处理器。

在前面的分析中,TransportChannelHandler会将RequestMessage类型的消息交给TransportRequestHandler的handle(...)方法进行处理,该方法源码如下:

org.apache.spark.network.server.TransportRequestHandler#handle
  • // 根据请求类型处理各类请求
  • @Override
  • public void handle(RequestMessage request) {
  • if (request instanceof ChunkFetchRequest) {
  • // 处理块获取请求
  • processFetchRequest((ChunkFetchRequest) request);
  • } else if (request instanceof RpcRequest) {
  • // 处理需要回复的RPC请求
  • processRpcRequest((RpcRequest) request);
  • } else if (request instanceof OneWayMessage) {
  • // 处理无需回复的RPC请求
  • processOneWayMessage((OneWayMessage) request);
  • } else if (request instanceof StreamRequest) {
  • // 处理流请求
  • processStreamRequest((StreamRequest) request);
  • } else {
  • throw new IllegalArgumentException("Unknown request type: " + request);
  • }
  • }

可见,对于不同类型的RequestMessage,handle(...)方法又会委托给内部的其他方法。

1.4.1.1. 处理ChunkFetchRequest

processFetchRequest(...)方法用于处理ChunkFetchRequest类型的消息,它的源码如下:

org.apache.spark.network.server.TransportRequestHandler#processFetchRequest
  • // 处理块获取请求
  • private void processFetchRequest(final ChunkFetchRequest req) {
  • if (logger.isTraceEnabled()) {
  • logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
  • req.streamChunkId);
  • }
  • ManagedBuffer buf;
  • try {
  • // 检查权限,校验客户端是否有权限从给定的流读取数据
  • streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
  • // 将流与客户端的一个TCP连接进行关联,保证对于单个的流只会有一个客户端读取,流关闭后就不能重用了
  • streamManager.registerChannel(channel, req.streamChunkId.streamId);
  • // 获取单个的块,被封装为ManagedBuffer对象,不能并行调用
  • buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
  • } catch (Exception e) {
  • logger.error(String.format("Error opening block %s for request from %s",
  • req.streamChunkId, getRemoteAddress(channel)), e);
  • // 读取出错,封装为ChunkFetchFailure后由respond()方法返回
  • respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
  • return;
  • }
  • // 读取成功,封装为ChunkFetchSuccess后由respond()方法返回
  • respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  • }

在上述方法中是通过StreamManager流管理器来负责所有的操作的;Spark中StreamManager流管理器的实现有OneForOneStreamManager和NettyStreamManager,我们将在后面探讨它们的实现。

1.4.1.2. 处理StreamRequest

与处理ChunkFetchRequest消息类似,StreamRequest消息也借助于StreamManager,源码如下:

org.apache.spark.network.server.TransportRequestHandler#processStreamRequest
  • // 处理流请求
  • private void processStreamRequest(final StreamRequest req) {
  • ManagedBuffer buf;
  • try {
  • // 使用StreamManager将获取到的流数据封装为ManagedBuffer
  • buf = streamManager.openStream(req.streamId);
  • } catch (Exception e) {
  • logger.error(String.format(
  • "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
  • // 失败时将响应包装为StreamFailure进行响应
  • respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
  • return;
  • }
  • if (buf != null) {
  • // 成功时将响应包装为StreamResponse进行响应
  • respond(new StreamResponse(req.streamId, buf.size(), buf));
  • } else {
  • // 失败时将响应包装为StreamFailure进行响应
  • respond(new StreamFailure(req.streamId, String.format(
  • "Stream '%s' was not found.", req.streamId)));
  • }
  • }

1.4.1.3. 处理RpcRequest

RpcRequest消息则由processRpcRequest(...)方法处理,它内部借助了RpcHandler的receive(...)方法:

org.apache.spark.network.server.TransportRequestHandler#processRpcRequest
  • // 处理需要回复的RPC请求
  • private void processRpcRequest(final RpcRequest req) {
  • try {
  • /**
  • * 将RpcRequest消息的内容体、发送消息的客户端及RpcResponseCallback回调传递给RpcHandler的receive方法
  • * 具体的处理有RPCHandler具体的实现类的receive()方法处理,
  • * 最终一定会调用RpcResponseCallback回调对象的相关方法响应处理
  • */
  • rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
  • @Override
  • public void onSuccess(ByteBuffer response) {
  • respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
  • }
  • @Override
  • public void onFailure(Throwable e) {
  • respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
  • }
  • });
  • } catch (Exception e) {
  • logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
  • respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
  • } finally {
  • req.body().release();
  • }
  • }

1.4.1.4. 处理OneWayMessage

OneWayMessage消息则由processOneWayMessage(...)方法处理,它内部也借助了RpcHandler的receive(...)方法:

org.apache.spark.network.server.TransportRequestHandler#processOneWayMessage
  • // 处理无需回复的RPC请求
  • private void processOneWayMessage(OneWayMessage req) {
  • try {
  • // 使用RpcHandler具体实现类的receive()方法处理,没有传入回调参数,即默认回调为ONE_WAY_CALLBACK
  • rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
  • } catch (Exception e) {
  • logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
  • } finally {
  • req.body().release();
  • }
  • }

1.4.1.5. 消息的回复

在上面的代码中,对于需要回复的ChunkFetchRequest、StreamRequest和RpcRequest三类消息,最终都调用了respond(...)方法进行消息回复。回复操作比较简单,直接通过Channel的writeAndFlush(...)将回复数据写出即可:

org.apache.spark.network.server.TransportRequestHandler#respond
  • /**
  • * Responds to a single message with some Encodable object. If a failure occurs while sending,
  • * it will be logged and the channel closed.
  • */
  • private void respond(final Encodable result) {
  • // 获取远程地址用于打印日志
  • final SocketAddress remoteAddress = channel.remoteAddress();
  • // 写出数据
  • channel.writeAndFlush(result).addListener(
  • new ChannelFutureListener() {
  • @Override
  • public void operationComplete(ChannelFuture future) throws Exception {
  • if (future.isSuccess()) {
  • logger.trace("Sent result {} to client {}", result, remoteAddress);
  • } else {
  • logger.error(String.format("Error sending result %s to %s; closing connection",
  • result, remoteAddress), future.cause());
  • channel.close();
  • }
  • }
  • }
  • );
  • }

根据前面对MessageEncoder的分析,这里回复数据会被编码后发出。同时会添加监听器,在写出操作发生错误时关闭Channel。

1.4.1.6. RpcHandler

在上面讲解TransportRequestHandler中我们已经知道,ChunkFetchRequest和StreamRequest消息将交给StreamManager处理,而RpcRequest和OneWayMessage消息则交给RpcHandler处理,实现职责分工。这一节我们先来探讨一下RpcHandler,它是一个抽象类,定义了处理RPC请求的高层规范,典型的模板方法设计模式的实现;源码如下:

org.apache.spark.network.server.RpcHandler
  • /**
  • * Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.
  • * 对调用传输客户端(TransportClient)的sendRPC方法发送的消息进行处理的程序。
  • * 定义了RPC处理器的规范。
  • */
  • public abstract class RpcHandler {
  • private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
  • /**
  • * Receive a single RPC message. Any exception thrown while in this method will be sent back to
  • * the client in string form as a standard RPC failure.
  • *
  • * This method will not be called in parallel for a single TransportClient (i.e., channel).
  • *
  • * 用来接收单一的RPC消息,具体处理逻辑需要子类去实现。
  • *
  • * @param client A channel client which enables the handler to make requests back to the sender
  • * of this RPC. This will always be the exact same object for a particular channel.
  • * @param message The serialized bytes of the RPC.
  • * @param callback Callback which should be invoked exactly once upon success or failure of the
  • * RPC. 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,该回调都会被调用一次
  • */
  • public abstract void receive(
  • TransportClient client,
  • ByteBuffer message,
  • RpcResponseCallback callback);
  • /**
  • * Returns the StreamManager which contains the state about which streams are currently being
  • * fetched by a TransportClient.
  • * 获取StreamManager,主要是用于跟踪当前正在被TransportClient拉取的流的状态。
  • */
  • public abstract StreamManager getStreamManager();
  • /**
  • * Receives an RPC message that does not expect a reply. The default implementation will
  • * call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
  • * any of the callback methods are called.
  • *
  • * @param client A channel client which enables the handler to make requests back to the sender
  • * of this RPC. This will always be the exact same object for a particular channel.
  • * @param message The serialized bytes of the RPC.
  • */
  • public void receive(TransportClient client, ByteBuffer message) {
  • // 默认传入的callback是ONE_WAY_CALLBACK,它的回调方法只打印了一些日志,没有其他操作
  • receive(client, message, ONE_WAY_CALLBACK);
  • }
  • /**
  • * Invoked when the channel associated with the given client is active.
  • * 当与给定客户端相关联的Channel处于活动状态时调用
  • */
  • public void channelActive(TransportClient client) { }
  • /**
  • * Invoked when the channel associated with the given client is inactive.
  • * No further requests will come from this client.
  • * 当与给定客户端相关联的Channel处于非活动状态时调用
  • */
  • public void channelInactive(TransportClient client) { }
  • // 当Channel产生异常时调用
  • public void exceptionCaught(Throwable cause, TransportClient client) { }
  • }

从RpcHandler的源码可以看出,除了需要传入三个参数重载receive(...)getStreamManager()两个方法,其它的方法都已经默认实现了;channelActive(...)channelInactive(...)exceptionCaught(...)是针对Channel状态的处理,真正需要我们关注的,其实是receive(...)getStreamManager()两个方法。

getStreamManager()是抽象方法,需要子类实现,用于获取使用的StreamManager,这个后面会讲解。

receive(...)方法有两个重载版本,抽象版本需要传入clientmessagecallback三个参数,分别代表消息的来源客户端,接收到的消息数据以及回调函数,在具体的RpcHandler实现类中,一般会将处理后需要返回给来源客户端的数据通过回调函数callback传递到下层TransportRequestHandler进行响应处理;有具体实现的版本内部直接调用了抽象版本,传入了OneWayRpcCallback类型的回调;OneWayRpcCallback内部只是做了简单的日志打印,源码如下:

org.apache.spark.network.server.RpcHandler.OneWayRpcCallback
  • // 定义的无需回复的消息的回调
  • private static class OneWayRpcCallback implements RpcResponseCallback {
  • private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
  • // 仅仅记录日志
  • @Override
  • public void onSuccess(ByteBuffer response) {
  • logger.warn("Response provided for one-way RPC.");
  • }
  • // 仅仅记录日志
  • @Override
  • public void onFailure(Throwable e) {
  • logger.error("Error response provided for one-way RPC.", e);
  • }
  • }

在后面的实现中,我们会详细讲解RpcHandler的实现类。

1.4.1.7. StreamManager

StreamManager主要用于处理ChunkFetchRequest和StreamRequest两种消息,它们分别代表块获取请求和流请求。StreamManager也是一个抽象类,源码如下:

org.apache.spark.network.server.StreamManager
  • /**
  • * The StreamManager is used to fetch individual chunks from a stream. This is used in
  • * {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the
  • * stream is outside the scope of the transport layer, but a given stream is guaranteed to be read
  • * by only one client connection, meaning that getChunk() for a particular stream will be called
  • * serially and that once the connection associated with the stream is closed, that stream will
  • * never be used again.
  • */
  • public abstract class StreamManager {
  • /**
  • * Called in response to a fetchChunk() request. The returned buffer will be passed as-is to the
  • * client. A single stream will be associated with a single TCP connection, so this method
  • * will not be called in parallel for a particular stream.
  • *
  • * Chunks may be requested in any order, and requests may be repeated, but it is not required
  • * that implementations support this behavior.
  • *
  • * The returned ManagedBuffer will be release()'d after being written to the network.
  • *
  • * 用于从Stream ID指定的流中获取索引从0至chunkIndex的块数据,返回的是ManagedBuffer对象
  • *
  • * @param streamId id of a stream that has been previously registered with the StreamManager.
  • * Stream ID用于从StreamManager中获取对应的流
  • * @param chunkIndex 0-indexed chunk of the stream that's requested
  • * 索引从0至chunkIndex为该方法需要拉取的块
  • */
  • public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
  • /**
  • * Called in response to a stream() request. The returned data is streamed to the client
  • * through a single TCP connection.
  • *
  • * Note the <code>streamId</code> argument is not related to the similarly named argument in the
  • * {@link #getChunk(long, int)} method.
  • *
  • * 打开Stream ID对应的流,返回的是ManagedBuffer对象
  • *
  • * @param streamId id of a stream that has been previously registered with the StreamManager.
  • * @return A managed buffer for the stream, or null if the stream was not found.
  • */
  • public ManagedBuffer openStream(String streamId) {
  • throw new UnsupportedOperationException();
  • }
  • /**
  • * Associates a stream with a single client connection, which is guaranteed to be the only reader
  • * of the stream. The getChunk() method will be called serially on this connection and once the
  • * connection is closed, the stream will never be used again, enabling cleanup.
  • *
  • * This must be called before the first getChunk() on the stream, but it may be invoked multiple
  • * times with the same channel and stream id.
  • *
  • * 将指定的Channel与流绑定
  • */
  • public void registerChannel(Channel channel, long streamId) { }
  • /**
  • * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
  • * to read from the associated streams again, so any state can be cleaned up.
  • *
  • * 关闭Channel。该操作执行后Channel绑定的流将不会被读取,Channel与流的绑定也会被清除
  • */
  • public void connectionTerminated(Channel channel) { }
  • /**
  • * Verify that the client is authorized to read from the given stream.
  • *
  • * 检查认证的方法,验证对应的客户端是否有权限从指定的流读取数据
  • *
  • * @throws SecurityException If client is not authorized.
  • */
  • public void checkAuthorization(TransportClient client, long streamId) { }
  • }

从StreamManager所规定的方法来看,我们不难发现,StreamManager内部会将服务端与客户端通信的Channel与具体的流绑定在一起,保证对于特定的流同时只会有一个客户端读取。

StreamManager的实现有两个:NettyStreamManager和OneForOneStreamManager。这里以OneForOneStreamManager简单介绍各个方法的具体实现。

1.4.1.7.1. OneForOneStreamManager

从OneForOneStreamManager的命名就可以看出,该流管理器是用于一对一流传输操作的。OneForOneStreamManager实现了StreamManager中除openStream(...)以外所有的方法,我们将从功能特性上分别讨论。

  1. OneForOneStreamManager的结构。

OneForOneStreamManager继承自StreamManager抽象类,它有两个重要的字段,如下:

  • // 用于生成数据流的标识,类型为AtomicLong。
  • private final AtomicLong nextStreamId;
  • // 维护streamId与StreamState之间映射关系的缓存。
  • private final ConcurrentHashMap<Long, StreamState> streams;

其中nextStreamId用于生成流的ID标识,streams是一个线程安全的Map,用于存放流的ID与表示流的StreamState状态对象的映射关系。

StreamState对象是OneForOneStreamManager的私有静态内部类,定义如下:

org.apache.spark.network.server.OneForOneStreamManager.StreamState
  • /** State of a single stream. */
  • private static class StreamState {
  • // 请求流所属的应用程序ID。此属性只有在ExternalShuffleClient启用后才会用到。
  • final String appId;
  • // ManagedBuffer的缓冲。
  • final Iterator<ManagedBuffer> buffers;
  • // The channel associated to the stream
  • // 与当前流相关联的Channel。
  • Channel associatedChannel = null;
  • // Used to keep track of the index of the buffer that the user has retrieved, just to ensure
  • // that the caller only requests each chunk one at a time, in order.
  • // 为了保证客户端按顺序每次请求一个块,所以用此属性跟踪客户端当前接收到的ManagedBuffer的索引。
  • int curChunk = 0;
  • StreamState(String appId, Iterator<ManagedBuffer> buffers) {
  • this.appId = appId;
  • this.buffers = Preconditions.checkNotNull(buffers);
  • }
  • }

这里我们只需要关注,每个流只会所属于一个Application,同时流的表示其实是一个Iterator类型的迭代器对象。

在OneForOneStreamManager实例被构造时,它会初始化nextStreamIdstreams两个字段:

org.apache.spark.network.server.OneForOneStreamManager#OneForOneStreamManager
  • public OneForOneStreamManager() {
  • // For debugging purposes, start with a random stream id to help identifying different streams.
  • // This does not need to be globally unique, only unique to this class.
  • nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
  • streams = new ConcurrentHashMap<>();
  • }
  1. 流从哪里来?

registerStream(...)方法是OneForOneStreamManager自有的方法,用于将指定的流注册到OneForOneStreamManager,实现如下:

org.apache.spark.network.server.OneForOneStreamManager#registerStream
  • /**
  • * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
  • * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
  • * client connection is closed before the iterator is fully drained, then the remaining buffers
  • * will all be release()'d.
  • *
  • * If an app ID is provided, only callers who've authenticated with the given app ID will be
  • * allowed to fetch from this stream.
  • *
  • * 向OneForOneStreamManager的streams缓存中注册流。
  • */
  • public long registerStream(String appId, Iterator<ManagedBuffer> buffers) {
  • // 生成新的Stream ID
  • long myStreamId = nextStreamId.getAndIncrement();
  • // 添加到streams字典
  • streams.put(myStreamId, new StreamState(appId, buffers));
  • // 返回生成的Stream ID
  • return myStreamId;
  • }

所谓的注册,其实就是使用nextStreamId分配一个流ID,然后根据传入的appIdbuffers创建一个StreamState对象,存入streams字典中即可。

  1. 如何验证客户端是否有权限访问流?

通过checkAuthorization(...)方法,它是StreamManager指定需要实现的方法,在OneForOneStreamManager中的实现如下:

org.apache.spark.network.server.OneForOneStreamManager#checkAuthorization
  • @Override
  • public void checkAuthorization(TransportClient client, long streamId) {
  • /**
  • * 如果没有配置对管道进行SASL认证,TransportClient的clientId为null,因而实际上并不走权限检查。
  • * 当启用了SASL认证,客户端需要给TransportClient的clientId赋值,因此才会走此检查。
  • */
  • if (client.getClientId() != null) { // 需要进行权限检查
  • // 获取streamId对应的流状态
  • StreamState state = streams.get(streamId);
  • // 检查对应的StreamState是否为空
  • Preconditions.checkArgument(state != null, "Unknown stream ID.");
  • // TransportClient的clientId属性值是否与streamId对应的StreamState的appId的值相等
  • if (!client.getClientId().equals(state.appId)) {
  • // 不相等说明权限验证失败
  • throw new SecurityException(String.format(
  • "Client %s not authorized to read stream %d (app %s).",
  • client.getClientId(),
  • streamId,
  • state.appId));
  • }
  • }
  • }

实现比较简单,其实就是校验TransportClient中保存的Client ID是否与streamId指定的流的appId相同,如果不同则不允许访问该流。

  1. 如何将指定的Channel与流绑定在一起?

通过registerChannel(...)方法,它也是StreamManager指定需要实现的方法,在OneForOneStreamManager中的实现如下:

org.apache.spark.network.server.OneForOneStreamManager#registerChannel
  • /**
  • * 注册Channel。将一个流和一条(只能是一条)客户端的TCP连接关联起来,
  • * 这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了。
  • */
  • @Override
  • public void registerChannel(Channel channel, long streamId) {
  • if (streams.containsKey(streamId)) {
  • // 将传入Channel关联到传入的streamId对应的StreamState的associatedChannel字段上
  • streams.get(streamId).associatedChannel = channel;
  • }
  • }

实现很简单,即使用流对应的StreamState对象中的associatedChannel字段记录指定的Channel对象。

  1. 如何获取流中单个的块?

getChunk(...)方法用于获取单个块,重写了StreamManager中的抽象方法,源码如下:

org.apache.spark.network.server.OneForOneStreamManager#getChunk
  • // 获取单个的块(块被封装为ManagedBuffer)。
  • @Override
  • public ManagedBuffer getChunk(long streamId, int chunkIndex) {
  • // 从streams中获取StreamState
  • StreamState state = streams.get(streamId);
  • // 获取的块不等于当前块,抛出异常
  • if (chunkIndex != state.curChunk) {
  • throw new IllegalStateException(String.format(
  • "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
  • } else if (!state.buffers.hasNext()) {
  • // buffers缓冲中的ManagedBuffer,已经全部被客户端获取,抛出异常
  • throw new IllegalStateException(String.format(
  • "Requested chunk index beyond end %s", chunkIndex));
  • }
  • // 将StreamState的curChunk加1,为下次接收请求做好准备
  • state.curChunk += 1;
  • // 从buffers缓冲中获取ManagedBuffer
  • ManagedBuffer nextChunk = state.buffers.next();
  • if (!state.buffers.hasNext()) { // buffers缓冲中的ManagedBuffer,已经全部被客户端获取
  • logger.trace("Removing stream id {}", streamId);
  • // 移除对应的StreamState
  • streams.remove(streamId);
  • }
  • return nextChunk;
  • }

从该方法的实现可知,所谓的获取块,其实就是迭代获取流对应的Iterator类型的迭代器buffers中的元素并返回。这里需要注意的一点是,StreamState的curChunk记录了下次将要拉取的块索引,在每次获取块时都会校验该索引,以保证拉取操作是按序进行的。

注:ManagedBuffer就是用于装载块数据的包装类,其中封装了获取块数据长度、块数据ByteBuffer缓冲和块数据对应的流的方法,具体实现类有多种,将在后面介绍。

  1. 如何取消指定Channel与流的绑定?

connectionTerminated(...)方法用于取消Channel与流的绑定,源码如下:

org.apache.spark.network.server.OneForOneStreamManager#connectionTerminated
  • @Override
  • public void connectionTerminated(Channel channel) {
  • // Close all streams which have been associated with the channel.
  • for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
  • StreamState state = entry.getValue();
  • if (state.associatedChannel == channel) {
  • streams.remove(entry.getKey());
  • // Release all remaining buffers.
  • while (state.buffers.hasNext()) {
  • state.buffers.next().release();
  • }
  • }
  • }
  • }

从源码可知,它会校验二者是否存在绑定关系,如果存在则将它们从streams字典中移除,然后释放流对应的所有的ManagedBuffer对象。

注:OneForOneStreamManager没有实现StreamManager中的openStream(...)方法,该方法在另一个StreamManager的子类NettyStreamManager中有实现,主要是用于返回包含了特定文件的File实例的FileSegmentManagedBuffer对象,以流的形式对文件进行访问。后面会讲解。

有了对RpcHandler和StreamManager的理解,我们可以得知TransportRequestHandler中对RpcHandler和StreamManager的定位,示意图如下:

3.RpcHandler和StreamManager的抽象.png

1.4.2. TransportResponseHandler处理器

TransportResponseHandler与TransportRequestHandler类似,也继承自MessageHandler类。在前面我们提到过,TransportChannelHandler在收到ResponseMessage的时候,将会交给TransportResponseHandler的handle(...)方法进行处理,不过,在讲解该方法之前,我们需要回顾TransportClient发送消息的过程。

不知道大家还是否记得,在前面讲解的内容中,TransportClient发送RpcRequest、ChunkFetchRequest和StreamRequest时,都会将对应的回调对象保存到TransportResponseHandler处理器中,对应于TransportResponseHandler中的三个字段如下:

注:可回顾本文前面的2.5.2 ~ 2.5.4节。

org.apache.spark.network.client.TransportResponseHandler
  • // 存放ChunkFetchRequest请求对应的回调
  • private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches;
  • // 存放RpcRequest请求对应的回调
  • private final Map<Long, RpcResponseCallback> outstandingRpcs;
  • // 存放StreamRequest请求对应的回调
  • private final Queue<StreamCallback> streamCallbacks;

它们在TransportResponseHandler初始化时就会创建了:

org.apache.spark.network.client.TransportResponseHandler#TransportResponseHandler
  • public TransportResponseHandler(Channel channel) {
  • this.channel = channel;
  • this.outstandingFetches = new ConcurrentHashMap<>();
  • this.outstandingRpcs = new ConcurrentHashMap<>();
  • this.streamCallbacks = new ConcurrentLinkedQueue<>();
  • this.timeOfLastRequestNs = new AtomicLong(0);
  • }

同时,TransportResponseHandler对象提供了操作这些集合的方法:

org.apache.spark.network.client.TransportResponseHandler
  • // 添加ChunkFetchRequest请求对应回调
  • public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
  • // 将更新最后一次请求的时间为当前系统时间
  • updateTimeOfLastRequest();
  • // 将StreamChunk ID和对应的ChunkReceivedCallback回调存入outstandingRpcs(ConcurrentHashMap)
  • outstandingFetches.put(streamChunkId, callback);
  • }
  • // 移除ChunkFetchRequest请求对应回调
  • public void removeFetchRequest(StreamChunkId streamChunkId) {
  • outstandingFetches.remove(streamChunkId);
  • }
  • // 添加RpcRequest请求对应回调
  • public void addRpcRequest(long requestId, RpcResponseCallback callback) {
  • // 更新最后一次请求的时间为当前系统时间
  • updateTimeOfLastRequest();
  • // 将Request ID和对应的RpcResponseCallback回调存入outstandingRpcs(ConcurrentHashMap)
  • outstandingRpcs.put(requestId, callback);
  • }
  • // 移除RpcRequest请求对应回调
  • public void removeRpcRequest(long requestId) {
  • outstandingRpcs.remove(requestId);
  • }
  • // 添加StreamRequest请求对应回调
  • public void addStreamCallback(StreamCallback callback) {
  • timeOfLastRequestNs.set(System.nanoTime());
  • streamCallbacks.offer(callback);
  • }
  • // 标识streamActive为非激活状态
  • @VisibleForTesting
  • public void deactivateStream() {
  • streamActive = false;
  • }

有了这些了解,我们分析一下TransportResponseHandler的handle(...)方法的大体结构:

  • @Override
  • public void handle(ResponseMessage message) throws Exception {
  • if (message instanceof ChunkFetchSuccess) { // 块获取请求成功的响应
  • // 转换消息类型
  • ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
  • ...
  • } else if (message instanceof ChunkFetchFailure) { // 块获取请求失败的响应
  • // 转换消息类型
  • ChunkFetchFailure resp = (ChunkFetchFailure) message;
  • ...
  • } else if (message instanceof RpcResponse) { // RPC请求成功的响应
  • // 转换消息类型
  • RpcResponse resp = (RpcResponse) message;
  • ...
  • } else if (message instanceof RpcFailure) { // RPC请求失败的响应
  • // 转换消息类型
  • RpcFailure resp = (RpcFailure) message;
  • ...
  • } else if (message instanceof StreamResponse) { // 流获取请求成功的响应
  • // 转换消息类型
  • StreamResponse resp = (StreamResponse) message;
  • ...
  • } else if (message instanceof StreamFailure) { // 流获取请求失败的响应
  • // 转换消息类型
  • StreamFailure resp = (StreamFailure) message;
  • ...
  • } else {
  • throw new IllegalStateException("Unknown response type: " + message.type());
  • }
  • }

可见,对于六类响应详细,分别在不同的分支中进行的处理。下面我们将分析它们的详细处理过程。

1.4.2.1. 处理ChunkFetchSuccess

handle(...)方法中处理ChunkFetchSuccess消息的分支代码如下:

org.apache.spark.network.client.TransportResponseHandler#handle
  • if (message instanceof ChunkFetchSuccess) { // 块获取请求成功的响应
  • // 转换消息类型
  • ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
  • // 根据响应中的StreamChunk ID从outstandingRpcs中获取对应的回调监听器
  • ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
  • if (listener == null) { // 回调监听器为空
  • logger.warn("Ignoring response for block {} from {} since it is not outstanding",
  • resp.streamChunkId, getRemoteAddress(channel));
  • resp.body().release();
  • } else { // 回调监听器存在
  • // 先将其从outstandingFetches中移除
  • outstandingFetches.remove(resp.streamChunkId);
  • // 调用其onSuccess()回调方法
  • listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
  • resp.body().release();
  • }
  • }

从源码可知,处理流程是将消息数据转换为ChunkFetchSuccess对象,然后根据ChunkFetchSuccess对象中的streamChunkIdoutstandingFetches取出当时发送ChunkFetchRequest消息时保存的ChunkReceivedCallback,将响应的ChunkFetchSuccess对象交给该ChunkReceivedCallback进行处理,最终将ChunkReceivedCallback从outstandingFetches移除即可。

1.4.2.2. 处理RpcResponse

handle(...)方法中处理RpcResponse消息的分支代码如下:

org.apache.spark.network.client.TransportResponseHandler#handle
  • if (message instanceof RpcResponse) { // RPC请求成功的响应
  • // 转换消息类型
  • RpcResponse resp = (RpcResponse) message;
  • // 根据响应中的Request ID从outstandingRpcs中获取对应的回调监听器
  • RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
  • if (listener == null) { // 回调监听器为空
  • logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
  • resp.requestId, getRemoteAddress(channel), resp.body().size());
  • } else { // 回调监听器存在
  • // 先将其从outstandingRpcs中移除
  • outstandingRpcs.remove(resp.requestId);
  • try {
  • // 调用其onSuccess()方法
  • listener.onSuccess(resp.body().nioByteBuffer());
  • } finally {
  • resp.body().release();
  • }
  • }
  • }

上述对RpcResponse消息的处理流程,其实与处理ChunkFetchSuccess消息是类似的,不在赘述。

1.4.2.3. 处理StreamResponse

handle(...)方法中处理StreamResponse消息的分支代码如下:

org.apache.spark.network.client.TransportResponseHandler#handle
  • if (message instanceof StreamResponse) { // 流获取请求成功的响应
  • StreamResponse resp = (StreamResponse) message;
  • StreamCallback callback = streamCallbacks.poll();
  • if (callback != null) {
  • if (resp.byteCount > 0) {
  • StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
  • callback);
  • try {
  • TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
  • channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
  • frameDecoder.setInterceptor(interceptor);
  • streamActive = true;
  • } catch (Exception e) {
  • logger.error("Error installing stream handler.", e);
  • deactivateStream();
  • }
  • } else {
  • try {
  • callback.onComplete(resp.streamId);
  • } catch (Exception e) {
  • logger.warn("Error in stream handler onComplete().", e);
  • }
  • }
  • } else {
  • logger.error("Could not find callback for StreamResponse.");
  • }
  • }

在处理StreamResponse消息时,也是交给从TransportResponseHandler的streamCallbacks队列取出StreamCallback回调进行的;不同处在于,在回调对象不为null时,会根据StreamResponse消息的byteCount字段决定是否给处理器链中的TransportFrameDecoder帧解码器设置StreamInterceptor,这个操作是为了接收接下来的流数据。注意,此处创建的StreamInterceptor对象会关联Stream ID、流字节总数以及StreamCallback回调。

1.4.2.3.1. StreamInterceptor流拦截器

在讲解TransportFrameDecoder帧解码器的时候,我们提到过它的拦截器Interceptor,当时没有详细分析。其实在Spark中,Interceptor的实现类只有两种:StreamInterceptor和MockInterceptor,其中MockInterceptor还是用于测试的,我们不用关注;而处理StreamResponse消息时则会使用到StreamInterceptor拦截器。

Spark中服务端对于流请求的处理,会首先返回一个StreamResponse消息,其中标记了流的ID标识streamId和该流包含的数据的总大小byteCount,在客户端收到StreamResponse消息之后,往后的消息数据将会是源源不断的、有序的、所属于该流的正式数据;如下示意图:

4.StreamResponse响应及对应的流数据.png

正因为流数据的传输机制,在客户端收到StreamResponse消息时,如果解析得到的byteCount大于0,说明接下来需要接收正式的流数据,因此会往TransportFrameDecoder帧解码器设置StreamInterceptor拦截器;通过前面讲解的内容中我们知道,如果TransportFrameDecoder帧解码器的Interceptor拦截器不为空,数据将交给拦截器的handle(...)方法处理,回顾这部分源码:

org.apache.spark.network.util.TransportFrameDecoder#channelRead
  • @Override
  • public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
  • ...
  • // 遍历buffers链表
  • while (!buffers.isEmpty()) {
  • // First, feed the interceptor, and if it's still, active, try again.
  • if (interceptor != null) { // 有拦截器,让拦截器处理,拦截器只会处理一次
  • // 取出链表头的ByteBuf
  • ByteBuf first = buffers.getFirst();
  • // 计算可读字节数
  • int available = first.readableBytes();
  • // 先使用intercepter处理数据
  • if (feedInterceptor(first)) {
  • assert !first.isReadable() : "Interceptor still active but buffer has data.";
  • }
  • ...
  • } else { // 没有拦截器
  • ...
  • // 能够解码得到数据,传递给下一个Handler
  • ctx.fireChannelRead(frame);
  • }
  • }
  • }

我们关注StreamInterceptor的handle(...)方法,源码如下:

org.apache.spark.network.client.StreamInterceptor#handle
  • @Override
  • public boolean handle(ByteBuf buf) throws Exception {
  • // 本次要读取的数据字节数
  • int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
  • // 构造buf的分片并转换为NIO的ByteBuffer,该操作会从readerIndex开始,创建长度为toRead的分片
  • ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
  • // 获取可读字节数
  • int available = nioBuffer.remaining();
  • // 将数据传给StreamCallback回调对象
  • callback.onData(streamId, nioBuffer);
  • // 累计已读字节数
  • bytesRead += available;
  • // 判断读了多少
  • if (bytesRead > byteCount) { // 已读字节大于响应消息指定的总字节数
  • // 构造异常
  • RuntimeException re = new IllegalStateException(String.format(
  • "Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
  • // 将异常交给StreamCallback回调对象
  • callback.onFailure(streamId, re);
  • // 关闭流
  • handler.deactivateStream();
  • throw re;
  • } else if (bytesRead == byteCount) { // 已读字节等于响应消息指定的总字节数
  • // 即已经读完了,关闭流
  • handler.deactivateStream();
  • // 调用StreamCallback回调对象的方法告知读完了
  • callback.onComplete(streamId);
  • }
  • // 返回值表示是否读完
  • return bytesRead != byteCount;
  • }

对于StreamResponse之后的流数据,都会交给该方法处理,它会读取本次传输的数据,传递给callbackonData(...)方法处理,这里的callback即是在TransportClient发送StreamRequest时存放在TransportResponseHandler的streamCallbacks队列里的StreamCallback回调。

handle(...)方法后面的代码会判断流数据的读取情况,在读取过多、读取完成的情况下都会将TransportResponseHandler中用于记录流的状态字段streamActive置为false,清除流的激活状态,然后使用StreamCallback回调对象的相应方法进行处理。

注:流数据被StreamInterceptor处理后,并不会交给TransportFrameDecoder后续的处理器进行处理,因为没有必要;读者可以回顾TransportFrameDecoder的channelRead(...)方法,只有在没有拦截器的情况下,才会将解码得到数据通过ctx.fireChannelRead(frame)传递给下一个Handler。

1.4.2.4. 处理Failure消息

对于ChunkFetchFailure、RpcFailure和StreamFailure三类消息的处理其实都是类似的,所以放在一节中介绍:

  • if (message instanceof ChunkFetchFailure) { // 块获取请求失败的响应
  • // 转换消息类型
  • ChunkFetchFailure resp = (ChunkFetchFailure) message;
  • // 根据响应中的StreamChunk ID从outstandingRpcs中获取对应的回调监听器
  • ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
  • if (listener == null) { // 回调监听器为空
  • logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
  • resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
  • } else { // 回调监听器存在
  • // 先将其从outstandingRpcs中移除
  • outstandingFetches.remove(resp.streamChunkId);
  • // 调用其onFailure()方法
  • listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
  • "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
  • }
  • }
  • ...
  • if (message instanceof RpcFailure) { // RPC请求失败的响应
  • // 转换消息类型
  • RpcFailure resp = (RpcFailure) message;
  • // 根据响应中的Request ID从outstandingRpcs中获取对应的回调监听器
  • RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
  • if (listener == null) { // 回调监听器为空
  • logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
  • resp.requestId, getRemoteAddress(channel), resp.errorString);
  • } else { // 回调监听器存在
  • // 先将其从outstandingRpcs中移除
  • outstandingRpcs.remove(resp.requestId);
  • // 调用其onFailure()方法,传入RuntimeException
  • listener.onFailure(new RuntimeException(resp.errorString));
  • }
  • }
  • ...
  • if (message instanceof StreamFailure) { // 流获取请求失败的响应
  • StreamFailure resp = (StreamFailure) message;
  • StreamCallback callback = streamCallbacks.poll();
  • if (callback != null) {
  • try {
  • callback.onFailure(resp.streamId, new RuntimeException(resp.error));
  • } catch (IOException ioe) {
  • logger.warn("Error in stream failure handler.", ioe);
  • }
  • } else {
  • logger.warn("Stream failure with unknown callback: {}", resp.error);
  • }
  • }

可见,在收到失败响应时,都会调用响应对象的onFailure(...)方法,并将回调对象从各自所在的容器中移除。

2. 传输层架构总结

经过前面的一系列讲解,相信大家对于Spark的传输层应该有一定的了解了,在本节将对前面的内容进行一个总结。

Spark传输层由TransportServer和TransportClient进行支撑,使用Netty框架作为底层实现;传输层负责服务端构建、客户端构建、客户端与服务端的连接与数据交互、数据的编解码等操作。

TransportServerBootstrap和TransportClientBootstrap分别负责服务端接收请求和客户端发送请求的引导程序,二者实现了SASL的认证操作。

在使用Netty的底层实现中,使用ServerBootstrap构建服务端,使用Bootstrap作为客户端,二者使用的处理器链是一致的,其中TransportFrameDecoder负责入站数据的帧解码,MessageEncoder、MessageDecoder分别负责出站数据的编码和入站数据的解码,TransportChannelHandler负责入站消息的逻辑处理。

TransportFrameDecoder会将入站消息分为RequestMessage和ResponseMessage,分别交给TransportRequestHandler和TransportResponseHandler处理。

对于Spark通信传输层底层的Netty处理器链,有以下的数据流示意图:

5.传输层处理器链结构示意图.png