大数据
流式处理
Spark

Spark基础 05 - Spark性能优化和错误诊断

简介:由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。

1. Spark性能优化概览

由于Spark的计算本质是基于内存的,所以Spark性能程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者是内存。如果内存能够容纳得下所有的数据,那么网络传输和通信就会导致性能出现瓶颈。但是如果内存比较紧张,不足以放下所有的数据(比如在针对10亿以上的数据量进行计算时),还是需要对内存的使用进行性能优化的,比如说使用一些手段来减少内存的消耗。

Spark性能优化,其实主要就是在于对内存的使用进行调优。因为通常情况下来说,如果你的Spark应用程序计算的数据量比较小,并且你的内存足够使用,那么只要运维可以保障网络通常,一般是不会有大的性能问题的。但是Spark应用程序的性能问题往往出现在针对大数据量(比如10亿级别)进行计算时出现,因此通常来说,Spark性能优化,主要是对内存进行性能优化。当然,除了内存调优之外,还有很多手段可以优化Spark应用程序的性能。

1.1. Spark性能优化技术

Spark的性能优化,主要手段包括:

  1. 使用高性能序列化类库;
  2. 优化数据结构;
  3. 对多次使用的RDD进行持久化 / Checkpoint;
  4. 使用序列化的持久化级别;
  5. Java虚拟机垃圾回收调优;
  6. 提高并行度;
  7. 广播共享数据;
  8. 数据本地化;
  9. reduceByKey()groupByKey()的合理使用;
  10. Shuffle调优(核心中的核心,重中之重)。

1.2. Spark性能优化的重要性

实际上Spark到目前为止,在大数据业界的影响力和覆盖度,还远没有达到Hadoop的水平,——虽然说,我们之前一再强调,Spark Core、Spark SQL、Spark Streaming,可以替代MapReduce、Hive查询引擎、Storm。但是事实就是,Spark还没有达到已经替代了它们的地步。

根据我在研究Spark,并且在一线使用Spark,与大量行业内的大数据相关从业人员沟通的情况来看。Spark最大的优点,其实也是它目前最大的问题——基于内存的计算模型。Spark由于使用了基于内存的计算模型,因此导致了其稳定性,远远不如Hadoop。虽然我也很喜欢和热爱Spark,但是这就是事实,Spark的速度的确达到了hadoop的几倍、几十倍、甚至上百倍(极端情况)。但是基于内存的模型,导致它经常出现各种OOM(内存溢出)、内部异常等问题。

说一个亲身经历的例子,曾经用Spark改写几个复杂的MapReduce程序,虽然MapReduce很慢,但是它很稳定,至少慢慢跑,是可以跑出来数据的。但是用Spark Core很快就改写完了程序,问题是,在整整半个月之内,Spark程序根本跑不起来,因为数据量太大,10亿+。导致它出现了各种各样的问题,包括OOM、文件丢失、Task Lost、内部异常等等各种问题。最后耗费了大量时间,最一个Spark程序进行了大量的性能调优,才最终让它可以跑起来。

的确,用了Spark,比MapReduce的速度快了十倍,但是付出的代价是惨痛的,花了整整一个月的时间做这个事情。

因此,当我在公司推广Spark的使用时,很多人都不无担心地说,听说Spark还不够稳定,经常出现问题,比如OOM等,它的稳定性,导致业界的人们不太敢轻易尝试它,在复杂的大数据系统,要求极高稳定性的线程系统中使用。——当然,如果你就是开发一个针对公司内部的,稳定性要求不高的系统,当然不用担心这个问题。

所以,我认为,Spark的基于内存的本质,就导致了上述的问题,导致了它目前还无法完全提到Hadoop中的某些技术。

但是,纵然Spark有各种问题,其优点就是缺点,缺点也是优点——它实在是很快。优秀的Spark应用程序,性能完全可以达到MapReduce、Hive查询引擎的数倍、甚至数十倍。因此,纵使有各种担忧,Spark还是吸引着大量的人们以及公司去探索,和尝试攻克它,使用它,让它为我们所用,用它开放更棒的大数据系统。

因此,正是基于上述背景,Spark工程师的要求是非常高的。比如我们这里,我们正在用Spark开发大型复杂的线上大数据系统,所以针对Spark的招聘,我们是要求Spark工程师必须精通Spark内核源码,能够对程序进行性能优化。——打个广告,实际上,我认为如果能精通本系列课程,那么成为一个行业内优秀的Spark工程师,是一定没有问题的。

所以,Spark虽然有它的问题所在,但是它的优势还是让它以极快的速度,极强的劲头在行业内快速发展。行业内各个公司,也大量缺乏着优秀的Spark工程师。而如果是想转型进行Spark开发的朋友,基于上述种种背景,就应该明白了,Spark性能优化,对于你找工作,对于你在实际工作中解决问题的重要性了!

要成为优秀的Spark工程师,顺利实现转型,那么就必须能够彻底精通Spark内核源码,能够基于对Spark内核原理的深度理解,对线上复杂的Spark大数据系统 / 程序出现的报错和故障,进行排查和解决;能够对运行较慢的Spark应用程序,进行精准的性能问题排查,并且对症下游,针对各种性能问题,使用对应的技术手段,进行解决。

只有这样,我认为,你才能够顺利实现转型,出去成功面试Spark工程师,甚至是高级Spark工程师的岗位。才能在实际工作中,真正让Spark发挥出其巨大的威力。而不仅仅是处于对新技术的喜爱,对Spark进行浅尝辄止的学习——那是没有任何用的。

不精通Spark内核源码,不精通Spark性能优化,也许你能找到Spark大数据的工作,但是通常情况下,也只能进入比较缺人的小公司。要进入大公司,找到更好的职业机会,那么就一起在精通了之前的Spark内核源码深度剖析阶段之后,来进入Spark性能优化阶段的学习吧。

2. 诊断内存消耗

2.1. 内存都花费在哪里了?

  1. 每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的字段,那么它的对象头实际上比对象自己还要大。
  2. Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。
  3. Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。
  4. 元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。

2.2. 如何判断程序消耗了多少内存?

这里有一个非常简单的办法来判断,你的spark程序消耗了多少内存。

  1. 首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()textFile()等方法中,传入第二个参数,设置RDD的Task / Partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个Application所有RDD的Partition数量。
  2. 其次,在程序中将RDD Cache到内存中,调用RDD.cache()方法即可。
  3. 最后,观察Driver的log,你会发现类似于:INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)的日志信息。这就显示了每个Partition占用了多少内存。
  4. 将这个内存信息乘以Partition数量,即可得出RDD的内存占用量。

3. 高性能序列化类库

3.1. 数据序列化概述

在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。

Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。

而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStreamObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。

但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。

3.2. Spark提供的两种序列化机制

Spark实际上提供了两种序列化机制,它只是默认使用了第一种:

  1. Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStreamObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。
  2. Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。

3.3. 如何使用Kryo序列化机制

如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。

使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。

但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。如下代码:

  1. Scala版本:
  • val conf = new SparkConf().setMaster(...).setAppName(...)
  • conf.registerKryoClasses(Array(classOf[Counter] ))
  • val sc = new SparkContext(conf)
  1. Java版本:
  • SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
  • conf.registerKryoClasses(Counter.class)
  • JavaSparkContext sc = new JavaSparkContext(conf)

3.4. 优化Kryo类库的使用

  1. 优化缓存大小

如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个字段。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。

默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。

  1. 预先注册自定义类型

虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。

3.5. 在什么场景下使用Kryo序列化类库?

首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,比如RDD的持久化,在后面会讲解。这里先不说。

那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。

此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。

因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。

4. 优化数据结构

要减少内存的消耗,除了使用高效的序列化类库以外,还有一个很重要的事情,就是优化数据结构。从而避免Java语法特性中所导致的额外内存的开销,比如基于指针的Java数据结构,以及包装类型。

有一个关键的问题,就是优化什么数据结构?其实主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。

4.1. 如何优化数据结构

  1. 优先使用数组以及字符串,而不是集合类。也就是说,优先用Array,而不是ArrayList、LinkedList、HashMap等集合。

比如,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。

还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以优化为,特殊的字符串格式:id:name,address|id:name,address...

  1. 避免使用多层嵌套的对象结构。比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象。

比如说,对于上述例子,也完全可以使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个很好的选择。

{"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]}

  1. 对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,但是之前分析过,还是有额外信息的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。

这里提醒,在Spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。

5. 对多次使用的RDD进行持久化或Checkpoint

如果程序中,对某一个RDD,基于它进行了多次Transformation或者Action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。

此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。

1.对多次操作的RDD进行持久化和Checkpoint.png

6. 使用序列化的持久化级别

除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。因为很有可能,RDD的数据是持久化到内存,或者磁盘中的。如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SERMEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。

注:cache()persist()的区别:cache()底层调用了无参的persist(),默认的缓存级别为MEMORY_ONLY,而persist()可以根据情况设置其它的缓存级别。

这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘IO性能消耗也比较小。

对RDD持久化序列化后,RDD的每个Partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。

因此,对于序列化的持久化级别,还可以进一步优化,也就是说,使用Kryo序列化类库,这样,可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提前注册自定义类型。

7. Java虚拟机垃圾回收调优

7.1. Java虚拟机垃圾回收调优的背景

如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。因为Java虚拟机会定期进行垃圾回收,此时就会追踪所有的java对象,并且在垃圾回收时,找到那些已经不在使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。

垃圾回收的性能开销,是跟内存中的对象的数量,成正比的。所以,对于垃圾回收的性能问题,首先要做的就是,使用更高效的数据结构,比如Array和String;其次就是在持久化RDD时,使用序列化的持久化级别,而且用Kryo序列化类库,这样,每个Partition就只是一个对象——一个字节数组。

7.2. 垃圾回收对性能的影响

如果内存中数据量较大,会频繁地造成内存空间占满,这个时候就会造成垃圾回收频繁地发生,而垃圾回收本身也是有性能消耗的。此外,如果数据量过大,每次垃圾回收时回收的数据量也特别大,也会导致垃圾回收的速度变慢。

除此之外,垃圾回收是工作在一条线程上的,当垃圾回收进行时,其他的线程,比如我们的Task任务线程就会直接停止工作,这也就直接影响到了Task的运行性能。

7.3. 监测垃圾回收

我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

但是要记住,这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了Worker上的日志中,而不是Driver的日志中。

但是这种方式也只是一种,其实也完全可以通过Spark UI(4040端口)来观察每个Stage的垃圾回收的情况。

7.4. 优化Executor内存比例

对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个Executor的60%的内存空间来缓存RDD,那么在Task执行期间创建的对象,只有40%的内存空间来存放。

在这种情况下,很有可能因为你的内存空间的不足,Task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。

在上述情况下,如果发现垃圾回收频繁发生。那么就需要对那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFraction", "0.5")即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让Task创建的对象进行使用。

因此,对于RDD持久化,完全可以使用Kryo序列化,加上降低其Executor内存占比的方式,来减少其内存消耗。给Task提供更多的内存,从而避免Task的执行频繁触发垃圾回收。

2.调节Executor的内存比例.png

7.5. 调节Executor堆外内存

当Spark作业处理的数据量特别大时,可能会出现时不时的报“shuffle output file cannot found”等错误,出现Task丢失或OOM内存溢出,此时可能是由于Executor的堆外内存不够用,导致Executor在运行的过程中出现内存溢出,以致于后续的Stage的Task在运行的时候,从一些Executor中拉取Shuffle Map Output文件时,Executor已经挂掉了,找不到关联的BlockManager对象,导致报错;同时在出错后Spark还会尝试重新提交Task(会出现“resubmitting task”的日志记录),多次提交后依然出错最终Spark作业将失败。

注:在两个Stage之间,数据的交换是通过BlockManager来进行的,上一个Stage会将产生的结果数据保存在本地文件中,在BlockManager中记录文件与数据对应的映射信息,同时将所有Executor与产生的结果数据文件的映射信息报告给Driver结点的MapOutputTracker对象,下一个Stage的Task会通过Driver的MapOutputTracker对象来获取自己需要的数据的Executor所在的Worker的地址,然后根据该地址向对应的Executor的BlockManager获取数据。

在这种情况下,可以考虑调节Executor的堆外内存避免报错,同时当堆外内存调节得比较大的时候,对于性能也会有一定的提升。可以通过在spark-submit脚本提交作业时使用--conf spark.yarn.executor.memoryOverhead=2048进行调节。注意,该配置不可使用SparkConf的set()方式设置。

默认情况下这个堆外内存上限大概是300M左右,在真正处理大数据的时候通过会调节该参数为1G以上。

7.6. 调节连接等待超时时常

在Stage中的Task向前一个Stage获取数据的过程中,可能需要与其他的Executor进行数据交换,此时需要与该Executor的BlockManager建立网络连接,但如果恰好该Executor所在的JVM进程发生了Full GC,就会导致网络连接失败,此时可能会出现某文件无法找到(File Can not be Found)、文件丢失(File Lost)等报错信息;当尝试重试后都拉取不到数据的时会导致会导致DAGScheduler反复提交Stage,TaskScheduler反复提交Task,延长Spark作业的运行时间,甚至会导致Spark作业的崩溃。

此时可以考虑调节连接的超时时长,可以通过在spark-submit脚本提交作业时使用--conf spark.core.connection.ack.wait.timeout=300进行调节。注意,该配置不可使用SparkConf的set()方式设置。通常来说,可以避免部分的偶尔出现的某文件拉取失败或某文件丢失的错误了。

7.7. 高级垃圾回收调优

Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象。年轻代又被划分了三块空间,Eden、Survivor1、Survivor2。

首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。

如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。

如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。

Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代。从而造成短时间存活的对象,长期呆在老年代中占据了空间,而且Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。

如果发现,在Task执行期间,大量Full GC发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:

  1. 包括降低spark.storage.memoryFraction的比例,调整Cache操作占用的内存占比,提供更多内存用于算子函数的执行,给年轻代更多的空间,来存放短时间存活的对象;
  2. 给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/3;
  3. 如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个Executor有4个Task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 4/3。

根据经验来看,对于垃圾回收的调优,尽量就是说,调节Executor内存的比例就可以了。因为jvm的调优是非常复杂和敏感的。除非是,真的到了万不得已的地方,然后呢,自己本身又对jvm相关的技术很了解,那么此时进行eden区域的调节,调优,是可以的。

JVM的一些高级的参数:

  • -XX:SurvivorRatio=4:如果值为4,那么就是两个Survivor跟Eden的比例是2:4,也就是说每个Survivor占据的年轻代的比例是1/6,所以,你其实也可以尝试调大Survivor区域的大小。
  • -XX:NewRatio=4:调节新生代和老年代的比例。

8. 提高并行度

实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。

Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个Partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。

比如说,在spark-submit中,我们配置了2个Executor,每个Executor有5个CPU Core,当我们设置new SparkConf().set("spark.default.parallelism", "5")时,所有的RDD的Partition就都被设置成了5个,也就是说每个RDD的数据会被拆分为5份,那么针对RDD的Partition,每个Partition会启动一个Task进行计算,所以对于所有的算子操作都只会创建5个Task在集群中运行,这个时候有5个CPU Core是空闲的。

可以手动使用textFile()parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。Spark官方的推荐是,给集群中的每个CPU Core设置2~3个Task。

例如spark-submit设置了Executor数量是10个,每个Executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。

9. 广播共享数据

如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个Task上去。而是给每个节点拷贝一份,然后节点上的Task共享该数据。

这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。

例如在我们进行类似MapReduce的Map-Side Join操作,即在每个Task运行时需要把输入的数据与一个表进行Join操作,在运行这个Job时,Spark会首先将表中的数据集读取出来并加载到Driver上,然后在算子函数中进行Join操作;默认情况下,算子函数使用到的外部的数据,都会被拷贝到每一个Task中,此时如果这份外部数据非常大的话就会产生大量的网络传输和内存占用。一般这种情况就应该对外部数据进行BroadCast广播,这样在每个节点上就只会存在一份副本,而不是每个Task上都有一份副本,可以大大减少内存占用的网络传输消耗。

10. 数据本地化

数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建Task调度算法的。

数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:

  1. PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。
  2. NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的Executor进程中,或者是数据在HDFS文件的block中。
  3. NO_PREF:数据从哪里过来,性能都是一样的。
  4. RACK_LOCAL:数据和计算它的代码在一个机架上。
  5. ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

10.1. 数据本地化优化

Spark倾向于使用最好的本地化级别来调度Task,但是这是不可能的。如果没有任何未处理的数据在空闲Eexecutor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到Executor上的CPU释放出来,那么就分配Task过去;第二,立即在任意一个Executor上启动一个Task。

Spark默认会等待一会儿,来期望Task要处理的数据所在的节点上的Executor空闲出一个CPU,从而将Task分配过去。只要超过了时间,那么Spark就会将Task分配到其他任意一个空闲的Executor上。

可以设置参数,spark.locality系列参数,来调节Spark等待Task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.nodespark.locality.wait.processspark.locality.wait.rack

3.数据本地化原理.png

11. reduceByKey和groupByKey

如果能用reduceByKey,那就用reduceByKey,因为它会在Map端,先进行本地Combine,可以大大减少要传输到Reduce端的数据量,减小网络传输的开销。

只有在reduceByKey()处理不了时,才用groupByKey().map()来替代。如下代码:

  • val counts = pairs.reduceByKey(_ + _)
  • val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))

4.reduceByKey和groupByKey.png

12. shuffle性能优化

什么样的情况下会发生Shuffle?在Spark中,主要是这几个算子groupByKey()reduceByKey()countByKey()join()的执行会发生Shuffle操作。以这四个算子为例,它们的执行过程如下:

  • groupByKey()算子需要把分布在集群各个节点上的数据中的相同的key对应的values,都集中到集群中同一个节点上,严格一点说,是集中到一个节点的一个Executor的一个Task中,存储为<key, Iterable<value>>类型的键值对;
  • reduceByKey()算子函数会对values集合进行聚合操作,最后变成一个value;
  • countByKey()算子则需要在一个Task中,获取到一个key对应的所有的values,然后进行计数,统计总共有多少个value;
  • join()算子则需要将两个RDD中相同key所对应的两个value,聚集到同一个节点的Executor的Task中进行后面的处理。

对于这四个算子来说,共同的问题在于,相同的key所对应的数据可能散落在不同的节点上,对相同的key所对应的数据进行聚合操作时,必须让所有键值对都到同一个节点的一个Task中,由一个Task来进行处理。

每一个Shuffle的前半部分Stage的每个Task都会创建下一个Stage的Task数量相同的文件,比如下一个Stage有100个Task,那么当前Stage的每个Task都会创建100份文件,并将同一个key对应的values写入同一个文件中;不同节点上的Task也会将同一个key对应的values写入下一个Stage的同一个Task对应的文件中。

Shuffle的后半部分Stage的每个Task都会从各个节点上的Task写的属于自己的那一份文件中,拉取key-value对;然后Task会有一个内存缓冲区,然后会用Map对key和values进行汇聚;Task会用我们自己定义的聚合算子函数,比如reduceByKey(_ + _),把所有values进行一对一的累加;聚合的得到最终的值,就完成了Shuffle。

Shuffle一定是分为两个Stage来完成的,这其实是个逆向的过程,不是由Stage决定Shuffle,而是由Shuffle决定Stage。以reduceByKey(_ + _)为例,在某个Action触发Job的执行时,DAGScheduler会负责划分Job为多个Stage,划分的依据是:如果发现有会触发Shuffle操作的算子,就将这个操作的前半部分,以及之前所有的RDD和Transformation操作,划分为一个Stage;这个操作的后半部分,以及后面直到Action为止的RDD和Transformation操作,划分为另外一个Stage。

Shuffle前半部分的Task在写入数据到磁盘文件之前,都会先写入一个个的内存缓冲中,内存缓冲满溢之后,再Spill溢写到磁盘文件中。

reduceByKey(_ + _)算子为例的过程示意图如下:

5.6.reduceByKey算子的Shuffle操作过程.png
5.reduceByKey算子的Shuffle操作过程.png

12.1. 合并Map端输出文件

默认的Shuffle行为可能会产生巨量的Map端输出文件;以集群中有100个节点的规模为例,每个节点运行1个Executor,每个Executor分配2个CPU Core,假设总共需要计算1000个Task,且Reduce端也有1000个Task,那么每个Executor平均计算10个Task,则每个节点会输出多少份1万(10 * 1000)份文件,所以的Task在Map端总共会产生100万(10000 * 100)个输出文件;如下示意图:

5.7.普通的Shuffle操作原理.png
6.普通的Shuffle操作原理.png

Shuffle中的写磁盘的操作,基本上就是Shuffle中性能消耗最为严重的部分。磁盘IO对性能和Spark作业执行速度的影响,是极其惊人的。基本上Spark作业的性能,都消耗在Shuffle过程中了。

针对这种情况,可以通过new SparkConf().set("spark.shuffle.consolidateFiles", "true")配置来开启Shuffle操作在Map端输出文件的合并机制;默认情况下该配置是不开启的,会发生上述的大量Map端输出文件的操作,严重影响性能。

在开启了Map端输出文件的合并机制之后,第一个Stage,同时就运行2个Task(CPU Core分配是2),并行运行2个Task;每个Task都以下一个Stage的Task数量创建相应个数的文件;并行运行的2个Task执行完以后就会执行另外两个Task,这接下来的2个Task不会再重新创建输出文件,而是复用之前Task创建的Map端输出文件,将数据写入上一批Task的输出文件中。

第二个Stage中,Task在拉取数据的时候,就不会去拉取上一个Stage每一个Task为自己创建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中可能包含了多个Task给自己的Map端输出。

只有并行执行的Task会去创建新的输出文件,下一批并行执行的Task会复用之前已有的输出文件;但是有一个例外,比如2个Task并行在执行,但是此时又启动要执行2个Task,此时就无法去复用之前2个Task创建的输出文件了,而是还是只能去创建新的输出文件。要实现输出文件的合并的效果,必须是一批Task先执行,然后下一批Task再执行,才能复用之前的输出文件;多批Task同时并行执行是做不到复用的。如下示意图:

5.8.优化后的Shuffle操作原理.png
7.优化后的Shuffle操作原理.png

开启了Map端输出文件合并机制之后,生产环境上的例子,会有什么样的变化?还是以集群中有100个节点的规模为例,每个节点运行1个Executor,每个Executor分配2个CPU Core,假设总共需要计算1000个Task,且Reduce端也有1000个Task,那么每个Executor平均计算10个Task,但由于开启了Map端文件合并,每个节点会输出2000份(2 * 1000)文件,所以Task在Map端总共会产生20万(2000 * 100)个输出文件。

合并Map端输出文件,对Spark的性能有以下的影响:

  1. Map端Task写入磁盘文件的IO次数减少;
  2. 第二个Stage要拉取第一个Stage的文件的数量减少,带来对网络传输的性能的优化。

12.2. 调节Map端内存缓冲与Reduce端内存占比

默认情况下,Shuffle的Map Task,输出到磁盘文件的时候,统一都会先写入每个Task自己关联的一个内存缓冲区,这个缓冲区大小默认是32KB。当内存缓冲区满溢之后,才会进行Spill溢写操作,溢写到磁盘文件中去。

Reduce端Task,在拉取到数据之后会用Map数据格式来对各个key对应的values进行汇聚。针对每个key对应的values,执行我们自定义的聚合函数的代码,比如_ + _。Reduce Task,在进行汇聚、聚合等操作的时候,实际上使用的就是自己对应的Executor的内存,默认Executor内存中划分给Reduce Task进行聚合的比例是0.2(20%),由于这个比例比较小,很有可能会出现拉取过来的数据很多,内存中放不下;这个时候默认将放不下的数据Spill(溢写)到磁盘文件中去。

如果Map端的Task处理的数据量比较大,但内存缓冲大小是固定的,就会导致频繁的磁盘溢写,发生大量磁盘IO操作,从而降低性能。Reduce端聚合内存占比默认是0.2,如果数据量比较大,Reduce Task拉取过来的数据很多,也会由于Reduce端聚合内存不够用,发生频繁Spill溢写操作。而且在后面进行聚合操作的时候,还需要多次重新读取磁盘中的数据进行聚合。

我们可以通过spark.Shuffle.file.buffer参数调节Map Task内存缓冲,该配置默认32k(在Spark 1.3.x版本该参数需要加上单位后缀,Spark 1.5.x以后不需要单位后缀),同时通过spark.shuffle.memoryFraction参数调节Reduce端聚合内存占比。

在实际生产环境中,可以观察每个Stage的Executor、Task详情,及每个Task的Shuffle Write和Shuffle Read的数据量、Shuffle的磁盘和内存读写的数据量;如果发现Shuffle磁盘的Write和Read操作数据量很大,就意味着需要调节这两个些Shuffle的参数进行调优。在调节上面说spark.Shuffle.file.buffer参数时,调节的原则是每次扩大一倍,然后观察效果;spark.Shuffle.memoryFraction每次提高0.1,然后观察效果。Map Task内存缓冲变大了,会减少Spill到磁盘文件的次数;Reduce端聚合内存变大了,也减少Spill到磁盘的次数,同时减少了后面聚合读取磁盘文件的数量。

对于Map Task和Reduce Task有以下的参数可以调整

  • spark.shuffle.consolidateFiles:是否开启Shuffle Block File的合并,默认为false。
  • spark.reducer.maxSizeInFlight:Reduce Task的拉取缓存,默认48MB。
  • spark.shuffle.file.buffer:Map Task的写磁盘缓存,默认32KB。
  • spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次。
  • spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s。
  • spark.shuffle.memoryFraction:用于Reduce端聚合的内存比例,默认0.2,超过比例就会溢出到磁盘上。

12.3. Shuffle Manager的选择

在Spark旧版本中,HashShuffleManager是默认的比较老旧的一种Shuffle Manager,从Spark 1.2.0版本以后Shuffle Manager就不再是默认的选择了。

Spark 1.2.0版本以后,默认的Shuffle manager改为SortShuffleManager。SortShuffleManager与HashShuffleManager两点不同如下:

  1. SortShuffleManager会对每个Reduce Task要处理的数据进行排序(默认的)。
  2. SortShuffleManager会避免像HashShuffleManager那样创建多份磁盘文件;使用SortShuffleManager时,一个Task只会写入一个磁盘文件,不同ReduceTtask的数据,用偏移量来划分界定。

上面讲解的一些调优方式,比如Consolidate Files机制、Map端缓冲、Reduce端内存占比,这些对任何Shuffle Manager都是有用的。

使用SortShuffleManager时,可以设定一个阈值(默认是200),当Map Task创建的输出文件数量小于等于200的、Reduce Task数量少于等于200时,最后会将所有的输出文件合并为一份文件。这样做避免了Sort排序,节省了性能开销。而且还能将多个Reduce Task的文件合并成一份文件,节省了Reduce Task拉取数据的时候的磁盘IO的开销。

在Spark 1.5.0以后,又出现了一种新的Shuffle Manager:Tungsten-Sort Shuffle Manager(钨丝),该Shuffle manager的效果与SortShuffleManager差不多的,唯一的不同之处在于,Tungsten-Sort Manager使用了自己实现的一套内存管理机制,在性能上有很大的提升,而且可以避免Shuffle过程中产生的大量的OOM、GC等等内存相关的异常。

三种Shuffle Manager如何选择?

  1. 首先考虑需不需要默认Spark对数据进行排序?如果不需要的话,其实还是建议搭建就使用最基本的HashShuffleManager,减少排序所消耗的性能;
  2. 如果需要数据按key排序了,那么就选择SortShuffleManager,而且此时要注意,Reduce Task的数量应该是超过200的,这样Sort、Merge(多个文件合并成一个)的机制,才能生效。同时也需要考量有没有必要在Shuffle的过程中就进行排序,毕竟排序操作对性能是有影响的。
  3. 如果不需要排序,而且希望每个Task输出的文件最终是会合并成一份的,以此减少性能开销;可以去调节spark.Shuffle.sort.bypassMergeThreshold阈值,比如Reduce Task数量是500,默认阈值是200,所以默认还是会进行Sort和直接Merge的;可以将阈值调节成550,则不会进行Sort而默认按照Hash的做法,但每个Reduce Task创建一份输出文件在最后合并成一份文件。
  4. 如果想选用SortShuffleManager,而且Spark版本比较高,那么可以考虑尝试使用Tungsten-Sort Shuffle Manager。

相关的设置参数如下:

  • spark.Shuffle.manager:配置Shuffle Manager,值可以是hashsorttungsten-sort
  • spark.Shuffle.sort.bypassMergeThreshold:默认为200,当Map Task创建的输出文件数量小于等于该值、Reduce Task数量少于等于该值时,最后会将所有的输出文件合并为一份文件。

13. 算子调优

Spark中有一些算子操作可以提升作业的运行速度,但是在使用这些算子的时候也会带有或多或少的弊端。

13.1. MapPartitions操作

Spark中,最基本的原则,就是每个Task处理一个RDD的Partition。MapPartitions相较于Map操作,它每次变量获得的是一个Partition的数据;在普通的Map操作中,当Partition中有1万条数据,那传入Map算子的方法需要执行和计算1万次。但是使用MapPartitions操作之后,一个Task仅仅会执行一次传入的方法,该方法一次接收所有的Partition数据,性能比较高。

正如MapPartitions带有的优点,该遍历方式也有相应的缺点。如果是普通的Map操作,一次方法的执行就处理一条数据,当内存不够用的情况下,可以将已经处理完的1千条数据从内存里面垃圾回收掉以腾出空间,因此普通的Map操作通常不会导致内存的OOM异常。但是MapPartitions操作,对于大量数据来说,当一个Partition内的数据过大时,可能会导致内存不够,但是又没无法腾出内存空间,因此会发生OOM内存溢出。

当数据量不是特别大的时候,可以使用MapPartitions系列操作,性能会有一定的提升;但在处理较大的数据量时就需要考虑内存是否足够的情况了。

13.2. Coalesce算子压缩Partition数量

默认情况下,经过filter算子过滤之后,RDD中的每个Partition的数据量可能都不太一样了。当某些Partition的数据量变少了,但在后面进行处理的时候,还使用与Partition数量一样Task来进行处理,有点浪费Task计算资源。而且由于每个Task要处理的数据量差距可能过大,很容易发生数据倾斜。

针对上述的两个问题,我们可以对Partition的数量进行压缩;由于数据量变少了,因此Partition的数量也完全可以对应的减少;缩减Partition的数量之后,在后面可以用较少的Task来处理数据,不会造成Task计算资源的浪费。同时由于在压缩Partition数量时,会尽量让每个Partition的数据量相差不大,以避免后面Task出现数据倾斜的问题。

coalesce算子可以用于压缩Partition的数量,该算子在实际生产环境中,主要用于在filter操作之后,针对每个Partition的数据量各不相同的情况来压缩Partition的数量,在一定程度上提升性能。

coalesce算子和repartition算子都是RDD的分区进行重新划分,它们有以下区别:

  1. repartition只是coalesce接口中shuffle参数为true的简易实现,源码如下:
  • /**
  • * Return a new RDD that has exactly numPartitions partitions.
  • *
  • * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
  • * a shuffle to redistribute data.
  • *
  • * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
  • * which can avoid performing a shuffle.
  • */
  • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  • coalesce(numPartitions, shuffle = true)
  • }
  1. coalesce()方法的作用是创建CoalescedRDD,假设RDD有N个分区,需要重新划分成M个分区:
  • 如果N > M并且N和M相差不多(假如N是1000,M是100),那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuffe参数设置为false;在shuffle参数为false的情况下,如果M > N时,coalesce是无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
  • 如果N > M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成Spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true,会增加一个shuffle的步骤。
  • 如果N < M,一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

总之:如果shuffe为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的。

13.3. foreachPartition算子

默认的foreach操作中,对于每条数据,都要单独调用一次相应的方法,Task需要为每条数据执行一次方法。如果遍历的是100万条数据那么需要调用100万次,性能比较差。这一点尤其体现在向数据库中写入数据的时候,如果每个数据都创建一个数据库连接的话,需要创建100万次数据库连接。数据库连接的创建和销毁,都是非常非常消耗性能的。

foreachPartition算子与mapPartitions算子类似,对于传入foreachPartition算子的方法,每次调用会传入一个Partition所有的数据;对于写入数据库操作来说,每次遍历只需要创建一个数据库连接,并进行批量写入即可,可以极大地改善性能。

在实际生产环境中基本都是使用foreachPartition算子来进行类似的操作,但是跟mapPartitions操作一样,如果当一个Partition中的数量特别大,也有可能会发生OOM内存溢出的问题。

13.4. repartition算子解决Spark SQL并行度过低的问题

并行度是可以由开发者自行调节的,使用spark.default.parallelism配置项,或者在textFile()类似的方法中,传入第二个参数指定Partition数量(这种用法比较少);在生产环境中,一般会根据作业所使用的所有的Executor所分配的CPU Core数量来进行并行度设置,官方推荐指定为CPU Core总数的2 ~ 3倍。

并行度的设置一般是有效的,但有一个例外,在使用用Spark SQL进行数据计算时,SQL执行所在的Stage的并行度无法自己指定。Spark SQL自己会默认根据Hive表对应的HDFS文件的Block块数量,自动设置Spark SQL查询所在的那个Stage的并行度,此时通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的Stage中生效。

Spark SQL默认情况下的并行度,开发者无法设置,因此可能会在Spark SQL所在的Stage后面的那些各类算子操作中由于业务逻辑过于复杂,而并行度太低而导致计算资源无法满足所需要要求。

解决上述Spark SQL无法设置并行度和Task数量的办法,是使用repartition算子对Spark SQL查询获得的RDD进行重新分区,以提高后续计算的并行度,repartition重分区之后的RDD的并行度和Task数量就会按照预期的进行分配,避免因与Spark SQL绑定在一个Stage中的导致后续计算的并行度过低。

13.5. reduceByKey算子的Map端本地聚合

reduceByKey算子相较于普通的Shuffle操作(比如groupByKey算子)来说,它会进行Map端的本地聚合,对Map端在为下个Stage每个Task创建的输出文件中,写数据之前会进行本地的Combine操作,也即是说会对每一个key所对应的values先执行一次算子函数。用reduceByKey算子对性能有以下的提升:

  1. 在本地进行聚合以后,在Map端的数据量就变少了,可以减少磁盘IO,而且可以减少磁盘空间的占用。
  2. 下一个Stage拉取数据的量也变少了,可以减少网络的数据传输的性能消耗。
  3. 在Reduce端进行数据缓存的内存占用变少了。
  4. Reduce端要进行聚合的数据量也变少了。

因此在实际开发中,对于一些类似于要对每个key进行一些字符串拼接的这种较为复杂的操作,可以衡量一下是否可以使用reduceByKey来实现的,如果能够实现对性能是有很大的帮助的。

14. Trouble Shooting

在实际生产环境中,作业执行过程往往会出现各种错误,下面将介绍一些比较常见的错误的解决方式。

15. 数据倾斜