大数据
流式处理
Spark

Spark基础 08 - Spark Streaming

简介:Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架。它的底层,其实,也是基于我们之前讲解的SparkCore的。基本的计算模型,还是基于内存的大数据实时计算模型。而且,它的底层的组件或者叫做概念,其实还是最核心的RDD。

1. 大数据实时计算介绍

Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架。它的底层,其实,也是基于我们之前讲解的SparkCore的。基本的计算模型,还是基于内存的大数据实时计算模型。而且,它的底层的组件或者叫做概念,其实还是最核心的RDD。

Spark针对实时计算的特点,在RDD之上,进行了一层封装,叫做DStream。其实,学过了Spark SQL之后,你理解这种封装就容易了。之前学习Spark SQL是不是也是发现,它针对数据查询这种应用,提供了一种基于RDD之上的全新概念,DataFrame,但是,其底层还是基于RDD的。所以,RDD是整个Spark技术生态中的核心。要学好Spark在交互式查询、实时计算上的应用技术和框架,首先必须学好Spark核心编程,也就是Spark Core。

1.1. Spark Streaming简介

Spark Streaming是Spark Core
API的一种扩展,它可以用于进行大规模、高吞吐量、容错的实时数据流的处理。它支持从很多种数据源中读取数据,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或者是TCP
Socket。并且能够使用类似高阶函数的复杂算法来进行数据处理,比如map、reduce、join和window。处理后的数据可以被保存到文件系统、数据库、Dashboard等存储中。

1.Spark Streaming流程示意图.png

1.2. Spark Streaming基本工作原理

Spark Streaming内部的基本工作原理如下:接收实时输入数据流,然后将数据拆分成多个batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的。

2.Spark Streaming原理示意图.png

1.2.1. DStream

Spark Streaming提供了一种高级的抽象,叫做DStream,英文全称为DiscretizedStream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume和Kinesis;也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。

DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每个RDD都包含了一个时间段内的数据。

3.DStream示意图.png

对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。底层的RDD的transformation操作,其实,还是由Spark
Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API。

4.多个DStream示意图.png

1.3. Spark Streaming与Storm的对比

对比点 Storm Spark Streaming
实时计算模型 纯实时,来一条数据,处理一条数据 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理
实时计算延迟度 毫秒级 秒级
吞吐量
事务机制 支持完善 支持,但不够完善
健壮性 / 容错性 ZooKeeper,Acker,非常强 Checkpoint,WAL,一般
动态调整并行度 支持 不支持

事实上,Spark Streaming绝对谈不上比Storm优秀。这两个框架在实时计算领域中,都很优秀,只是擅长的细分场景并不相同。

Spark Streaming仅仅在吞吐量上比Storm要优秀,而吞吐量这一点,也是历来挺Spark Streaming,贬Storm的人着重强调的。但是问题是,是不是在所有的实时计算场景下,都那么注重吞吐量?不尽然。因此,通过吞吐量说Spark Streaming强于Storm,不靠谱。

事实上,Storm在实时延迟度上,比Spark Streaming就好多了,前者是纯实时,后者是准实时。而且,Storm的事务机制、健壮性 / 容错性、动态调整并行度等特性,都要比Spark Streaming更加优秀。

Spark Streaming,有一点是Storm绝对比不上的,就是:它位于Spark生态技术栈中,因此Spark Streaming可以和SparkCore、Spark SQL无缝整合,也就意味着,我们可以对实时处理出来的中间数据,立即在程序中无缝进行延迟批处理、交互式查询等操作。这个特点大大增强了Spark Streaming的优势和功能。

  • 对于Storm来说:
  1. 建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析
  2. 此外,如果对于实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,一条也不能多,一条也不能少,也可以考虑使用Storm
  3. 如果还需要针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况),也可以考虑用Storm
  4. 如果一个大数据应用系统,它就是纯粹的实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等,那么用Storm是比较好的选择
  • 对于Spark Streaming来说:
  1. 如果对上述适用于Storm的三点,一条都不满足的实时场景,即,不要求纯实时,不要求强大可靠的事务机制,不要求动态调整并行度,那么可以考虑使用Spark Streaming
  2. 考虑使用Spark Streaming最主要的一个因素,应该是针对整个项目进行宏观的考虑,即,如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么就应该首选Spark生态,用SparkCore开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。

2. 实时Wordcount程序开发

下面我们通过Netcat工具来模拟一个实时的WordCount程序。通过Netcat工具实时向某个端口发送数据,利用Spark Streaming的Socket流处理的方式来实时计算单词个数。

  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.util.Arrays;
  • import java.util.Iterator;
  • public class WordCount {
  • public static void main(String [] args) throws Exception {
  • /**
  • * 创建SparkConf对象
  • * 但是这里有一点不同,我们是要给它设置一个Master属性,但是我们测试的时候使用local模式
  • * local后面必须跟一个方括号,里面填写一个数字,数字代表了,我们用几个线程来执行我们的
  • * Spark Streaming程序
  • * */
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCount");
  • /**
  • * 创建JavaStreamingContext对象
  • * 该对象,就类似于Spark Core中的JavaSparkContext,就类似于Spark SQL中的SQLContext
  • * 该对象除了接收SparkConf对象对象之外
  • * 还必须接收一个batch interval参数,就是说,每收集多长时间的数据,划分为一个batch,进行处理
  • * 这里设置一秒
  • * */
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  • /**
  • * 首先,创建输入DStream,代表了一个从数据源(比如kafka、socket)来的持续不断的实时数据流
  • * 调用JavaStreamingContext的socketTextStream()方法,可以创建一个数据源为Socket网络端口的
  • * 数据流,JavaReceiverInputStream,代表了一个输入的DStream
  • * socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端口
  • * */
  • JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  • /**
  • * 到这里为止,你可以理解为JavaReceiverInputDStream中的,每隔一秒,会有一个RDD,其中封装了这一秒发送过来的数据
  • * RDD的元素类型为String,即一行一行的文本
  • * 所以,这里JavaReceiverInputStream的泛型类型<String>,其实就代表了它底层的RDD的泛型类型
  • * 开始对接收到的数据,执行计算,使用Spark Core提供的算子,执行应用在DStream中即可
  • * 在底层,实际上是会对DStream中的一个一个的RDD,执行我们应用在DStream上的算子
  • * 产生的新RDD,会作为新DStream中的RDD
  • * */
  • JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  • private static final long serialVersionUID = -1587295617165008483L;
  • @Override
  • public Iterator<String> call(String line) throws Exception {
  • return Arrays.asList(line.split(" ")).iterator();
  • }
  • });
  • /**
  • * 这个时候,每秒的数据,一行一行的文本,就会被拆分为多个单词,words DStream中的RDD的元素类型
  • * 即为一个一个的单词
  • * 接着,开始进行flatMap、reduceByKey操作
  • * */
  • JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = -4509091532034334852L;
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • /**
  • * 这里,正好说明一下,其实大家可以看到,用Spark Streaming开发程序,和Spark Core很相像
  • * 唯一不同的是Spark Core中的JavaRDD、JavaPairRDD,都变成了JavaDStream、JavaPairDStream
  • * */
  • JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • });
  • /**
  • * 到此为止,我们就实现了实时的wordcount程序了
  • * 大家总结一下思路,加深一下印象
  • * 每秒中发送到指定socket端口上的数据,都会被lines DStream接收到然后lines DStream会把每秒的数据,也就是一行一行的文本,诸如hell world,封装为一个RDD
  • * 然后呢,就会对每秒中对应的RDD,执行后续的一系列的算子操作
  • * 比如,对lins RDD执行了flatMap之后,得到一个words RDD,作为words DStream中的一个RDD
  • * 以此类推,直到生成最后一个,wordCounts RDD,作为wordCounts DStream中的一个RDD
  • * 此时,就得到了,每秒钟发送过来的数据的单词统计
  • * 但是,一定要注意,Spark Streaming的计算模型,就决定了,我们必须自己来进行中间缓存的控制,比如写入redis等缓存
  • * 它的计算模型跟Storm是完全不同的,storm是自己编写的一个一个的程序,运行在节点上,相当于一个一个的对象,可以自己在对象中控制缓存
  • * 但是Spark本身是函数式编程的计算模型,所以,比如在words或pairs DStream中,没法在实例变量中进行缓存
  • * 此时就只能将最后计算出的wordCounts中的一个一个的RDD,写入外部的缓存,或者持久化DB
  • * 最后,每次计算完,都打印一下这一秒钟的单词计数情况,并休眠5秒钟,以便于我们测试和观察
  • * */
  • wordCounts.print();
  • Thread.sleep(5000);
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.{Seconds, StreamingContext}
  • object WordCount {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
  • val sc = new StreamingContext(conf, Seconds(1))
  • val lines = sc.socketTextStream("localhost", 9999)
  • val words = lines.flatMap(_.split(" "))
  • val pairs = words.map(word => (word, 1))
  • val wordCounts = pairs.reduceByKey(_ + _)
  • Thread.sleep(5000)
  • wordCounts.print()
  • sc.start()
  • sc.awaitTermination()
  • }
  • }

我们可以先开启Netcat命令,打开9999端口的监听:

  • ubuntu@s100:~$ nc -lk 9999

然后使用下列脚本启动Spark Streaming程序:

  • spark-submit \
  • --class com.coderap.streaming.WordCount \
  • --num-executors 3 \
  • --driver-memory 500m \
  • --executor-memory 500m \
  • --executor-cores 3 \
  • /home/ubuntu/spark-study/java/streaming/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

会发现控制台打印如下信息:

  • -------------------------------------------
  • Time: 1519308060000 ms
  • -------------------------------------------

这是由于我们并没有向9999端口发送数据,此时我们可以使用Netcat向9999端口发送数据:

  • ubuntu@s100:~$ nc -lk 9999
  • hello world hello world you

可以发现,在Spark Streaming运行的控制台有数据打印:

  • -------------------------------------------
  • Time: 1519308061000 ms
  • -------------------------------------------
  • (hello,2)
  • (world,2)
  • (you,1)

3. StreamingContext详解

有两种创建StreamingContext的方式:

  • val conf = new SparkConf().setAppName(appName).setMaster(master)
  • val ssc = new StreamingContext(conf, Seconds(1))

StreamingContext,还可以使用已有的SparkContext来创建

  • val sc = new SparkContext(conf)
  • val ssc = new StreamingContext(sc, Seconds(1))

AppName,是用来在SparkUI上显示的应用名称。Master是一个Spark、Mesos或者Yarn集群的URL,或者是local[*]

batch interval可以根据你的应用程序的延迟要求以及可用的集群资源情况来设置。

一个StreamingContext定义之后,必须做以下几件事情:

  1. 通过创建输入DStream来创建输入数据源。
  2. 通过对DStream定义transformation和output算子操作,来定义实时计算逻辑。
  3. 调用StreamingContext的start()方法,来开始实时处理数据。
  4. 调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
  5. 也可以通过调用StreamingContext的stop()方法,来停止应用程序。

需要注意的要点:

  1. 只要一个StreamingContext启动之后,就不能再往其中添加任何计算逻辑了。比如执行start()方法之后,还给某个DStream执行一个算子。
  2. 一个StreamingContext停止之后,是肯定不能够重启的。调用stop()之后,不能再调用start()
  3. 一个JVM同时只能有一个StreamingContext启动。在你的应用程序中,不能创建两个StreamingContext。
  4. 调用stop()方法时,会同时停止内部的SparkContext,如果不希望如此,还希望后面继续使用SparkContext创建其他类型的Context,比如SQLContext,那么就用stop(false)
  5. 一个SparkContext可以创建多个StreamingContext,只要上一个先用stop(false)停止,再创建下一个即可。

4. 输入DStream和Receiver详解

输入DStream代表了来自数据源的输入数据流。在之前的wordcount例子中,lines就是一个输入DStream(JavaReceiverInputDStream),代表了从Netcat服务接收到的数据流。除了文件数据流之外,所有的输入DStream都会绑定一个Receiver对象,该对象是一个关键的组件,用来从数据源接收数据,并将其存储在Spark的内存中,以供后续处理。

Spark Streaming提供了两种内置的数据源支持;

  1. 基础数据源:StreamingContext API中直接提供了对这些数据源的支持,比如文件、socket、Akka Actor等。
  2. 高级数据源:诸如Kafka、Flume、Kinesis、Twitter等数据源,通过第三方工具类提供支持。这些数据源的使用,需要引用其依赖。
  3. 自定义数据源:我们可以自己定义数据源,来决定如何接受和存储数据。

要注意的是,如果你想要在实时计算应用中并行接收多条数据流,可以创建多个输入DStream。这样就会创建多个Receiver,从而并行地接收多个数据流。但是要注意的是,一个Spark Streaming Application的Executor,是一个长时间运行的任务,因此,它会独占分配给Spark Streaming Application的CPU Core。从而只要Spark Streaming运行起来以后,这个节点上的CPU Core,就没法给其他应用使用了。

使用本地模式,运行程序时,绝对不能用local或者local[1],因为那样的话,只会给执行输入DStream的executor分配一个线程。而Spark Streaming底层的原理是,至少要有两条线程,一条线程用来分配给Receiver接收数据,一条线程用来处理接收到的数据。因此必须使用local[n],n>=2的模式。

如果不设置Master,也就是直接将Spark Streaming应用提交到集群上运行,那么首先,必须要求集群节点上,有>1个CPU Core,其次,给Spark Streaming的每个executor分配的core,必须>1,这样,才能保证分配到executor上运行的输入DStream,两条线程并行,一条运行Receiver,接收数据;一条处理数据。否则的话,只会接收数据,不会处理数据。

因此,基于此,特此声明,我们本系列课程所有的练习,都是基于local[2]的本地模式,因为我们的虚拟机上都只有一个1个CPU Core。但是大家在实际企业工作中,机器肯定是不只一个CPU Core的,现在都至少4核了。到时记得给每个executor的CPU Core,设置为超过1个即可。

5. 输入DStream之基础数据源

Socket数据源之前的wordcount例子,已经演示过了,即使用StreamingContext.socketTextStream();下面介绍HDFS文件作为数据源。基于HDFS文件的实时计算,其实就是监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。

  • streamingContext.fileStream<KeyClass,ValueClass, InputFormatClass>(dataDirectory)
  • streamingContext.fileStream[KeyClass,ValueClass, InputFormatClass](dataDirectory)

Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个CPU Core。

  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.util.Arrays;
  • import java.util.Iterator;
  • public class HDFSWordCount {
  • public static void main(String [] args) throws Exception {
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • JavaDStream<String> lines = jssc.textFileStream("hdfs://s100:8020/user/ubuntu/spark/streaming/wordcount_dir/");
  • JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  • private static final long serialVersionUID = 6512742125504806981L;
  • @Override
  • public Iterator<String> call(String line) throws Exception {
  • return Arrays.asList(line.split(" ")).iterator();
  • }
  • });
  • JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = -4348165903301595868L;
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • });
  • Thread.sleep(5000);
  • wordCounts.print();
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.{Seconds, StreamingContext}
  • object HDFSWordCount {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount")
  • val sc = new StreamingContext(conf, Seconds(1))
  • val lines = sc.textFileStream("hdfs://s100:8020/user/ubuntu/spark/streaming/wordcount_dir/")
  • val words = lines.flatMap(_.split(" "))
  • val pairs = words.map(word => (word, 1))
  • val wordCounts = pairs.reduceByKey(_ + _)
  • Thread.sleep(5000)
  • wordCounts.print()
  • sc.start()
  • sc.awaitTermination()
  • }
  • }

上面的代码中,会监测HDFS的/user/ubuntu/spark/streaming/wordcount_dir目录,一旦该目录有新的文件存储进去,就会实时计算单次数量。我们启动程序之后,向该目录存入文件words.txt,内容如下:

  • hello world hello you hello you

可以发现,在Spark Streaming运行的控制台有数据打印:

  • -------------------------------------------
  • Time: 1519313006000 ms
  • -------------------------------------------
  • (hello,3)
  • (world,1)
  • (you,2)

6. 输入DStream之Kafka数据源

6.1. 基于Receiver的方式

Receiver是使用Kafka的高层次ConsumerAPI来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write AheadLog,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

6.1.1. 如何进行Kafka数据源连接?

  1. 在maven添加依赖
  • <dependency>
  • <groupId>org.apache.spark</groupId>
  • <artifactId>spark-streaming-kafka_2.11</artifactId>
  • <version>1.6.2</version>
  • </dependency>
  1. 使用第三方工具类创建输入DStream
  • JavaPairReceiverInputDStream<String,String> kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consumer]);

6.1.2. 需要注意的要点

  1. Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
  2. 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
  3. 如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER

注:Kafka命令

  • # 创建主题
  • kafka-topics.sh --zookeeper 192.168.127.101:2181,192.168.127.102:2181,192.168.127.103:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
  • # 创建生产者
  • kafka-console-producer.sh --broker-list 192.168.127.101:9092,192.168.127.102:9092,192.168.127.102:9092 --topic TestTopic 192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import org.apache.spark.streaming.kafka.KafkaUtils;
  • import scala.Tuple2;
  • import java.util.Arrays;
  • import java.util.HashMap;
  • import java.util.Iterator;
  • import java.util.Map;
  • /**
  • * 基于Kafka receiver方式的实时wordcount程序
  • *
  • * @author Administrator
  • */
  • public class KafkaReceiverWordCount {
  • public static void main(String[] args) throws Exception {
  • SparkConf conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("KafkaWordCount");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • // 使用KafkaUtils.createStream()方法,创建针对Kafka的输入数据流
  • Map<String, Integer> topicThreadMap = new HashMap<String, Integer>();
  • topicThreadMap.put("TestTopic", 1);
  • JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
  • jssc,
  • "192.168.127.101:2181,192.168.127.102:2181,192.168.127.103:2181",
  • "DefaultConsumerGroup",
  • topicThreadMap);
  • // 然后开发wordcount逻辑
  • JavaDStream<String> words = lines.flatMap(
  • new FlatMapFunction<Tuple2<String, String>, String>() {
  • private static final long serialVersionUID = 3639009894041641240L;
  • @Override
  • public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
  • return Arrays.asList(tuple._2.split(" ")).iterator();
  • }
  • });
  • JavaPairDStream<String, Integer> pairs = words.mapToPair(
  • new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = 6109964897132056353L;
  • @Override
  • public Tuple2<String, Integer> call(String word)
  • throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  • new Function2<Integer, Integer, Integer>() {
  • private static final long serialVersionUID = 5241698686413043053L;
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • });
  • wordCounts.print();
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }

6.2. 基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个Topic+Partition的最新的Offset,从而定义每个Batch的Offset的范围。当处理数据的Job启动时,就会使用Kafka的简单Consumer API来获取Kafka指定Offset范围的数据。

这种方式有如下优点:

  1. 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka Partition一样多的RDD Partition,并且会并行从Kafka中读取数据。所以在Kafka Partition和RDD Partition之间,有一个一对一的映射关系。

  2. 高性能:如果要保证零数据丢失,在基于Receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于Direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

  3. 一次且仅一次的事务机制:基于Receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的Offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于Direct的方式,使用Kafka的简单API,Spark Streaming自己就负责追踪消费的Offset,并保存在Checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

  • JavaPairReceiverInputDStream<String,String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [keydecoder class], [value decoder class], [map of Kafka parameters], [set oftopics to consume]);

注:Kafka命令

  • kafka-topics.sh --zookeeper 192.168.127.101:2181,192.168.127.102:2181,192.168.127.103:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
  • kafka-console-producer.sh --broker-list 192.168.127.101:9092,192.168.127.102:9092,192.168.127.102:9092 --topic TestTopic 192.168.127.101:2181,192.168.127.102:2181,192.168.127.103:2181 metadata.broker.list
  1. Java版本
  • package com.coderap.streaming;
  • import kafka.serializer.StringDecoder;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import org.apache.spark.streaming.kafka.KafkaUtils;
  • import scala.Tuple2;
  • import java.util.*;
  • /**
  • * 基于Kafka Direct方式的实时wordcount程序
  • *
  • * @author Administrator
  • */
  • public class KafkaDirectWordCount {
  • public static void main(String[] args) throws Exception {
  • SparkConf conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("KafkaDirectWordCount");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • // 首先,要创建一份kafka参数map
  • Map<String, String> kafkaParams = new HashMap<String, String>();
  • kafkaParams.put("metadata.broker.list",
  • "192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092");
  • // 然后,要创建一个set,里面放入,你要读取的topic
  • // 这个,就是我们所说的,它自己给你做的很好,可以并行读取多个topic
  • Set<String> topics = new HashSet<String>();
  • topics.add("TestTopic");
  • // 创建输入DStream
  • JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
  • jssc,
  • String.class,
  • String.class,
  • StringDecoder.class,
  • StringDecoder.class,
  • kafkaParams,
  • topics);
  • // 执行wordcount操作
  • JavaDStream<String> words = lines.flatMap(
  • new FlatMapFunction<Tuple2<String, String>, String>() {
  • private static final long serialVersionUID = -7346613657073827409L;
  • @Override
  • public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
  • return Arrays.asList(tuple._2.split(" ")).iterator();
  • }
  • });
  • JavaPairDStream<String, Integer> pairs = words.mapToPair(
  • new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = 5902497920775067453L;
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  • new Function2<Integer, Integer, Integer>() {
  • private static final long serialVersionUID = 3646276724226288912L;
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • });
  • wordCounts.print();
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }

7. DStream的Transformation操作概览

Transformation Meaning
map 对传入的每个元素,返回一个新的元素
flatMap 对传入的每个元素,返回一个或多个元素
filter 对传入的元素返回true或false,返回的false的元素被过滤掉
union 将两个DStream进行合并
count 返回元素的个数
reduce 对所有values进行聚合
countByValue 对元素按照值进行分组,对每个组进行计数,最后返回的格式
reduceByKey 对key对应的values进行聚合
cogroup 对两个DStream进行连接操作,一个key连接起来的两个RDD的数据,都会以Iterable的形式,出现在一个Tuple中。
join 对两个DStream进行join操作,每个连接起来的pair,作为新DStream的RDD的一个元素
transform 对数据进行转换操作
updateStateByKey 为每个key维护一份state,并进行更新(这个,我认为,是在普通的实时计算中,最有用的一种操作)
window 对滑动窗口数据执行操作(实时计算中最有特色的一种操作)

7.1. updateStateByKey操作

在上面的操作中,大部分的操作都在Spark Core中讲解过,使用方式都大同小异。这里我们着重讲解三个Spark Streaming特有的Transformation,首先是updateStateByKey操作。

updateStateByKey操作可以让我们为每个key维护一份state,并持续不断的更新该state。使用该操作有以下的步骤:

  1. 首先,要定义一个state,可以是任意的数据类型;
  2. 其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。

对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。当然,对于每个新出现的key,也会执行state更新函数。

注:updateStateByKey操作,要求必须开启Checkpoint机制。

案例:基于缓存的实时wordcount程序(在实际业务场景中,这种操作是非常有用的)

  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.Optional;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.util.Arrays;
  • import java.util.Iterator;
  • import java.util.List;
  • public class UpdateStateByKeyWordCount {
  • public static void main(String [] args) throws Exception {
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount");
  • JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • /**
  • * 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  • * 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  • * 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  • * 内存数据丢失的时候,可以从checkpoint中恢复数据
  • * 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  • * */
  • jssc.checkpoint("hdfs://s100:8020/user/ubuntu/spark/wordcount_checkpoint");
  • JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  • JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  • private static final long serialVersionUID = -427393453356094666L;
  • @Override
  • public Iterator<String> call(String line) throws Exception {
  • return Arrays.asList(line.split(" ")).iterator();
  • }
  • });
  • JavaPairDStream<String, Integer> wordPairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = -4505412005203166404L;
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • /**
  • * 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey
  • * 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
  • * 然后,可以打印出那个时间段的单词计数
  • * 但是,有个问题,你如果要统计每个单词的全局的计数呢?
  • * 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
  • * 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
  • * 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
  • * */
  • JavaPairDStream<String, Integer> wordCounts = wordPairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
  • private static final long serialVersionUID = -1239753594052251669L;
  • /**
  • * 这里两个参数
  • * 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数
  • * 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧
  • * 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)
  • * 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的
  • * */
  • @Override
  • public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
  • // 首先定义一个全局的单词计数
  • Integer newvalue = 0;
  • // 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现
  • // 如果存在,说明这个key之前已经统计过全局的次数了
  • if (state.isPresent()) {
  • newvalue = state.get();
  • }
  • // 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计次数
  • for (Integer value : values) {
  • newvalue += value;
  • }
  • return Optional.of(newvalue);
  • }
  • });
  • wordCounts.print();
  • jssc.start();
  • jssc.awaitTermination();
  • jssc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.{Seconds, StreamingContext}
  • object UpdateStateByKeyWordCount {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
  • val sc = new StreamingContext(conf, Seconds(1))
  • sc.checkpoint("hdfs://s100:8020/user/ubuntu/spark/wordcount_checkpoint")
  • val lines = sc.socketTextStream("localhost", 9999)
  • val words = lines.flatMap(_.split(" "))
  • val pairs = words.map(word => (word, 1))
  • val wordCounts = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  • var newValue = state.getOrElse(0)
  • for (elem <- values) {
  • newValue += elem
  • }
  • Option(newValue)
  • })
  • wordCounts.print()
  • sc.start()
  • sc.awaitTermination()
  • }
  • }

将代码打包上传到环境中并执行,在我们使用Netcat工具依次向本地9999端口发送以下数据时:

  • hello world
  • hello world
  • hello you
  • hello me

Spark运行时会依次打印以下信息:

  • -------------------------------------------
  • Time: 1520060645000 ms
  • -------------------------------------------
  • (hello,1)
  • (world,1)
  • -------------------------------------------
  • Time: 1520060650000 ms
  • -------------------------------------------
  • (hello,2)
  • (world,2)
  • -------------------------------------------
  • Time: 1520060655000 ms
  • -------------------------------------------
  • (hello,3)
  • (world,2)
  • (you,1)
  • -------------------------------------------
  • Time: 1520060675000 ms
  • -------------------------------------------
  • (hello,4)
  • (me,1)
  • (world,2)
  • (you,1)

7.2. 案例实战:实时黑名单过滤

Transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStreamAPI中所没有提供的操作。比如说,DStreamAPI中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用Transform操作来实现该功能。

DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

案例:广告计费日志实时黑名单过滤

  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.Optional;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.util.ArrayList;
  • public class TransformBlacklist {
  • public static void main(String[] args) throws Exception {
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TransformBlacklist");
  • JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(5));
  • /**
  • * 用户对我们的网站上的广告可以进行点击
  • * 点击之后,是不是要进行实时计费,点一下,算一次钱
  • * 但是,对于那些帮助某些无良商家刷广告的人,那么我们有一个黑名单
  • * 只要是黑名单中的用户点击的广告,我们就给过滤掉
  • * 先做一份模拟的黑名单RDD
  • * */
  • ArrayList<Tuple2<String, Boolean>> blackList = new ArrayList<>();
  • blackList.add(new Tuple2("tom", true));
  • final JavaPairRDD<String, Boolean> blacklistRDD = sc.sparkContext().parallelizePairs(blackList);
  • // 这里的日志格式,就简化一下,就是date username的方式
  • JavaReceiverInputDStream<String> adsClickLogDstream = sc.socketTextStream("localhost", 9999);
  • /**
  • * 所以,要先对输入的数据,进行一下转换操作,变成,(username, date username)
  • * 以便于,后面对每个batch RDD,与定义好的黑名单RDD进行join操作
  • * */
  • JavaPairDStream<String, String> userAdsClickLogDstream = adsClickLogDstream.mapToPair(new PairFunction<String, String, String>() {
  • private static final long serialVersionUID = -2533843686307734571L;
  • @Override
  • public Tuple2<String, String> call(String adsClickLog) throws Exception {
  • return new Tuple2<>(adsClickLog.split(" ")[0], adsClickLog);
  • }
  • });
  • /**
  • * 然后,就可以执行transform操作了,将每个batch的RDD,与黑名单RDD进行join、filter、map等操作
  • * 实时进行黑名单过滤
  • * */
  • JavaDStream<String> validAdsClickLogDstream = userAdsClickLogDstream.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
  • private static final long serialVersionUID = -7175331835482893765L;
  • @Override
  • public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD) throws Exception {
  • /**
  • * 这里为什么用左外连接?
  • * 因为,并不是每个用户都存在于黑名单中的,
  • * 所以,如果直接用join,那么没有存在于黑名单中的数据,会无法join到就给丢弃掉了
  • * 所以,这里用leftOuterJoin,就是说,哪怕一个user不在黑名单RDD中,没有join到
  • * 也还是会被保存下来的
  • * */
  • JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD);
  • // 连接之后,执行filter算子
  • JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = joinedRDD.filter(new Function<Tuple2<String, Tuple2<String, Optional<Boolean>>>, Boolean>() {
  • private static final long serialVersionUID = 3432539794243361742L;
  • @Override
  • public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {
  • // 这里的tuple,就是每个用户,对应的访问日志,和在黑名单中的状态
  • if (tuple._2._2().isPresent() && tuple._2._2.get()) {
  • return false;
  • } else {
  • return true;
  • }
  • }
  • });
  • /**
  • * 此时,filteredRDD中,就只剩下没有被黑名单过滤的用户点击了
  • * 进行map操作,转换成我们想要的格式
  • * */
  • JavaRDD<String> validAdsClickLogRDD = filteredRDD.map(new Function<Tuple2<String, Tuple2<String, Optional<Boolean>>>, String>() {
  • private static final long serialVersionUID = -8586527271676288003L;
  • @Override
  • public String call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {
  • return tuple._2._1;
  • }
  • });
  • return validAdsClickLogRDD;
  • }
  • });
  • validAdsClickLogDstream.print();
  • sc.start();
  • sc.awaitTermination();
  • sc.close();
  • }
  • }
  1. Scala
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.{Seconds, StreamingContext}
  • object TransformBlacklist {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("TransformBlacklist")
  • val ssc = new StreamingContext(conf, Seconds(5))
  • val blacklist = Array(("tom", true))
  • val blacklistRDD = ssc.sparkContext.parallelize(blacklist, 5)
  • val adsClickLogDStream = ssc.socketTextStream("localhost", 9999)
  • val userAdsClickLogDStream = adsClickLogDStream
  • .map { adsClickLog => (adsClickLog.split(" ")(1), adsClickLog) }
  • val validAdsClickLogDStream = userAdsClickLogDStream.transform(userAdsClickLogRDD => {
  • val joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD)
  • val filteredRDD = joinedRDD.filter(tuple => {
  • if (tuple._2._2.getOrElse(false)) {
  • false
  • } else {
  • true
  • }
  • })
  • val validAdsClickLogRDD = filteredRDD.map(tuple => tuple._2._1)
  • validAdsClickLogRDD
  • })
  • validAdsClickLogDStream.print()
  • ssc.start()
  • ssc.awaitTermination()
  • }
  • }

对于上述代码,运行后,如果我们输入用户为tom的广告点击数据就会被屏蔽掉,例如,输入:

  • 2018-03-03 tom
  • 2018-03-03 jack

只会输出:

  • 2018-03-03 jack

7.3. Window滑动窗口

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window
DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。(Spark
Streaming对滑动窗口的支持,是比Storm更加完善和强大的)

5.Spark Streaming滑动窗口示意图.png

7.3.1. window滑动窗口操作

Transform 意义
window 对每个滑动窗口的数据执行自定义的计算
countByWindow 对每个滑动窗口的数据执行count操作
reduceByWindow 对每个滑动窗口的数据执行reduce操作
reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作

7.3.2. 案例:热点搜索词滑动统计

该案例的效果是,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出排名最靠前的3个搜索词以及出现次数。

  1. Java版本
  • package com.coderap.streaming;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.streaming.Durations;
  • import org.apache.spark.streaming.api.java.JavaDStream;
  • import org.apache.spark.streaming.api.java.JavaPairDStream;
  • import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • import scala.Tuple2;
  • import java.util.List;
  • public class WindowHotWord {
  • public static void main(String[] args) throws Exception {
  • SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowHotWord");
  • JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
  • // 这里的搜索日志的格式为 leo hello , 即 用户名 搜索关键字
  • JavaReceiverInputDStream<String> searchLogsDstream = jsc.socketTextStream("localhost", 9999);
  • // 将搜索日志给转换成,只有一个搜索词,即可
  • JavaDStream<String> searchWordsDstream = searchLogsDstream.map(new Function<String, String>() {
  • private static final long serialVersionUID = 3221467404127675753L;
  • @Override
  • public String call(String searchLog) throws Exception {
  • return searchLog.split(" ")[1];
  • }
  • });
  • // 将搜索词映射为(searchWord, 1)的tuple格式
  • JavaPairDStream<String, Integer> searchWordPairDstream = searchWordsDstream.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = 3327747219955435124L;
  • @Override
  • public Tuple2<String, Integer> call(String searchWord) throws Exception {
  • return new Tuple2(searchWord, 1);
  • }
  • });
  • /**
  • * 针对(searchWord, 1)的tuple格式的DStream,执行reduceByKeyAndWindow,滑动窗口操作
  • * 第二个参数,是窗口长度,这里是60秒
  • * 第三个参数,是滑动间隔,这里是10秒也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
  • * 所以说,这里的意思,就是,之前的searchWordPairDStream为止,其实,都是不会立即进行计算的,而是只是放在那里
  • * 然后,等待我们的滑动间隔到了以后,10秒钟到了,会将之前60秒的RDD,因为一个batch间隔是,5秒,所以之前
  • * 60秒,就有12个RDD,给聚合起来,然后,统一执行redcueByKey操作
  • * 所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对某个DStream中的RDD
  • * */
  • JavaPairDStream<String, Integer> searchWordCountDstream = searchWordPairDstream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
  • private static final long serialVersionUID = 7444352444631770663L;
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • }, Durations.seconds(60), Durations.seconds(10));
  • /**
  • * 到这里为止,就已经可以做到,每隔10秒钟,出来,之前60秒的收集到的单词的统计次数
  • * 执行transform操作,因为,一个窗口,就是一个60秒钟的数据,会变成一个RDD,然后,对这一个RDD
  • * 根据每个搜索词出现的频率进行排序,然后获取排名前3的热点搜索词*/
  • JavaPairDStream<String, Integer> sortedSearchWordCountDStream = searchWordCountDstream.transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
  • private static final long serialVersionUID = -5485224624861088816L;
  • @Override
  • public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> searchWordCountRDD) throws Exception {
  • // 执行搜索词和出现频率的反转
  • JavaPairRDD<Integer, String> searchCountWordRDD = searchWordCountRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  • private static final long serialVersionUID = -2586136739321343269L;
  • @Override
  • public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
  • return new Tuple2<>(tuple._2, tuple._1);
  • }
  • });
  • // 降序排序
  • JavaPairRDD<Integer, String> sortedSearchCountWordRDD = searchCountWordRDD.sortByKey(false);
  • // 然后再次执行反转,变成(searchWord, count)的这种格式
  • JavaPairRDD<String, Integer> sortedSearchWordCountRDD = sortedSearchCountWordRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  • private static final long serialVersionUID = 340691853545324716L;
  • @Override
  • public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
  • return new Tuple2<>(tuple._2, tuple._1);
  • }
  • });
  • // 然后用take(),获取排名前3的热点搜索词
  • List<Tuple2<String, Integer>> top3SearchWordCount = sortedSearchWordCountRDD.take(3);
  • for (Tuple2<String, Integer> tuple : top3SearchWordCount) {
  • System.out.println((tuple._1 + " : " + tuple._2));
  • }
  • return sortedSearchWordCountRDD;
  • }
  • });
  • // 这个无关紧要,只是为了触发job的执行,所以必须有output操作
  • sortedSearchWordCountDStream.print();
  • jsc.start();
  • jsc.awaitTermination();
  • jsc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.streaming
  • import org.apache.spark.SparkConf
  • import org.apache.spark.streaming.{Seconds, StreamingContext}
  • object WindowHotWord {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local[2]")
  • .setAppName("TransformBlacklist")
  • val ssc = new StreamingContext(conf, Seconds(1))
  • val searchLogDStream = ssc.socketTextStream("localhost", 9999)
  • val searchPairDStream = searchLogDStream.map(searchLog => {
  • (searchLog.split(" ")(1), 1)
  • })
  • val searchWordCountDStream = searchPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))
  • val finalDStream = searchWordCountDStream.transform(searchWordCountRDD => {
  • val sortedSearchWordCountsRDD = searchWordCountRDD.map(tuple => (tuple._2, tuple._1)).sortByKey(false).map(tuple => (tuple._2, tuple._1))
  • val top3SearchWordCountRDD = sortedSearchWordCountsRDD.take(3)
  • for (elem <- top3SearchWordCountRDD) {
  • println(elem)
  • }
  • sortedSearchWordCountsRDD
  • })
  • finalDStream.print()
  • ssc.start()
  • ssc.awaitTermination()
  • }
  • }

对于输入的搜索数据:

  • ubuntu@s100:~$ nc -lk 9999
  • tom hello
  • tom hello
  • tom world
  • tom me
  • tom you
  • leo world
  • leo me
  • leo hello

该程序运行后会打印:

  • -------------------------------------------
  • Time: 1520142347000 ms
  • -------------------------------------------
  • (hello,3)
  • (world,2)
  • (me,2)
  • (you,1)