大数据
流式处理
Spark

Spark基础 07 - Spark数据源的使用

简介:Spark可以加载各类数据源

1. JSON数据源实战

Spark SQL可以自动推断JSON文件的元数据,并且加载其数据,创建一个DataFrame。可以使用SQLContext.read.json()方法,针对一个元素类型为String的RDD,或者是一个JSON文件。

但是要注意的是,这里使用的JSON文件与传统意义上的JSON文件是不一样的。每行都必须,也只能包含一个,单独的,自包含的,有效的JSON对象。不能让一个JSON对象分散在多行。否则会报错。

综合性复杂案例:查询成绩为80分以上的学生的基本信息与成绩信息

  1. Java版本
  • package com.coderap.sql;
  • import java.util.ArrayList;
  • import java.util.List;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.RowFactory;
  • import org.apache.spark.sql.SQLContext;
  • import org.apache.spark.sql.types.DataTypes;
  • import org.apache.spark.sql.types.StructField;
  • import org.apache.spark.sql.types.StructType;
  • import scala.Tuple2;
  • /**
  • * JSON数据源
  • */
  • public class JSONDataSource {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf()
  • .setAppName("JSONDataSource");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • SQLContext sqlContext = new SQLContext(sc);
  • // 针对json文件,创建DataFrame(针对json文件创建DataFrame)
  • Dataset studentScoresDF = sqlContext.read().json(
  • "hdfs://s100:8020/user/ubuntu/spark/students.json");
  • // 针对学生成绩信息的DataFrame,注册临时表,查询分数大于80分的学生的姓名
  • // (注册临时表,针对临时表执行sql语句)
  • studentScoresDF.registerTempTable("student_scores");
  • Dataset goodStudentScoresDF = sqlContext.sql(
  • "select name,score from student_scores where score>=80");
  • // (将DataFrame转换为rdd,执行transformation操作)
  • List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function<Row, String>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public String call(Row row) throws Exception {
  • return row.getString(0);
  • }
  • }).collect();
  • // 然后针对JavaRDD<String>,创建DataFrame
  • // (针对包含json串的JavaRDD,创建DataFrame)
  • List<String> studentInfoJSONs = new ArrayList<String>();
  • studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}");
  • studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}");
  • studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}");
  • JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
  • Dataset studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
  • // 针对学生基本信息DataFrame,注册临时表,然后查询分数大于80分的学生的基本信息
  • studentInfosDF.registerTempTable("student_infos");
  • String sql = "select name,age from student_infos where name in (";
  • for (int i = 0; i < goodStudentNames.size(); i++) {
  • sql += "'" + goodStudentNames.get(i) + "'";
  • if (i < goodStudentNames.size() - 1) {
  • sql += ",";
  • }
  • }
  • sql += ")";
  • Dataset goodStudentInfosDF = sqlContext.sql(sql);
  • // 然后将两份数据的DataFrame,转换为JavaPairRDD,执行join transformation
  • // (将DataFrame转换为JavaRDD,再map为JavaPairRDD,然后进行join)
  • JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, Integer> call(Row row) throws Exception {
  • return new Tuple2<String, Integer>(row.getString(0),
  • Integer.valueOf(String.valueOf(row.getLong(1))));
  • }
  • }).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, Integer> call(Row row) throws Exception {
  • return new Tuple2<String, Integer>(row.getString(0),
  • Integer.valueOf(String.valueOf(row.getLong(1))));
  • }
  • }));
  • // 然后将封装在RDD中的好学生的全部信息,转换为一个JavaRDD<Row>的格式
  • // (将JavaRDD,转换为DataFrame)
  • JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(
  • new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Row call(
  • Tuple2<String, Tuple2<Integer, Integer>> tuple)
  • throws Exception {
  • return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
  • }
  • });
  • // 创建一份元数据,将JavaRDD<Row>转换为DataFrame
  • List<StructField> structFields = new ArrayList<StructField>();
  • structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  • structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
  • structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
  • StructType structType = DataTypes.createStructType(structFields);
  • Dataset goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
  • // 将好学生的全部信息保存到一个json文件中去
  • // (将DataFrame中的数据保存到外部的json文件中去)
  • goodStudentsDF.write().format("json").save("hdfs://s100:8020/user/ubuntu/spark/good-students");
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.{Row, SQLContext}
  • import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  • object JSONDataSource {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setAppName("JSONDataSource")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • // 创建学生成绩DataFrame
  • val studentScoresDF = sqlContext.read.json("hdfs://s100:8020/user/ubuntu/spark/students.json")
  • // 查询出分数大于80分的学生成绩信息,以及学生姓名
  • studentScoresDF.registerTempTable("student_scores")
  • val goodStudentScoresDF = sqlContext.sql("select name,score from student_scores where score>=80")
  • val goodStudentNames = goodStudentScoresDF.rdd.map { row => row(0) }.collect()
  • // 创建学生基本信息DataFrame
  • val studentInfoJSONs = Array("{\"name\":\"Leo\", \"age\":18}",
  • "{\"name\":\"Marry\", \"age\":17}",
  • "{\"name\":\"Jack\", \"age\":19}")
  • val studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs, 3);
  • val studentInfosDF = sqlContext.read.json(studentInfoJSONsRDD)
  • // 查询分数大于80分的学生的基本信息
  • studentInfosDF.registerTempTable("student_infos")
  • var sql = "select name,age from student_infos where name in ("
  • for(i <- 0 until goodStudentNames.length) {
  • sql += "'" + goodStudentNames(i) + "'"
  • if(i < goodStudentNames.length - 1) {
  • sql += ","
  • }
  • }
  • sql += ")"
  • val goodStudentInfosDF = sqlContext.sql(sql)
  • // 将分数大于80分的学生的成绩信息与基本信息进行join
  • val goodStudentsRDD =
  • goodStudentScoresDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("score")) }
  • .join(goodStudentInfosDF.rdd.map { row => (row.getAs[String]("name"), row.getAs("age")) })
  • // 将rdd转换为dataframe
  • val goodStudentRowsRDD = goodStudentsRDD.map(
  • info => Row(info._1, info._2._1.toInt, info._2._2.toString.toInt))
  • val structType = StructType(Array(
  • StructField("name", StringType, true),
  • StructField("score", IntegerType, true),
  • StructField("age", IntegerType, true)))
  • val goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType)
  • // 将dataframe中的数据保存到json中
  • goodStudentsDF.write.format("json").save("hdfs://s100:8020/user/ubuntu/spark/good-students-scala")
  • }
  • }

2. Hive数据源实战

Spark SQL支持对Hive中存储的数据进行读写。操作Hive中的数据时,必须创建HiveContext,而不是SQLContext。HiveContext继承自SQLContext,但是增加了在Hive元数据库中查找表,以及用HiveQL语法编写SQL的功能。除了sql()方法,HiveContext还提供了hql()方法,从而用Hive语法来编译sql。

使用HiveContext,可以执行Hive的大部分功能,包括创建表、往表里导入数据以及用SQL语句查询表中的数据。查询出来的数据是一个Row数组。

注:使用Hive操作需要将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/jars目录下。

Spark SQL还允许将数据保存到Hive表中。调用DataFrame的saveAsTable()命令,即可将DataFrame中的数据保存到Hive表中。与registerTempTable不同,saveAsTable是会将DataFrame中的数据物化到Hive表中的,而且还会在Hive元数据库中创建表的元数据。

默认情况下,saveAsTable会创建一张Hive Managed Table,也就是说,数据的位置都是由元数据库中的信息控制的。当Managed Table被删除时,表中的数据也会一并被物理删除。

registerTempTable只是注册一个临时的表,只要Spark Application重启或者停止了,那么表就没了。而saveAsTable创建的是物化的表,无论Spark Application重启或者停止,表都会一直存在。

调用HiveContext.table()方法,还可以直接针对Hive中的表,创建一个DataFrame。

案例:查询分数大于80分的学生的完整信息

  1. Java版本
  • package com.coderap.core;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.hive.HiveContext;
  • public class HiveDataSource {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("HiveDataSource");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext
  • HiveContext hiveContext = new HiveContext(sc.sc());
  • // 第一个功能,使用HiveContext的sql()方法,可以执行Hive中能够执行的HiveQL语句
  • // 判断是否存在student_infos表,如果存在则删除
  • hiveContext.sql("DROP TABLE IF EXISTS student_infos");
  • // 判断student_infos表是否不存在,如果不存在,则创建该表
  • hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)");
  • // 将学生基本信息数据导入student_infos表
  • hiveContext.sql("LOAD DATA LOCAL INPATH '/home/ubuntu/spark-study/java/sql/HiveDataSource/student_infos.txt' INTO TABLE student_infos");
  • // 用同样的方式给student_scores导入数据
  • hiveContext.sql("DROP TABLE IF EXISTS student_scores");
  • hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)");
  • hiveContext.sql("LOAD DATA LOCAL INPATH '/home/ubuntu/spark-study/java/sql/HiveDataSource/student_scores.txt' INTO TABLE student_scores");
  • // 第二个功能,执行sql还可以返回Dataset,用于查询
  • Dataset<Row> goodStudentDataSet = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");
  • // 第三个功能,可以将DataFrame中的数据,理论上来说,DataFrame对应的RDD的元素,是Row即可
  • // 将Dataset中的数据保存到hive表中
  • hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
  • goodStudentDataSet.write().saveAsTable("good_student_infos");
  • // 第四个功能,可以用table()方法,针对hive表,直接创建DataFrame
  • // 然后针对good_student_infos表,直接创建DataFrame
  • Row[] goodStudentInfos = hiveContext.read().table("good_student_infos").collect();
  • for (Row goodStudentInfo : goodStudentInfos) {
  • System.out.println(goodStudentInfo);
  • }
  • sc.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.SparkConf
  • import org.apache.spark.api.java.JavaSparkContext
  • import org.apache.spark.sql.hive.HiveContext
  • object HiveDataSource {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("HiveDataSource")
  • val sc = new JavaSparkContext(conf)
  • // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext
  • val hiveContext = new HiveContext(sc.sc)
  • // 第一个功能,使用HiveContext的sql()方法,可以执行Hive中能够执行的HiveQL语句
  • // 判断是否存在student_infos表,如果存在则删除
  • hiveContext.sql("DROP TABLE IF EXISTS student_infos")
  • // 判断student_infos表是否不存在,如果不存在,则创建该表
  • hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)")
  • // 将学生基本信息数据导入student_infos表
  • hiveContext.sql("LOAD DATA LOCAL INPATH '/home/ubuntu/spark-study/java/sql/HiveDataSource/student_infos.txt' INTO TABLE student_infos")
  • // 用同样的方式给student_scores导入数据
  • hiveContext.sql("DROP TABLE IF EXISTS student_scores")
  • hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)")
  • hiveContext.sql("LOAD DATA LOCAL INPATH '/home/ubuntu/spark-study/java/sql/HiveDataSource/student_scores.txt' INTO TABLE student_scores")
  • // 第二个功能,执行sql还可以返回Dataset,用于查询
  • val goodStudentDataSet = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80")
  • // 第三个功能,可以将DataFrame中的数据,理论上来说,DataFrame对应的RDD的元素,是Row即可
  • // 将Dataset中的数据保存到hive表中
  • hiveContext.sql("DROP TABLE IF EXISTS good_student_infos")
  • goodStudentDataSet.write.saveAsTable("good_student_infos")
  • // 第四个功能,可以用table()方法,针对hive表,直接创建DataFrame
  • // 然后针对good_student_infos表,直接创建DataFrame
  • val goodStudentInfos = hiveContext.read.table("good_student_infos").collect
  • for (goodStudentInfo <- goodStudentInfos) {
  • println(goodStudentInfo)
  • }
  • }
  • }

对于输入数据有:

  • student_infos.txt
  • leo18
  • marry17
  • jack19
  • student_scores.txt
  • leo88
  • marry99
  • jack76

输出数据为:

  • [leo,18,88]
  • [marry,17,99]

我们可以查看Hive中的数据:

  • hive> select * from student_infos;
  • OK
  • leo 18
  • marry 17
  • jack 19
  • Time taken: 1.13 seconds, Fetched: 3 row(s)
  • hive> select * from student_scores;
  • OK
  • leo 88
  • marry 99
  • jack 76
  • Time taken: 0.103 seconds, Fetched: 3 row(s)
  • hive> select * from good_student_infos;
  • OK
  • SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  • SLF4J: Defaulting to no-operation (NOP) logger implementation
  • SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  • leo 18 88
  • marry 17 99
  • Time taken: 0.11 seconds, Fetched: 2 row(s)

3. JDBC数据源实战

SparkSQL支持使用JDBC从关系型数据库(比如MySQL)中读取数据。读取的数据,依然由DataFrame表示,可以很方便地使用SparkCore提供的各种算子进行处理。

这里有一个经验之谈,实际上用SparkSQL处理JDBC中的数据是非常有用的。比如说,你的MySQL业务数据库中,有大量的数据,比如1000万,然后,你现在需要编写一个程序,对线上的脏数据某种复杂业务逻辑的处理,甚至复杂到可能涉及到要用SparkSQL反复查询MySQL中的数据,来进行关联处理。

那么此时,用SparkSQL来通过JDBC数据源,加载MySQL中的数据,然后通过各种算子进行处理,是最好的选择。因为Spark是分布式的计算框架,对于1000万数据,肯定是分布式处理的。而如果你自己手工编写一个Java程序,那么不好意思,你只能分批次处理了,先处理2万条,再处理2万条,可能运行完你的Java程序,已经是几天以后的事情了。

案例:查询分数大于80分的学生信息

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.api.java.function.VoidFunction;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.RowFactory;
  • import org.apache.spark.sql.SQLContext;
  • import org.apache.spark.sql.types.DataTypes;
  • import org.apache.spark.sql.types.StructField;
  • import org.apache.spark.sql.types.StructType;
  • import scala.Tuple2;
  • import java.sql.Connection;
  • import java.sql.DriverManager;
  • import java.sql.Statement;
  • import java.util.ArrayList;
  • import java.util.HashMap;
  • import java.util.Map;
  • public class JDBCDataSource {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("HiveDataSource");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • SQLContext sqlContext = new SQLContext(sc);
  • Map<String, String> options = new HashMap<String, String>();
  • options.put("url", "jdbc:mysql://127.0.0.1:3306/spark_hive_test");
  • options.put("driver", "com.mysql.jdbc.Driver");
  • options.put("user", "root");
  • options.put("password", "12345678");
  • options.put("dbtable", "student_infos");
  • Dataset<Row> studentInfosDataSet = sqlContext.read().format("jdbc").options(options).load();
  • options.put("dbtable", "student_scores");
  • Dataset<Row> studentscoresDataSet = sqlContext.read().format("jdbc").options(options).load();
  • // 将两个DataFrame转换为JavaPairRDD,执行join操作
  • JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = studentInfosDataSet.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
  • private static final long serialVersionUID = -7687296014714712656L;
  • @Override
  • public Tuple2<String, Integer> call(Row row) throws Exception {
  • return new Tuple2<String, Integer>(row.getString(0),
  • Integer.valueOf(String.valueOf(row.get(1))));
  • }
  • }).join(studentscoresDataSet.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
  • private static final long serialVersionUID = 2291225490149462322L;
  • @Override
  • public Tuple2<String, Integer> call(Row row) throws Exception {
  • return new Tuple2<String, Integer>(String.valueOf(row.get(0)),
  • Integer.valueOf(String.valueOf(row.get(1))));
  • }
  • }));
  • // 将JavaPairRDD转换为JavaRDD<Row>
  • JavaRDD<Row> studentRowsRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
  • private static final long serialVersionUID = 3844922024822848771L;
  • @Override
  • public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
  • return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
  • }
  • });
  • // 过滤出分数大于80分的数据
  • JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(new Function<Row, Boolean>() {
  • @Override
  • public Boolean call(Row row) throws Exception {
  • return row.getInt(2) > 80;
  • }
  • });
  • // 转换为DataFrame
  • ArrayList<StructField> structFields = new ArrayList<StructField>();
  • structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  • structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
  • structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
  • StructType structType = DataTypes.createStructType(structFields);
  • Dataset<Row> studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
  • Row[] rows = (Row[]) studentsDF.collect();
  • for (Row row : rows) {
  • System.out.println(row);
  • }
  • // 将DataFrame中的数据保存到mysql表中
  • // 这种方式是在企业里很常用的,有可能是插入mysql、有可能是插入hbase,还有可能是插入redis缓存
  • studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public void call(Row row) throws Exception {
  • String sql = "insert into good_student_infos values("
  • + "'" + String.valueOf(row.getString(0)) + "',"
  • + Integer.valueOf(String.valueOf(row.get(1))) + ","
  • + Integer.valueOf(String.valueOf(row.get(2))) + ")";
  • Class.forName("com.mysql.jdbc.Driver");
  • Connection conn = null;
  • Statement stmt = null;
  • try {
  • conn = DriverManager.getConnection(
  • "jdbc:mysql://127.0.0.1:3306/spark_hive_test", "root", "12345678");
  • stmt = conn.createStatement();
  • stmt.executeUpdate(sql);
  • } catch (Exception e) {
  • e.printStackTrace();
  • } finally {
  • if (stmt != null) {
  • stmt.close();
  • }
  • if (conn != null) {
  • conn.close();
  • }
  • }
  • }
  • });
  • sc.close();
  • }
  • }

对于之前的数据,运行以上代码有如下结果:

  • [marry,17,95]
  • [jack,19,82]

同时查看数据库也有数据如下:

  • mysql> select * from good_student_infos;
  • +-------+------+-------+
  • | name | age | score |
  • +-------+------+-------+
  • | marry | 17 | 95 |
  • | jack | 19 | 82 |
  • +-------+------+-------+
  • 2 rows in set (0.00 sec)

4. 内置函数

在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。

种类 函数
聚合函数 approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函数 array_contains, explode, size, sort_array
日期/时间函数 日期时间转换 unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp 从日期时间中提取字段 year, month, dayofmonth, hour, minute, second
日期/时间函数 日期/时间计算 datediff, date_add, date_sub, add_months, last_day, next_day, months_between 获取当前时间等 current_date, current_timestamp, trunc, date_format
数学函数 abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
混合函数 array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
字符串函数 ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
窗口函数 cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

案例实战:根据每天的用户访问日志和用户购买日志,统计每日的UV和销售额

  1. 统计访问UV
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.{Row, SQLContext}
  • import org.apache.spark.sql.functions._
  • import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  • object DailyUV {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local")
  • .setAppName("DailyUV")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • // 这里着重说明一下!!!
  • // 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换
  • import sqlContext.implicits._
  • // 构造用户访问日志数据,并创建DataFrame
  • // 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id
  • val userAccessLog = Array(
  • "2015-10-01,1122",
  • "2015-10-01,1122",
  • "2015-10-01,1123",
  • "2015-10-01,1124",
  • "2015-10-01,1124",
  • "2015-10-02,1122",
  • "2015-10-02,1121",
  • "2015-10-02,1123",
  • "2015-10-02,1123");
  • val userAccessLogRDD = sc.parallelize(userAccessLog, 5)
  • // 将模拟出来的用户访问日志RDD,转换为DataFrame
  • // 首先,将普通的RDD,转换为元素为Row的RDD
  • val userAccessLogRowRDD = userAccessLogRDD
  • .map { log => Row(log.split(",")(0), log.split(",")(1).toInt) }
  • // 构造DataFrame的元数据
  • val structType = StructType(Array(
  • StructField("date", StringType, true),
  • StructField("userid", IntegerType, true)))
  • // 使用SQLContext创建DataFrame
  • val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType)
  • // 这里讲解一下uv的基本含义和业务
  • // 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次
  • // 所以,uv,指的是,对用户进行去重以后的访问总数
  • // 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct
  • // 讲解一下聚合函数的用法
  • // 首先,对DataFrame调用groupBy()方法,对某一列进行分组
  • // 然后,调用agg()方法 ,第一个参数,必须,必须,传入之前在groupBy()方法中出现的字段
  • // 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数
  • // 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段
  • userAccessLogRowDF.groupBy("date")
  • .agg('date, countDistinct('userid))
  • .map { row => Row(row(1), row(2)) }
  • .collect()
  • .foreach(println)
  • }
  • }
  1. 统计销售额
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.{Row, SQLContext}
  • import org.apache.spark.sql.functions._
  • import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
  • object DailySale {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local")
  • .setAppName("DailySale")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • import sqlContext.implicits._
  • // 说明一下,业务的特点
  • // 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计
  • // 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了
  • // 模拟数据
  • val userSaleLog = Array("2015-10-01,55.05,1122",
  • "2015-10-01,23.15,1133",
  • "2015-10-01,15.20,",
  • "2015-10-02,56.05,1144",
  • "2015-10-02,78.87,1155",
  • "2015-10-02,113.02,1123")
  • val userSaleLogRDD = sc.parallelize(userSaleLog, 5)
  • // 进行有效销售日志的过滤
  • val filteredUserSaleLogRDD = userSaleLogRDD
  • .filter { log => if (log.split(",").length == 3) true else false }
  • val userSaleLogRowRDD = filteredUserSaleLogRDD
  • .map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) }
  • val structType = StructType(Array(
  • StructField("date", StringType, true),
  • StructField("sale_amount", DoubleType, true)))
  • val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
  • // 开始进行每日销售额的统计
  • userSaleLogDF.groupBy("date")
  • .agg('date, sum('sale_amount))
  • .map { row => Row(row(1), row(2)) }
  • .collect()
  • .foreach(println)
  • }
  • }

Spark 1.4.x版本以后,为SparkSQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取topn的逻辑。

案例:统计每个种类的销售额排名前3的产品

  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.hive.HiveContext;
  • public class RowNumberWindowFunction {
  • @SuppressWarnings("deprecation")
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf()
  • .setAppName("RowNumberWindowFunction");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • HiveContext hiveContext = new HiveContext(sc.sc());
  • // 创建销售额表,sales表
  • hiveContext.sql("DROP TABLE IF EXISTS sales");
  • hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
  • + "product STRING,"
  • + "category STRING,"
  • + "revenue BIGINT)");
  • hiveContext.sql("LOAD DATA "
  • + "LOCAL INPATH '/home/ubuntu/spark-study/java/sql/RowNumberWindowFunction/sales.txt' "
  • + "INTO TABLE sales");
  • /**
  • * 开始编写我们的统计逻辑,使用row_number()开窗函数
  • * 先说明一下,row_number()开窗函数的作用
  • * 其实,就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号
  • * 比如说,有一个分组date=20151001,里面有3条数据,1122,1121,1124,
  • * 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号
  • * 行号从1开始递增,比如1122 1,1121 2,1124 3
  • * */
  • Dataset top3SalesDF = hiveContext.sql(""
  • + "SELECT product,category,revenue "
  • + "FROM ("
  • + "SELECT "
  • + "product,"
  • + "category,"
  • + "revenue,"
  • /**
  • * row_number()开窗函数的语法说明
  • * 首先可以在SELECT查询时,使用row_number()函数
  • * 其次,row_number()函数后面先跟上OVER关键字
  • * 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组
  • * 其次是可以用ORDER BY进行组内排序
  • * 然后row_number()就可以给每个组内的行,一个组内行号
  • * 这里会先根据category进行分组,然后根据revenue进行组内排序
  • * row_number()会给每个组内的数据添加一个行号,标记为rank列
  • * 最后根据rank <= 3可以筛选出每个组内revenue排在前三的数据
  • * */
  • + "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
  • + "FROM sales "
  • + ") tmp_sales "
  • + "WHERE rank<=3");
  • // 将每组排名前3的数据,保存到一个表中
  • hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
  • top3SalesDF.write().saveAsTable("top3_sales");
  • sc.close();
  • }
  • }

5. UDF自定义函数实战

  • package com.coderap.sql
  • import org.apache.spark.SparkConf
  • import org.apache.spark.SparkContext
  • import org.apache.spark.sql.SQLContext
  • import org.apache.spark.sql.Row
  • import org.apache.spark.sql.types.StructType
  • import org.apache.spark.sql.types.StructField
  • import org.apache.spark.sql.types.StringType
  • object UDF {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("UDF")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • // 构造模拟数据
  • val names = Array("Leo", "Marry", "Jack", "Tom")
  • val namesRDD = sc.parallelize(names, 5)
  • val namesRowRDD = namesRDD.map { name => Row(name) }
  • val structType = StructType(Array(StructField("name", StringType, true)))
  • val namesDF = sqlContext.createDataFrame(namesRowRDD, structType)
  • // 注册一张names表
  • namesDF.registerTempTable("names")
  • /**
  • * 定义和注册自定义函数
  • * 定义函数:自己写匿名函数
  • * 注册函数:SQLContext.udf.register()
  • * */
  • sqlContext.udf.register("strLen", (str: String) => str.length())
  • // 使用自定义函数
  • sqlContext.sql("select name,strLen(name) from names")
  • .collect()
  • .foreach(println)
  • }
  • }

上述代码运行结果如下:

  • [Leo,3]
  • [Marry,5]
  • [Jack,4]
  • [Tom,3]

6. UDAF自定义函数实战

UDAF:User Defined AggregateFunction。用户自定义聚合函数。是Spark 1.5.x引入的最新特性。

上节课讲解了UDF,其实更多的是针对单行输入,返回一个输出

这里的UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加强大

  • StringCount用户自定义聚合函数类
  • package com.coderap.sql
  • import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
  • import org.apache.spark.sql.types.StructType
  • import org.apache.spark.sql.types.DataType
  • import org.apache.spark.sql.expressions.MutableAggregationBuffer
  • import org.apache.spark.sql.Row
  • import org.apache.spark.sql.types.StructField
  • import org.apache.spark.sql.types.StringType
  • import org.apache.spark.sql.types.IntegerType
  • /**
  • * @author Administrator
  • */
  • class StringCount extends UserDefinedAggregateFunction {
  • // inputSchema,指的是,输入数据的类型
  • def inputSchema: StructType = {
  • StructType(Array(StructField("str", StringType, true)))
  • }
  • // bufferSchema,指的是,中间进行聚合时,所处理的数据的类型
  • def bufferSchema: StructType = {
  • StructType(Array(StructField("count", IntegerType, true)))
  • }
  • // dataType,指的是,函数返回值的类型
  • def dataType: DataType = {
  • IntegerType
  • }
  • def deterministic: Boolean = {
  • true
  • }
  • // 为每个分组的数据执行初始化操作
  • def initialize(buffer: MutableAggregationBuffer): Unit = {
  • buffer(0) = 0
  • }
  • // 指的是,每个分组,有新的值进来的时候,如何进行分组对应的聚合值的计算
  • def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  • buffer(0) = buffer.getAs[Int](0) + 1
  • }
  • // 由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update
  • // 但是,最后一个分组,在各个节点上的聚合值,要进行merge,也就是合并
  • def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  • buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  • }
  • // 最后,指的是,一个分组的聚合值,如何通过中间的缓存聚合值,最后返回一个最终的聚合值
  • def evaluate(buffer: Row): Any = {
  • buffer.getAs[Int](0)
  • }
  • }
  • UDAF类
  • package com.coderap.sql
  • import org.apache.spark.SparkConf
  • import org.apache.spark.SparkContext
  • import org.apache.spark.sql.SQLContext
  • import org.apache.spark.sql.Row
  • import org.apache.spark.sql.types.StructType
  • import org.apache.spark.sql.types.StructField
  • import org.apache.spark.sql.types.StringType
  • /**
  • * @author Administrator
  • */
  • object UDAF {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setMaster("local")
  • .setAppName("UDAF")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • // 构造模拟数据
  • val names = Array("Leo", "Marry", "Jack", "Tom", "Tom", "Tom", "Leo")
  • val namesRDD = sc.parallelize(names, 5)
  • val namesRowRDD = namesRDD.map { name => Row(name) }
  • val structType = StructType(Array(StructField("name", StringType, true)))
  • val namesDF = sqlContext.createDataFrame(namesRowRDD, structType)
  • // 注册一张names表
  • namesDF.registerTempTable("names")
  • // 定义和注册自定义函数
  • // 定义函数:自己写匿名函数
  • // 注册函数:SQLContext.udf.register()
  • sqlContext.udf.register("strCount", new StringCount)
  • // 使用自定义函数
  • sqlContext.sql("select name,strCount(name) from names group by name")
  • .collect()
  • .foreach(println)
  • }
  • }

运行结果:

  • [Jack,1]
  • [Tom,3]
  • [Marry,1]
  • [Leo,2]

7. Spark SQL工作原理和性能优化

7.1. 工作原理

  1. SqlParse
  2. Analyser
  3. Optimizer
  4. SparkPlan

![1.SparkSQL原理.png](https://material.coderap.com/2018/2/G91Y3dl6hbimyDMtluHOjnsIN8WEmewb9dFL6DlcI8X4Uj7Nv9C6P4EUiSntRChV "1.SparkSQL原理.png")

7.2. 性能优化

  1. 设置Shuffle过程中的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())
  2. 在Hive数据仓库建设过程中,合理设置数据类型,比如能设置为INT的,就不要设置为BIGINT。减少数据类型导致的不必要的内存开销。
  3. 编写SQL时,尽量给出明确的列名,比如select name from students。不要写select *的方式。
  4. 并行处理查询结果:对于SparkSQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要一次性collect()到Driver再处理。使用foreach()算子,并行处理查询结果。
  5. 缓存表:对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(tableName),或者DataFrame.cache()即可。SparkSQL会用内存列存储的格式进行表的缓存。然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。SQLContext.uncacheTable(tableName)可以将表从缓存中移除。用SQLContext.setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以配置列存储的单位。
  6. 广播Join表:spark.sql.autoBroadcastJoinThreshold,默认10485760(10 MB)。在内存够用的情况下,可以增加其大小,该参数设置了一个表在join的时候,最大在多大以内,可以被广播出去优化性能。
  7. 钨丝计划:spark.sql.tungsten.enabled,默认是true,自动管理内存。

其中第四、五、六点对于性能优化比较有效。

8. Hive On Spark

Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapReduce实现的,但是由于MapReduce速度实在比较慢,因此这两年,陆续出来了新的SQL查询引擎。包括SparkSQL,Hive On Tez,Hive On Spark等。

Spark SQL与Hive On Spark是不一样的。SparkSQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已。适合在一些使用Spark技术栈的大数据应用类系统中使用。

而Hive OnSpark,是Hive的一个项目,它是指,不通过MapReduce作为唯一的查询引擎,而是将Spark作为底层的查询引擎。Hive OnSpark,只适用于Hive。在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce切换为Spark了。适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。

8.1. Hive工作原理

Hive QL语句 =>

语法分析 => AST =>

生成逻辑执行计划 => Operator Tree =>

优化逻辑执行计划 => Optimized OperatorTree =>

生成物理执行计划 => Task Tree =>

优化物理执行计划 => Optimized Task Tree=>

执行优化后的Optimized Task Tree

8.2. Hive On Spark的计算原理要点

  1. 将Hive表作为Spark RDD来进行操作:这个是没有疑问的
  2. 使用Hive原语

对于一些针对RDD的操作,比如groupByKey、sortByKey等。不使用Spark的transformation操作和原语。如果那样做的话,那么就需要重新实现一套Hive的原语,而且如果Hive增加了新功能,那么又要实现新的Spark原语。因此选择将Hive的原语包装为针对RDD的操作即可。

  1. 新的物理执行计划生成机制

使用SparkCompiler将逻辑执行计划,即OperatorTree,转换为Task Tree。提交SparkTask给Spark进行执行。SparkTask包装了DAG,DAG包装为SparkWork。SparkTask根据SparkWork表示的DAG计算。

  1. SparkContext生命周期

Hive OnSpark会为每个用户的会话,比如执行一次SQL语句,创建一个SparkContext。但是Spark不允许在一个JVM内创建多个SparkContext。因此,需要在单独的JVM中启动每个会话的SparkContext,然后通过RPC与远程JVM中的SparkContext进行通信。

  1. 本地和远程运行模式

Hive OnSpark提供两种运行模式,本地和远程。如果将Spark Master设置为local,比如set spark.master=local,那么就是本地模式,SparkContext与客户端运行在一个JVM中。否则,如果将SparkMaster设置为Master的地址,那么就是远程模式,SparkContext会在远程的JVM中启动。

远程模式下,每个用户Session都会创建一个SparkClient,SparkClient启动RemoteDriver,RemoteDriver负责创建SparkContext。

Hive On Spark做了一些优化:

  1. Map Join

SparkSQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapReduce的DistributedCache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。

  1. Cache Table

对于某些需要对一张表执行多次操作的场景,Hive OnSpark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive OnSpark还有很多不完善的地方。

9. 案例:每日Top3热点搜索词统计

对于数据格式为日期 用户 搜索词 城市 平台 版本的数据,我们有以下的需求:

  1. 筛选出符合查询条件(城市、平台、版本)的数据;
  2. 统计出每天搜索uv排名前3的搜索词;
  3. 按照每天的top3搜索词的uv搜索总次数,倒序排序;
  4. 将数据保存到hive表中;

实现思路如下:

  1. 针对原始数据(HDFS文件),获取输入的RDD;

  2. 使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据;

  • 普通的做法:直接在fitler算子函数中,使用外部的查询条件(Map),但是,这样做的话,是不是查询条件Map,会发送到每一个task上一份副本。(性能并不好)

  • 优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量进行数据筛选。

  1. 将数据转换为(日期_搜索词,用户)格式,然后呢,对它进行分组,然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的uv,最后获得(日期_搜索词,uv)
  2. 将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame;
  3. 将DataFrame注册为临时表,使用SparkSQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取的是一个DataFrame;
  4. 将DataFrame转换为RDD,继续操作,按照每天日期来进行分组,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串;
  5. 按照每天的top3搜索总uv,进行排序,倒序排序;
  6. 将排好序的数据,再次映射回来,变成“日期_搜索词_uv”的格式;
  7. 再次映射为DataFrame,并将数据保存到Hive中即可。

实现代码如下:

  1. Java代码
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaPairRDD;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.FlatMapFunction;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.api.java.function.PairFunction;
  • import org.apache.spark.broadcast.Broadcast;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.RowFactory;
  • import org.apache.spark.sql.hive.HiveContext;
  • import org.apache.spark.sql.types.DataTypes;
  • import org.apache.spark.sql.types.StructField;
  • import org.apache.spark.sql.types.StructType;
  • import scala.Tuple2;
  • import java.util.*;
  • public class DailyTop3Keyword {
  • @SuppressWarnings("deprecation")
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf()
  • .setAppName("DailyTop3Keyword");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • HiveContext sqlContext = new HiveContext(sc.sc());
  • // 伪造出一份数据,查询条件
  • // 备注:实际上,在实际的企业项目开发中,很可能,这个查询条件,是通过J2EE平台插入到某个MySQL表中的
  • // 然后,这里呢,实际上,通常是会用Spring框架和ORM框架(MyBatis)的,去提取MySQL表中的查询条件
  • Map<String, List<String>> queryParamMap = new HashMap<String, List<String>>();
  • queryParamMap.put("city", Arrays.asList("beijing"));
  • queryParamMap.put("platform", Arrays.asList("android"));
  • queryParamMap.put("version", Arrays.asList("1.0", "1.2", "1.5", "2.0"));
  • // 根据我们实现思路中的分析,这里最合适的方式,是将该查询参数Map封装为一个Broadcast广播变量
  • // 这样可以进行优化,每个Worker节点,就拷贝一份数据即可
  • final Broadcast<Map<String, List<String>>> queryParamMapBroadcast =
  • sc.broadcast(queryParamMap);
  • // 针对HDFS文件中的日志,获取输入RDD
  • JavaRDD<String> rawRDD = sc.textFile("hdfs://s100:8020/user/ubuntu/spark/keyword.txt");
  • // 使用查询参数Map广播变量,进行筛选
  • JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Boolean call(String log) throws Exception {
  • // 切割原始日志,获取城市、平台和版本
  • String[] logSplited = log.split("\t");
  • String city = logSplited[3];
  • String platform = logSplited[4];
  • String version = logSplited[5];
  • // 与查询条件进行比对,任何一个条件,只要该条件设置了,且日志中的数据没有满足条件
  • // 则直接返回false,过滤该日志
  • // 否则,如果所有设置的条件,都有日志中的数据,则返回true,保留日志
  • Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
  • List<String> cities = queryParamMap.get("city");
  • if (cities.size() > 0 && !cities.contains(city)) {
  • return false;
  • }
  • List<String> platforms = queryParamMap.get("platform");
  • if (platforms.size() > 0 && !platforms.contains(platform)) {
  • return false;
  • }
  • List<String> versions = queryParamMap.get("version");
  • if (versions.size() > 0 && !versions.contains(version)) {
  • return false;
  • }
  • return true;
  • }
  • });
  • // 过滤出来的原始日志,映射为(日期_搜索词, 用户)的格式
  • JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(
  • new PairFunction<String, String, String>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, String> call(String log) throws Exception {
  • String[] logSplited = log.split("\t");
  • String date = logSplited[0];
  • String user = logSplited[1];
  • String keyword = logSplited[2];
  • return new Tuple2<String, String>(date + "_" + keyword, user);
  • }
  • });
  • // 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
  • JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
  • // 对每天每个搜索词的搜索用户,执行去重操作,获得其uv
  • JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(
  • new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, Long> call(
  • Tuple2<String, Iterable<String>> dateKeywordUsers) throws Exception {
  • String dateKeyword = dateKeywordUsers._1;
  • Iterator<String> users = dateKeywordUsers._2.iterator();
  • // 对用户进行去重,并统计去重后的数量
  • List<String> distinctUsers = new ArrayList<String>();
  • while (users.hasNext()) {
  • String user = users.next();
  • if (!distinctUsers.contains(user)) {
  • distinctUsers.add(user);
  • }
  • }
  • // 获取uv
  • long uv = distinctUsers.size();
  • return new Tuple2<String, Long>(dateKeyword, uv);
  • }
  • });
  • // 将每天每个搜索词的uv数据,转换成DataFrame
  • JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(
  • new Function<Tuple2<String, Long>, Row>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {
  • String date = dateKeywordUv._1.split("_")[0];
  • String keyword = dateKeywordUv._1.split("_")[1];
  • long uv = dateKeywordUv._2;
  • return RowFactory.create(date, keyword, uv);
  • }
  • });
  • List<StructField> structFields = Arrays.asList(
  • DataTypes.createStructField("date", DataTypes.StringType, true),
  • DataTypes.createStructField("keyword", DataTypes.StringType, true),
  • DataTypes.createStructField("uv", DataTypes.LongType, true));
  • StructType structType = DataTypes.createStructType(structFields);
  • Dataset dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
  • // 使用Spark SQL的开窗函数,统计每天搜索uv排名前3的热点搜索词
  • dateKeywordUvDF.registerTempTable("daily_keyword_uv");
  • Dataset dailyTop3KeywordDF = sqlContext.sql(""
  • + "SELECT date,keyword,uv "
  • + "FROM ("
  • + "SELECT "
  • + "date,"
  • + "keyword,"
  • + "uv,"
  • + "row_number() OVER (PARTITION BY date ORDER BY uv DESC) rank "
  • + "FROM daily_keyword_uv"
  • + ") tmp "
  • + "WHERE rank<=3");
  • // 将DataFrame转换为RDD,然后映射,计算出每天的top3搜索词的搜索uv总数
  • JavaRDD<Row> dailyTop3KeywordRDD = dailyTop3KeywordDF.javaRDD();
  • JavaPairRDD<String, String> top3DateKeywordUvRDD = dailyTop3KeywordRDD.mapToPair(
  • new PairFunction<Row, String, String>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<String, String> call(Row row)
  • throws Exception {
  • String date = String.valueOf(row.get(0));
  • String keyword = String.valueOf(row.get(1));
  • Long uv = Long.valueOf(String.valueOf(row.get(2)));
  • return new Tuple2<String, String>(date, keyword + "_" + uv);
  • }
  • });
  • JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = top3DateKeywordUvRDD.groupByKey();
  • JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(
  • new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Tuple2<Long, String> call(
  • Tuple2<String, Iterable<String>> tuple)
  • throws Exception {
  • String date = tuple._1;
  • Long totalUv = 0L;
  • String dateKeywords = date;
  • Iterator<String> keywordUvIterator = tuple._2.iterator();
  • while (keywordUvIterator.hasNext()) {
  • String keywordUv = keywordUvIterator.next();
  • Long uv = Long.valueOf(keywordUv.split("_")[1]);
  • totalUv += uv;
  • dateKeywords += "," + keywordUv;
  • }
  • return new Tuple2<Long, String>(totalUv, dateKeywords);
  • }
  • });
  • // 按照每天的总搜索uv进行倒序排序
  • JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
  • // 再次进行映射,将排序后的数据,映射回原始的格式,Iterable<Row>
  • JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(
  • new FlatMapFunction<Tuple2<Long, String>, Row>() {
  • private static final long serialVersionUID = 1L;
  • @Override
  • public Iterator<Row> call(Tuple2<Long, String> tuple) throws Exception {
  • String dateKeywords = tuple._2;
  • String[] dateKeywordsSplited = dateKeywords.split(",");
  • String date = dateKeywordsSplited[0];
  • List<Row> rows = new ArrayList<Row>();
  • rows.add(RowFactory.create(date,
  • dateKeywordsSplited[1].split("_")[0],
  • Long.valueOf(dateKeywordsSplited[1].split("_")[1])));
  • rows.add(RowFactory.create(date,
  • dateKeywordsSplited[2].split("_")[0],
  • Long.valueOf(dateKeywordsSplited[2].split("_")[1])));
  • rows.add(RowFactory.create(date,
  • dateKeywordsSplited[3].split("_")[0],
  • Long.valueOf(dateKeywordsSplited[3].split("_")[1])));
  • return rows.iterator();
  • }
  • });
  • // 将最终的数据,转换为DataFrame,并保存到Hive表中
  • Dataset finalDF = sqlContext.createDataFrame(sortedRowRDD, structType);
  • finalDF.write().saveAsTable("daily_top3_keyword_uv");
  • sc.close();
  • }
  • }

对于给定数据: