大数据
流式处理
Spark
源码解析

Spark源码分析02 - SparkConf配置管理

简介:本文主要讲解Spark中与配置管理相关的内容。

1. SparkConf简介

SparkConf用于管理Spark所有的配置项。在日常开发中,不论是使用旧的SparkContext作为访问Spark的入口,还是使用新提供的SparkSession统一访问入口,我们会使用SparkConf设置各类参数,并传递给SparkContext或SparkSession。SparkConf类的定义及其中重要的字段如下:

org.apache.spark.SparkConf
  • /**
  • * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
  • *
  • * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
  • * values from any `spark.*` Java system properties set in your application as well. In this case,
  • * parameters you set directly on the `SparkConf` object take priority over system properties.
  • *
  • * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
  • * get the same configuration no matter what the system properties are.
  • *
  • * All setter methods in this class support chaining. For example, you can write
  • * `new SparkConf().setMaster("local").setAppName("My app")`.
  • *
  • * @param loadDefaults whether to also load values from Java system properties
  • *
  • * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
  • * by the user. Spark does not support modifying the configuration at runtime.
  • */
  • class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
  • import SparkConf._
  • // 用于存放配置的Map
  • private val settings = new ConcurrentHashMap[String, String]()
  • // 配置读取器,使用懒加载方法初始化
  • @transient private lazy val reader: ConfigReader = {
  • // SparkConfigProvider对settings字典进行了包装
  • val _reader = new ConfigReader(new SparkConfigProvider(settings))
  • _reader.bindEnv(new ConfigProvider {
  • override def get(key: String): Option[String] = Option(getenv(key))
  • })
  • _reader
  • }
  • ...
  • }

在SparkConf类的构造方法中,loadDefaults参数决定是否加载System Properties中的属性,SparkConf提供了一个无参构造器,它默认传入的loadDefaults参数为true:

  • /** Create a SparkConf that loads defaults from system properties and the classpath
  • * 默认无参的构造方法,会传入loadDefaults为true
  • **/
  • def this() = this(true)

在SparkConf初始化时,如果loadDefaults参数为true,将会调用loadFromSystemProperties(...)方法加载System Properties:

org.apache.spark.SparkConf
  • // 当该参数为true时,会从系统属性中加载Spark的配置
  • if (loadDefaults) {
  • loadFromSystemProperties(false)
  • }
  • private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
  • // Load any spark.* system properties
  • // 获取系统跟属性并加载以spark.开头的
  • for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
  • set(key, value, silent)
  • }
  • this
  • }

可见,这里限定了系统参数是以spark.作为前缀的,Utils的getSystemProperties()方法实现比较简单,它直接调用了System.getProperty(...)进行获取,最终会返回一个Map:

org.apache.spark.util.Utils#getSystemProperties
  • /**
  • * Returns the system properties map that is thread-safe to iterator over. It gets the
  • * properties which have been set explicitly, as well as those for which only a default value
  • * has been defined.
  • */
  • def getSystemProperties: Map[String, String] = {
  • System.getProperties.stringPropertyNames().asScala
  • .map(key => (key, System.getProperty(key))).toMap
  • }

SparkConf使用线程安全的ConcurrentHashMap对象settings来保存所有的配置,从settings的实例化也可以看出,SparkConf中所有配置项和对应的值的类型都是String。

SparkConf还在其伴生对象中定义了废弃的配置项以及一些在不同版本中发生变化的配置项:

org.apache.spark.SparkConf
  • /**
  • * Maps deprecated config keys to information about the deprecation.
  • *
  • * The extra information is logged as a warning when the config is present in the user's
  • * configuration.
  • *
  • * 废弃的配置项
  • */
  • private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
  • val configs = Seq(
  • DeprecatedConfig("spark.cache.class", "0.8",
  • "The spark.cache.class property is no longer being used! Specify storage levels using " +
  • "the RDD.persist() method instead."),
  • // 配置项比较多,此处省略
  • ...
  • )
  • Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
  • }
  • /**
  • * Maps a current config key to alternate keys that were used in previous version of Spark.
  • *
  • * The alternates are used in the order defined in this map. If deprecated configs are
  • * present in the user's configuration, a warning is logged.
  • *
  • * 在不同版本中发生变化的配置项
  • */
  • private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
  • "spark.executor.userClassPathFirst" -> Seq(
  • AlternateConfig("spark.files.userClassPathFirst", "1.3")),
  • // 配置项比较多,此处省略
  • ...
  • )

其中,deprecatedConfigs使用的DeprecatedConfig中的三个字符串参数分别代表废弃的配置项名称、在哪个版本被废弃的以及废弃提示说明;configsWithAlternatives则使用键值对的形式表示新旧配置项的变化,键为字符串,表示配置项新的名称,值为AlternateConfig序列,包含多个AlternateConfig对象,AlternateConfig中的两个字符串参数分别表示配置项旧的名称以及出现的版本号。

2. 配置项的设置

Spark的所有配置通过以下3种方式设置:

  1. 来源于系统参数(即使用System.getProperties获取的属性)中以spark.作为前缀的那部分属性。
  2. 使用SparkConf的API进行设置。
  3. 从其他SparkConf中克隆。

第一种方式在上面已经讲解过了。第二种方式也就是我们日常开发用的比较多的方式,SparkConf提供了大量以“set”开头的方法方便我们设置配置,如常见的setMaster(...)setAppName(...)等:

  • /**
  • * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
  • * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
  • */
  • def setMaster(master: String): SparkConf = {
  • set("spark.master", master)
  • }
  • /** Set a name for your application. Shown in the Spark web UI. */
  • def setAppName(name: String): SparkConf = {
  • set("spark.app.name", name)
  • }

第三种设置方式,则是由SparkConf的clone()方法实现的,SparkConf实现了JDK的Cloneable接口,提供了克隆操作,实现比较简单:

org.apache.spark.SparkConf#clone
  • /**
  • * Copy this object
  • * clone()方法会克隆一个新的SparkConf,方便开发者根据当前配置克隆一份新配置
  • **/
  • override def clone: SparkConf = {
  • val cloned = new SparkConf(false)
  • settings.entrySet().asScala.foreach { e =>
  • cloned.set(e.getKey(), e.getValue(), true)
  • }
  • cloned
  • }

其实上述三种方法最终都还是使用了SparkConf的set(key: String, value: String, silent: Boolean): SparkConf方法,该方法会检查配置项和值的合法性,然后将它们添加到settings字典中:

org.apache.spark.SparkConf#set
  • private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
  • // 校验参数
  • if (key == null) {
  • throw new NullPointerException("null key")
  • }
  • if (value == null) {
  • throw new NullPointerException("null value for " + key)
  • }
  • // 如果不是静默添加,设置丢弃的配置会打印提示
  • if (!silent) {
  • logDeprecationWarning(key)
  • }
  • // 设置到settings中
  • settings.put(key, value)
  • this
  • }

3. 配置项的获取

对于配置项的获取,SparkConf则提供了大量的以“get”开头的方法,获取不同类型的配置项值,它们最终都调用了getOption(...)方法,该方法会先按指定键获取,如果没有获取到就按照对应的废弃键尝试获取:

org.apache.spark.SparkConf#getOption
  • /** Get a parameter as an Option */
  • def getOption(key: String): Option[String] = {
  • Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
  • }

4. 配置项的校验

SparkConf提供了validateSettings()方法校验配置项的合法性并记录或打印提示日志,它的源码非常多,如下:

  • /**
  • * Checks for illegal or deprecated config settings. Throws an exception for the former. Not
  • * idempotent - may mutate this conf object to convert deprecated settings to supported ones.
  • */
  • private[spark] def validateSettings() {
  • // 在1.0版本之后该配置会被Cluster Manager的SPARK_LOCAL_DIRS或LOCAL_DIRS覆盖
  • if (contains("spark.local.dir")) {
  • val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
  • "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
  • logWarning(msg)
  • }
  • // 大量配置项名称
  • val executorOptsKey = "spark.executor.extraJavaOptions"
  • val executorClasspathKey = "spark.executor.extraClassPath"
  • val driverOptsKey = "spark.driver.extraJavaOptions"
  • val driverClassPathKey = "spark.driver.extraClassPath"
  • val driverLibraryPathKey = "spark.driver.extraLibraryPath"
  • val sparkExecutorInstances = "spark.executor.instances"
  • // Used by Yarn in 1.1 and before
  • // spark.driver.libraryPath在1.2被废弃
  • sys.props.get("spark.driver.libraryPath").foreach { value =>
  • val warning =
  • s"""
  • |spark.driver.libraryPath was detected (set to '$value').
  • |This is deprecated in Spark 1.2+.
  • |
  • |Please instead use: $driverLibraryPathKey
  • """.stripMargin
  • logWarning(warning)
  • }
  • // Validate spark.executor.extraJavaOptions
  • // 检查spark.executor.extraJavaOptions
  • getOption(executorOptsKey).foreach { javaOpts =>
  • if (javaOpts.contains("-Dspark")) {
  • val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
  • "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
  • throw new Exception(msg)
  • }
  • if (javaOpts.contains("-Xmx")) {
  • val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +
  • s"(was '$javaOpts'). Use spark.executor.memory instead."
  • throw new Exception(msg)
  • }
  • }
  • // Validate memory fractions
  • // 检查内存相关的配置
  • val deprecatedMemoryKeys = Seq(
  • "spark.storage.memoryFraction",
  • "spark.shuffle.memoryFraction",
  • "spark.shuffle.safetyFraction",
  • "spark.storage.unrollFraction",
  • "spark.storage.safetyFraction")
  • val memoryKeys = Seq(
  • "spark.memory.fraction",
  • "spark.memory.storageFraction") ++
  • deprecatedMemoryKeys
  • for (key <- memoryKeys) {
  • val value = getDouble(key, 0.5)
  • if (value > 1 || value < 0) {
  • throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
  • }
  • }
  • // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
  • // 检查内存相关的配置
  • val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
  • val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
  • if (!legacyMemoryManagement) {
  • val keyset = deprecatedMemoryKeys.toSet
  • val detected = settings.keys().asScala.filter(keyset.contains)
  • if (detected.nonEmpty) {
  • logWarning("Detected deprecated memory fraction settings: " +
  • detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
  • "memory management are unified. All memory fractions used in the old model are " +
  • "now deprecated and no longer read. If you wish to use the old memory management, " +
  • s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
  • }
  • }
  • // Check for legacy configs
  • // 检查Spark运行时JVM参数
  • sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
  • val warning =
  • s"""
  • |SPARK_JAVA_OPTS was detected (set to '$value').
  • |This is deprecated in Spark 1.0+.
  • |
  • |Please instead use:
  • | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
  • | - ./spark-submit with --driver-java-options to set -X options for a driver
  • | - spark.executor.extraJavaOptions to set -X options for executors
  • | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
  • """.stripMargin
  • logWarning(warning)
  • for (key <- Seq(executorOptsKey, driverOptsKey)) {
  • if (getOption(key).isDefined) {
  • throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
  • } else {
  • logWarning(s"Setting '$key' to '$value' as a work-around.")
  • set(key, value)
  • }
  • }
  • }
  • // 检查Spark Classpath
  • sys.env.get("SPARK_CLASSPATH").foreach { value =>
  • val warning =
  • s"""
  • |SPARK_CLASSPATH was detected (set to '$value').
  • |This is deprecated in Spark 1.0+.
  • |
  • |Please instead use:
  • | - ./spark-submit with --driver-class-path to augment the driver classpath
  • | - spark.executor.extraClassPath to augment the executor classpath
  • """.stripMargin
  • logWarning(warning)
  • for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
  • if (getOption(key).isDefined) {
  • throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
  • } else {
  • logWarning(s"Setting '$key' to '$value' as a work-around.")
  • set(key, value)
  • }
  • }
  • }
  • // 检查Executor相关参数
  • if (!contains(sparkExecutorInstances)) {
  • sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
  • val warning =
  • s"""
  • |SPARK_WORKER_INSTANCES was detected (set to '$value').
  • |This is deprecated in Spark 1.0+.
  • |
  • |Please instead use:
  • | - ./spark-submit with --num-executors to specify the number of executors
  • | - Or set SPARK_EXECUTOR_INSTANCES
  • | - spark.executor.instances to configure the number of instances in the spark config.
  • """.stripMargin
  • logWarning(warning)
  • set("spark.executor.instances", value)
  • }
  • }
  • // 检查部署模式相关的参数
  • if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
  • val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
  • "instead use \"yarn\" with specified deploy mode."
  • get("spark.master") match {
  • case "yarn-cluster" =>
  • logWarning(warning)
  • set("spark.master", "yarn")
  • set("spark.submit.deployMode", "cluster")
  • case "yarn-client" =>
  • logWarning(warning)
  • set("spark.master", "yarn")
  • set("spark.submit.deployMode", "client")
  • case _ => // Any other unexpected master will be checked when creating scheduler backend.
  • }
  • }
  • if (contains("spark.submit.deployMode")) {
  • get("spark.submit.deployMode") match {
  • case "cluster" | "client" =>
  • case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
  • "\"client\".")
  • }
  • }
  • }