大数据
流式处理
Spark

Spark基础 06 - Spark SQL

简介:Spark 1.0版本开始,推出了Spark SQL。其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark SQL。Shark的性能比Hive就要高出一个数量级,而Spark SQL的性能又比Shark高出一个数量级。

1. Spark新特性

1.1. Spark 1.4.x的新特性

1、Spark Core

1.1 提供REST API供外界开发者获取Spark内部的各种信息(jobs / stages / tasks / storageinfo),基于这些API,可以搭建自己的Spark监控系统。

1.2 shuffle阶段,默认将map端写入磁盘的数据进行序列化,优化io性能。

1.3 钨丝计划(ProjectTungsten),提供了UnsafeShuffleManager,使用缓存友好的排序算法,降低了shuffle的内存使用,提高了排序性能。

2、Spark Streaming

2.1 提供了新的Spark Streaming的UI,能够更好,更清晰的监控Spark Streaming应用程序的运行状况。

2.2 支持Kafka 0.8.2版本

3、Spark SQL and DataFrame

3.1 支持ORCFile

3.2 提供了一些window function(窗口函数)

3.3 优化了join的性能

1.2. Spark 1.5.x的新特性

1、DataFrame底层执行的性能优化(钨丝计划第一阶段)

1.1 Spark自己来管理内存,而不再依靠JVM管理内容。这样就可以避免JVM GC的性能开销,并且能够控制OOM的问题。

1.2 Java对象直接使用内部的二进制格式存储和计算,省去了序列化和反序列化的性能开销,而且更加节省内存开销。

1.3 完善了Shuffle阶段的UnsafeShuffleManager,增加了不少新功能,优化shuffle性能。

1.4默认使用code-gen,使用cache-aware算法,加强了join、aggregation、shuffle、sorting的性能,增强了windowfunction的性能,性能比1.4.x版本提高数倍

2、DataFrame

2.1 实现了新的聚合函数接口,AggregateFunction2,并且提供了7个新的内置聚合函数。

2.2 实现了100多个新的expression function,例如unix_timestamp等,增强了对NaN的处理

2.3 支持连接不同版本的hive metastore

2.4 支持Parquet 1.7

3、SparkStreaming:更完善的python支持、非实验的Kafka Direct API等等。

1.3. Spark 1.5.1源码编译

掌握了源码编译,就具备了对Spark进行二次开发的基本条件了!如果你要修改Spark源码,进行二次开发,那么首先就得从官网下载指定版本的源码,然后倒入你的ide开发环境,进行源码的修改;接着修改完了,你希望能够将修改后的源码部署到集群上面去,那么是不是得对源码进行编译,编译成可以在linux集群上进行部署的格式包吧!

1、http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1.tgz
2、准备好JDK、Scala、Maven环境
3、针对指定hadoop版本进行编译:./make-distribution.sh –tgz -Phadoop-2.6 -Pyarn -DskipTests -Dhadoop.version=2.6.0 -Phive
4、经常长时间的编译之后,得到spark-1.4.0-bin-2.6.0.tgz

1.4. 安装spark包

1、停止Spark 1.3.0集群:SPARK_HOME/sbin/stop-all.sh
2、将spark-1.5.1-bin-hadoop2.4.tgz使用WinSCP上传到/usr/local目录下。
3、解压缩spark包:tar zxvf spark-1.5.1-bin-hadoop2.4.tgz。
4、修改spark环境变量

  • vi .bashrc
  • export SPARK_HOME=/usr/local/spark-1.5.1-bin-hadoop2.4
  • export PATH=$SPARK_HOME/bin
  • export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
  • source .bashrc

1.5. 修改spark-env.sh文件

1、cd /usr/local/spark/conf
2、cp spark-env.sh.template spark-env.sh
3、vi spark-env.sh

  • export JAVA_HOME=/usr/java/latest
  • export SCALA_HOME=/usr/local/scala
  • export SPARK_MASTER_IP=192.168.1.107
  • export SPARK_WORKER_MEMORY=1g
  • export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

1.6. 修改slaves文件

  • spark2
  • spark3

1.7. 安装spark集群

在另外两个节点进行一模一样的配置,使用scp将spark和.bashrc拷贝到spark2和spark3即可。

1.8. 启动spark集群

1、在spark目录下的sbin目录
2、执行./start-all.sh
3、使用jsp和8080端口可以检查集群是否启动成功
4、进入spark-shell查看是否正常

2. Spark SQL

2.1. Spark SQL前世今生

Spark 1.0版本开始,推出了Spark SQL。其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark SQL。Shark的性能比Hive就要高出一个数量级,而Spark SQL的性能又比Shark高出一个数量级。

最早来说,Hive的诞生,主要是因为要让那些不熟悉Java,无法深入进行MapReduce编程的数据分析师,能够使用他们熟悉的关系型数据库的SQL模型,来操作HDFS上的数据。因此推出了Hive。Hive底层基于MapReduce实现SQL功能,能够让数据分析人员,以及数据开发人员,方便的使用Hive进行数据仓库的建模和建设,然后使用SQL模型针对数据仓库中的数据进行统计和分析。但是Hive有个致命的缺陷,就是它的底层基于MapReduce,而MapReduce的shuffle又是基于磁盘的,因此导致Hive的性能异常低下。经常出现复杂的SQL ETL,要运行数个小时,甚至数十个小时的情况。

后来,Spark推出了Shark,Shark与Hive实际上还是紧密关联的,Shark底层很多东西还是依赖于Hive,但是修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。

然而,Shark还是它的问题所在,Shark底层依赖了Hive的语法解析器、查询优化器等组件,因此对于其性能的提升还是造成了制约。所以后来Spark团队决定,完全抛弃Shark,推出了全新的Spark SQL项目。Spark SQL就不只是针对Hive中的数据了,而且可以支持其他很多数据源的查询。

2.2. Spark SQL的特点

  1. 支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
  2. 多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
  3. 组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展。

在2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发。

Spark SQL的性能比Shark来说,又有了数倍的提升。

2.3. Spark SQL的性能优化技术简介

  1. 内存列存储(in-memory columnar storage)

内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,作为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储之后,减少了对内存的消耗,也就避免了GC大量数据的性能开销。

  1. 字节码生成技术(byte-code generation)

Spark SQL在其catalyst模块的expressions中增加了codegen模块,对于SQL语句中的计算表达式,比如select num + num from t这种的sql,就可以使用动态字节码生成技术来优化其性能。

  1. Scala代码编写的优化

对于Scala代码编写中,可能会造成较大性能开销的地方,通过使用更加复杂的方式重写,来获取更好的性能。比如Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成了用null、while循环等来实现,及重用可变的对象。

2.4. Spark SQL and DataFrame引言

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。

DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

2.4.1. SQLContext

要使用Spark SQL,首先就得创建一个创建一个SQLContext对象,或者是它的子类的对象,比如HiveContext的对象。

  1. Java版本:

  • JavaSparkContext sc = ...;
  • SQLContext sqlContext = new SQLContext(sc);

2. Scala版本:

  • val sc: SparkContext = ...
  • val sqlContext = new SQLContext(sc)
  • import sqlContext.implicits._

2.4.2. HiveContext

除了基本的SQLContext以外,还可以使用它的子类——HiveContext。HiveContext的功能除了包含SQLContext提供的所有功能之外,还包括了额外的专门针对Hive的一些功能。这些额外功能包括:使用HiveQL语法来编写和执行SQL,使用Hive中的UDF函数,从Hive表中读取数据。

要使用HiveContext,就必须预先安装好Hive,SQLContext支持的数据源,HiveContext也同样支持——而不只是支持Hive。对于Spark 1.3.x以上的版本,都推荐使用HiveContext,因为其功能更加丰富和完善。

Spark SQL还支持用spark.sql.dialect参数设置SQL的方言。使用SQLContext的setConf()即可进行设置。对于SQLContext,它只支持SQL一种方言。对于HiveContext,它默认的方言是HiveQL。

2.4.3. 创建DataFrame

使用SQLContext,可以从RDD、Hive表或者其他数据源,来创建一个DataFrame。以下是一个使用JSON文件创建DataFrame的例子:

对于以下的原始数据:

  • {"id":1, "name":"leo", "age":18}
  • {"id":2, "name":"jack", "age":19}
  • {"id":3, "name":"marry", "age":17}
  1. Java版本:
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class DataSetCreate {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("DataSetCreate");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • SQLContext sqlContext = new SQLContext(sc);
  • Dataset<Row> dataset = sqlContext.read().json("hdfs://s100:8020/user/ubuntu/spark/students.json");
  • dataset.show();
  • sc.close();
  • }
  • }
  1. Scala版本:
  • package com.coderap.sql
  • import org.apache.spark.sql.SQLContext
  • import org.apache.spark.{SparkConf, SparkContext}
  • object DataSetCreate {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("DataSetCreate")
  • val sc = new SparkContext(conf)
  • val sQLContext = new SQLContext(sc)
  • val dataSet = sQLContext.read.json("hdfs://s100:8020/user/ubuntu/spark/students.json")
  • dataSet.show()
  • }
  • }

运行结果:

  • +---+---+-----+
  • |age| id| name|
  • +---+---+-----+
  • | 18| 1| leo|
  • | 19| 2| jack|
  • | 17| 3|marry|
  • +---+---+-----+

2.4.4. DataFrame的常用操作

DataSet有一些常用的操作,示例代码如下:

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class DataSetOperation {
  • public static void main(String[] args) {
  • SparkConf conf = new SparkConf().setAppName("DataSetCreate");
  • JavaSparkContext sc = new JavaSparkContext(conf);
  • SQLContext sqlContext = new SQLContext(sc);
  • Dataset<Row> dataset = sqlContext.read().json("hdfs://s100:8020/user/ubuntu/spark/students.json");
  • // 查看所有数据
  • dataset.show();
  • // 查看某一列数据
  • dataset.select("name").show();
  • // 查看某几列数据,并对某列数据进行处理
  • dataset.select(dataset.col("name"), dataset.col("age").plus(1)).show();
  • // 查看条件过滤某列数据
  • dataset.filter(dataset.col("age").gt(18)).show();
  • // 根据某一列进行分组聚合
  • dataset.groupBy(dataset.col("age")).count().show();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.SQLContext
  • class DataSetOperation {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setAppName("DataSetCreate")
  • val sc = new SparkContext(conf)
  • val sQLContext = new SQLContext(sc)
  • val dataSet = sQLContext.read.json("hdfs://s100:8020/user/ubuntu/spark/students.json")
  • dataSet.show()
  • dataSet.select("name").show()
  • dataSet.select(dataSet.col("name"), dataSet.col("age").plus(1)).show()
  • dataSet.filter(dataSet.col("age").gt(18)).show()
  • dataSet.groupBy("age").count().show()
  • }
  • }

对于之前的数据,运行结果如下:

  • +---+---+-----+
  • |age| id| name|
  • +---+---+-----+
  • | 18| 1| leo|
  • | 19| 2| jack|
  • | 17| 3|marry|
  • +---+---+-----+
  • +-----+
  • | name|
  • +-----+
  • | leo|
  • | jack|
  • |marry|
  • +-----+
  • +-----+---------+
  • | name|(age + 1)|
  • +-----+---------+
  • | leo| 19|
  • | jack| 20|
  • |marry| 18|
  • +-----+---------+
  • +---+---+----+
  • |age| id|name|
  • +---+---+----+
  • | 19| 2|jack|
  • +---+---+----+
  • +---+-----+
  • |age|count|
  • +---+-----+
  • | 19| 1|
  • | 17| 1|
  • | 18| 1|
  • +---+-----+

2.5. RDD转换为DataFrame

为什么要将RDD转换为DataFrame?因为这样的话,我们就可以直接针对HDFS等任何可以构建为RDD的数据,使用Spark SQL进行SQL查询了。这个功能是无比强大的。想象一下,针对HDFS中的数据,直接就可以使用SQL进行查询。

Spark SQL支持两种方式来将RDD转换为DataFrame。

第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元数据时,是一种非常不错的方式。

第二种方式,是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。

2.5.1. 使用反射方式推断元数据

Java版本:Spark SQL是支持将包含了JavaBean的RDD转换为DataFrame的。JavaBean的信息,就定义了元数据。Spark SQL现在是不支持将包含了嵌套JavaBean或者List等复杂数据的JavaBean,作为元数据的。只支持一个包含简单数据类型的field的JavaBean。

Scala版本:而Scala由于其具有隐式转换的特性,所以Spark SQL的Scala接口,是支持自动将包含了case class的RDD转换为DataFrame的。case class就定义了元数据。Spark SQL会通过反射读取传递给case class的参数的名称,然后将其作为列名。与Java不同的是,Spark SQL是支持将包含了嵌套数据结构的case class作为元数据的,比如包含了Array等。

我们有以下数据:

  • 1,leo,17
  • 2,marry,17
  • 3,jack,18
  • 4,tom,19

在这份数据中,第一列是id,第二列是name,第三列是age;使用DataSet的方式查询上述数据中年龄大于18的人;示例代码如下:

  1. Java版本

首先需要一个Student类:

  • package com.coderap.sql;
  • import java.io.Serializable;
  • public class Student implements Serializable {
  • private static final long serialVersionUID = -615045082992784887L;
  • Integer id;
  • String name;
  • Integer age;
  • public Student(Integer id, String name, Integer age) {
  • this.id = id;
  • this.name = name;
  • this.age = age;
  • }
  • public Integer getId() {
  • return id;
  • }
  • public void setId(Integer id) {
  • this.id = id;
  • }
  • public String getName() {
  • return name;
  • }
  • public void setName(String name) {
  • this.name = name;
  • }
  • public Integer getAge() {
  • return age;
  • }
  • public void setAge(Integer age) {
  • this.age = age;
  • }
  • @Override
  • public String toString() {
  • return "Student{" +
  • "id=" + id +
  • ", name='" + name + '\'' +
  • ", age=" + age +
  • '}';
  • }
  • }

然后是主要的业务代码:

  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • import java.util.List;
  • public class RDD2DataSetReflection {
  • public static void main(String[] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("RDD2DataSetReflection");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • JavaRDD<String> lines = javaSparkContext.textFile("D:\\_Code\\Spark\\RDD2DataSetReflection\\students.txt");
  • JavaRDD<Student> studentsRDD = lines.map(new Function<String, Student>() {
  • private static final long serialVersionUID = -3029841765739785984L;
  • @Override
  • public Student call(String line) throws Exception {
  • String[] columns = line.split(",");
  • return new Student(Integer.valueOf(columns[0].trim()), columns[1].trim(), Integer.valueOf(columns[2].trim()));
  • }
  • });
  • /**
  • * 使用反射方式,将RDD转换为DataFrame
  • * 将Student.class传入进去,其实就是用反射的方式来创建DataFrame
  • * 因为Student.class本身就是反射的一个应用
  • * 然后底层还得通过对Student Class进行反射,来获取其中的field
  • * 这里要求,JavaBean必须实现Serializable接口,是可序列化的
  • */
  • Dataset<Row> studentsDataSet = sqlContext.createDataFrame(studentsRDD, Student.class);
  • // 拿到了一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行SQL语句
  • studentsDataSet.registerTempTable("students");
  • // 针对students临时表执行SQL语句,查询年龄小于等于18岁的学生,就是teenageer
  • Dataset<Row> teenagerDataSet = sqlContext.sql("select id,name,age from students where age<=18");
  • // 将查询出来的DataFrame,再次转换为RDD
  • JavaRDD<Row> teenagerRDD = teenagerDataSet.javaRDD();
  • // 将RDD中的数据,进行映射,映射为Student
  • JavaRDD<Student> teenagerResultRDD = teenagerRDD.map(new Function<Row, Student>() {
  • private static final long serialVersionUID = 767976589610443175L;
  • @Override
  • public Student call(Row row) throws Exception {
  • return new Student(row.getInt(0), row.getString(1), row.getInt(2));
  • }
  • });
  • // 将数据collect回来,打印出来
  • List<Student> teenagerList = teenagerResultRDD.collect();
  • for (Student student : teenagerList) {
  • System.out.println(student.toString());
  • }
  • javaSparkContext.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.rdd.RDD
  • import org.apache.spark.sql.{DataFrame, Row, SQLContext}
  • import org.apache.spark.{SparkConf, SparkContext}
  • object RDD2DataSetReflection extends App {
  • private val conf: SparkConf = new SparkConf().setMaster("local").setAppName("RDD2DataSetReflection")
  • private val sc = new SparkContext(conf)
  • private val sQLContext = new SQLContext(sc)
  • private val lines: RDD[String] = sc.textFile("D:\\_Code\\Spark\\RDD2DataSetReflection\\students.txt", 1)
  • // 在Scala中使用反射方式,进行RDD到DataFrame的转换,需要手动导入一个隐式转换
  • import sQLContext.implicits._
  • case class Student(id: Int, name: String, age: Int)
  • /*
  • 这里其实就是一个普通的,元素为case class的RDD
  • 直接对它使用toDF()方法,即可转换为DataFrame
  • */
  • val studentsDataSet = lines.map(line => line.split(","))
  • .map(colums => Student(colums(0).trim().toInt, colums(1), colums(2).trim().toInt)).toDF()
  • studentsDataSet.registerTempTable("students")
  • private val teenagersDataSet: DataFrame = sQLContext.sql("select id,name,age from students where age<=18")
  • private val teenagersRDD: RDD[Row] = teenagersDataSet.rdd
  • // 在scala中,对row的使用,比java中的row的使用,更加丰富
  • teenagersRDD.map(row => Student(row(0).toString.toInt, row(1).toString, row(2).toString.toInt))
  • .collect()
  • .foreach(student => println(student))
  • // 在scala中,可以用row的getAs()方法,获取指定列名的列
  • teenagersRDD.map(row => Student(row.getAs[Int]("id"), row.getAs[String]("name"), row.getAs[Int]("age")))
  • .collect()
  • .foreach(student => println(student))
  • // 还可以通过row的getValuesMap()方法,获取指定几列的值,返回的是个map
  • teenagersRDD.map { row => {
  • val map = row.getValuesMap[Any](Array("id", "name", "age"));
  • Student(map("id").toString().toInt, map("name").toString(), map("age").toString().toInt)
  • }
  • }.collect().foreach(student => println(student))
  • }

运行结果如下:

  • Student{id=1, name='leo', age=17}
  • Student{id=2, name='marry', age=17}
  • Student{id=3, name='jack', age=18}

2.5.2. 使用编程方式指定元数据

Java版本:当JavaBean无法预先定义和知道的时候,比如要动态从一个文件中读取数据结构,那么就只能用编程方式动态指定元数据了。首先要从原始RDD创建一个元素为Row的RDD;其次要创建一个StructType,来代表Row;最后将动态定义的元数据应用到RDD上。

依旧是上面的数据,我们有以下的示例代码:

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaRDD;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.RowFactory;
  • import org.apache.spark.sql.SQLContext;
  • import org.apache.spark.sql.types.DataTypes;
  • import org.apache.spark.sql.types.StructField;
  • import org.apache.spark.sql.types.StructType;
  • import java.util.ArrayList;
  • import java.util.List;
  • public class RDD2DataSetProgrammatically {
  • public static void main(String[] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("RDD2DataSetReflection");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • // 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
  • JavaRDD<String> lines = javaSparkContext.textFile("D:\\_Code\\Spark\\RDD2DataSetReflection\\students.txt");
  • JavaRDD<Row> studentRowsRDD = lines.map(new Function<String, Row>() {
  • private static final long serialVersionUID = 6046313347580422123L;
  • @Override
  • public Row call(String line) throws Exception {
  • String[] columns = line.split(",");
  • return RowFactory.create(Integer.valueOf(columns[0]),
  • columns[1],
  • Integer.valueOf(columns[2]));
  • }
  • });
  • /**
  • * 第二步,动态构造元数据
  • * 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
  • * 或者是配置文件中,加载出来的,是不固定的
  • * 所以特别适合用这种编程的方式,来构造元数据
  • */
  • ArrayList<StructField> structFields = new ArrayList<StructField>();
  • structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
  • structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  • structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
  • StructType structType = DataTypes.createStructType(structFields);
  • // 第三步,使用动态构造的元数据,将RDD转换为DataFrame
  • Dataset<Row> studentsDataSet = sqlContext.createDataFrame(studentRowsRDD, structType);
  • // 后面,就可以使用DataFrame了
  • studentsDataSet.registerTempTable("students");
  • Dataset<Row> teenagersDataSet = sqlContext.sql("select id,name,age from students where age<=18");
  • List<Row> teenagersRows = teenagersDataSet.javaRDD().collect();
  • for (Row teenagersRow : teenagersRows) {
  • System.out.println(teenagersRow);
  • }
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.sql.Row
  • import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
  • import org.apache.spark.SparkConf
  • import org.apache.spark.SparkContext
  • import org.apache.spark.sql.SQLContext
  • object RDD2DataFrameProgrammatically extends App {
  • val conf = new SparkConf()
  • .setMaster("local")
  • .setAppName("RDD2DataFrameProgrammatically")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • // 第一步,构造出元素为Row的普通RDD
  • val studentRDD = sc.textFile("C://Users//Administrator//Desktop//students.txt", 1)
  • .map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }
  • // 第二步,编程方式动态构造元数据
  • val structType = StructType(Array(
  • StructField("id", IntegerType, true),
  • StructField("name", StringType, true),
  • StructField("age", IntegerType, true)))
  • // 第三步,进行RDD到DataFrame的转换
  • val studentDF = sqlContext.createDataFrame(studentRDD, structType)
  • // 继续正常使用
  • studentDF.registerTempTable("students")
  • val teenagerDF = sqlContext.sql("select * from students where age<=18")
  • val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println(row) }
  • }

运行结果如下:

  • [1,leo,17]
  • [2,marry,17]
  • [3,jack,18]

2.6. 通用的load和save操作

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。下面我们用示例代码来演示;

首先是将一份parquet读取出来,查询指定的列的数据,然后存储到某个文件中:

  1. Java版本代码
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class GenericLoadAndSave {
  • public static void main(String [] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • Dataset<Row> loadDataSet = sqlContext.read().load("D:\\_Code\\Spark\\GenericLoadAndSave\\users.parquet");
  • loadDataSet.select("name", "favorite_color").write().save("D:\\_Code\\Spark\\GenericLoadAndSave\\namesAndFavColors.parquet");
  • javaSparkContext.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.sql.SQLContext
  • import org.apache.spark.{SparkConf, SparkContext}
  • object GenericLoadAndSave {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • val dataSet = sqlContext.read.load("D:\\_Code\\Spark\\GenericLoadAndSave\\users.parquet")
  • dataSet.select("name", "favorite_color").write.save("D:\\_Code\\Spark\\GenericLoadAndSave\\namesAndFavColors_scala.parquet")
  • }
  • }

在运行过程中,会有以下的打印信息:

  • 18/01/31 21:42:36 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
  • {
  • "type" : "struct",
  • "fields" : [ {
  • "name" : "name",
  • "type" : "string",
  • "nullable" : true,
  • "metadata" : { }
  • }, {
  • "name" : "favorite_color",
  • "type" : "string",
  • "nullable" : true,
  • "metadata" : { }
  • } ]
  • }
  • and corresponding Parquet message type:
  • message spark_schema {
  • optional binary name (UTF8);
  • optional binary favorite_color (UTF8);
  • }

运行完后,会生成相应的文件目录:

1.Load和Save操作的结果文件.png

注:parquet文件是无法直接查看的。

2.6.1. Save Mode

Spark SQL对于save操作,提供了不同的save
mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

Save Mode 意义
SaveMode.ErrorIfExists (默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append 如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite 如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
SaveMode.Ignore 如果目标位置已经存在数据,那么就忽略,不做任何操作。

如下示例代码:

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class ManuallySpecifyOptions {
  • public static void main(String [] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • Dataset<Row> loadDataSet = sqlContext.read().format("json").load("D:\\_Code\\Spark\\ManuallySpecifyOptions\\people.json");
  • loadDataSet.select("name").write().format("parquet").save("D:\\_Code\\Spark\\ManuallySpecifyOptions\\ManuallySpecifyOptions");
  • javaSparkContext.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.{SQLContext, SaveMode}
  • object SpecifySaveMode {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • val dataSet = sqlContext.read.format("json").load("D:\\_Code\\Spark\\ManuallySpecifyOptions\\people.json")
  • dataSet.select("name").write.format("parquet").mode(SaveMode.Append).save("D:\\_Code\\Spark\\ManuallySpecifyOptions\\ManuallySpecifyOptions_scala")
  • }
  • }

运行后可以观察输出数据的目录,会发现比原来新增了一些文件:

2.SaveMode的使用.png

2.7. 手动指定数据源类型

也可以手动指定用来操作的数据源类型。数据源通常需要使用其全限定名来指定,比如parquet是org.apache.spark.sql.parquet。但是Spark SQL内置了一些数据源类型,比如json,parquet,jdbc等等。实际上,通过这个功能,就可以在不同类型的数据源之间进行转换了。比如将json文件中的数据保存到parquet文件中。默认情况下,如果不指定数据源类型,那么就是parquet。

下面的示例将指定读入数据为json格式,输出数据为parquet格式:

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class ManuallySpecifyOptions {
  • public static void main(String [] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • Dataset<Row> loadDataSet = sqlContext.read().format("json").load("D:\\_Code\\Spark\\ManuallySpecifyOptions\\people.json");
  • loadDataSet.select("name").write().format("parquet").save("D:\\_Code\\Spark\\ManuallySpecifyOptions\\ManuallySpecifyOptions");
  • javaSparkContext.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.SQLContext
  • object ManuallySpecifyOptions {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • val dataSet = sqlContext.read.format("json").load("D:\\_Code\\Spark\\ManuallySpecifyOptions\\people.json")
  • dataSet.select("name").write.format("parquet").save("D:\\_Code\\Spark\\ManuallySpecifyOptions\\ManuallySpecifyOptions_scala")
  • }
  • }

同样的,运行后可以得到输出文件:

3.手动指定输入输出格式.png

2.8. 使用编程方式加载数据

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目,最新的版本是1.8.0。

列式存储和行式存储相比有哪些优势呢?

  1. 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
  2. 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。
  3. 只读取需要的列,支持向量运算,能够获取更好的扫描性能。

这里讲解Parquet数据源的第一个知识点,使用编程的方式加载Parquet文件中的数据。

案例:查询用户数据中的用户姓名。

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.api.java.function.Function;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • import java.util.List;
  • public class ParquetLoadData {
  • public static void main(String[] args) {
  • SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • Dataset<Row> userParquet = sqlContext.read().parquet("D:\\_Code\\Spark\\ParquetLoadData\\users.parquet");
  • userParquet.registerTempTable("users");
  • Dataset<Row> userNamesDataSet = sqlContext.sql("select name from users");
  • List<String> userNames = userNamesDataSet.javaRDD().map(new Function<Row, String>() {
  • private static final long serialVersionUID = 9073388819826938197L;
  • @Override
  • public String call(Row row) throws Exception {
  • return "Name: " + row.getString(0);
  • }
  • }).collect();
  • for (String userName : userNames) {
  • System.out.println(userName);
  • }
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.SQLContext
  • object ParquetLoadData {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • val userDataSet = sqlContext.read.parquet("D:\\_Code\\Spark\\ParquetLoadData\\users.parquet")
  • userDataSet.registerTempTable("users")
  • val dataSet = sqlContext.sql("select name from users")
  • dataSet.rdd.map(row => {
  • "Name: " + row.get(0)
  • }).collect().foreach(userName => println(userName))
  • }
  • }

运行后的输出结果为:

  • Name: Alyssa
  • Name: Ben

2.9. 自动分区推断

表分区是一种常见的优化方式,比如Hive中就提供了表分区的特性。在一个分区表中,不同分区的数据通常存储在不同的目录中,分区列的值通常就包含在了分区目录的目录名中。Spark
SQL中的Parquet数据源,支持自动根据目录名推断出分区信息。例如,如果将人口数据存储在分区表中,并且使用性别和国家作为分区列。那么目录结构可能如下所示:

  • tableName
  • |- gender=male
  • |- country=US
  • ...
  • ...
  • ...
  • |- country=CN
  • ...
  • |- gender=female
  • |- country=US
  • ...
  • |- country=CH
  • ...

如果将tableName传入SQLContext.read.parquet()或者SQLContext.read.load()方法,那么SparkSQL就会自动根据目录结构,推断出分区信息,是gender和country。即使数据文件中只包含了两列值,name和age,但是SparkSQL返回的DataFrame,调用printSchema()方法时,会打印出四个列的值:name,age,country,gender。这就是自动分区推断的功能。

此外,分区列的数据类型,也是自动被推断出来的。目前,SparkSQL仅支持自动推断出数字类型和字符串类型。有时,用户也许不希望Spark SQL自动推断分区列的数据类型。此时只要设置一个配置即可,spark.sql.sources.partitionColumnTypeInference.enabled,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。禁止自动推断分区列的类型时,所有分区列的类型,就统一默认都是String。

案例:自动推断用户数据的性别和国家

我们首先在HDFS中创建一些分区,并将文件存入其中:

  • ubuntu@s100:~$ hdfs dfs -mkdir -p /user/ubuntu/spark/users/gender=male/country=US
  • ubuntu@s100:~$ hdfs dfs -ls -R /user/ubuntu/spark/users
  • drwxr-xr-x - ubuntu supergroup 0 2018-01-31 07:22 /user/ubuntu/spark/users/gender=male
  • drwxr-xr-x - ubuntu supergroup 0 2018-01-31 07:22 /user/ubuntu/spark/users/gender=male/country=US
  • ubuntu@s100:~$ hdfs dfs -put users.parquet /user/ubuntu/spark/users/gender=male/country=US/
  • ubuntu@s100:~$ hdfs dfs -ls -R /user/ubuntu/spark/users
  • drwxr-xr-x - ubuntu supergroup 0 2018-01-31 07:22 /user/ubuntu/spark/users/gender=male
  • drwxr-xr-x - ubuntu supergroup 0 2018-01-31 07:23 /user/ubuntu/spark/users/gender=male/country=US
  • -rw-r--r-- 3 ubuntu supergroup 615 2018-01-31 07:23 /user/ubuntu/spark/users/gender=male/country=US/users.parquet

然后编写代码如下:

  1. Java版本
  • package com.coderap.sql;
  • import org.apache.spark.SparkConf;
  • import org.apache.spark.api.java.JavaSparkContext;
  • import org.apache.spark.sql.Dataset;
  • import org.apache.spark.sql.Row;
  • import org.apache.spark.sql.SQLContext;
  • public class ParquetPartitionDiscovery {
  • public static void main(String [] args) {
  • SparkConf sparkConf = new SparkConf().setAppName("ParquetPartitionDiscovery");
  • JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
  • SQLContext sqlContext = new SQLContext(javaSparkContext);
  • Dataset<Row> userParquet = sqlContext.read().parquet("hdfs://s100:8020/user/ubuntu/spark/users/gender=male/country=US/users.parquet");
  • userParquet.printSchema();
  • userParquet.show();
  • javaSparkContext.close();
  • }
  • }
  1. Scala版本
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.SQLContext
  • object ParquetPartitionDiscovery {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf().setMaster("local").setAppName("GenericLoadAndSave")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • val dataSet = sqlContext.read.parquet("hdfs://s100:8020/user/ubuntu/spark/users/gender=male/country=US/users.parquet")
  • dataSet.printSchema()
  • dataSet.show()
  • }
  • }

运行结果如下:

  • root
  • |-- name: string (nullable = true)
  • |-- favorite_color: string (nullable = true)
  • |-- favorite_numbers: array (nullable = true)
  • | |-- element: integer (containsNull = true)
  • |-- gender: string (nullable = true)
  • |-- country: string (nullable = true)
  • +------+--------------+----------------+------+-------+
  • | name|favorite_color|favorite_numbers|gender|country|
  • +------+--------------+----------------+------+-------+
  • |Alyssa| null| [3, 9, 15, 20]| male| US|
  • | Ben| red| []| male| US|
  • +------+--------------+----------------+------+-------+

2.10. 合并元数据

如同ProtocolBuffer,Avro,Thrift一样,Parquet也是支持元数据合并的。用户可以在一开始就定义一个简单的元数据,然后随着业务需要,逐渐往元数据中添加更多的列。在这种情况下,用户可能会创建多个Parquet文件,有着多个不同的但是却互相兼容的元数据。Parquet数据源支持自动推断出这种情况,并且进行多个Parquet文件的元数据的合并。

因为元数据合并是一种相对耗时的操作,而且在大多数情况下不是一种必要的特性,从Spark1.5.0版本开始,默认是关闭Parquet文件的自动合并元数据的特性的。可以通过以下两种方式开启Parquet数据源的自动合并元数据的特性:

1、读取Parquet文件时,将数据源的选项,mergeSchema,设置为true

2、使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true

案例:合并学生的基本信息,和成绩信息的元数据

  1. Scala代码
  • package com.coderap.sql
  • import org.apache.spark.{SparkConf, SparkContext}
  • import org.apache.spark.sql.{SQLContext, SaveMode}
  • object ParquetMergeSchema {
  • def main(args: Array[String]): Unit = {
  • val conf = new SparkConf()
  • .setAppName("ParquetMergeSchema")
  • val sc = new SparkContext(conf)
  • val sqlContext = new SQLContext(sc)
  • import sqlContext.implicits._
  • // 创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件中
  • val studentsWithNameAge = Array(("leo", 23), ("jack", 25)).toSeq
  • val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name", "age")
  • studentsWithNameAgeDF.write.format("parquet").mode(SaveMode.Append).save("hdfs://spark1:9000/spark-study/students")
  • // 创建第二个DataFrame,作为学生的成绩信息,并写入一个parquet文件中
  • val studentsWithNameGrade = Array(("marry", "A"), ("tom", "B")).toSeq
  • val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name", "grade")
  • studentsWithNameGradeDF.write.format("parquet").mode(SaveMode.Append).save("hdfs://s100:8020/user/ubuntu/spark/students")
  • /**
  • * 首先,第一个DataFrame和第二个DataFrame的元数据肯定是不一样的吧
  • * 一个是包含了name和age两个列,一个是包含了name和grade两个列
  • * 所以, 这里期望的是,读取出来的表数据,自动合并两个文件的元数据,出现三个列,name、age、grade
  • */
  • // 用mergeSchema的方式,读取students表中的数据,进行元数据的合并
  • val students = sqlContext.read.option("mergeSchema", "true")
  • .parquet("hdfs://s100:8020/user/ubuntu/spark/students")
  • students.printSchema()
  • students.show()
  • }
  • }