详解 Spark SQL 核心编程知识

一、SparkSQL 概述

1. 概念

Spark SQL 是 Spark 用于结构化数据 (structured data) 处理的 Spark 模块,使用 SQL 的方式简化 RDD 的开发

2. Hive VS SparkSQL

  • Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具,但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O,降低的运行效率
  • Shark 是为了提高 SQL-on-Hadoop的效率而产生的 SQL-on-Hadoop 工具,是基于 Hive 所开发的,它修
    改了 Hive 中的内存管理、物理计划和执行三个模块,并使之能运行在 Spark 引擎上
  • 由于 Shark 对于 Hive 存在太多依赖(如采用 Hive 的语法解析器、查询优化器等等),制约了其发展,SparkSQL 由此应运而生,它抛弃了原有 Shark 的代码,但汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、 Hive 兼容性等
    • 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据
    • 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等
    • 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展
  • Shark 停止开发后,发展出了两个支线,其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而 Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,由此 Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎

3. SparkSQL 特点

  • 易整合:无缝的整合了 SQL 查询和 Spark 编程
  • 统一的数据访问:使用相同的方式连接不同的数据源
  • 兼容 Hive:在已有的仓库上直接运行 SQL 或者 HiveQL
  • 标准数据连接:通过 JDBC 或者 ODBC 来连接

4. 两大数据模型

  • DataFrame:一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格
  • DataSet:是 Spark 1.6 中添加的一个新抽象,是 DataFrame 的一个扩展

二、SparkSQL 核心编程

1. 上下文对象

  • SparkCore 中执行应用程序前构建的上下文环境对象为 SparkContext
  • SparkSQL 中执行应用程序前构建的上下文环境对象为:
    • 老版本:SQLContext,用于 Spark 自己提供的 SQL 查询;HiveContext,用于连接 Hive 的查询
    • 新版本:SparkSession,实质上是 SQLContext 和 HiveContext 的组合,内部封装了 SparkContext

2. DataFrame

2.1 介绍
  • DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。与 RDD 的主要区别在于,DataFrame 带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型
  • 与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)
  • DataFrame 是为数据提供了 Schema 的视图,可以把它当做数据库中的一张表来看待
  • DataFrame 是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化
2.2 创建
2.2.1 从 Spark 数据源创建
/**
	基本语法:SparkSession.read.[data_format]
	可用的数据源格式:csv format jdbc json load option options orc parquet schema table text textFile
*/
// 从一个 json 文件中创建 DataFrame
val df = sparkSession.read.json("data/user.json")

// 展示 DataFrame 数据
df.show()
2.2.2 从 RDD 创建
/**
	基本语法:RDD.toDF(col1, col2, ...)
	注意:
		1.必须先创建 SparkSession 对象,使用 val 修饰且命名为 spark
		2.引入:import spark.implicits._  (此处的 spark 为 SparkSession 对象)
*/
val rdd = sc.makeRDD(List(1,2,3,4))
val df = rdd.toDF("id")
df.show()

//使用样例类
case class User(val name: String, val age: Int)

val rdd1 = sc.makeRDD(List(
	("tom", 18), ("jerry", 17)
)).map(t => User(t._1, t._2))

rdd1.toDF.show

// DataFrame 转换为 RDD
val rdd2: RDD[org.apache.spark.sql.Row] = df.rdd

2.2.3 从 Hive Table 创建
/**
	基本语法:SparkSession.sql("select * from hive_table")
	
*/
val df = sparkSession.sql("select * from user")
df.show()

2.3 SQL 语法
// 1.创建临时视图
// 1.1 session 范围有效
df.createTempView("user")
df.createOrReplaceTempView("user")

// 1.2 应用范围有效
df.createGlobalTempView("emp")
df.createOrReplaceGlobalTempView("emp")

// 2.使用 SQL 语法查询视图
sparkSession.sql("select * from user").show()
// 访问全局视图需要加上 global_temp 前缀
sparkSession.newSession().sql("select * from global_temp.emp").show()

2.4 DSL 语法

domain-specific language,可以直接在 DataFrame 中管理结构化的数据

// 1.查看 DataFrame 的 Schema 信息
df.printSchema()

// 2.查看某列数据
df.select("username").show()
// 2.1 查看某列计算后的数据:涉及到运算的时候,每列都必须使用 $ 或者单引号表达式
// 注意:必须先创建 SparkSession 对象,使用 val 修饰且命名为 spark,然后 import spark.implicits._
df.select($"age" + 1).show()
df.select($"username", $"age" + 1).show()
df.select('username, 'age + 1).show()

// 3.按条件查询数据
df.filter('age > 20).show()

// 4.分组计数
df.groupBy("age").count().show()


3. DataSet

3.1 介绍
  • DataSet 是具有强类型的数据集合,需要提供对应的类型信息,比如 DataSet[Car],DataSet[Person]
  • DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
  • 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性
  • 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称
  • DataFrame 是 DataSet 的特列,DataFrame = DataSet[Row] ,所以可以通过 as 方法将 DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
3.2 创建
3.2.1 从序列创建
/**
	基本语法:List/Seq.toDS()
*/
// 1.基本类型序列创建
val list = List(1,2,3,4)
val ds = list.toDS()
ds.show()

// 2.样例类序列创建
case class Person(name: String, age: Int)
val list1 = List(Person("tom", 20), Person("jerry", 18))
val ds1 = list1.toDS()
ds1.show()

3.2.2 从 RDD 创建
/**
	基本语法:RDD.toDS(),最好使用样例类类型的 RDD
	DataSet 转成 RDD:DataSet.rdd()
*/
case class Person(name: String, age: Int)

val rdd = sc.makeRDD(Person("tom", 20), Person("jerry", 18))

val ds = rdd.toDS()
ds.show()

val rdd1 = ds.rdd()
3.2.3 从 DataFrame 创建
/**
	基本语法:DataFrame.as[Class]
	DataSet 转成 DataFrame:DataSet.toDF
*/
val df = sc.makeRDD(List(
	("tom", 20), ("jerry", 18)
)).toDF("name", "age")

df.show()

// 定义样例类型
case class Person(name: String, age: Int)

val ds = df.as[Person]
ds.show()

val df1 = ds.toDF()
df1.show()

4. RDD/DataFrame/DataSet 的关系

在这里插入图片描述

  • RDD 产生于 Spark1.0 版本,DataFrame 产生于 Spark1.3 版本,Dataset 产生于 Spark1.6 版本
  • 三者都是 Spark 平台下的分布式弹性数据集,为处理超大型数据提供便利,都有惰性机制,都会根据 Spark 的内存情况自动缓存运算,都有 partition 的概念
  • RDD 不支持 SparkSQL 操作,而 DataFrame 与 DataSet 均支持 SparkSQL 的操作;DataFrame 其实是 DataSet 的一个特例

相关推荐

  1. sparksql DSL编程风格

    2024-06-06 15:56:05       17 阅读
  2. SparkSql Join Types详解

    2024-06-06 15:56:05       34 阅读
  3. (二)PySpark3:SparkSQL编程

    2024-06-06 15:56:05       22 阅读
  4. sparksql的SQL风格编程

    2024-06-06 15:56:05       23 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-06 15:56:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-06 15:56:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-06 15:56:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-06 15:56:05       20 阅读

热门阅读

  1. 【SAP HANA 34】HANA查找函数LOCATE的使用

    2024-06-06 15:56:05       10 阅读
  2. Mybatis学习之Spring boot整合Mybatis示例

    2024-06-06 15:56:05       9 阅读
  3. 什么是HTTPS协议,与HTTP协议有什么区别

    2024-06-06 15:56:05       10 阅读
  4. 【杂记-webshell恶意脚本木马】

    2024-06-06 15:56:05       8 阅读
  5. PTA R6-1 寻找链表元素的前驱结点

    2024-06-06 15:56:05       9 阅读
  6. 逆序数求解算法

    2024-06-06 15:56:05       8 阅读
  7. CSRF 令牌的生成过程和检查过程

    2024-06-06 15:56:05       8 阅读
  8. Xilinx FPGA 管脚的默认电平配置方法 XDC约束

    2024-06-06 15:56:05       10 阅读