大数据
流式处理
Spark
源码解析

Spark源码分析06 - 通信架构04:高层实现(2)RpcHandler与消息的处理

简介:NettyRpcEnv将使用NettyRpcHandler来处理入站的RPC消息。

1. NettyRpcHandler

在前面分析NettyRpcEnv的初始化过程提到过,NettyRpcEnv使用的RpcHandler实现类是NettyRpcHandler;综合前面讲解传输层原理的内容可知,NettyRpcEnv将使用NettyRpcHandler来处理入站的RPC消息。NettyRpcHandler的定义和重要字段如下:

org.apache.spark.rpc.netty.NettyRpcHandler
  • /**
  • * Dispatches incoming RPCs to registered endpoints.
  • *
  • * The handler keeps track of all client instances that communicate with it, so that the RpcEnv
  • * knows which `TransportClient` instance to use when sending RPCs to a client endpoint (i.e.,
  • * one that is not listening for incoming connections, but rather needs to be contacted via the
  • * client socket).
  • *
  • * Events are sent on a per-connection basis, so if a client opens multiple connections to the
  • * RpcEnv, multiple connection / disconnection events will be created for that client (albeit
  • * with different `RpcAddress` information).
  • */
  • private[netty] class NettyRpcHandler(
  • dispatcher: Dispatcher,
  • nettyEnv: NettyRpcEnv,
  • streamManager: StreamManager) extends RpcHandler with Logging {
  • // A variable to track the remote RpcEnv addresses of all clients
  • // 用于跟踪远程客户端的RpcEnv地址的字典
  • private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
  • ...
  • }

NettyRpcEnv在实例化NettyRpcHandler对象时,传入了Dispatcher、StreamManager和自己;另外,NettyRpcHandler还有一个线程安全的字典remoteAddresses,它是用于记录发送请求的客户端的地址的,键和值都是RpcAddress对象,键是发送请求的客户端地址,值是发送请求的客户端的RpcEnv地址(也即是客户端上同时构建的服务端地址)。

注:读者可能对remoteAddresses的结构有疑惑,请继续往下阅读,会在后面进行解释。

1.1. 处理Channel状态

NettyRpcHandler实现了RpcHandler要求的所有方法,其中与Channel通道相关的有channelActive(...)exceptionCaught(...)channelInactive(...)三个。

1.1.1. Channel Active

channelActive(...)方法会在Channel会激活时被调用,此事件代表着有新的客户端连接到服务端了,channelActive(...)方法的源码如下:

org.apache.spark.rpc.netty.NettyRpcHandler#channelActive
  • // Channel激活时调用
  • override def channelActive(client: TransportClient): Unit = {
  • val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
  • assert(addr != null)
  • val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
  • // 投递RemoteProcessConnected消息,包含的是远端Client地址
  • dispatcher.postToAll(RemoteProcessConnected(clientAddr))
  • }

可见,channelActive(...)方法内会解析客户端的地址,将其包装为RpcAddress对象,然后封装到RemoteProcessConnected消息中,使用Dispatcher的postToAll(...)方法进行投递。

1.1.2. Exception Caught

exceptionCaught(...)方法会在出现异常的时候被调用,源码如下:

org.apache.spark.rpc.netty.NettyRpcHandler#exceptionCaught
  • override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
  • val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
  • if (addr != null) {
  • val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
  • // 投递RemoteProcessConnectionError事件,包含的是远端Client地址
  • dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
  • // If the remove RpcEnv listens to some address, we should also fire a
  • // RemoteProcessConnectionError for the remote RpcEnv listening address
  • val remoteEnvAddress = remoteAddresses.get(clientAddr)
  • if (remoteEnvAddress != null) {
  • // 投递RemoteProcessConnectionError事件,包含的是远端Server地址
  • dispatcher.postToAll(RemoteProcessConnectionError(cause, remoteEnvAddress))
  • }
  • } else {
  • // If the channel is closed before connecting, its remoteAddress will be null.
  • // See java.net.Socket.getRemoteSocketAddress
  • // Because we cannot get a RpcAddress, just log it
  • logError("Exception before connecting to the client", cause)
  • }
  • }

该方法同样会解析客户端地址信息,将其包装为RpcAddress对象,然后封装到RemoteProcessConnectionError消息中,使用Dispatcher的postToAll(...)方法进行投递。

不过这里多了一步操作,它会根据客户端的RpcAddress地址对象从remoteAddresses中获取对应的remoteEnvAddress,然后将其也封装到RemoteProcessConnectionError消息中进行投递。

这里即是读者们可能存在疑惑的地方,即前面讲到的,字典remoteAddresses的键和值都是RpcAddress对象,键是发送请求的客户端地址,值是发送请求的客户端的RpcEnv地址(也即是客户端上同时构建的服务端地址)。为什么客户端还会存在对应的remoteEnvAddress地址?

这是因为,Spark中的节点在同一时刻可能同时充当着服务端与客户端两种角色,在它接收其他节点的消息时是服务端角色,而它也需要向其他节点发送消息,此时它又是客户端角色;该节点会同时创建TransportServer和TransportClient。而NettyRpcHandler之所以存储了两种地址,是为了保持节点间的双向断开;例如,当A节点的客户端连接着B节点的服务端时,可能B节点的客户端也连接着A节点的服务端,当A节点的客户端与B节点的服务端断开连接时,此时B节点的服务端会感知并移除A节点客户端的本地记录,同时也会尝试断开B节点客户端与A节点服务端的连接。

exceptionCaught(...)方法的上述操作,正是在实现这部分功能。

1.1.3. Channel Inactive

exceptionCaught(...)方法类似的,channelInactive(...)在Channel激活失效时会被调用,此时它也实现了这部分功能,源码如下:

  • // Channel断开连接时调用;在当远端Client与本当前RPC环境断开连接时,还需要断开当前RPC环境到远端Server的连接
  • override def channelInactive(client: TransportClient): Unit = {
  • val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
  • if (addr != null) {
  • val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
  • // 移除对应的Outbox
  • nettyEnv.removeOutbox(clientAddr)
  • // 投递RemoteProcessDisconnected消息,包含的是远端Client地址
  • dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
  • // 从remoteAddresses中移除
  • val remoteEnvAddress = remoteAddresses.remove(clientAddr)
  • // If the remove RpcEnv listens to some address, we should also fire a
  • // RemoteProcessDisconnected for the remote RpcEnv listening address
  • // 断开与远端服务端RpcEnv的连接
  • if (remoteEnvAddress != null) {
  • // 投递RemoteProcessDisconnected消息,包含的是远端Server地址
  • dispatcher.postToAll(RemoteProcessDisconnected(remoteEnvAddress))
  • }
  • } else {
  • // If the channel is closed before connecting, its remoteAddress will be null. In this case,
  • // we can ignore it since we don't fire "Associated".
  • // See java.net.Socket.getRemoteSocketAddress
  • }
  • }

1.2. 处理消息接收

由于NettyRpcHandler继承了RpcHandler类,因此它必然是需要实现重载的两个receive(...)方法的,它们将用于处理接收的消息,源码如下:

org.apache.spark.rpc.netty.NettyRpcHandler#receive
  • /**
  • * @param client A channel client which enables the handler to make requests back to the sender
  • * of this RPC. This will always be the exact same object for a particular channel.
  • * 发送消息的TransportClient
  • * @param message The serialized bytes of the RPC. 消息序列化后的数据
  • * @param callback Callback which should be invoked exactly once upon success or failure of the RPC
  • * 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,该回调都会被调用一次
  • */
  • override def receive(
  • client: TransportClient,
  • message: ByteBuffer,
  • callback: RpcResponseCallback): Unit = {
  • // 转换消息数据为RequestMessage对象
  • val messageToDispatch = internalReceive(client, message)
  • // 将消息投递到Dispatcher中对应的Inbox中
  • dispatcher.postRemoteMessage(messageToDispatch, callback)
  • }
  • // 无需回复的消息
  • override def receive(
  • client: TransportClient,
  • message: ByteBuffer): Unit = {
  • val messageToDispatch = internalReceive(client, message)
  • dispatcher.postOneWayMessage(messageToDispatch)
  • }

两个方法对消息数据的处理是类似的,首先将发送数据的客户端对象和消息数据缓冲通过internalReceive(...)方法封装为RequestMessage,然后使用Dispatcher进行投递。internalReceive(...)方法的源码如下:

org.apache.spark.rpc.netty.NettyRpcHandler#internalReceive
  • // 反序列化消息数据为RequestMessage对象
  • private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
  • // 获取发送消息的TransportClient的InetSocketAddress,并构造为客户端地址对象RpcAddress
  • val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
  • assert(addr != null)
  • val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
  • // 反序列化消息为RequestMessage对象
  • val requestMessage = nettyEnv.deserialize[RequestMessage](client, message)
  • // 检查客户端RpcEnv地址是否为空
  • if (requestMessage.senderAddress == null) {
  • // Create a new message with the socket address of the client as the sender.
  • // 客户端RpcEnv地址为空,重新构造一个RequestMessage对象,客户端RpcEnv地址就是客户端的地址
  • RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)
  • } else { // 客户端RpcEnv地址不为空
  • // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for
  • // the listening address
  • val remoteEnvAddress = requestMessage.senderAddress
  • /**
  • * 将客户端RpcEnv地址存入remoteAddresses字典
  • * - 键为保存客户端节点的host和port的RpcAddress实例
  • * - 值为保存客户端节点的RpcEnv地址的host和port的RpcAddress实例
  • */
  • if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
  • // put后返回为null即表示成功,说明这是首次与该地址通信,则向Dispatcher投递RemoteProcessConnected事件
  • dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
  • }
  • requestMessage
  • }
  • }

注意,这里的RequestMessage与前面讲解传输层时的RequestMessage不一样,它是一个样例类,作用于高层实现,用于包装传输层解析RequestMessage得到的消息数据:

org.apache.spark.rpc.netty.RequestMessage
  • /**
  • * The message that is sent from the sender to the receiver.
  • *
  • * @param senderAddress 发送消息的客户端的RpcEnv地址
  • * @param receiver 消息接收者的RpcEndpointRef
  • * @param content 消息内容
  • */
  • private[netty] case class RequestMessage(
  • senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any)

internalReceive(...)方法会反序列化解析得到RequestMessage对象,这里有一个细节操作,即判断得到的RequestMessage对象的senderAddress字段是否为空,该值表示发送消息的客户端的RpcEnv地址;如果为空,会以客户端的地址作为该字段值重新构建RequestMessage对象,否则会以客户端地址为键、senderAddress字段为值,向remoteAddresses字典中添加映射关系,然后使用Dispatcher投递RemoteProcessConnected消息。相信大家对这个操作是非常熟悉的,与处理Channel状态时的操作一样,用于实现前面提到的“双向断开”。

1.3. 获取StreamManager

NettyRpcHandler还实现了RpcHandler要求实现的getStreamManager()方法获取StreamManager,不过其实现很简单,即直接返回NettyRpcEnv传递给它的StreamManager对象:

org.apache.spark.rpc.netty.NettyRpcHandler#getStreamManager
  • override def getStreamManager: StreamManager = streamManager

2. RpcEndpoint的启动

不知道大家在阅读完上面的内容后是否对RpcEnv有了一个深入的理解;接下来的内容,让我们回到当初以Master启动的流程中,我们回顾一下:Master的startRpcEnvAndEndpoint(...)方法是启动Master的入口方法,它内部会调用RpcEnv伴生对象的create(...)方法创建RpcEnv,而创建RpcEnv的任务是有NettyRpcEnvFactory工厂的create(...)方法完成的,回顾该方法的实现:

  • private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
  • // 创建NettyRpcEnv
  • def create(config: RpcEnvConfig): RpcEnv = {
  • // 创建序列化器
  • val sparkConf = config.conf
  • // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
  • // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
  • val javaSerializerInstance =
  • new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
  • // 通过序列化器、监听地址、安全管理器等构造NettyRpcEnv
  • val nettyEnv =
  • new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
  • config.securityManager)
  • if (!config.clientMode) { // 如果是Driver
  • // 定义启动RpcEnv的偏函数,该偏函数中会创建TransportServer
  • val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
  • // 调用NettyRpcEnv的startServer(),这里会创建TransportServer
  • nettyEnv.startServer(config.bindAddress, actualPort)
  • // 返回NettyRpcEnv和端口
  • (nettyEnv, nettyEnv.address.port)
  • }
  • try {
  • /**
  • * 在指定端口启动NettyRpcEnv,
  • * 该操作中会从config.port指定端口开始尝试创建并启动TransportServer,默认为7077
  • * 如果启动失败说明端口可能被占了,就自增端口号重试启动,默认重试16次
  • */
  • Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
  • } catch {
  • case NonFatal(e) =>
  • nettyEnv.shutdown()
  • throw e
  • }
  • }
  • // 返回NettyRpcEnv
  • nettyEnv
  • }
  • }

注:这里使用Utils的startServiceOnPort(...)方法辅助TransportServer的端口绑定操作,会从指定端口(默认为7077)开始尝试绑定,如果端口被占用就自增端口号重试绑定,默认重试16次。具体实现比较简单,就不赘述了。

在前面的介绍中,我们分析到创建NettyRpcEnv这一步时就转而去分析NettyRpcEnv的实现了;有了对NettyRpcEnv的理解,让我们回到上面的创建过程中;config.clientMode参数配置决定了是否使用NettyRpcEnv的startServer(...)方法启动TransportServer,对于Master来说,该参数自始至终一直是false,因此我们接下来考察一下NettyRpcEnv的startServer(...)方法,源码如下:

org.apache.spark.rpc.netty.NettyRpcEnv#startServer
  • // 创建TransportServer
  • def startServer(bindAddress: String, port: Int): Unit = {
  • // 先创建TransportServerBootstrap列表
  • val bootstraps: java.util.List[TransportServerBootstrap] =
  • if (securityManager.isAuthenticationEnabled()) {
  • java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
  • } else {
  • java.util.Collections.emptyList()
  • }
  • // 创建TransportServer
  • server = transportContext.createServer(bindAddress, port, bootstraps)
  • // 向Dispatcher注册RpcEndpointVerifier,注册名为endpoint-verifier。
  • dispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, // endpoint-verifier
  • new RpcEndpointVerifier(this, dispatcher)) // RpcEndpointVerifier用于校验指定名称的RpcEndpoint是否存在。
  • }

NettyRpcEnv的startServer(...)方法会根据指定的绑定主机地址及端口,使用TransportContext创建TransportServer,由server字段持有引用,同时使用Dispatcher的registerRpcEndpoint(...)方法住了一个名为RpcEndpointVerifier.NAME的RpcEndpoint端点RpcEndpointVerifier。

RpcEndpointVerifier是用于校验指定名称的RpcEndpoint是否存在于Dispatcher的,它的实现比较简单:

org.apache.spark.rpc.netty.RpcEndpointVerifier
  • /**
  • * An [[RpcEndpoint]] for remote [[RpcEnv]]s to query if an `RpcEndpoint` exists.
  • *
  • * This is used when setting up a remote endpoint reference.
  • */
  • private[netty] class RpcEndpointVerifier(override val rpcEnv: RpcEnv, dispatcher: Dispatcher)
  • extends RpcEndpoint {
  • // 接收并回复消息
  • override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  • /**
  • * 接收CheckExistence类型的消息,匹配得到name参数,该参数代表要查询的RpcEndpoint的名称,
  • * 使用Dispatcher的verify()方法进行查询,
  • * 将查询到的结果通过RpcCallContext的reply()方法回复客户端
  • */
  • case RpcEndpointVerifier.CheckExistence(name) => context.reply(dispatcher.verify(name))
  • }
  • }
  • private[netty] object RpcEndpointVerifier {
  • val NAME = "endpoint-verifier"
  • /** A message used to ask the remote [[RpcEndpointVerifier]] if an `RpcEndpoint` exists. */
  • case class CheckExistence(name: String)
  • }

可见,RpcEndpointVerifier只实现了RpcEndpoint的receiveAndReply(...)方法,具体检查方式是通过Dispatcher的verify(...)方法,即去Dispatcher的endpoints集合中查找:

org.apache.spark.rpc.netty.Dispatcher#verify
  • /**
  • * Return if the endpoint exists
  • * 查找对应名称的RpcEndpoint是否存在
  • */
  • def verify(name: String): Boolean = {
  • endpoints.containsKey(name)
  • }

NettyRpcEnvFactory工厂的create(...)方法在创建好NettyRpcEnv之后,会返回NettyRpcEnv对象给Master的startRpcEnvAndEndpoint(...)方法,回顾该方法:

  • def startRpcEnvAndEndpoint(
  • host: String,
  • port: Int,
  • webUiPort: Int,
  • conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  • // 创建SecurityManager
  • val securityMgr = new SecurityManager(conf)
  • // 创建RpcEnv,名为sparkMaster
  • val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  • // 创建Master,将Master(Master继承了ThreadSafeRpcEndpoint)注册到RpcEnv中,获得Master的RpcEndpointRef。
  • val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
  • new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  • // 向Master发送BoundPortsRequest消息,并获得返回的BoundPortsResponse消息。
  • val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
  • // 返回创建的RpcEnv、BoundPortsResponse消息携带的WebUIPort、REST服务的端口(restPort)等信息。
  • (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  • }

startRpcEnvAndEndpoint(...)中其实会使用NettyRpcEnv的setupEndpoint(...)方法注册一个名为ENDPOINT_NAME(即“Master”)的RpcEndpoint:Master实例,得到Master这个RPC端点的RpcEndpointRef引用。

不知道大家还记不记得前面在讲解RpcEndpoint时,有讲到Master、Worker和Client其实都是RpcEndpoint的实现类。这里用于举例的Master正是一个RPC端点,我们看一下它的定义:

  • /**
  • * Master是local-cluster部署模式和Standalone部署模式中,整个Spark集群最为重要的组件之一,
  • * 它的设计将直接决定整个集群的可扩展性、可用性和容错性。
  • * Master的职责包括Worker的管理、Application的管理、Driver的管理等。
  • * Master负责对整个集群中所有资源的统一管理和分配,它接收各个Worker的注册、更新状态、心跳等消息,
  • * 也接收Driver和Application的注册。
  • *
  • * @param rpcEnv Master会向该RpcEnv注册自己
  • * @param address RpcEnv的地址
  • * @param webUiPort WebUI的端口
  • * @param securityMgr 安全管理器
  • * @param conf SparkConf
  • */
  • private[deploy] class Master(
  • override val rpcEnv: RpcEnv,
  • address: RpcAddress,
  • webUiPort: Int,
  • val securityMgr: SecurityManager,
  • val conf: SparkConf)
  • extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
  • ...
  • }

Master实现的ThreadSafeRpcEndpoint继承了RpcEndpoint特质,因此Master简介实现了RpcEndpoint特质。

回顾前面讲解的内容我们知道,当向NettyRpcEnv注册Master自己时,对应的EndpointData和Inbox会被创建,且Inbox会向自己的消息队列放入一个OnStart方法,而Inbox对OnStart消息的处理就是调用对应RpcEndpoint(即这里的Master)的onStart()方法,Master的onStart()方法初始化了很多辅助组件,在后面我们会详细介绍Master的实现,这里就不赘述了。

startRpcEnvAndEndpoint(...)方法中,在Master向NettyRpcEnv注册了自己之后,会使用对应的RpcEndpointRef的askWithRetry(...)方法向自己发送一个BoundPortsRequest消息获得自己绑定的Web UI端口和Restful服务端口,并由startRpcEnvAndEndpoint(...)方法返回。

RpcEndpointRef发送消息的操作是非常重要的方法,我们将在下一节中详细讨论。

3. 消息发送

在上一篇文章中我们详细讨论了消息调度器Dispatcher,它用于处理来自各个节点的消息,它会为每一个RpcEndpoint创建对应的Inbox收件箱,并将发往该RpcEndpoint的消息存放在Inbox,由专门的线程顺序处理。

Dispatcher调度器和Inbox收件箱是用来处理入站消息的,对于发送出站的消息,一般是由RpcEndpointRef来处理的。

当需要向某个RpcEndpoint端点发送消息时,只需要取得该端点对应的RpcEndpointRef即可;在前面我们介绍过,RpcEndpointRef中定义了很多消息发送的规范方法,但其中只有两个是留给子类实现的,即send(...)ask(...)

在向RpcEnv注册RpcEndpoint时,会创建对应的NettyRpcEndpointRef并存放在Dispatcher中,NettyRpcEndpointRef也是Spark2.1.0版本中RpcEndpointRef的唯一实现,它的定义和重要字段如下:

org.apache.spark.rpc.netty.NettyRpcEndpointRef
  • /**
  • * The NettyRpcEnv version of RpcEndpointRef.
  • *
  • * This class behaves differently depending on where it's created. On the node that "owns" the
  • * RpcEndpoint, it's a simple wrapper around the RpcEndpointAddress instance.
  • *
  • * On other machines that receive a serialized version of the reference, the behavior changes. The
  • * instance will keep track of the TransportClient that sent the reference, so that messages
  • * to the endpoint are sent over the client connection, instead of needing a new connection to
  • * be opened.
  • *
  • * The RpcAddress of this ref can be null; what that means is that the ref can only be used through
  • * a client connection, since the process hosting the endpoint is not listening for incoming
  • * connections. These refs should not be shared with 3rd parties, since they will not be able to
  • * send messages to the endpoint.
  • *
  • * @param conf Spark configuration.
  • * @param endpointAddress The address where the endpoint is listening.
  • * @param nettyEnv The RpcEnv associated with this ref.
  • */
  • private[netty] class NettyRpcEndpointRef(
  • @transient private val conf: SparkConf,
  • endpointAddress: RpcEndpointAddress,
  • @transient @volatile private var nettyEnv: NettyRpcEnv)
  • extends RpcEndpointRef(conf) with Serializable with Logging {
  • // 类型为TransportClient。NettyRpcEndpointRef将利用此TransportClient向远端的RpcEndpoint发送请求。
  • @transient @volatile var client: TransportClient = _
  • // 远端RpcEndpoint的地址RpcEndpointAddress。
  • private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null
  • // 远端RpcEndpoint的名称。
  • private val _name = endpointAddress.name
  • ...
  • }

NettyRpcEndpointRef持有了当前的NettyRpcEnv对象,同时还持有了RpcEndpoint的地址及名称;client用于记录向该RpcEndpoint发送消息的TransportClient对象,在NettyRpcEndpointRef初始化时,该字段为null。

NettyRpcEndpointRef的ask(...)方法和send(...)都调用了NettyRpcEnv的同名方法:

org.apache.spark.rpc.netty.NettyRpcEndpointRef
  • // 首先将message封装为RequestMessage,然后调用NettyRpcEnv的ask方法。
  • override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
  • // 第二个参数是receiver,这里传入了当前NettyRpcEndpointRef自己
  • nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout)
  • }
  • // 首先将message封装为RequestMessage,然后调用NettyRpcEnv的send方法。
  • override def send(message: Any): Unit = {
  • require(message != null, "Message is null")
  • // 第二个参数是receiver,这里传入了当前NettyRpcEndpointRef自己
  • nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
  • }

这两个方法的区别是ask(...)需要回复,而send(...)方法不需要。

3.1. 发送需要回复的消息

我们先看NettyRpcEnv的ask(...)方法的实现,源码如下:

org.apache.spark.rpc.netty.NettyRpcEnv#ask
  • private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
  • val promise = Promise[Any]()
  • // 获取消息的接收者地址
  • val remoteAddr = message.receiver.address
  • // 失败回调
  • def onFailure(e: Throwable): Unit = {
  • if (!promise.tryFailure(e)) {
  • logWarning(s"Ignored failure: $e")
  • }
  • }
  • // 成功回调
  • def onSuccess(reply: Any): Unit = reply match {
  • case RpcFailure(e) => onFailure(e)
  • case rpcReply =>
  • if (!promise.trySuccess(rpcReply)) {
  • logWarning(s"Ignored message: $reply")
  • }
  • }
  • try {
  • if (remoteAddr == address) {
  • // 接收者地址与当前NettyRpcEnv的地址相同,说明处理请求的RpcEndpoint位于本地的NettyRpcEnv中
  • // 构造Promise对象
  • val p = Promise[Any]()
  • p.future.onComplete { // 在消息被处理完成后会被调用
  • case Success(response) => onSuccess(response)
  • case Failure(e) => onFailure(e)
  • }(ThreadUtils.sameThread)
  • // 调用Dispatcher的postLocalMessage()方法投递消息
  • dispatcher.postLocalMessage(message, p)
  • } else {
  • // 接收者地址与当前NettyRpcEnv的地址不同,说明处理请求的RpcEndpoint位于其他节点的NettyRpcEnv中
  • // 序列化消息数据,构造为RpcOutboxMessage对象,传递的失败和成功的回调都定义在上面
  • val rpcMessage = RpcOutboxMessage(serialize(message),
  • onFailure,
  • (client, response) => onSuccess(deserialize[Any](client, response)))
  • // 投递消息
  • postToOutbox(message.receiver, rpcMessage)
  • // 超时
  • promise.future.onFailure {
  • case _: TimeoutException => rpcMessage.onTimeout()
  • case _ =>
  • }(ThreadUtils.sameThread)
  • }
  • // 使用timeoutScheduler设置一个定时器,用于超时处理。此定时器在等待指定的超时时间后将抛出TimeoutException异常。
  • val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
  • override def run(): Unit = {
  • onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
  • }
  • }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
  • // 请求如果在超时时间内处理完毕,则会调用timeoutScheduler的cancel方法取消timeoutCancelable超时定时器。
  • promise.future.onComplete { v =>
  • timeoutCancelable.cancel(true)
  • }(ThreadUtils.sameThread)
  • } catch {
  • case NonFatal(e) =>
  • onFailure(e)
  • }
  • promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
  • }

由于ask(...)方法发送的请求需要被回复,因此在一开始会创建一个Promise对象,并且定义了触发该Promise回调的两个函数;发送的操作分为两种情况,当消息的接收者地址与当前NettyRpcEnv的地址相同,说明消息实际上是发给本地的NettyRpcEnv,因此直接使用Dispatcher的postLocalMessage(...)投递到当前RpcEndpoint端点对应的的Inbox内即可;否则说明是发送给远程RpcEndpoint的消息,此时会将消息数据构造为RpcOutboxMessage,然后通过NettyRpcEnv的postToOutbox(...)方法进行发送。最后会使用timeoutScheduler线程池提交一个超时检测任务,以满足超时机制。

3.2. 发送无需回复的消息

我们后面再详细讨论postToOutbox(...)方法,先来讲解send(...)方法的实现:

org.apache.spark.rpc.netty.NettyRpcEnv#send
  • // 发送消息
  • private[netty] def send(message: RequestMessage): Unit = {
  • // 获取消息的接收者地址
  • val remoteAddr = message.receiver.address
  • if (remoteAddr == address) {
  • // 接收者地址与当前NettyRpcEnv的地址相同,说明处理请求的RpcEndpoint位于本地的NettyRpcEnv中
  • // Message to a local RPC endpoint.
  • try {
  • // 由Dispatcher的postOneWayMessage()方法发送
  • dispatcher.postOneWayMessage(message)
  • } catch {
  • case e: RpcEnvStoppedException => logWarning(e.getMessage)
  • }
  • } else {
  • // 请求消息的接收者的地址与当前NettyRpcEnv的地址不同
  • // Message to a remote RPC endpoint.
  • // 将message序列化,封装为OneWayOutboxMessage类型的消息,调用postToOutbox()方法将消息投递出去。
  • postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
  • }
  • }

send(...)方法其实与ask(...)方法在发送流程上是类似的,只是少了回调和超时机制的处理。从源码可知,当发送的消息是给本地RpcEndpoint的,则使用Dispatcher的postOneWayMessage(...)进行投递,否则消息数据被包装为OneWayOutboxMessage对象,然后也会使用postToOutbox(...)方法进行发送。

3.3. postToOutbox方法

postToOutbox(...)ask(...)send(...)在发送远程消息时都会使用的方法,它的实现如下:

org.apache.spark.rpc.netty.NettyRpcEnv#postToOutbox
  • // 向远端RpcEndpoint发送消息
  • private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
  • if (receiver.client != null) { // 消息对应的TransportClient存在,直接发送
  • message.sendWith(receiver.client)
  • } else { // TransportClient不存在
  • require(receiver.address != null,
  • "Cannot send message to client endpoint with no listen address.")
  • val targetOutbox = {
  • // 获取远端RpcEndpoint对应的Outbox
  • val outbox = outboxes.get(receiver.address)
  • if (outbox == null) { // Outbox为空
  • // 新建一个Outbox并添加到outboxes字典中
  • val newOutbox = new Outbox(this, receiver.address)
  • val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
  • if (oldOutbox == null) { // 返回为空,说明添加成功
  • // 将添加的Outbox返回
  • newOutbox
  • } else {
  • // 否则说明有其他线程抢先添加了,返回已经添加的Outbox
  • oldOutbox
  • }
  • } else {
  • // 已存在,直接返回
  • outbox
  • }
  • }
  • if (stopped.get) { // 如果当前RpcEndpoint处于停止状态
  • // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
  • // 则从outboxes移除对应的的消息接受者的Outbox
  • outboxes.remove(receiver.address)
  • // 停止该Outbox
  • targetOutbox.stop()
  • } else {
  • // 否则使用Outbox的send()方法发送消息
  • targetOutbox.send(message)
  • }
  • }
  • }

postToOutbox(...)方法中,如果消息接收者NettyRpcEndpointRef的client属性不为空,说明之前已经向该RpcEndpoint发送过消息了,已经构建了与该RpcEndpoint相连接的TransportClient客户端,因此直接通过OutboxMessage消息的sendWith(client: TransportClient): Unit方法使用该TransportClient客户端发送即可。

如果消息接收者NettyRpcEndpointRef的client属性为空,说明这可能是第一次向该RpcEndpoint发送消息,因此先从outboxes字典查找对应的Outbox发件箱是否存在,如果不存在就进行创建并添加到outboxes字典中进行记录,最后会通过Outbox的send(...)方法对消息进行发送。

3.3.1. Outbox

这里涉及到一个Outbox发件箱的概念。在NettyRpcEnv中,在需要向远端RpcEndpoint发送消息时,会为该远端RpcEndpoint维护一个Outbox对象,发送给该RpcEndpoint的消息都会先投递到对应的Outbox中;Outbox的定义和重要字段如下:

  • /**
  • * @param nettyEnv 当前Outbox所在节点上的NettyRpcEnv。
  • * @param address Outbox所对应的远端NettyRpcEnv的地址。
  • */
  • private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
  • outbox => // Give this an alias so we can use it more clearly in closures.
  • // 向其他远端NettyRpcEnv上的所有RpcEndpoint发送的消息列表。
  • @GuardedBy("this")
  • private val messages = new java.util.LinkedList[OutboxMessage]
  • // 当前Outbox内的TransportClient。消息的发送都依赖于此传输客户端。
  • @GuardedBy("this")
  • private var client: TransportClient = null
  • /**
  • * connectFuture points to the connect task. If there is no connect task, connectFuture will be
  • * null.
  • * 指向当前Outbox内连接任务的Future引用。
  • * 如果当前没有连接任务,则connectFuture为null。
  • */
  • @GuardedBy("this")
  • private var connectFuture: java.util.concurrent.Future[Unit] = null
  • // 当前Outbox是否停止的状态。
  • @GuardedBy("this")
  • private var stopped = false
  • /**
  • * If there is any thread draining the message queue
  • * 表示当前Outbox内是否正有线程在处理messages列表中消息。
  • */
  • @GuardedBy("this")
  • private var draining = false
  • ...
  • }

Outbox的构造与Inbox有点类似,它内部也有一个LinkedList类型的链表messages,另外,每个Outbox都会持有一个与远端RpcEndpoint建立了连接的TransportClient,但它是懒加载的,只在消息被发送时发现为空才会被创建。

我们关注Outbox中被NettyRpcEnv的postToOutbox(...)调用的send(...)方法,源码如下:

org.apache.spark.rpc.netty.Outbox#send
  • /**
  • * Send a message. If there is no active connection, cache it and launch a new connection. If
  • * [[Outbox]] is stopped, the sender will be notified with a [[SparkException]].
  • * 发送消息
  • */
  • def send(message: OutboxMessage): Unit = {
  • val dropped = synchronized {
  • if (stopped) { // 如果Outbox已停止则丢弃休息
  • true
  • } else {
  • // 否则放入messages链表
  • messages.add(message)
  • false
  • }
  • }
  • if (dropped) { // 丢弃消息需要调用message发送失败的回调
  • message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
  • } else {
  • // 调用drainOutbox()方法处理messages中的消息
  • drainOutbox()
  • }
  • }

源码比较简单,它会先把OutboxMessage消息放入到messages链表中,然后调用drainOutbox()方法处理消息,该方法源码如下:

org.apache.spark.rpc.netty.Outbox#drainOutbox
  • /**
  • * Drain the message queue. If there is other draining thread, just exit. If the connection has
  • * not been established, launch a task in the `nettyEnv.clientConnectionExecutor` to setup the
  • * connection.
  • */
  • private def drainOutbox(): Unit = {
  • var message: OutboxMessage = null
  • synchronized { // 加锁
  • // 当前Outbox已停止,直接返回
  • if (stopped) {
  • return
  • }
  • // 正在连接远端服务,直接返回
  • if (connectFuture != null) {
  • // We are connecting to the remote address, so just exit
  • return
  • }
  • // TransportClient为空,说明还未连接远端服务。
  • if (client == null) {
  • // There is no connect task but client is null, so we need to launch the connect task.
  • // 需要调用launchConnectTask方法运行连接远端服务的任务,然后返回。
  • launchConnectTask()
  • return
  • }
  • // 正有线程在处理(即发送)messages列表中的消息,则返回。
  • if (draining) {
  • // There is some thread draining, so just exit
  • return
  • }
  • // 取出消息,如果取出为空,则返回
  • message = messages.poll()
  • if (message == null) {
  • return
  • }
  • // 走到这里说明有可以处理的消息,将draining置为true
  • draining = true
  • }
  • while (true) { // 循环处理messages列表中的消息
  • try {
  • val _client = synchronized { client }
  • if (_client != null) {
  • // 调用OutboxMessage的sendWith方法发送消息
  • message.sendWith(_client)
  • } else {
  • assert(stopped == true)
  • }
  • } catch {
  • case NonFatal(e) =>
  • handleNetworkFailure(e)
  • return
  • }
  • synchronized {
  • if (stopped) {
  • return
  • }
  • // 不断从messages列表中取出消息
  • message = messages.poll()
  • if (message == null) {
  • draining = false
  • return
  • }
  • }
  • }
  • }

可见,在drainOutbox()方法的一开始,就会判断client是否为空,如果为空则调用launchConnectTask()进行TransportClient的创建及连接操作,并直接返回;这个方法我们后面介绍。

如果client不为空,则会将draining标记置为true,表示已经有线程在处理消息了;处理的方式是通过开启一个while死循环,不断地取出messages中的OutboxMessage消息对象,通过OutboxMessage消息的sendWith(client: TransportClient): Unit方法使用client客户端进行发送。

上面有讲到,当client是为空时会调用launchConnectTask()进行TransportClient的创建及连接操作,我们来看看这个方法的实现:

org.apache.spark.rpc.netty.Outbox#launchConnectTask
  • // 连接远程服务
  • private def launchConnectTask(): Unit = {
  • // 向NettyRpcEnv的clientConnectionExecutor提交一个连接任务
  • connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {
  • override def call(): Unit = {
  • try {
  • // 根据远端NettyRpcEnv的TransportServer的地址创建TransportClient对象
  • val _client = nettyEnv.createClient(address)
  • outbox.synchronized {
  • // 记录TransportClient
  • client = _client
  • if (stopped) { // 如果Outbox停止了,则关闭TransportClient
  • // 这里的关闭仅仅是将client置为null,并没有真正关闭TransportClient,方便后面复用
  • closeClient()
  • }
  • }
  • } catch {
  • case ie: InterruptedException =>
  • // exit
  • return
  • case NonFatal(e) =>
  • outbox.synchronized { connectFuture = null }
  • handleNetworkFailure(e)
  • return
  • }
  • outbox.synchronized { connectFuture = null }
  • // It's possible that no thread is draining now. If we don't drain here, we cannot send the
  • // messages until the next message arrives.
  • // 调用drainOutbox()处理messages中的消息
  • drainOutbox()
  • }
  • })
  • }

launchConnectTask()会向NettyRpcEnv的clientConnectionExecutor线程池中提交一个创建TransportClient的任务,该任务是通过NettyRpcEnv的createClient(...)来创建客户端的,源码比较简单,即使用当前NettyRpcEnv中已经准备好的TransportClientFactory工厂进行创建,如下:

  • private[netty] def createClient(address: RpcAddress): TransportClient = {
  • // 创建TransportClient
  • clientFactory.createClient(address.host, address.port)
  • }

这个创建过程在前面已经讲过了,这里就不多赘述。创建好的TransportClient会被Outbox的client字段缓存,以便下次发送使用。

同时,在launchConnectTask()的最后还会调用drainOutbox()处理messages中的OutboxMessage消息。

3.3.2. OutboxMessage

上面postToOutbox(...)和Outbox处理的消息,都是OutboxMessage类型的,这一节我们来分析一下它。OutboxMessage是一个特质:

org.apache.spark.rpc.netty.OutboxMessage
  • private[netty] sealed trait OutboxMessage {
  • // 发送消息
  • def sendWith(client: TransportClient): Unit
  • // 发送失败的情况
  • def onFailure(e: Throwable): Unit
  • }

sendWith(...)方法的作用是使用指定的TransportClient发送消息,onFailure()则用于处理发送失败的情况

OutboxMessage只有OneWayOutboxMessage和RpcOutboxMessage两种实现类型,我们先来看看OneWayOutboxMessage的实现:

org.apache.spark.rpc.netty.OneWayOutboxMessage
  • // 不需要回复的消息
  • private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage
  • with Logging {
  • override def sendWith(client: TransportClient): Unit = {
  • // 直接使用Transport的send()方法发送
  • client.send(content)
  • }
  • override def onFailure(e: Throwable): Unit = {
  • e match {
  • case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
  • case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
  • }
  • }
  • }

实现比较简单,它会使用TransportClient的send(...)方法发送消息,大家应该还记得,这个方法是专门用于发送无需回复的OneWayMessage消息的。

RpcOutboxMessage的实现则要复杂一点,因为它需要被回复:

org.apache.spark.rpc.netty.RpcOutboxMessage
  • // 需要回复的RPC消息
  • private[netty] case class RpcOutboxMessage(
  • content: ByteBuffer,
  • _onFailure: (Throwable) => Unit,
  • _onSuccess: (TransportClient, ByteBuffer) => Unit)
  • extends OutboxMessage with RpcResponseCallback {
  • private var client: TransportClient = _
  • private var requestId: Long = _
  • override def sendWith(client: TransportClient): Unit = {
  • // 记录TransportClient
  • this.client = client
  • // 使用TransportClient发送,记录返回的Request ID,注意最后的callback参数
  • this.requestId = client.sendRpc(content, this)
  • }
  • // 超时
  • def onTimeout(): Unit = {
  • require(client != null, "TransportClient has not yet been set.")
  • // 从Transport中根据Request ID移除对应的消息
  • client.removeRpcRequest(requestId)
  • }
  • // 发送失败的处理
  • override def onFailure(e: Throwable): Unit = {
  • // 交给回调方法
  • _onFailure(e)
  • }
  • // 发送成功的处理
  • override def onSuccess(response: ByteBuffer): Unit = {
  • // 交给回调方法
  • _onSuccess(client, response)
  • }
  • }

我们可以看到,它还实现了RpcResponseCallback接口,提供了onSuccess(...)onFailure(...)方法,这两个方法内会调用构造RpcOutboxMessage时传入的两个回调函数_onSuccess_onFailure

而它的sendWith(...)方法中会使用TransportClient的sendRpc(...)方法发送消息,更为重要的是,它会将自己作为sendRpc(...)方法的类型为RpcResponseCallback的第二个参数,并记录得到的Request ID。

相信大家一定还不会忘记,TransportClient发送RpcRequest的流程,它会将Request ID与RpcResponseCallback回调的映射关系存放到TransportResponseHandler的outstandingRpcs字典中,当收到回复的RpcResponse消息时,TransportResponseHandler的handle(...)方法会根据RpcResponse中携带的Request ID去outstandingRpcs字典中找到当时存入的RpcResponseCallback回调,然后使用该回调处理消息回复。

4. Inbox中的消息处理

在上一篇文章,我们讲到了Inbox对OnStart、OnStop两类消息的处理,在这一节,我们补充讲解Inbox内的其它几种消息的处理流程。

4.1. RpcMessage

RpcMessage表示需要进行回复的消息,RpcEndpoint在收到该消息后,需要通过它的receiveAndReply(...)方法进行处理,这里以BoundPortsRequest请求为例。

不知道读者是否还记得BoundPortsRequest请求;在前面讲解Master启动流程时,Master会使用自己的RpcEndpointRef向自己发送一条BoundPortsRequest消息,回顾startRpcEnvAndEndpoint(...)的源码:

  • def startRpcEnvAndEndpoint(
  • host: String,
  • port: Int,
  • webUiPort: Int,
  • conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  • // 创建SecurityManager
  • val securityMgr = new SecurityManager(conf)
  • // 创建RpcEnv
  • val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  • // 创建Master,将Master(Master继承了ThreadSafeRpcEndpoint)注册到RpcEnv中,获得Master的RpcEndpointRef。
  • val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
  • new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  • // 向Master发送BoundPortsRequest消息,并获得返回的BoundPortsResponse消息。
  • val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
  • // 返回创建的RpcEnv、BoundPortsResponse消息携带的WebUIPort、REST服务的端口(restPort)等信息。
  • (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  • }

由于是向自己发送,最终消息还是会放入Master自己所对应的Inbox中,不过BoundPortsRequest消息被封装为了RpcMessage对象。Inbox的process(...)方法在处理RpcMessage消息时,会使用对应RpcEndpoint的receiveAndReply(...)方法进行回复,代码片段如下:

org.apache.spark.rpc.netty.Inbox#process
  • case RpcMessage(_sender, content, context) =>
  • try {
  • // 发送并要求回复
  • endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
  • throw new SparkException(s"Unsupported message $message from ${_sender}")
  • })
  • } catch {
  • case NonFatal(e) =>
  • context.sendFailure(e)
  • // Throw the exception -- this exception will be caught by the safelyCall function.
  • // The endpoint's onError function will be called.
  • throw e
  • }

Master自然也实现了receiveAndReply(...)方法,它对BoundPortsRequest消息的处理的代码片段如下:

org.apache.spark.deploy.master.Master#receiveAndReply
  • case BoundPortsRequest =>
  • context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))

即使用RpcCallContext回调向发送者回复了一条包含RpcEnv端口、Web UI端口和Restful服务端口的BoundPortsResponse响应消息。

4.2. OneWayMessage

Inbox对于OneWayMessage的消息也比较简单,即直接使用对应的RpcEndpoint的receive(...)方法处理,代码片段如下:

  • case OneWayMessage(_sender, content) =>
  • // 发送不要求回复
  • endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
  • throw new SparkException(s"Unsupported message $message from ${_sender}")
  • })

Master也实现了该方法,不过内部处理的消息类型非常多,这里举一个简单的例子。Worker会向Master发送心跳以更新自己的存活时间,Master需要定期检查Worker是否超时,在Master中会开启一个名为checkForWorkerTimeOutTask的任务,它会定期向Master自己发送CheckForWorkerTimeOut消息,这个消息就是OneWayMessage类型的消息,源码展示如下:

org.apache.spark.deploy.master.Master
  • /**
  • * 包含一个线程的ScheduledThreadPoolExecutor,启动的线程以master-forward-message-thread作为名称。
  • * forwardMessageThread主要用于运行checkForWorkerTimeOutTask和recoveryCompletionTask。
  • */
  • private val forwardMessageThread =
  • ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
  • ...
  • // 启动检查Worker超时的任务
  • checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
  • override def run(): Unit = Utils.tryLogNonFatalError {
  • // 向自己发送CheckForWorkerTimeOut消息,会调用timeOutDeadWorkers()方法
  • self.send(CheckForWorkerTimeOut)
  • }
  • }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
  • ...
  • // self()方法用于获取RpcEndpoint自己的RpcEndpointRef
  • final def self: RpcEndpointRef = {
  • require(rpcEnv != null, "rpcEnv has not been initialized")
  • // 实际调用了RpcEnv的endpointRef方法
  • rpcEnv.endpointRef(this)
  • }

发送方式很简单,即使用自己的RpcEndpointRef的send(...)方法。

CheckForWorkerTimeOut消息经过Inbox处理后会交给Master的receive(...)方法,内部的处理代码片段如下:

org.apache.spark.deploy.master.Master#receive
  • case CheckForWorkerTimeOut =>
  • // 处理CheckForWorkerTimeOut消息,检查Worker是否超时
  • timeOutDeadWorkers()

即调用timeOutDeadWorkers()检测Worker超时情况。

4.3. RemoteProcess相关消息

在检测到有新的客户端与RpcEndpoint服务端的Channel连接状态发生变化时,会向对应的Inbox投递RemoteProcess相关消息,这个操作在前面NettyRpcHandler的方法中是有体现的,读者可以自行回顾本文1.1节。

RemoteProcess相关消息有三类:RemoteProcessConnected、RemoteProcessDisconnected和RemoteProcessConnectionError,分别在新的客户端与RpcEndpoint建立连接、断开连接或连接出错时被调用;Inbox对它们的处理都是交给RpcEndpoint的相关方法,代码片段如下:

  • case RemoteProcessConnected(remoteAddress) =>
  • // 调用RpcEndpoint的onConnected()方法告知该RpcEndpoint收到远程连接
  • endpoint.onConnected(remoteAddress)
  • case RemoteProcessDisconnected(remoteAddress) =>
  • // 调用RpcEndpoint的onDisconnected()方法告知该RpcEndpoint断开远程连接
  • endpoint.onDisconnected(remoteAddress)
  • case RemoteProcessConnectionError(cause, remoteAddress) =>
  • // 调用RpcEndpoint的onNetworkError()方法告知该RpcEndpoint处理连接错误
  • endpoint.onNetworkError(cause, remoteAddress)

以Master为例,Master没有重写onConnected(...)onNetworkError(...)方法,都沿用了RpcEndpoint的实现;唯一实现的onDisconnected(...)方法如下:

  • override def onDisconnected(address: RpcAddress): Unit = {
  • // The disconnected client could've been either a worker or an app; remove whichever it was
  • logInfo(s"$address got disassociated, removing it.")
  • addressToWorker.get(address).foreach(removeWorker)
  • // 调用finishApplication()方法处理Application
  • addressToApp.get(address).foreach(finishApplication)
  • if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
  • }

可见,当连接断开后,它会尝试移除连接对应的Worker,并停止相关的Application。