DataFrame相关的API

目录

DataFrame的操作方案

 SQL相关的API

创建一个视图/表

 DSL相关的API

DSL的传递方式

 SQL的函数库

Spark SQL的综合应用

直接基于DataFrame来处理

SQL方式

 DSL方式

 基于RDD转换DataFrame的方式


DataFrame的操作方案

        操作DataFrame一般有两种操作方案:一种为DSL方式,一种为SQL方式

SQL方式:通过编写SQL语句完成统计分析操作

DSL操作:特定领域语言,使用DataFrame特有的API完成计算,也就是代码形式

从使用角度来说:SQL更加方便一些,当适应了DSL写法后,会发现DSL比SQL更好用

从Soark角度来说:推荐使用DSL方案,更有利于Spark底层优化处理

 SQL相关的API

创建一个视图/表

df.createTempview('视图名称'):创建一个临时的视图(表名)

df.createorReplaceTempview('视图名称'):创建一个临时的视图(表名),如果视图存在,直接替换

临时视图:仅能在当前这个spark session的会话中使用

df.createGlobalTempview('视图名称'):创建一个全局视图,运行在一个Spark应用中,多个spark会话中读可以使用,使用的时候必须通过global_temp.视图名称方式才可以加载到,较少使用

执行SQL语句:

        spark.sql('书写SQL')

 DSL相关的API

show():用于展示DF中数据,默认仅展示前20行

             参数1:设置默认展示多少行,默认为20

               参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示(一般不设置)

printSchema():用于打印当前这个DF的表结构信息

select():类似于SQL中的select,SQL中的select后面可以些什么,这里也一样

filter()和where():用于对数据进行过滤操作,一般在SparkSQL中只要使用where

groupBy():用于执行分组操作

orderBy():用于执行排序操作

DSL的传递方式

DSL主要支持一下几种传递的方式: str | column对象 | 列表

        str格式: '字段'

        column对象:

                              DataFrame含有的字段  df['字段']

                               执行过程产生:F.col('字段')

        列表:

                ['字段1','字段2'...]

                [df['字段1'],df['字段2']]

 SQL的函数库

为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库:import pyspark.sql.functions as F

通过F调用对应的函数即可

SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html

Spark SQL的综合应用

world count 案例

已知HDFS又一个words.txt的文件,words.txt文件的内容如下:

hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop

直接基于DataFrame来处理

SQL方式

SQL方式一:子查询

SQL方式二:侧视图

炸裂函数配合侧视图使用如下:

        格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段) 侧视图名 as 字段名

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
2.2- 只有一列。单词
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
    sparksql方式处理数据-子查询
    1.先切分每一行的数据
    2.使用炸裂函数获得一个word单词列
    3.使用子查询方式聚合统计每个单词出现的次数
    """
    spark.sql("""select word,count(*) as cnt 
    from (select explode(split(value,' ')) as word from words)
    group by word order by cnt desc
    """).show()
    """
       sparksql方式处理数据-侧视图
       1.先切分每一行的数据
       2.使用炸裂函数获得一个word单词列
       3.使用侧视图方式聚合统计每个单词出现的次数
       炸裂函数配合侧视图使用如下:
       格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
       侧视图名 as 字段名
       """
    spark.sql("""select word,count(*) as cnt
    from words w 
    lateral view explode(split(value,' ')) t as word
    group by word order by cnt desc
    """).show()

 DSL方式

DSL方式总结:
            withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
            agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
            withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源

 

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
           DSL方式处理数据-方式一
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').count().orderBy('count', ascending=False).show()

    """
           DSL方式处理数据-方式二
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
           4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    """
    DSL方式处理数据-方式三
        withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
        withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
    """
    init_df.withColumn(
        'word',
        F.explode(F.split('value', ' '))
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word')
    ).orderBy('cnt', ascending=False).show()

    # 数据输出
    # 是否资源
    spark.stop()

 基于RDD转换DataFrame的方式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    # 创建SparkSession对象
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 创建sparkContext顶级对象
    sc = spark.sparkContext
    # 数据输入
    # text方式读取hdfs上的文件
    init_rdd = sc.textFile('hdfs://node1:8020/source/word.txt')
    # RDD数据结构转化为二维数据
    map_rdd = init_rdd.flatMap(lambda line: line.split()).map(lambda word: (word,))
    # 查看数据
    # print(map_rdd.collect())
    # 通过Rdd构建DataFrame
    schema = StructType([StructField("value", StringType(), True)])
    init_df = spark.createDataFrame(data=map_rdd, schema=schema)
    # 打印dataframe表结构信息
    # init_df.show()
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
       sparksql方式处理数据
    """
    spark.sql("""select value as word,count(*) as cnt
    from words group by value order by cnt desc""").show()

    print('=' * 50)
    """
           DSL方式处理数据
    """
    init_df.withColumn(
        'word',
        init_df.value
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    # 释放资源
    spark.stop()

相关推荐

  1. DataFrame相关API

    2024-01-10 03:28:01       33 阅读
  2. [spark] DataFrame checkpoint

    2024-01-10 03:28:01       39 阅读
  3. Spark---创建DataFrame方式

    2024-01-10 03:28:01       29 阅读
  4. [spark] dataframecache方法

    2024-01-10 03:28:01       41 阅读
  5. 【Spark系列5】Dataframe下常用算子API

    2024-01-10 03:28:01       28 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-10 03:28:01       14 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-10 03:28:01       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-10 03:28:01       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-10 03:28:01       18 阅读

热门阅读

  1. 正则表达式手册

    2024-01-10 03:28:01       39 阅读
  2. android系列-init 初始化日志

    2024-01-10 03:28:01       33 阅读
  3. 什么是跨境电商独立站?

    2024-01-10 03:28:01       50 阅读
  4. MySQL运维实战(2.4) SSL认证在MySQL中的应用

    2024-01-10 03:28:01       27 阅读
  5. 【Leetcode】24. 两两交换链表中的节点

    2024-01-10 03:28:01       41 阅读
  6. 什么是OOM error

    2024-01-10 03:28:01       37 阅读
  7. Lazada商品API接口:item_search接口中指定搜索范围

    2024-01-10 03:28:01       36 阅读
  8. VSCode安装GitHub Copilot插件方法

    2024-01-10 03:28:01       49 阅读
  9. Python高级用法:property

    2024-01-10 03:28:01       28 阅读
  10. infer。。。。

    2024-01-10 03:28:01       36 阅读