大数据
流式处理
Spark

Spark基础 09 - Spark缓存和持久化

简介:与RDD类似,SparkStreaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让SparkStreaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。

1. DStream的output操作以及foreachRDD详解

1.1. output操作概览

Output Meaning
print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。
saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]
saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。
saveAsHadoopFile 同上,将数据保存到Hadoop文件中
foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作,那么就不会执行定义的计算逻辑。

此外,即使你使用了foreachRDDoutput操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。否则,光有foreachRDDoutput操作,在里面没有对RDD执行action操作,也不会触发任何逻辑。

1.2. foreachRDD详解

通常在foreachRDD中,都会创建一个Connection,比如JDBCConnection,然后通过Connection将数据写入外部存储。

  • 误区一:在RDD的foreach操作外部,创建Connection

这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

  • dstream.foreachRDD { rdd =>
  • val connection = createNewConnection()
  • rdd.foreach { record => connection.send(record)
  • }
  • }
  • 误区二:在RDD的foreach操作内部,创建Connection

这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

  • dstream.foreachRDD { rdd =>
  • rdd.foreach { record =>
  • val connection = createNewConnection()
  • connection.send(record)
  • connection.close()
  • }
  • }
  • 合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。
  • dstream.foreachRDD { rdd =>
  • rdd.foreachPartition { partitionOfRecords =>
  • val connection = createNewConnection()
  • partitionOfRecords.foreach(record => connection.send(record))
  • connection.close()
  • }
  • }
  • 合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。
  • dstream.foreachRDD { rdd =>
  • rdd.foreachPartition { partitionOfRecords =>
  • val connection = ConnectionPool.getConnection()
  • partitionOfRecords.foreach(record => connection.send(record))
  • ConnectionPool.returnConnection(connection)
  • }
  • }
  • 案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数据库中。

我们先完成一个简易的MySQL连接池类:

  • package com.coderap.streaming;
  • import java.sql.Connection;
  • import java.sql.DriverManager;
  • import java.util.LinkedList;
  • public class ConnectionPool {
  • // 静态的Connection队列
  • private static LinkedList<Connection> connectionQueue;
  • /**
  • * 加载驱动
  • */
  • static {
  • try {
  • Class.forName("com.mysql.jdbc.Driver");
  • } catch (ClassNotFoundException e) {
  • e.printStackTrace();
  • }
  • }
  • /**
  • * 获取连接,多线程访问并发控制
  • * @return
  • */
  • public synchronized static Connection getConnection() {
  • try {
  • if(connectionQueue == null) {
  • connectionQueue = new LinkedList<Connection>();
  • for(int i = 0; i < 10; i++) {
  • Connection conn = DriverManager.getConnection(
  • "jdbc:mysql://localhost:3306/spark_testdb",
  • "root",
  • "12345678");
  • connectionQueue.push(conn);
  • }
  • }
  • } catch (Exception e) {
  • e.printStackTrace();
  • }
  • return connectionQueue.poll();
  • }
  • /**
  • * 还回去一个连接
  • */
  • public static void returnConnection(Connection conn) {
  • connectionQueue.push(conn);
  • }
  • }

然后编写主体代码:

  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.Optional;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.sql.Connection;
  • import java.sql.Statement;
  • import java.util.Arrays;
  • import java.util.Iterator;
  • import java.util.List;
  • public class PersistWordCount {
  • public static void main(String[] args) throws Exception {
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("PersistWordCount");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • jssc.checkpoint("hdfs://s100:8020/user/ubuntu/spark/wordcount_checkpoint");
  • JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  • JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  • private static final long serialVersionUID = -7328793434152520905L;
  • @Override
  • public Iterator<String> call(String line) throws Exception {
  • return Arrays.asList(line.split(" ")).iterator();
  • }
  • });
  • JavaPairDStream<String, Integer> wordPairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = 7260565320630895877L;
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • JavaPairDStream<String, Integer> wordCountsRDD = wordPairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
  • private static final long serialVersionUID = -293047490356371480L;
  • @Override
  • public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
  • Integer newvalue = 0;
  • if (state.isPresent()) {
  • newvalue = state.get();
  • }
  • for (Integer value : values) {
  • newvalue += value;
  • }
  • return Optional.of(newvalue);
  • }
  • });
  • // 每次得到当前所有单词的统计次数之后,将其写入mysql存储,进行持久化,以便于后续的J2EE应用程序进行显示
  • wordCountsRDD.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
  • private static final long serialVersionUID = 2179397022286419332L;
  • @Override
  • public void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {
  • // 调用RDD的foreachPartition()方法
  • wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
  • private static final long serialVersionUID = -436068627105344893L;
  • @Override
  • public void call(Iterator<Tuple2<String, Integer>> wordCounts) throws Exception {
  • // 给每个partition,获取一个连接
  • Connection conn = ConnectionPool.getConnection();
  • // 遍历partition中的数据,使用一个连接,插入数据库
  • Tuple2<String, Integer> wordCount = null;
  • while (wordCounts.hasNext()) {
  • wordCount = wordCounts.next();
  • String sql = "insert into wordcount(word,count) "
  • + "values('" + wordCount._1 + "'," + wordCount._2 + ")";
  • Statement stmt = conn.createStatement();
  • stmt.executeUpdate(sql);
  • }
  • // 用完以后,将连接还回去
  • ConnectionPool.returnConnection(conn);
  • }
  • });
  • }
  • });
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }

然后我们在数据库中建立一张表,下面是建表语句:

  • create table wordcount (
  • id integer auto_increment primary key,
  • updated_time timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
  • word varchar(255),
  • count integer
  • );

运行代码后,在Netcat中向9999端口发送以下数据:

  • ubuntu@s100:~$ nc -lk 9999
  • leo hello
  • tom hello

然后查看数据库,可以发现这些数据已经被持久化了:

  • mysql> select * from wordcount;
  • +----+---------------------+-------+-------+
  • | id | updated_time | word | count |
  • +----+---------------------+-------+-------+
  • | 1 | 2018-03-03 23:54:40 | leo | 1 |
  • | 2 | 2018-03-03 23:54:40 | tom | 1 |
  • | 3 | 2018-03-03 23:54:40 | hello | 2 |
  • +----+---------------------+-------+-------+
  • 3 rows in set (0.01 sec)

2. 与Spark SQL结合使用

Spark Streaming最强大的地方在于,可以与SparkCore、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用SparkCore执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品。

  1. Java版本
  • package com.coderap.streaming;
  • import java.util.ArrayList;
  • import java.util.List;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.sql.DataFrame;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.RowFactory;
  • import org.apache.spark.sql.hive.HiveContext;
  • import org.apache.spark.sql.types.DataTypes;
  • import org.apache.spark.sql.types.StructField;
  • import org.apache.spark.sql.types.StructType;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • /**
  • * 与Spark SQL整合使用,top3热门商品实时统计
  • * @author Administrator
  • *
  • */
  • public class Top3HotProduct {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("Top3HotProduct");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  • // 首先看一下,输入日志的格式
  • // leo iphone mobile_phone
  • // 首先,获取输入数据流
  • // 这里顺带提一句,之前没有讲过,就是说,我们的Spark Streaming的案例为什么都是基于socket的呢?
  • // 因为方便啊。。。
  • // 其实,企业里面,真正最常用的,都是基于Kafka这种数据源
  • // 但是我觉得我们的练习,用socket也无妨,比较方便,而且一点也不影响学习
  • // 因为不同的输入来源的,不同之处,只是在创建输入DStream的那一点点代码
  • // 所以,核心是在于之后的Spark Streaming的实时计算
  • // 所以只要我们掌握了各个案例和功能的使用
  • // 在企业里,切换到Kafka,易如反掌把,因为我们之前都详细讲过,而且实验过,实战编码过,将Kafka作为
  • // 数据源的两种方式了
  • // 获取输入数据流
  • JavaReceiverInputDStream<String> productClickLogsDStream = jssc.socketTextStream("spark1", 9999);
  • // 然后,应该是做一个映射,将每个种类的每个商品,映射为(category_product, 1)的这种格式
  • // 从而在后面可以使用window操作,对窗口中的这种格式的数据,进行reduceByKey操作
  • // 从而统计出来,一个窗口中的每个种类的每个商品的,点击次数
  • JavaPairDStream<String, Integer> categoryProductPairsDStream = productClickLogsDStream
  • .mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, Integer> call(String productClickLog)
  • throws Exception {
  • String[] productClickLogSplited = productClickLog.split(" ");
  • return new Tuple2<String, Integer>(productClickLogSplited[2] + "_" +
  • productClickLogSplited[1], 1);
  • }
  • });
  • // 然后执行window操作
  • // 到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey操作
  • // 计算出来这60秒内,每个种类的每个商品的点击次数
  • JavaPairDStream<String, Integer> categoryProductCountsDStream =
  • categoryProductPairsDStream.reduceByKeyAndWindow(
  • new Function2<Integer, Integer, Integer>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • }, Durations.seconds(60), Durations.seconds(10));
  • // 然后针对60秒内的每个种类的每个商品的点击次数
  • // foreachRDD,在内部,使用Spark SQL执行top3热门商品的统计
  • categoryProductCountsDStream.foreachRDD(new Function<JavaPairRDD<String,Integer>, Void>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Void call(JavaPairRDD<String, Integer> categoryProductCountsRDD) throws Exception {
  • // 将该RDD,转换为JavaRDD<Row>的格式
  • JavaRDD<Row> categoryProductCountRowRDD = categoryProductCountsRDD.map(
  • new Function<Tuple2<String,Integer>, Row>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Row call(Tuple2<String, Integer> categoryProductCount)
  • throws Exception {
  • String category = categoryProductCount._1.split("_")[0];
  • String product = categoryProductCount._1.split("_")[1];
  • Integer count = categoryProductCount._2;
  • return RowFactory.create(category, product, count);
  • }
  • });
  • // 然后,执行DataFrame转换
  • List<StructField> structFields = new ArrayList<StructField>();
  • structFields.add(DataTypes.createStructField("category", DataTypes.StringType, true));
  • structFields.add(DataTypes.createStructField("product", DataTypes.StringType, true));
  • structFields.add(DataTypes.createStructField("click_count", DataTypes.IntegerType, true));
  • StructType structType = DataTypes.createStructType(structFields);
  • HiveContext hiveContext = new HiveContext(categoryProductCountsRDD.context());
  • DataFrame categoryProductCountDF = hiveContext.createDataFrame(
  • categoryProductCountRowRDD, structType);
  • // 将60秒内的每个种类的每个商品的点击次数的数据,注册为一个临时表
  • categoryProductCountDF.registerTempTable("product_click_log");
  • // 执行SQL语句,针对临时表,统计出来每个种类下,点击次数排名前3的热门商品
  • DataFrame top3ProductDF = hiveContext.sql(
  • "SELECT category,product,click_count "
  • + "FROM ("
  • + "SELECT "
  • + "category,"
  • + "product,"
  • + "click_count,"
  • + "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
  • + "FROM product_click_log"
  • + ") tmp "
  • + "WHERE rank<=3");
  • // 这里说明一下,其实在企业场景中,可以不是打印的
  • // 案例说,应该将数据保存到redis缓存、或者是mysql db中
  • // 然后,应该配合一个J2EE系统,进行数据的展示和查询、图形报表
  • top3ProductDF.show();
  • return null;
  • }
  • });
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.StreamingContext
  • import org.apache.spark.streaming.Seconds
  • import org.apache.spark.sql.Row
  • import org.apache.spark.sql.types.StructType
  • import org.apache.spark.sql.types.StructField
  • import org.apache.spark.sql.types.StringType
  • import org.apache.spark.sql.types.IntegerType
  • import org.apache.spark.sql.hive.HiveContext
  • /**
  • * @author Administrator
  • */
  • object Top3HotProduct {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("Top3HotProduct")
  • val ssc = new StreamingContext(conf, Seconds(1))
  • val productClickLogsDStream = ssc.socketTextStream("spark1", 9999)
  • val categoryProductPairsDStream = productClickLogsDStream
  • .map { productClickLog => (productClickLog.split(" ")(2) + "_" + productClickLog.split(" ")(1), 1)}
  • val categoryProductCountsDStream = categoryProductPairsDStream.reduceByKeyAndWindow(
  • (v1: Int, v2: Int) => v1 + v2,
  • Seconds(60),
  • Seconds(10))
  • categoryProductCountsDStream.foreachRDD(categoryProductCountsRDD => {
  • val categoryProductCountRowRDD = categoryProductCountsRDD.map(tuple => {
  • val category = tuple._1.split("_")(0)
  • val product = tuple._1.split("_")(1)
  • val count = tuple._2
  • Row(category, product, count)
  • })
  • val structType = StructType(Array(
  • StructField("category", StringType, true),
  • StructField("product", StringType, true),
  • StructField("click_count", IntegerType, true)))
  • val hiveContext = new HiveContext(categoryProductCountsRDD.context)
  • val categoryProductCountDF = hiveContext.createDataFrame(categoryProductCountRowRDD, structType)
  • categoryProductCountDF.registerTempTable("product_click_log")
  • val top3ProductDF = hiveContext.sql(
  • "SELECT category,product,click_count "
  • + "FROM ("
  • + "SELECT "
  • + "category,"
  • + "product,"
  • + "click_count,"
  • + "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "
  • + "FROM product_click_log"
  • + ") tmp "
  • + "WHERE rank<=3")
  • top3ProductDF.show()
  • })
  • ssc.start()
  • ssc.awaitTermination()
  • }
  • }

3. 缓存与持久化机制

与RDD类似,SparkStreaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让SparkStreaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。

对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即SparkStreaming默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。

对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2。

与RDD不同的是,默认的持久化级别,统一都是要序列化的。

4. Checkpoint机制

每一个Spark Streaming应用,正常来说,都是要7 *24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错。

如果要实现这个目标,SparkStreaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint:

  1. 元数据checkpoint——将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行SparkStreaming应用程序的Driver进程所在节点失败时,该信息可以用于进行恢复。元数据信息包括了:
  • 配置信息——创建Spark Streaming应用程序的配置信息,比如SparkConf中的信息。

  • DStream的操作信息——定义了Spark Stream应用程序的计算逻辑的DStream操作信息。

  • 未处理的batch信息——那些job正在排队,还没处理的batch信息。
  1. 数据checkpoint——将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。

对于一些将多个batch的数据进行聚合的,有状态的transformation操作,这是非常有用的。在这种transformation操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。

要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中间产生的RDD,会定期地被checkpoint到可靠的存储系统上,比如HDFS。从而削减RDD的依赖链条,进而缩短失败恢复时,RDD的恢复时间。

一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDDcheckpoint主要是为了,使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复。

4.1. 何时启用Checkpoint机制?

  1. 使用了有状态的transformation操作——比如updateStateByKey,或者reduceByKeyAndWindow操作,被使用了,那么checkpoint目录要求是必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDDcheckpoint。
  2. 要保证可以从Driver失败中进行恢复——元数据checkpoint需要启用,来进行这种情况的恢复。

要注意的是,并不是说,所有的SparkStreaming应用程序,都要启用checkpoint机制,如果即不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint。事实上,这么做反而是有助于提升性能的。

4.2. 如何启用Checkpoint机制?

  1. 对于有状态的transformation操作,启用checkpoint机制,定期将其生产的RDD数据checkpoint,是比较简单的。

可以通过配置一个容错的、可靠的文件系统(比如HDFS)的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。使用StreamingContext的checkpoint()方法即可。然后,你就可以放心使用有状态的transformation操作了。

  1. 如果为了要从Driver失败中进行恢复,那么启用checkpoint机制,是比较复杂的。需要改写SparkStreaming应用程序。

当应用程序第一次启动的时候,需要创建一个新的StreamingContext,并且调用其start()方法,进行启动。当Driver从失败中恢复过来时,需要从checkpoint目录中记录的元数据中,恢复出来一个StreamingContext。

4.3. 为Driver失败的恢复机制重写程序

  1. Java版本
  • JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  • @Override
  • public JavaStreamingContext create() {
  • JavaStreamingContext jssc = new JavaStreamingContext(...);
  • JavaDStream<String> lines = jssc.socketTextStream(...);
  • jssc.checkpoint(checkpointDirectory);
  • return jssc;
  • }
  • };
  • JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
  • context.start();
  • context.awaitTermination();
  1. Scala版本
  • def functionToCreateContext(): StreamingContext = {
  • val ssc = new StreamingContext(...)
  • val lines = ssc.socketTextStream(...)
  • ssc.checkpoint(checkpointDirectory)
  • ssc
  • }
  • val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  • context.start()
  • context.awaitTermination()

4.4. 配置spark-submit提交参数

按照上述方法,进行SparkStreaming应用程序的重写后,当第一次运行程序时,如果发现checkpoint目录不存在,那么就使用定义的函数来第一次创建一个StreamingContext,并将其元数据写入checkpoint目录;当从Driver失败中恢复过来时,发现checkpoint目录已经存在了,那么会使用该目录中的元数据创建一个StreamingContext。

但是上面的重写应用程序的过程,只是实现Driver失败自动恢复的第一步。第二步是,必须确保Driver可以在失败时,自动被重启。

要能够自动从Driver失败中恢复过来,运行SparkStreaming应用程序的集群,就必须监控Driver运行的过程,并且在它失败时将它重启。对于Spark自身的standalone模式,需要进行一些配置去supervisedriver,在它失败时将其重启。

首先,要在spark-submit中,添加–deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver;但是,要能够自动重启Driver,就必须将其值设置为cluster;此外,需要添加–supervise参数。

使用上述第二步骤提交应用之后,就可以让driver在失败时自动被重启,并且通过checkpoint目录的元数据恢复StreamingContext。

4.5. Checkpoint的说明

将RDDcheckpoint到可靠的存储系统上,会耗费很多性能。当RDD被checkpoint时,会导致这些batch的处理时间增加。因此,checkpoint的间隔,需要谨慎的设置。对于那些间隔很多的batch,比如1秒,如果还要执行checkpoint操作,则会大幅度削减吞吐量。而另外一方面,如果checkpoint操作执行的太不频繁,那就会导致RDD的lineage变长,又会有失败恢复时间过长的风险。

对于那些要求checkpoint的有状态的transformation操作,默认的checkpoint间隔通常是batch间隔的数倍,至少是10秒。使用DStream的checkpoint()方法,可以设置这个DStream的checkpoint的间隔时长。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5~10倍,是个不错的选择。