大数据
流式处理
Spark
源码解析

Spark源码分析07 - 通信架构05:事件总线

简介:一般来说,笔者通常将事件总线归为Spark的通信架构层。事件总线是以监听器模式实现的,主要组成结构分为监听器、事件源和事件分发器(也即是事件总线)。Spark中定义了大量的事件总线,方便监听者监听自己所感兴趣的事件。

1. 事件总线简介

一般来说,笔者通常将事件总线归为Spark的通信架构层。事件总线是以监听器模式实现的,主要组成结构分为监听器、事件源和事件分发器(也即是事件总线)。Spark中定义了大量的事件总线,方便监听者监听自己所感兴趣的事件。

2. ListenerBus

Spark Core模块的org.apache.spark.util包下的ListenerBus是所有事件总线的顶层特质,它内部定义了多个方法,其中大部分方法是有默认实现的,只提供了doPostEvent(...)方法交由子类实现。ListenerBus的定义和重要字段如下:

org.apache.spark.util.ListenerBus
  • /**
  • * An event bus which posts events to its listeners.
  • * L代表监听器的泛型参数,表示支持任何类型的监听器
  • * E代表事件的泛型参数
  • */
  • private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
  • // Marked `private[spark]` for access in tests.
  • // 维护所有注册的监听器
  • private[spark] val listeners = new CopyOnWriteArrayList[L]
  • ...
  • }

ListenerBus指定了两个泛型,第一个泛型L表示可管理的监听器的类型,第二个泛型E表示可接受的事件的类型;ListenerBus有一个线程安全的列表字段listeners,它用于维护被当前事件总线管理的所有监听器。

Listener中还定义了一些方法,一般分为以下几类:

  1. 用于管理监听器

既然有了维护监听器的列表listeners,自然需要有方法对该列表进行增删查等操作;ListenerBus的addListener(...)removeListener(...)分别用于对列表进行添加或移除监听器,源码都比较简单,如下:

org.apache.spark.util.ListenerBus
  • /**
  • * Add a listener to listen events. This method is thread-safe and can be called in any thread.
  • * 向listeners中添加监听器的方法,由于listeners采用CopyOnWrite-ArrayList来实现,所以addListener方法是线程安全的。
  • */
  • final def addListener(listener: L): Unit = {
  • listeners.add(listener)
  • }
  • /**
  • * Remove a listener and it won't receive any events. This method is thread-safe and can be called
  • * in any thread.
  • * 从listeners中移除监听器的方法,由于listeners采用CopyOn-WriteArrayList来实现,所以removeListener方法是线程安全的。
  • */
  • final def removeListener(listener: L): Unit = {
  • listeners.remove(listener)
  • }

findListenersByClass(...)方法可以通过指定的监听器类型,查找监听器,返回值是监听器序列:

org.apache.spark.util.ListenerBus#findListenersByClass
  • // 查找与指定类型相同的监听器列表。
  • private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
  • val c = implicitly[ClassTag[T]].runtimeClass
  • listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
  • }
  1. 事件通知

ListenerBus还提供了两个方法用于将指定事件通知给所维护的监听器;其中doPostEvent(...)方法是一个未实现的方法,它用于将指定事件投递给指定监听器,源码如下:

org.apache.spark.util.ListenerBus#doPostEvent
  • /**
  • * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
  • * thread for all listeners.
  • * 用于将事件投递给指定的监听器,需子类实现。
  • */
  • protected def doPostEvent(listener: L, event: E): Unit

postToAll(...)方法则会将指定的事件投递给ListenerBus维护的所有监听器,它内部是通过遍历listeners列表,对所有的监听器调用doPostEvent(...)方法来实现的:

org.apache.spark.util.ListenerBus#postToAll
  • /**
  • * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
  • * `postToAll` in the same thread for all events.
  • * 此方法的作用是将事件投递给所有的监听器。
  • */
  • final def postToAll(event: E): Unit = {
  • // JavaConverters can create a JIterableWrapper if we use asScala.
  • // However, this method will be called frequently. To avoid the wrapper cost, here we use
  • // Java Iterator directly.
  • // 遍历所有的监听器
  • val iter = listeners.iterator
  • while (iter.hasNext) {
  • val listener = iter.next()
  • try {
  • // 对监听器发送事件,需子类实现
  • doPostEvent(listener, event)
  • } catch {
  • case NonFatal(e) =>
  • logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
  • }
  • }
  • }

ListenerBus拥有多个实现类,相关的类图体系结构如下:

1.ListenerBus类图体系结构.png

下面我们将分别介绍这些实现类。

3. SparkListenerBus

SparkListenerBus也是一个特质,它继承了ListenerBus,规定监听器类型需要是SparkListenerInterface,事件类型需要是SparkListenerEvent,定义如下:

  • /**
  • * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
  • * 用于将SparkListenerEvent类型的事件投递到SparkListenerInterface类型的监听器。
  • */
  • private[spark] trait SparkListenerBus
  • extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
  • ...
  • }

SparkListenerBus实现了ListenerBus的doPostEvent(...)方法,它内部也只有这一个方法:

org.apache.spark.scheduler.SparkListenerBus#doPostEvent
  • // 通过对SparkListenerEvent事件的匹配,执行SparkListenerInterface监听器的相应方法
  • protected override def doPostEvent(
  • listener: SparkListenerInterface,
  • event: SparkListenerEvent): Unit = {
  • // 对事件进行匹配,分别调用监听器的不同方法进行处理
  • event match {
  • case stageSubmitted: SparkListenerStageSubmitted =>
  • listener.onStageSubmitted(stageSubmitted)
  • case stageCompleted: SparkListenerStageCompleted =>
  • listener.onStageCompleted(stageCompleted)
  • case jobStart: SparkListenerJobStart =>
  • listener.onJobStart(jobStart)
  • case jobEnd: SparkListenerJobEnd =>
  • listener.onJobEnd(jobEnd)
  • case taskStart: SparkListenerTaskStart =>
  • listener.onTaskStart(taskStart)
  • case taskGettingResult: SparkListenerTaskGettingResult =>
  • listener.onTaskGettingResult(taskGettingResult)
  • case taskEnd: SparkListenerTaskEnd =>
  • listener.onTaskEnd(taskEnd)
  • case environmentUpdate: SparkListenerEnvironmentUpdate =>
  • listener.onEnvironmentUpdate(environmentUpdate)
  • case blockManagerAdded: SparkListenerBlockManagerAdded =>
  • listener.onBlockManagerAdded(blockManagerAdded)
  • case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
  • listener.onBlockManagerRemoved(blockManagerRemoved)
  • case unpersistRDD: SparkListenerUnpersistRDD =>
  • listener.onUnpersistRDD(unpersistRDD)
  • case applicationStart: SparkListenerApplicationStart =>
  • listener.onApplicationStart(applicationStart)
  • case applicationEnd: SparkListenerApplicationEnd =>
  • listener.onApplicationEnd(applicationEnd)
  • case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
  • listener.onExecutorMetricsUpdate(metricsUpdate)
  • case executorAdded: SparkListenerExecutorAdded =>
  • listener.onExecutorAdded(executorAdded)
  • case executorRemoved: SparkListenerExecutorRemoved =>
  • listener.onExecutorRemoved(executorRemoved)
  • case blockUpdated: SparkListenerBlockUpdated =>
  • listener.onBlockUpdated(blockUpdated)
  • case logStart: SparkListenerLogStart => // ignore event log metadata
  • case _ => listener.onOtherEvent(event)
  • }
  • }

SparkListenerBus的doPostEvent(...)方法中会对传入的事件作类型匹配,分别传给监听器的不同方法。这些事件从它们的命名就可以得知它们都是与Spark整个体系相关的。

3.1. SparkListenerInterface

SparkListenerBus要求自己维护的监听器必须是SparkListenerInterface类型;SparkListenerInterface也是一个特质,它内部规范了所有处理事件的方法:

org.apache.spark.scheduler.SparkListenerInterface
  • /**
  • * An internal class that describes the metadata of an event log.
  • * This event is not meant to be posted to listeners downstream.
  • *
  • * 用于事件日志元数据,会被忽略投递
  • */
  • private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
  • /**
  • * Interface for creating history listeners defined in other modules like SQL, which are used to
  • * rebuild the history UI.
  • */
  • private[spark] trait SparkHistoryListenerFactory {
  • /**
  • * Create listeners used to rebuild the history UI.
  • */
  • def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
  • }
  • /**
  • * Interface for listening to events from the Spark scheduler. Most applications should probably
  • * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
  • *
  • * Note that this is an internal interface which might change in different Spark releases.
  • */
  • private[spark] trait SparkListenerInterface {
  • /**
  • * Called when a stage completes successfully or fails, with information on the completed stage.
  • */
  • def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
  • /**
  • * Called when a stage is submitted
  • */
  • def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
  • /**
  • * Called when a task starts
  • */
  • def onTaskStart(taskStart: SparkListenerTaskStart): Unit
  • /**
  • * Called when a task begins remotely fetching its result (will not be called for tasks that do
  • * not need to fetch the result remotely).
  • */
  • def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit
  • /**
  • * Called when a task ends
  • */
  • def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
  • /**
  • * Called when a job starts
  • */
  • def onJobStart(jobStart: SparkListenerJobStart): Unit
  • /**
  • * Called when a job ends
  • */
  • def onJobEnd(jobEnd: SparkListenerJobEnd): Unit
  • /**
  • * Called when environment properties have been updated
  • */
  • def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit
  • /**
  • * Called when a new block manager has joined
  • */
  • def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit
  • /**
  • * Called when an existing block manager has been removed
  • */
  • def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit
  • /**
  • * Called when an RDD is manually unpersisted by the application
  • */
  • def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit
  • /**
  • * Called when the application starts
  • */
  • def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
  • /**
  • * Called when the application ends
  • */
  • def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit
  • /**
  • * Called when the driver receives task metrics from an executor in a heartbeat.
  • */
  • def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
  • /**
  • * Called when the driver registers a new executor.
  • */
  • def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
  • /**
  • * Called when the driver removes an executor.
  • */
  • def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
  • /**
  • * Called when the driver receives a block update info.
  • */
  • def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
  • /**
  • * Called when other events like SQL-specific events are posted.
  • */
  • def onOtherEvent(event: SparkListenerEvent): Unit
  • }

SparkListenerInterface其实有一个默认的抽象实现类SparkListener,它内部对SparkListenerInterface规范的方法都提供了空方法体实现,源码这里就不贴出来了。

4. LiveListenerBus

LiveListenerBus实现了SparkListenerBus特质,它内部会使用一个单独的线程处理投递进来的事件,通过信号量来协调进行事件投递的线程和处理事件的线程之间的配合。我们先来看一下它的定义和重要字段:

  • /**
  • * Asynchronously passes SparkListenerEvents to registered SparkListeners.
  • *
  • * Until `start()` is called, all posted events are only buffered. Only after this listener bus
  • * has started will events be actually propagated to all attached listeners. This listener bus
  • * is stopped when `stop()` is called, and it will drop further events after stopping.
  • * 采用异步线程将SparkListenerEvent类型的事件投递到SparkListener类型的监听器。
  • */
  • private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
  • self =>
  • import LiveListenerBus._
  • // Cap the capacity of the event queue so we get an explicit error (rather than
  • // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
  • // 事件队列容量,通过spark.scheduler.listenerbus.eventqueue.size参数指定,默认为1000
  • private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
  • // 从配置信息获取队列容量值,会设置给EVENT_QUEUE_CAPACITY字段
  • private def validateAndGetQueueSize(): Int = {
  • // 通过spark.scheduler.listenerbus.eventqueue.size获取队列容量,默认为1000
  • val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
  • if (queueSize <= 0) {
  • throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
  • }
  • queueSize
  • }
  • /**
  • * SparkListenerEvent事件的阻塞队列,
  • * 队列大小可以通过Spark属性spark.scheduler.listenerbus.eventqueue.size进行配置,
  • * 默认为10000(Spark早期版本中属于静态属性,固定为10000)。
  • */
  • private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
  • // Indicate if `start()` is called
  • // 标记LiveListenerBus的启动状态的AtomicBoolean类型的变量。
  • private val started = new AtomicBoolean(false)
  • // Indicate if `stop()` is called
  • // 标记LiveListenerBus的停止状态的AtomicBoolean类型的变量。
  • private val stopped = new AtomicBoolean(false)
  • /**
  • * A counter for dropped events. It will be reset every time we log it.
  • * 使用AtomicLong类型对删除的事件进行计数,
  • * 每当日志打印了droppedEventsCounter后,会将droppedEventsCounter重置为0。
  • **/
  • private val droppedEventsCounter = new AtomicLong(0L)
  • /**
  • * When `droppedEventsCounter` was logged last time in milliseconds.
  • * 用于记录最后一次日志打印droppedEventsCounter的时间戳。
  • **/
  • @volatile private var lastReportTimestamp = 0L
  • // Indicate if we are processing some event
  • // Guarded by `self`
  • // 用来标记当前正有事件被listenerThread线程处理。
  • private var processingEvent = false
  • // 用于标记是否由于eventQueue已满,导致新的事件被删除。
  • private val logDroppedEvent = new AtomicBoolean(false)
  • // A counter that represents the number of events produced and consumed in the queue
  • // 用于当有新的事件到来时释放信号量,当对事件进行处理时获取信号量。
  • private val eventLock = new Semaphore(0)
  • ...
  • }

从LiveListenerBus的eventQueue字段可以得知,它内部使用大小默认为1000的阻塞队列LinkedBlockingQueue来存放投递进来的事件,同时定义了一些标记字段和计数器。eventLock是使用Semaphore信号量实现的锁,投递线程和处理线程就是用该锁进行协调的。

4.1. 事件处理

LiveListenerBus定义了名为listenerThread的线程,它是一个守护线程,用于不断从eventQueue队列中取出事件,并向所有的监听器进行广播:

org.apache.spark.scheduler.LiveListenerBus#listenerThread
  • // 处理事件的线程
  • private val listenerThread = new Thread(name) {
  • // 设置为守护线程
  • setDaemon(true)
  • // 主要的运行方法
  • override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
  • LiveListenerBus.withinListenerThread.withValue(true) {
  • // 无限循环
  • while (true) {
  • // 获取信号量
  • eventLock.acquire()
  • self.synchronized {
  • // 标记正在处理事件
  • processingEvent = true
  • }
  • try {
  • // 从eventQueue中获取事件,并判断事件是否为空
  • val event = eventQueue.poll
  • if (event == null) {
  • // Get out of the while loop and shutdown the daemon thread
  • if (!stopped.get) {
  • throw new IllegalStateException("Polling `null` from eventQueue means" +
  • " the listener bus has been stopped. So `stopped` must be true")
  • }
  • return
  • }
  • // 对事件进行处理
  • postToAll(event)
  • } finally {
  • self.synchronized {
  • // 标记当前没有事件被处理
  • processingEvent = false
  • }
  • }
  • }
  • }
  • }
  • }

listenerThread线程的run()方法中定义了一个while死循环,它会尝试获取eventLock信号量的1个许可,如果能获取到说明eventQueue队列中存在未处理的事件,就从中取出一个事件,然后通过postToAll(...)方法进行投递;根据前面对SparkListenerBus的分析,投递的事件会使用SparkListenerBus的doPostEvent(...)方法通知给所有类型为SparkListenerInterface的监听器。

4.2. 事件投递

eventLock信号量的许可是在事件投递到LiveListenerBus中被增加的,投递方法是post(...),源码如下:

org.apache.spark.scheduler.LiveListenerBus#post
  • // 向eventQueue队列放入事件
  • def post(event: SparkListenerEvent): Unit = {
  • // 判断事件总线运行状态
  • if (stopped.get) {
  • // Drop further events to make `listenerThread` exit ASAP
  • logError(s"$name has already stopped! Dropping event $event")
  • return
  • }
  • // 放入事件
  • val eventAdded = eventQueue.offer(event)
  • if (eventAdded) { // 放入成功,释放信号量
  • eventLock.release()
  • } else { // 放入失败,可能是因为事件队列满了
  • // 丢弃事件
  • onDropEvent(event)
  • // 递增丢弃事件的计数器
  • droppedEventsCounter.incrementAndGet()
  • }
  • // 丢弃事件数大于0,需要周期性记录日志
  • val droppedEvents = droppedEventsCounter.get
  • if (droppedEvents > 0) {
  • // Don't log too frequently
  • // 距离上一次报告时间已经超过60秒
  • if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
  • // There may be multiple threads trying to decrease droppedEventsCounter.
  • // Use "compareAndSet" to make sure only one thread can win.
  • // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
  • // then that thread will update it.
  • // 重置丢弃事件计数器为0
  • if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
  • // 更新报告时间
  • val prevLastReportTimestamp = lastReportTimestamp
  • lastReportTimestamp = System.currentTimeMillis()
  • // 记录警告日志
  • logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
  • new java.util.Date(prevLastReportTimestamp))
  • }
  • }
  • }
  • }

可见,当投递的事件顺利放入eventQueue队列后,会对eventLock信号量增加许可。当向eventQueue队列队列添加事件失败时,说明队列可能满了,此时会通过onDropEvent(...)方法丢弃事件并对丢弃事件数加1;onDropEvent(...)方法内部只是将logDroppedEvent从false置为true并记录日志,实现比较简单就不贴源码了。

4.3. 等待LiveListenerBus空闲

waitUntilEmpty(...)方法会接收一个超时时间,它会在指定的超时时间范围内,等待LiveListenerBus变为空闲状态,所谓的空闲是指事件队列中没有待处理的事件,且当前没有正在处理的事件;waitUntilEmpty(...)方法的源码如下:

org.apache.spark.scheduler.LiveListenerBus#waitUntilEmpty
  • /**
  • * For testing only. Wait until there are no more events in the queue, or until the specified
  • * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
  • * emptied.
  • * Exposed for testing.
  • *
  • * 等待一段时间直到当前事件总线中没有事件了,此处的"没有事件"需满足两个条件:
  • * 1. 事件队列为空;
  • * 2. 没有正在处理的事件。
  • *
  • * 如果直到超时一直无法满足,会抛出超时异常
  • */
  • @throws(classOf[TimeoutException])
  • def waitUntilEmpty(timeoutMillis: Long): Unit = {
  • // 计算应该终止的时间
  • val finishTime = System.currentTimeMillis + timeoutMillis
  • // 如果事件队列不为空,或者还有正在处理的事件,就循环判断
  • while (!queueIsEmpty) {
  • // 如果当前时间超过了应该终止的时间,就抛出超时异常
  • if (System.currentTimeMillis > finishTime) {
  • throw new TimeoutException(
  • s"The event queue is not empty after $timeoutMillis milliseconds")
  • }
  • /* Sleep rather than using wait/notify, because this is used only for testing and
  • * wait/notify add overhead in the general case.
  • * 休眠一段时间
  • **/
  • Thread.sleep(10)
  • }
  • }

可见,如果在超时时间内未能等到LiveListenerBus变为空闲状态,则会抛出TimeoutException异常。

4.4. 启动和停止LiveListenerBus

start()方法和stop()方法用于启动和停止LiveListenerBus。start()方法主要是将started标记置为true,然后启动listenerThread线程:

  • /**
  • * Start sending events to attached listeners.
  • *
  • * This first sends out all buffered events posted before this listener bus has started, then
  • * listens for any additional events asynchronously while the listener bus is still running.
  • * This should only be called once.
  • *
  • */
  • def start(): Unit = {
  • // CAS方式标记开始运行了
  • if (started.compareAndSet(false, true)) {
  • // 启动listenerThread线程开始处理事件
  • listenerThread.start()
  • } else {
  • throw new IllegalStateException(s"$name already started!")
  • }
  • }

stop()方法则会相对复杂一点,它会将stopped标记置为true后,增加eventLock的许可,然后等待listenerThread将当前正在处理的事件处理完:

  • /**
  • * Stop the listener bus. It will wait until the queued events have been processed, but drop the
  • * new events after stopping.
  • *
  • * 停止事件总线
  • */
  • def stop(): Unit = {
  • // 先检查状态
  • if (!started.get()) {
  • throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
  • }
  • // CAS方式标记已经停止
  • if (stopped.compareAndSet(false, true)) {
  • // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
  • // `stop` is called.
  • // 释放信号量
  • eventLock.release()
  • /**
  • * join处理线程直到它处理完正在处理的事件,
  • * 因为listenerThread在判断stopped标记为true后会直接结束,
  • * 所以往后的事件不会被处理
  • */
  • listenerThread.join()
  • } else {
  • // Keep quiet
  • }
  • }

5. ReplayListenerBus

ReplayListenerBus事件总线可用于从序列化的事件记录中重放事件的投递,它继承自SparkListenerBus,没有覆盖任何方法,但提供了两个重载的用于重放的replay(...)方法;其中一个replay(...)会根据传入的输入流,读取数据并转换为迭代器,然后调用另一个重载的replay(...)方法进行事件重放:

org.apache.spark.scheduler.ReplayListenerBus#replay
  • /**
  • * Replay each event in the order maintained in the given stream. The stream is expected to
  • * contain one JSON-encoded SparkListenerEvent per line.
  • *
  • * This method can be called multiple times, but the listener behavior is undefined after any
  • * error is thrown by this method.
  • *
  • * @param logData Stream containing event log data.
  • * @param sourceName Filename (or other source identifier) from whence @logData is being read
  • * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
  • * encountered, log file might not finished writing) or not
  • * @param eventsFilter Filter function to select JSON event strings in the log data stream that
  • * should be parsed and replayed. When not specified, all event strings in the log data
  • * are parsed and replayed.
  • */
  • def replay(
  • logData: InputStream,
  • sourceName: String,
  • maybeTruncated: Boolean = false,
  • eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
  • // 从logData指定的输入流中读取事件数据,并转换为迭代器,迭代元素都是字符串
  • val lines = Source.fromInputStream(logData).getLines()
  • // 使用replay()方法进行事件重播
  • replay(lines, sourceName, maybeTruncated, eventsFilter)
  • }

重载的replay(...)方法会使用指定的ReplayEventsFilter过滤器对事件先进行过滤(默认保留所有事件),然后遍历过滤后的所有事件,使用postToAll(...)方法投递到所有监听器,源码比较简单:

org.apache.spark.scheduler.ReplayListenerBus#replay
  • /**
  • * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an
  • * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations.
  • *
  • * @param lines 事件集合
  • * @param sourceName 事件源名称
  • * @param maybeTruncated
  • * @param eventsFilter 事件过滤器,即(String) => Boolean函数
  • */
  • def replay(
  • lines: Iterator[String],
  • sourceName: String,
  • maybeTruncated: Boolean,
  • eventsFilter: ReplayEventsFilter): Unit = {
  • // 记录当前行
  • var currentLine: String = null
  • // 记录行号
  • var lineNumber: Int = 0
  • try {
  • // 预处理
  • val lineEntries = lines
  • // 对lines迭代器进行Zip操作,为每行都添加序号
  • .zipWithIndex
  • // 使用指定的事件过滤器进行过滤
  • .filter { case (line, _) => eventsFilter(line) }
  • // 遍历预处理后的事件迭代器
  • while (lineEntries.hasNext) {
  • try {
  • // 获取下一个事件
  • val entry = lineEntries.next()
  • // 记录当前事件并自增行号
  • currentLine = entry._1
  • lineNumber = entry._2 + 1
  • // 投递事件
  • postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
  • } catch {
  • case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
  • // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
  • // It's safe since no place uses them.
  • logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
  • case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
  • "Unrecognized field \"queryStatus\" " +
  • "(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
  • // Ignore events generated by Structured Streaming in Spark 2.0.2
  • // It's safe since no place uses them.
  • logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
  • case jpe: JsonParseException =>
  • // We can only ignore exception from last line of the file that might be truncated
  • // the last entry may not be the very last line in the event log, but we treat it
  • // as such in a best effort to replay the given input
  • /**
  • * 在解析JSON数据时出现解析错误,可能是因为文件被截断所导致的,
  • * 此时需要根据maybeTruncated参数来决定是否抛出异常;
  • * 如果允许截断文件存在,且没有更多的元素,则不抛出异常
  • */
  • if (!maybeTruncated || lineEntries.hasNext) {
  • throw jpe
  • } else {
  • logWarning(s"Got JsonParseException from log file $sourceName" +
  • s" at line $lineNumber, the file might not have finished writing cleanly.")
  • }
  • }
  • }
  • } catch {
  • case ioe: IOException =>
  • throw ioe
  • case e: Exception =>
  • logError(s"Exception parsing Spark event log: $sourceName", e)
  • logError(s"Malformed line #$lineNumber: $currentLine\n")
  • }
  • }

6. StreamingQueryListenerBus

StreamingQueryListenerBus作用于Spark Streaming和Spark SQL组件,它通过继承SparkListener事件类让自己也成为一个监听器,同时它也继承了ListenerBus,维护的监听器需要是StreamingQueryListener类型d ,接收的事件需要是StreamingQueryListener.Event类型的。StreamingQueryListenerBus的构造方法接收一个LiveListenerBus类型的参数sparkListenerBus,在初始化时它会将自己添加到sparkListenerBus事件总线中:

org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus
  • /**
  • * A bus to forward events to [[StreamingQueryListener]]s. This one will send received
  • * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with
  • * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
  • * to StreamingQueryListeners.
  • *
  • * Note that each bus and its registered listeners are associated with a single SparkSession
  • * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only
  • * those queries that were started in the associated SparkSession.
  • * 用于将StreamingQueryListener.Event类型的事件投递到StreamingQueryListener类型的监听器,
  • * 此外还会将StreamingQueryListener.Event类型的事件交给SparkListenerBus。
  • *
  • * StreamingQueryListenerBus自己也是一个SparkListener监听器,
  • * 它会将自己添加到sparkListenerBus中,并且实现了onOtherEvent()方法
  • */
  • class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
  • extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
  • import StreamingQueryListener._
  • // 将自己作为监听器添加到sparkListenerBus事件总线中
  • sparkListenerBus.addListener(this)
  • /**
  • * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
  • * to registered `StreamingQueryListeners`.
  • *
  • * Note 1: We need to track runIds instead of ids because the runId is unique for every started
  • * query, even it its a restart. So even if a query is restarted, this bus will identify them
  • * separately and correctly account for the restart.
  • *
  • * Note 2: This list needs to be maintained separately from the
  • * `StreamingQueryManager.activeQueries` because a terminated query is cleared from
  • * `StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus
  • * must clear a query only after the termination event of that query has been posted.
  • *
  • * 用于记录需要被sparkListenerBus转发的事件的RunId
  • */
  • private val activeQueryRunIds = new mutable.HashSet[UUID]
  • ...
  • }

StreamingQueryListenerBus的post(...)方法投递的事件会经过判断,如果是QueryStartedEvent类型的事件,则记录其runIdactiveQueryRunIds集合,并将事件也投递给sparkListenerBus事件总线,然后将事件通知给所有自己所维护的监听器;对于其它事件都会直接投递给sparkListenerBus事件总线:

org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus#post
  • /**
  • * Post a StreamingQueryListener event to the added StreamingQueryListeners.
  • * Note that only the QueryStarted event is posted to the listener synchronously. Other events
  • * are dispatched to Spark listener bus. This method is guaranteed to be called by queries in
  • * the same SparkSession as this listener.
  • *
  • * 投递事件的方法
  • */
  • def post(event: StreamingQueryListener.Event) {
  • event match {
  • case s: QueryStartedEvent => // 对于QueryStartedEvent单独处理
  • // 记录Query事件的runId
  • activeQueryRunIds.synchronized { activeQueryRunIds += s.runId }
  • // 会将投递的事件也投递给sparkListenerBus
  • sparkListenerBus.post(s)
  • // post to local listeners to trigger callbacks
  • // 然后将事件通知给自己所维护的所有监听器
  • postToAll(s)
  • case _ =>
  • // 其它事件直接投递给sparkListenerBus
  • sparkListenerBus.post(event)
  • }
  • }

其用于向特定监听器投递事件的主要方法doPostEvent(...)内部会根据事件类型分别处理,过滤掉activeQueryRunIds并没有管理的事件:

  • /**
  • * Dispatch events to registered StreamingQueryListeners. Only the events associated queries
  • * started in the same SparkSession as this ListenerBus will be dispatched to the listeners.
  • */
  • override protected def doPostEvent(
  • listener: StreamingQueryListener,
  • event: StreamingQueryListener.Event): Unit = {
  • // 该方法用于判断activeQueryRunIds是否包含指定的runId
  • def shouldReport(runId: UUID): Boolean = {
  • activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
  • }
  • event match {
  • case queryStarted: QueryStartedEvent =>
  • // 如果activeQueryRunIds中包含该Query事件的runId
  • if (shouldReport(queryStarted.runId)) {
  • // 则执行监听器的onQueryStarted()方法
  • listener.onQueryStarted(queryStarted)
  • }
  • case queryProgress: QueryProgressEvent =>
  • // 如果activeQueryRunIds中包含该Query事件的runId
  • if (shouldReport(queryProgress.progress.runId)) {
  • // 则执行监听器的onQueryProgress()方法
  • listener.onQueryProgress(queryProgress)
  • }
  • case queryTerminated: QueryTerminatedEvent =>
  • // 如果activeQueryRunIds中包含该Query事件的runId
  • if (shouldReport(queryTerminated.runId)) {
  • // 则执行监听器的onQueryTerminated()方法
  • listener.onQueryTerminated(queryTerminated)
  • // 因为Query事件终止了,需要将该事件的runId从activeQueryRunIds移除
  • activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
  • }
  • case _ =>
  • }
  • }

onOtherEvent(...)方法是继承自SparkListener类的,由于StreamingQueryListenerBus自己也是一个监听器,它会监听并处理StreamingQueryListener.Event类型的事件,并将事件转发给自己维护的所有监听器:

org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus#onOtherEvent
  • override def onOtherEvent(event: SparkListenerEvent): Unit = {
  • event match {
  • case e: StreamingQueryListener.Event =>
  • // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus
  • // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
  • // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
  • // thread
  • /**
  • * 当收到投递给sparkListenerBus的StreamingQueryListener.Event时,
  • * 当LiveListenerBus事件总线的处理线程还没有工作,或投递的事件不是QueryStartedEvent,
  • * 就讲事件通知给自己所维护的所有监听器
  • */
  • if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
  • postToAll(e)
  • }
  • case _ =>
  • }
  • }

从上面的分析中可以得知,StreamingQueryListenerBus的处理逻辑如下:

  1. StreamingQueryListenerBus只会处理投递给自己的三类事件:QueryStartedEvent、QueryProgressEvent和QueryTerminatedEvent,对它们的处理都会转发给sparkListenerBus事件总线。
  2. 但对于QueryStartedEvent事件,StreamingQueryListenerBus会额外记录该事件的runId,并将其通知给自己所维护的所有监听器。并且在之后,StreamingQueryListenerBus只会处理记录了runId的QueryStartedEvent、QueryProgressEvent和QueryTerminatedEvent三类事件。
  3. StreamingQueryListenerBus自己也会作为sparkListenerBus事件总线的一个监听器,sparkListenerBus事件总线收到StreamingQueryListener.Event事件时,如果sparkListenerBus事件总线还没有工作,或者该事件不是QueryStartedEvent类型的,就会将其投递给StreamingQueryListenerBus自己维护的所有监听器。

下面是StreamingQueryListenerBus内投递消息的流转示意图:

2.StreamingQueryListenerBus事件总线.png

7. StreamingListenerBus

StreamingListenerBus与StreamingQueryListenerBus类似,通过继承SparkListener事件类让自己也成为一个监听器,同时它也继承了ListenerBus,维护的监听器需要是StreamingListener类型的,接收的事件需要是StreamingListenerEvent类型的:

org.apache.spark.streaming.scheduler.StreamingListenerBus
  • /**
  • * A Streaming listener bus to forward events to StreamingListeners. This one will wrap received
  • * Streaming events as WrappedStreamingListenerEvent and send them to Spark listener bus. It also
  • * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents,
  • * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners.
  • * 用于将StreamingListenerEvent类型的事件投递到Streaming Listener类型的监听器,
  • * 此外还会将StreamingListenerEvent类型的事件交给SparkListenerBus。
  • */
  • private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
  • extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
  • ...
  • }

StreamingListenerBus提供一个post(...)方法用于投递StreamingListenerEvent事件,它将事件都转发给内部的sparkListenerBus事件总线了:

  • /**
  • * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be
  • * dispatched to all StreamingListeners in the thread of the Spark listener bus.
  • */
  • def post(event: StreamingListenerEvent) {
  • // 把事件转发给sparkListenerBus
  • sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
  • }

对于doPostEvent(...)方法,它会根据事件类型进行匹配,分别调用指定监听器的指定方法:

org.apache.spark.streaming.scheduler.StreamingListenerBus#doPostEvent
  • // 用于向指定监听器通知事件
  • protected override def doPostEvent(
  • listener: StreamingListener,
  • event: StreamingListenerEvent): Unit = {
  • // 根据事件类型进行匹配,分别处理
  • event match {
  • case receiverStarted: StreamingListenerReceiverStarted =>
  • listener.onReceiverStarted(receiverStarted)
  • case receiverError: StreamingListenerReceiverError =>
  • listener.onReceiverError(receiverError)
  • case receiverStopped: StreamingListenerReceiverStopped =>
  • listener.onReceiverStopped(receiverStopped)
  • case batchSubmitted: StreamingListenerBatchSubmitted =>
  • listener.onBatchSubmitted(batchSubmitted)
  • case batchStarted: StreamingListenerBatchStarted =>
  • listener.onBatchStarted(batchStarted)
  • case batchCompleted: StreamingListenerBatchCompleted =>
  • listener.onBatchCompleted(batchCompleted)
  • case outputOperationStarted: StreamingListenerOutputOperationStarted =>
  • listener.onOutputOperationStarted(outputOperationStarted)
  • case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
  • listener.onOutputOperationCompleted(outputOperationCompleted)
  • case _ =>
  • }
  • }

StreamingListenerBus提供了start()stop()方法,在启动StreamingListenerBus时,会将其作为监听器添加到内部维护的sparkListenerBus事件总线中,在停止时则会执行移除操作:

org.apache.spark.streaming.scheduler.StreamingListenerBus
  • /**
  • * Register this one with the Spark listener bus so that it can receive Streaming events and
  • * forward them to StreamingListeners.
  • */
  • def start(): Unit = {
  • sparkListenerBus.addListener(this) // for getting callbacks on spark events
  • }
  • /**
  • * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any
  • * events after that.
  • */
  • def stop(): Unit = {
  • sparkListenerBus.removeListener(this)
  • }

由于StreamingListenerBus自己也是一个监听器,它会接收WrappedStreamingListenerEvent类型的事件,并将它内部包含的StreamingListenerEvent事件解包并转发给自己维护的所有监听器:

org.apache.spark.streaming.scheduler.StreamingListenerBus#onOtherEvent
  • // 当收到SparkListenerEvent事件
  • override def onOtherEvent(event: SparkListenerEvent): Unit = {
  • event match {
  • // 如果是WrappedStreamingListenerEvent类型的事件,就投递给自己维护的所有监听器
  • case WrappedStreamingListenerEvent(e) =>
  • postToAll(e)
  • case _ =>
  • }
  • }

WrappedStreamingListenerEvent事件的定义如下:

org.apache.spark.streaming.scheduler.StreamingListenerBus.WrappedStreamingListenerEvent
  • /**
  • * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
  • * listener bus.
  • *
  • * @param streamingListenerEvent 内部维护的StreamingListenerEvent事件
  • */
  • private case class WrappedStreamingListenerEvent(streamingListenerEvent: StreamingListenerEvent)
  • extends SparkListenerEvent {
  • // Do not log streaming events in event log as history server does not support streaming
  • // events (SPARK-12140). TODO Once SPARK-12140 is resolved we should set it to true.
  • protected[spark] override def logEvent: Boolean = false
  • }

可见,StreamingListenerBus主要用于转发操作,它会将自己收到的StreamingListenerEvent包装为WrappedStreamingListenerEvent转发给内部维护的sparkListenerBus监听器;并将收到的WrappedStreamingListenerEvent事件转发给自己维护的监听器。

关于Spark中事件总线的介绍就先介绍到这里了,在讲解过程中我们只是分析了它们的原理,但并没有介绍事件总线的用处,在后面的文章中我们将会穿插介绍它们的使用场景。