大数据
流式处理
源码解析
Spark

Spark源码分析20 - 调度系统05:调度算法和调度池的构建

简介:对于不同的调度模式会创建不同的调度池构建器。

1. 调度池构建器

在上一篇文章中我们提到过,TaskScheduleImpl的initialize(...)方法在初始化时,会根据调度模式构建调度池。回顾源码如下:

org.apache.spark.scheduler.TaskSchedulerImpl#initialize
  • // 初始化方法
  • def initialize(backend: SchedulerBackend) {
  • this.backend = backend
  • // temporarily set rootPool name to empty
  • // 创建根调度池
  • rootPool = new Pool("", schedulingMode, 0, 0)
  • // 根据调度模式,创建相应的调度池构建器,默认为FIFOSchedulableBuilder
  • schedulableBuilder = {
  • schedulingMode match {
  • case SchedulingMode.FIFO =>
  • new FIFOSchedulableBuilder(rootPool)
  • case SchedulingMode.FAIR =>
  • new FairSchedulableBuilder(rootPool, conf)
  • case _ =>
  • throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
  • }
  • }
  • // 构建调度池
  • schedulableBuilder.buildPools()
  • }

可见,对于不同的调度模式会创建不同的调度池构建器,SchedulingMode.FIFO模式下创建的是FIFOSchedulableBuilder构建器,而SchedulingMode.FAIR模式下则创建的是FairSchedulableBuilder构建器;initialize(...)方法的最后会使用构建器构建调度池。

1.1. FIFOSchedulableBuilder

FIFOSchedulableBuilder和FairSchedulableBuilder都实现了SchedulableBuilder特质,该特质比较简单,它规定了三个方法:

org.apache.spark.scheduler.SchedulableBuilder
  • /**
  • * An interface to build Schedulable tree
  • * buildPools: build the tree nodes(pools)
  • * addTaskSetManager: build the leaf nodes(TaskSetManagers)
  • */
  • private[spark] trait SchedulableBuilder {
  • // 返回根调度池
  • def rootPool: Pool
  • // 对调度池进行构建
  • def buildPools(): Unit
  • // 向调度池内添加TaskSetManager
  • def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
  • }

FIFOSchedulableBuilder构建器的实现也非常简单:

org.apache.spark.scheduler.FairSchedulableBuilder
  • private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
  • extends SchedulableBuilder with Logging {
  • override def buildPools() {
  • // nothing
  • }
  • override def addTaskSetManager(manager: Schedulable, properties: Properties) {
  • // 直接向根调度池添加TaskSetManager
  • rootPool.addSchedulable(manager)
  • }
  • }

可见,对于FIFOSchedulableBuilder构建器来说,它的buildPools()方法没有做任何事情,而addTaskSetManager(...)方法也只会将Schedulable调度对象添加到rootPool中;这种模式是由FIFO模式的调度池决定的,在FIFO调度池中,只存在rootPool一个根调度池,不存在下级调度器,所有添加到FIFO调度池中的Schedulable调度实体,都会直接放入rootPool中。

1.2. FairSchedulableBuilder

FairSchedulableBuilder构建器则不一样,它构建的调度池中的rootPool调度池包含了多个下级调度池,这些下级调度池中又包含多个Schedulable调度实体。我们先来看一下FairSchedulableBuilder的定义和重要字段:

org.apache.spark.scheduler.FairSchedulableBuilder
  • private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
  • extends SchedulableBuilder with Logging {
  • /**
  • * 用户指定的文件系统中的调度分配文件。
  • * 此文件可以通过spark.scheduler.allocation.file属性配置,
  • * FairSchedulableBuilder将从文件系统中读取此文件提供的公平调度配置。
  • */
  • val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
  • /**
  • * 默认的调度文件名。
  • * 常量DEFAULT_SCHEDULER_FILE的值固定为"fairscheduler.xml",
  • * FairSchedulableBuilder将从ClassPath中读取此文件提供的公平调度配置。
  • *
  • * <allocations>
  • * <pool name="production">
  • * <schedulingMode>FAIR</schedulingMode>
  • * <weight>1</weight>
  • * <minShare>2</minShare>
  • * </pool>
  • * <pool name="test">
  • * <schedulingMode>FIFO</schedulingMode>
  • * <weight>2</weight>
  • * <minShare>3</minShare>
  • * </pool>
  • * </allocations>
  • */
  • val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
  • // 即spark.scheduler.pool,此属性的值作为放置TaskSetManager的公平调度池的名称。
  • val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
  • // 默认的调度池名。常量DEFAULT_POOL_NAME的值固定为"default"。
  • val DEFAULT_POOL_NAME = "default"
  • // 固定为"minShare",即XML文件的<Pool>节点的子节点<mindshare>。节点<mindshare>的值将作为Pool的minShare属性。
  • val MINIMUM_SHARES_PROPERTY = "minShare"
  • // 固定为"schedulingMode",即XML文件的<Pool>节点的子节点<schedulingMode>。
  • // 节点<schedulingMode>的值将作为Pool的调度模式(schedulingMode)属性。
  • val SCHEDULING_MODE_PROPERTY = "schedulingMode"
  • // 权重属性。固定为"weight",即XML文件的<Pool>节点的子节点<weight>。节点<weight>的值将作为Pool的权重(weight)属性。
  • val WEIGHT_PROPERTY = "weight"
  • // 固定为"@name",即XML文件的<Pool>节点的name属性。name属性的值将作为Pool的调度池名(poolName)属性。
  • val POOL_NAME_PROPERTY = "@name"
  • // 调度池属性。常量POOLS_PROPERTY的值固定为"pool"。
  • val POOLS_PROPERTY = "pool"
  • // 默认的调度模式FIFO
  • val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
  • // 公平调度算法中Schedulable的minShare属性的默认值,固定为0。
  • val DEFAULT_MINIMUM_SHARE = 0
  • // 默认的权重,固定为1。
  • val DEFAULT_WEIGHT = 1
  • ...
  • }

FairSchedulableBuilder其实是需要通过配置文件来指定子级调度池的,配置文件名默认为fairscheduler.xml,也可以通过spark.scheduler.allocation.file参数来指定。FairSchedulableBuilder的大多数字段都与该XML文件的解析相关。

FairSchedulableBuilder用于构建调度池的buildPools()方法的源码如下:

org.apache.spark.scheduler.FairSchedulableBuilder#buildPools
  • // 构建公平调度池
  • override def buildPools() {
  • var is: Option[InputStream] = None
  • try {
  • is = Option {
  • schedulerAllocFile.map { f =>
  • // 从文件系统中读取公平调度配置的文件输入流
  • new FileInputStream(f)
  • }.getOrElse {
  • // 或者获取fairscheduler.xml文件的输入流
  • Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
  • }
  • }
  • // 解析文件输入流并构建调度池
  • is.foreach { i => buildFairSchedulerPool(i) }
  • } finally {
  • is.foreach(_.close())
  • }
  • // finally create "default" pool
  • // 构建默认的调度池
  • buildDefaultPool()
  • }

可见,当通过schedulerAllocFile成功读取到配置文件的输入流后,会调用buildFairSchedulerPool(...)方法来构建调度池,否则使用buildDefaultPool()方法构造默认的调度池。

我们先来分析一下buildDefaultPool()方法的源码:

org.apache.spark.scheduler.FairSchedulableBuilder#buildDefaultPool
  • // 当根调度池及其子调度池中不存在名为default的调度池时,构建默认调度池
  • private def buildDefaultPool() {
  • if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
  • // 创建默认调度池
  • val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
  • DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
  • // 向根调度池的调度队列中添加默认的子调度池
  • rootPool.addSchedulable(pool)
  • logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
  • DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
  • }
  • }

buildDefaultPool()方法会判断根调度池rootPool是否已存在名为DEFAULT_POOL_NAME(即“default”)的下级调度池,如果没有,就构建名为DEFAULT_POOL_NAME(值为“default”字符串),调度模式为DEFAULT_SCHEDULING_MODE(值为SchedulingMode.FIFO),initMinShare为DEFAULT_MINIMUM_SHARE(值为1),initWeight为DEFAULT_WEIGHT(值为1)的Pool调度池,并添加到根调度池rootPool中作为子级调度池。

buildFairSchedulerPool()方法则会解析配置文件,并按照其中指定的配置创建子级调度器,添加到根调度池rootPool中:

org.apache.spark.scheduler.FairSchedulableBuilder#buildFairSchedulerPool
  • // 对文件输入流进行解析并构建调度池
  • private def buildFairSchedulerPool(is: InputStream) {
  • // 将文件输入流转换为XML
  • val xml = XML.load(is)
  • // 读取XML的每一个<Pool>节点
  • for (poolNode <- (xml \\ POOLS_PROPERTY)) {
  • // 读取<Pool>的name属性作为调度池的名称
  • val poolName = (poolNode \ POOL_NAME_PROPERTY).text
  • var schedulingMode = DEFAULT_SCHEDULING_MODE
  • var minShare = DEFAULT_MINIMUM_SHARE
  • var weight = DEFAULT_WEIGHT
  • val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
  • if (xmlSchedulingMode != "") {
  • try {
  • // 读取<Pool>的子节点<schedulingMode>的值作为调度池的调度模式属性
  • schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
  • } catch {
  • case e: NoSuchElementException =>
  • logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
  • s"using the default schedulingMode: $schedulingMode")
  • }
  • }
  • // 读取<Pool>的子节点<minShare>的值作为调度池的minShare属性
  • val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
  • if (xmlMinShare != "") {
  • minShare = xmlMinShare.toInt
  • }
  • // 读取<Pool>的子节点<weight>的值作为调度池的权重(weight)属性
  • val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
  • if (xmlWeight != "") {
  • weight = xmlWeight.toInt
  • }
  • // 创建子调度池
  • val pool = new Pool(poolName, schedulingMode, minShare, weight)
  • // 将创建的子调度池添加到根调度池的调度队列
  • rootPool.addSchedulable(pool)
  • logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
  • poolName, schedulingMode, minShare, weight))
  • }
  • }

一般来说,配置文件的格式如下:

  • <allocations>
  • <pool name="production">
  • <schedulingMode>FAIR</schedulingMode>
  • <weight>1</weight>
  • <minShare>2</minShare>
  • </pool>
  • <pool name="test">
  • <schedulingMode>FIFO</schedulingMode>
  • <weight>2</weight>
  • <minShare>3</minShare>
  • </pool>
  • </allocations>

也即是说,子级调度池是可以配置多个的。

FairSchedulableBuilder的addTaskSetManager(...)方法源码如下:

org.apache.spark.scheduler.FairSchedulableBuilder#addTaskSetManager
  • // 添加TaskSetManager
  • override def addTaskSetManager(manager: Schedulable, properties: Properties) {
  • // "default"
  • var poolName = DEFAULT_POOL_NAME
  • // 以默认调度池作为TaskSetManager的父调度池
  • var parentPool = rootPool.getSchedulableByName(poolName)
  • // 判断默认调度池是否存在
  • if (properties != null) { // 指定了配置信息
  • // 以spark.scheduler.pool属性指定的调度池作为TaskSetManager的父调度池,如果没有指定则默认为"default"调度池
  • poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
  • // 获取poolName指定的父调度池
  • parentPool = rootPool.getSchedulableByName(poolName)
  • if (parentPool == null) { // 指定的父调度池不存在
  • // we will create a new pool that user has configured in app
  • // instead of being defined in xml file
  • // 创建新的父调度池
  • parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
  • DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
  • // 将父调度池添加到根调度池中
  • rootPool.addSchedulable(parentPool)
  • logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
  • poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
  • }
  • }
  • // 将TaskSetManager放入指定的父调度池
  • parentPool.addSchedulable(manager)
  • logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  • }

相对来说,FairSchedulableBuilder的addTaskSetManager(...)方法则要复杂一点,其接收的properties参数可以指定目标调度池(如果没有指定则默认为“default”调度池),addTaskSetManager(...)方法会从调度池树中查找,如果查找不到就创建新的目标调度池并添加rootPool根调度池中作为子级调度池,然后将Schedulable调度对象添加到该子级调度池中。

2. 调度算法

从上面介绍的调度池构建器相关内容可知,构建器在构造调度池时会指定调度模式,Pool调度池则会在初始化时根据调度模式选择创建特定的调度算法。调度池依赖于调度算法,通过不同的调度算法可以实现不同的调度策略。Spark Core模块在org.apache.spark.scheduler包下提供了调度算法的顶层特质SchedulingAlgorithm,定义非常简单:

org.apache.spark.scheduler.SchedulingAlgorithm
  • /**
  • * An interface for sort algorithm
  • * FIFO: FIFO algorithm between TaskSetManagers
  • * FS: FS algorithm between Pools, and FIFO or FS within Pools
  • */
  • private[spark] trait SchedulingAlgorithm {
  • // 用于对两个Schedulable进行比较
  • def comparator(s1: Schedulable, s2: Schedulable): Boolean
  • }

comparator(...)方法将用于对两个Schedulable进行比较,决定调度顺序。

SchedulingAlgorithm的实现主要有两种:FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,即先进先出调度算法和公平调度算法;下面将分别介绍。

2.1. FIFOSchedulingAlgorithm

FIFOSchedulingAlgorithm会先比较两个调度对象的优先级,再比较调度对象的Stage ID,实现比较简单:

org.apache.spark.scheduler.FIFOSchedulingAlgorithm
  • // 先进先出算法,先比较优先级,再比较Stage ID
  • private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  • override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
  • val priority1 = s1.priority
  • val priority2 = s2.priority
  • // 对s1和s2两个Schedulable的优先级进行比较
  • var res = math.signum(priority1 - priority2)
  • if (res == 0) {
  • val stageId1 = s1.stageId
  • val stageId2 = s2.stageId
  • // 对s1和s2所属的Stage的身份标识进行比较
  • res = math.signum(stageId1 - stageId2)
  • }
  • res < 0
  • }
  • }

2.2. FairSchedulingAlgorithm

FairSchedulingAlgorithm的实现则相对复杂一些,它需要考虑多个综合因素;源码如下:

org.apache.spark.scheduler.FairSchedulingAlgorithm
  • // 公平调度算法
  • private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  • override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
  • val minShare1 = s1.minShare
  • val minShare2 = s2.minShare
  • val runningTasks1 = s1.runningTasks
  • val runningTasks2 = s2.runningTasks
  • // 处于运行状态的Task的数量是否小于s1的minShare
  • val s1Needy = runningTasks1 < minShare1
  • val s2Needy = runningTasks2 < minShare2
  • // 正在运行的任务数量与minShare之间的比值
  • val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
  • val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
  • // 正在运行的任务数量与权重(weight)之间的比值
  • val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
  • val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
  • var compare = 0
  • if (s1Needy && !s2Needy) {
  • /**
  • * 如果s1中处于运行状态的Task的数量小于s1的minShare,
  • * 并且s2中处于运行状态的Task的数量大于等于s2的minShare,
  • * 那么优先调度s1。
  • */
  • return true
  • } else if (!s1Needy && s2Needy) {
  • /**
  • * 如果s1中处于运行状态的Task的数量大于等于s1的minShare,
  • * 并且s2中处于运行状态的Task的数量小于s2的minShare,
  • * 那么优先调度s2。
  • */
  • return false
  • } else if (s1Needy && s2Needy) {
  • /**
  • * 如果s1中处于运行状态的Task的数量小于s1的minShare,
  • * 并且s2中处于运行状态的Task的数量小于s2的minShare,
  • * 那么再对minShareRatio1和minShareRatio2进行比较。
  • * minShareRatio是正在运行的任务数量与minShare之间的比值。
  • *
  • * 如果minShareRatio1小于minShareRatio2,则优先调度s1;
  • * 如果minShareRatio2小于minShareRatio1,则优先调度s2。
  • * 如果minShareRatio1和minShareRatio2相等,还需要对s1和s2的名字进行比较。
  • */
  • compare = minShareRatio1.compareTo(minShareRatio2)
  • } else {
  • /**
  • * 如果s1中处于运行状态的Task的数量大于等于s1的minShare,
  • * 并且s2中处于运行状态的Task的数量大于等于s2的minShare,
  • * 那么再对taskToWeightRatio1和taskToWeightRatio2进行比较。
  • * taskToWeightRatio是正在运行的任务数量与权重(weight)之间的比值。
  • *
  • * 如果taskToWeightRatio1小于taskToWeightRatio2,则优先调度s1;
  • * 如果taskToWeightRatio2小于taskToWeightRatio1,则优先调度s2。
  • * 如果taskToWeightRatio1和taskToWeightRatio2相等,还需要对s1和s2的名字进行比较。
  • */
  • compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
  • }
  • if (compare < 0) {
  • true
  • } else if (compare > 0) {
  • false
  • } else {
  • // 如果s1的名字小于s2的名字,则优先调度s1,否则优先调度s2。
  • s1.name < s2.name
  • }
  • }
  • }

我们可以将FairSchedulingAlgorithm的comparator(...)方法中涉及到的needyminShareRatiotaskToWeightRatio分别理解为调度实体的饥饿程度、资源比和权重比;comparator(...)方法的比较方式如下:

  1. 如果某个调度实体处于饥饿状态而另一个不处于饥饿状态,先调度处于饥饿状态的调度实体。
  2. 如果两个调度实体都处于饥饿状态,则比较它们的资源比,先调度资源比小的调度实体。
  3. 如果两个调度实体都处于饥饿状态,且资源比相同,则比较它们的权重比,先调度权重比小的调度实体。
  4. 如果两个调度实体都处于饥饿状态,且资源比、权重比都相同,则比较它们的名称,按名称排序进行先后调度。

3. 调度池的运作方式

有了对调度池构建器、调度池和调度算法的理解,我们来看一下TaskSchedulerImpl中调度池的构建方式。

TaskSchedulerImpl在其initialize(...)方法中直接创建了rootPool,这也是最基本的根调度池。当调度模式为SchedulingMode.FIFO时,调度池只有一级,即rootPool根调度池,所有的Task都会交由该调度池进行调度;而当调度模式为SchedulingMode.FAIR时,调度池至少有两级,在rootPool根调度池下还会有子级调度池,子级调度池的构建取决于具体配置。经过这些分析我们可以抽象出FIFO调度池和FAIR调度池树的示意图:

1.FIFO和FAIR任务调度池示意图.png

TaskSchedulerImpl的submitTasks(...)方法在接收来自DAGScheduler提交的TaskSet时,会将TaskSet封装到TaskSetManager中,并使用调度池构建器的addTaskSetManager(...)方法将其添加到调度池中,这里只截取源码片段,具体实现将在后面的文章中介绍:

org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  • // 处理传入的TaskSet
  • override def submitTasks(taskSet: TaskSet) {
  • // 获取TaskSet中的所有Task
  • val tasks = taskSet.tasks
  • this.synchronized {
  • // 创建TaskSetManager
  • val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • ...
  • // 将刚创建的TaskSetManager添加到调度池构建器的调度池中
  • schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • ...
  • }
  • // 给Task分配资源并运行Task
  • backend.reviveOffers()
  • }

通过这种方式,就可以实现对TaskSet的初始提交了。而TaskSchedulerImpl的resourceOffers(...)方法则用于从调度池中选择合适的Task运行,源码片段如下:

org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  • /**
  • * Called by cluster manager to offer resources on slaves. We respond by asking our active task
  • * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
  • * that tasks are balanced across the cluster.
  • *
  • * 用于给Task分配资源
  • */
  • def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  • ...
  • // 对rootPool中所有TaskSetManager按照调度算法排序
  • val sortedTaskSets = rootPool.getSortedTaskSetQueue
  • ...
  • }

可见,resourceOffers(...)方法会使用rootPool根调度池的getSortedTaskSetQueue()方法对其管理的所有TaskSetManager按照调度算法进行排序,回顾源码:

org.apache.spark.scheduler.Pool#getSortedTaskSetQueue
  • // 对当前Pool中的所有TaskSetManager按照调度算法进行排序,并返回排序后的TaskSetManager
  • override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
  • var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
  • // 对schedulableQueue内的元素进行排序
  • val sortedSchedulableQueue =
  • schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
  • for (schedulable <- sortedSchedulableQueue) {
  • sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
  • }
  • sortedTaskSetQueue
  • }

对于FIFO调度池来说,它只有一级调度池,也就是rootPool根调度池,调度池内的调度实体都是TaskSetManager,因此这里会使用调度算法对所有TaskSetManager进行排序;而对于FAIR调度池来说,它可能有多级调度池,rootPool根调度池的getSortedTaskSetQueue()方法会对所有层级调度池中的TaskSetManager调度实体使用调度算法进行排序。显然,FAIR调度器拥有更高的灵活性。