大数据
流式处理
Spark

Spark基础 03 - 基础案例

简介:本文实现了几个Spark相关的基础案例

1. 基于排序机制的WordCount程序

我们在之前实现的WordCount程序中,只是简单地统计了文本中每个单词出现的次序,最后的输出结果并没有按照一定的顺序排序,如果我们需要按照出现次序进行排序输出,那在处理过程中就会多一些步骤,下面的示例代码:

  1. Java版本
  • package com.coderap.core;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function2;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import scala.Tuple2;
  • import java.util.Arrays;
  • import java.util.Iterator;
  • public class SortWordCount {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • JavaRDD<String> linesRDD = sc.textFile("D:\\_Code\\Spark\\spark.txt");
  • // 将每一行都铺平为单个单词
  • JavaRDD<String> words = linesRDD.flatMap(new FlatMapFunction<String, String>() {
  • private static final long serialVersionUID = -5917013483245022495L;
  • @Override
  • public Iterator<String> call(String line) throws Exception {
  • return Arrays.asList(line.split(" ")).iterator();
  • }
  • });
  • // 将单个单词映射为Tuple
  • JavaPairRDD<String, Integer> pairsRDD = words.mapToPair(new PairFunction<String, String, Integer>() {
  • @Override
  • public Tuple2<String, Integer> call(String word) throws Exception {
  • return new Tuple2<String, Integer>(word, 1);
  • }
  • });
  • // 对单词Tuple的值进行统计和计算
  • JavaPairRDD<String, Integer> wordCounts = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  • @Override
  • public Integer call(Integer v1, Integer v2) throws Exception {
  • return v1 + v2;
  • }
  • });
  • // 进行key-value的翻转映射
  • JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  • private static final long serialVersionUID = -5774332907846784452L;
  • @Override
  • public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
  • return new Tuple2<Integer, String>(t._2, t._1);
  • }
  • });
  • // 根据key排序
  • JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);
  • // 再次进行翻转映射
  • JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  • private static final long serialVersionUID = -1734692109120488915L;
  • @Override
  • public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
  • return new Tuple2<String, Integer>(t._2, t._1);
  • }
  • });
  • sortedWordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
  • @Override
  • public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
  • System.out.printf("%s appeared %d times%n", stringIntegerTuple2._1, stringIntegerTuple2._2);
  • }
  • });
  • sc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.core
  • import org.apache.spark.{SparkConf, SparkContext}
  • object SortWordCount {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("SortWordCount").setMaster("local")
  • val sc = new SparkContext(conf)
  • val lines = sc.textFile("D:\\_Code\\Spark\\spark.txt", 1)
  • val words = lines.flatMap(line => line.split(" "))
  • val wordCountPairs = words.map(word => (word, 1))
  • val wordCounts = wordCountPairs.reduceByKey(_ + _)
  • val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1))
  • val sortedCountWords = countWords.sortByKey(false)
  • val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1))
  • sortedWordCounts.foreach(sortedWordCount => {
  • println(sortedWordCount._1 + " appeared " + sortedWordCount._2 + " times")
  • })
  • }
  • }

打印结果如下:

  • of appeared 10 times
  • Spark appeared 10 times
  • to appeared 10 times
  • the appeared 7 times
  • appeared 7 times
  • and appeared 7 times
  • our appeared 5 times
  • in appeared 5 times
  • has appeared 4 times
  • or appeared 4 times
  • cluster appeared 4 times
  • Hadoop appeared 4 times
  • data appeared 4 times
  • that appeared 4 times
  • it appeared 3 times
  • have appeared 3 times
  • technology appeared 3 times
  • on appeared 3 times
  • at appeared 3 times
  • new appeared 3 times
  • for appeared 3 times
  • with appeared 3 times
  • a appeared 3 times
  • For appeared 2 times
  • we appeared 2 times
  • software appeared 2 times
  • leverage appeared 2 times
  • is appeared 2 times
  • We appeared 2 times
  • 50 appeared 2 times
  • without appeared 2 times
  • latest appeared 2 times
  • up appeared 2 times
  • advanced appeared 2 times
  • (MLlib), appeared 1 times
  • Product appeared 1 times
  • operators appeared 1 times
  • ...
  • additional appeared 1 times
  • certified appeared 1 times

可以发现,打印结果按照出现次数进行排序了。

2. 二次排序

二次排序一般是指,例如我们有一个多行文本的文件,每一行有两列,要对这些行文本按照第一列排序,如果第一列相同,则按照第二列排序。这就是二次排序。我们现在有以下的文本:

  • 1 2
  • 3 2
  • 5 2
  • 1 3
  • 2 3
  • 4 5
  • 3 5

如果要进行二次排序,一般分为以下几个步骤:

  1. 需要自定义一个Key类,这个Key需要实现Ordered接口和Serializable接口。在这个自定义的Key中实现对多个列的排序算法;
  2. 将包含文本的RDD的key映射为自定义key,value为文本的JavaPairRDD;
  3. 使用sortByKey算子进行排序,会按照自定义key中规定的排序规则进行排序;
  4. 再次映射,剔除自定义的key,只保留原始文本行。

下面我们就来实现这个二次排序的功能。

  1. Java版本二次排序

首先我们需要自定义一个二次排序的Key:

  • package com.coderap.core;
  • import scala.math.Ordered;
  • import java.io.Serializable;
  • public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {
  • private static final long serialVersionUID = -7060447022187284835L;
  • private int first;
  • private int second;
  • public SecondarySortKey(int first, int second) {
  • this.first = first;
  • this.second = second;
  • }
  • @Override
  • public int compare(SecondarySortKey that) {
  • if (this.first - that.getFirst() != 0) {
  • return this.first - that.getFirst();
  • } else {
  • return that.second - that.getSecond();
  • }
  • }
  • @Override
  • public boolean $less(SecondarySortKey that) {
  • if (this.first < that.getFirst()) {
  • return true;
  • } else if (this.first == that.getFirst()) {
  • return this.second < that.getSecond();
  • }
  • return false;
  • }
  • @Override
  • public boolean $greater(SecondarySortKey that) {
  • if (this.first > that.getFirst()) {
  • return true;
  • } else if (this.first == that.getFirst()) {
  • return this.second > that.getSecond();
  • }
  • return false;
  • }
  • @Override
  • public boolean $less$eq(SecondarySortKey that) {
  • if (this.$less(that)) {
  • return true;
  • } else if (this.first == that.getFirst()) {
  • return this.second == that.getSecond();
  • }
  • return false;
  • }
  • @Override
  • public boolean $greater$eq(SecondarySortKey that) {
  • if (this.$greater(that)) {
  • return true;
  • } else if (this.first == that.getFirst()) {
  • return this.second == that.getSecond();
  • }
  • return false;
  • }
  • @Override
  • public int compareTo(SecondarySortKey that) {
  • if (this.first - that.getFirst() != 0) {
  • return this.first - that.getFirst();
  • } else {
  • return that.second - that.getSecond();
  • }
  • }
  • public int getFirst() {
  • return first;
  • }
  • public void setFirst(int first) {
  • this.first = first;
  • }
  • public int getSecond() {
  • return second;
  • }
  • public void setSecond(int second) {
  • this.second = second;
  • }
  • }

然后是主要的排序算法:

  • package com.coderap.core;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import scala.Tuple2;
  • public class SecondarySort {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • JavaRDD<String> lines = sc.textFile("D:\\_Code\\Spark\\secondaryKey.txt");
  • JavaPairRDD<SecondarySortKey, String> secondarySortKeyStringJavaPairRDD = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {
  • private static final long serialVersionUID = -6394735032905140996L;
  • @Override
  • public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
  • String[] splitLine = line.split(" ");
  • return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(splitLine[0]), Integer.valueOf(splitLine[1])), line);
  • }
  • });
  • JavaPairRDD<SecondarySortKey, String> sortedPairs = secondarySortKeyStringJavaPairRDD.sortByKey();
  • JavaRDD<String> sortedLines = sortedPairs.map(new Function<Tuple2<SecondarySortKey, String>, String>() {
  • private static final long serialVersionUID = -6874882290261432355L;
  • @Override
  • public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
  • return v1._2;
  • }
  • });
  • sortedLines.foreach(new VoidFunction<String>() {
  • @Override
  • public void call(String s) throws Exception {
  • System.out.println(s);
  • }
  • });
  • sc.close();
  • }
  • }

运行结果如下:

  • 1 2
  • 1 3
  • 2 3
  • 3 2
  • 3 5
  • 4 5
  • 5 2
  1. Scala版本

用于排序的自定义Key类:

  • package com.coderap.core
  • import scala.math.Ordered
  • class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
  • override def compare(that: SecondarySortKey) = {
  • if (this.first - that.first != 0) {
  • this.first - that.first
  • } else {
  • this.second - that.second
  • }
  • }
  • }

排序主类:

  • package com.coderap.core
  • import org.apache.spark.{SparkConf, SparkContext}
  • object SecondarySort {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("SecondarySort").setMaster("local")
  • val sc = new SparkContext(conf)
  • val lines = sc.textFile("D:\\_Code\\Spark\\secondaryKey.txt", 1)
  • val secondaryKeyLines = lines.map {
  • line => (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)
  • }
  • val sortedPairs = secondaryKeyLines.sortByKey()
  • val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)
  • sortedLines.foreach(line => println(line))
  • }
  • }

3. 取Top N:获取文本内最大的前三个数字

一般来说,在获取之前需要使用SortByKey进行排序,然后使用Take取前三个元素。下面是实例代码:

  1. Java版本
  • package com.coderap.core;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import scala.Tuple2;
  • import java.util.List;
  • public class Top3 {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("Top3").setMaster("local");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • JavaRDD<String> lines = sc.textFile("D:\\_Code\\Spark\\top3.txt");
  • JavaPairRDD<Integer, String> integerStringJavaPairRDD = lines.mapToPair(new PairFunction<String, Integer, String>() {
  • private static final long serialVersionUID = -2386526168094926307L;
  • @Override
  • public Tuple2<Integer, String> call(String s) throws Exception {
  • return new Tuple2<Integer, String>(Integer.valueOf(s), s);
  • }
  • });
  • JavaPairRDD<Integer, String> sortedPairs = integerStringJavaPairRDD.sortByKey(false);
  • JavaRDD<String> sortedLines = sortedPairs.map(new Function<Tuple2<Integer, String>, String>() {
  • @Override
  • public String call(Tuple2<Integer, String> v1) throws Exception {
  • return v1._2;
  • }
  • });
  • List<String> top3Numbers = sortedLines.take(3);
  • for (String number : top3Numbers) {
  • System.out.println(number);
  • }
  • sc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.core
  • import org.apache.spark.{SparkConf, SparkContext}
  • object Top3 {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("Top3").setMaster("local")
  • val sc = new SparkContext(conf)
  • val lines = sc.textFile("D:\\_Code\\Spark\\top3.txt", 1)
  • val pairs = lines.map(line => (line.toInt, line))
  • val top3Pairs = pairs.sortByKey(false)
  • val top3 = top3Pairs.map(top3Pair => top3Pair._2).take(3)
  • for (elem <- top3) {
  • println(elem)
  • }
  • }
  • }

对于输入文本:

  • 9
  • 2
  • 5
  • 2
  • 3
  • 2
  • 8

输出结果为:

  • 9
  • 8
  • 5

4. 分组取Top N:获取每个班级前三名的分数

在应用场景中,有多个班级的分数信息存在一份文件中,我们需要取出每个班级前三名的分数,这个时候就需要先对数据进行分组,然后取前三的数据;例如我们有以下的数据:

  • class1 87
  • class2 98
  • class3 80
  • class1 78
  • class3 76
  • class2 78
  • class1 90
  • class3 85
  • class3 76
  • class1 96
  • class2 76
  • class3 88
  • class1 96
  • class3 78
  • class2 85

则首先需要对key进行分组,再在每组的值中取出前三的数据。实现的代码如下:

  1. Java版本
  • package com.coderap.core;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import scala.Tuple2;
  • import java.util.*;
  • public class GroupTop {
  • public static void main(String [] args) {
  • SparkConf conf = new SparkConf().setAppName("GroupTop").setMaster("local");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • JavaRDD<String> lines = sc.textFile("D:\\_Code\\Spark\\groupTop.txt");
  • // 先映射为(className, score)的方式
  • JavaPairRDD<String, Integer> classScorePairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
  • private static final long serialVersionUID = -7775411643708409351L;
  • @Override
  • public Tuple2<String, Integer> call(String line) throws Exception {
  • String[] splitLine = line.split(" ");
  • return new Tuple2<String, Integer>(splitLine[0], Integer.valueOf(splitLine[1]));
  • }
  • });
  • // 对每行数据进行key分组
  • JavaPairRDD<String, Iterable<Integer>> groupByKeyPairs = classScorePairs.groupByKey();
  • // 对分组的数据进行映射,并产生新的key和value,key为班级名,value为每个班级前三名的分数集合
  • JavaPairRDD<String, Iterable<Integer>> sortedClassScores = groupByKeyPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
  • private static final long serialVersionUID = -604038820221415400L;
  • @Override
  • public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores) throws Exception {
  • String className = classScores._1;
  • Iterable<Integer> scores = classScores._2;
  • // 将集合转为List,使用List进行排序并取出前三个成绩
  • Iterator<Integer> scoresIterator = scores.iterator();
  • List<Integer> scoresList = new ArrayList<Integer>();
  • while (scoresIterator.hasNext()) {
  • scoresList.add(scoresIterator.next());
  • }
  • // 排序
  • scoresList.sort(new Comparator<Integer>() {
  • @Override
  • public int compare(Integer o1, Integer o2) {
  • return o2 - o1;
  • }
  • });
  • return new Tuple2<String, Iterable<Integer>>(className, scoresList.subList(0, scoresList.size() > 3 ? 3 : scoresList.size()));
  • }
  • });
  • sortedClassScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
  • @Override
  • public void call(Tuple2<String, Iterable<Integer>> classScore) throws Exception {
  • System.out.printf("Class: %s%n", classScore._1);
  • Iterator<Integer> scores = classScore._2.iterator();
  • while (scores.hasNext()) {
  • System.out.println(scores.next());
  • }
  • System.out.println("***********************");
  • }
  • });
  • sc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.core
  • import org.apache.spark.{SparkConf, SparkContext}
  • import scala.collection.mutable.ArrayBuffer
  • object GroupTop {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("GroupTop").setMaster("local")
  • val sc = new SparkContext(conf)
  • val lines = sc.textFile("D:\\_Code\\Spark\\groupTop.txt", 1)
  • val pairs = lines.map(line => {
  • val splitLine = line.split(" ")
  • (splitLine(0), splitLine(1).toInt)
  • })
  • val groupedPairs = pairs.groupByKey()
  • val sortedClassScores = groupedPairs.map(pair => {
  • val scores = pair._2
  • val scoresArray = ArrayBuffer[Int]()
  • for (elem <- scores) {
  • scoresArray += elem
  • }
  • val sortedScoresArray = scoresArray.sortWith(_ > _)
  • sortedScoresArray.trimEnd(sortedScoresArray.length - 3 max 0)
  • (pair._1, sortedScoresArray.toIterable)
  • })
  • sortedClassScores.foreach(sortedClassScore => {
  • println("Class: " + sortedClassScore._1)
  • for (elem <- sortedClassScore._2) {
  • println(elem)
  • }
  • println("*********************")
  • })
  • }
  • }

输出结果:

  • Class: class3
  • 88
  • 85
  • 80
  • *********************
  • Class: class1
  • 96
  • 96
  • 90
  • *********************
  • Class: class2
  • 98
  • 85
  • 78
  • *********************