pyspark基础 -- DataFrame的理解与案例

DataFrame(df)介绍

datafram就是一个内存中的二维表结构,具备表结构的三个基本属性:

  • 表结构描述
  1. 在结构层面,pyspark中的StructType对象描述了表结构,StructField对象描述了表的一个列信息;
  2. 在数据层面,Row对象记录了一行数据,Column对象记录了一列数据并包含列信息;

在这里插入图片描述

DataFrame对象的构建方式

方式一 基于RDD构建

dataframe是可以从RDD中直接转化而来的,通过sparksession对象的createDataFrame方法可以将RDD转化为DataFrame,下例:
假设我们有一个people.txt,内容如下:

zhangsan,28
lisi,29
wangwu,30
# coding:utf8
from pyspark.sql import SparkSession
if __name__ == '__main__':
    # 构建spark
    spark = SparkSession.builder.\
        appName("Create df").\
        master("local[*]").\
        getOrCreate()
    # 获取context
    sc = spark.sparkContext
    # 构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("people.txt").\
        map(lambda x : x.split(',')).\
        map(lambda x : [x[0], int(x[1])]) # RDD会做类型自动探测,这里需要做类型转换
    
    # 构建df,RDD按照提供的字段顺序一次获取信息
    df = spark.createDataFrame(rdd, schema = ["name", "age"])
    # 打印df结构
    df.printSchema()
    # 打印20行数据
    # show有两个参数,参数1指定展示多少条数据,默认20
    # 参数2表示是否对列进行截断,如果列的长度超过20个字符传长度,后续的内容以...代替,不全打印
    df.show()
    # df.show(15, False)
    # 构建临时视图表,让我们可以用sparksql的方式查询表内容
    df.createTempView("peopleTable")
    spark.sql("select * from peopleTable where age < 29").show()
        
方式二 通过StructType对象构建

structtype类可以定义整个DataFrame中的schema,也即 df = spark.createDataFrame(rdd, schema = [“name”, “age”])这里的schema可以通过structType来指定

# 需导入以下类
from pyspark.sql import SturctType StringType IntegerType
# 定义表结构,第一个为列名称,第二个参数列数据类型,第三个是否允许为空
schema = StructTpe.\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=False).\
    add("score", IntegerType(), nullable=False)
# 将rdd转为scheme结构
df = spark.createDataFrame(rdd, schema)
通过rdd.toDF()的方法

rdd对象本身有toDF()方法,可以通过参数指定表结构来将rdd转为DataFrame,参数也细分为两种,一种是直接指定表头,如下:

df1 = rdd.toDF(["name", "age"])

这种方法只能指定列名,无法指定列数据类型,只能靠编译器自动推断类型,在类型不敏感时可用,另外一种则是利用上面提到的StructType类先定义号表结构,传入指定表结构变量

df2 = rdd.toDF(schema=schema)
基于pandas的DataFrame转化为sparksql的DataFrame

createDataFrame直接接收pandas的df即可转换

from pyspark.sql import SparkSession
import pandas as pd

if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 定义pandas的DataFrame
    pdf = pd.DataFrame(
        {
            "id" : [1, 2, 5],
            "name" : ["zhang", "wang", "li"],
            "age" : [11, 23, 55]
        }
    )
    # 转换为spark的DF
    df = spark.createDataFrame(pdf)

相关推荐

  1. 机器学习---pySpark案例

    2024-03-20 12:36:02       38 阅读
  2. Python 中 DataFrame 对象该怎么理解

    2024-03-20 12:36:02       34 阅读
  3. MySQL中索引:深入理解案例解析

    2024-03-20 12:36:02       35 阅读
  4. 深入理解SpringApplicationContext:案例详解应用

    2024-03-20 12:36:02       21 阅读
  5. 使用PySpark处理DataFrame以拆分数组列

    2024-03-20 12:36:02       31 阅读
  6. 【Spark基础】-- RDD 转 Dataframe 三种方式

    2024-03-20 12:36:02       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-20 12:36:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-20 12:36:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-20 12:36:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-20 12:36:02       20 阅读

热门阅读

  1. react中useContext的用法

    2024-03-20 12:36:02       20 阅读
  2. 分流器电阻的工艺结构原理及选型参数总结

    2024-03-20 12:36:02       24 阅读
  3. 常见的排序算法有哪些?用Python实现一个

    2024-03-20 12:36:02       17 阅读
  4. websocket 中 request-line 中的URI编码问题

    2024-03-20 12:36:02       18 阅读
  5. 【大模型学习记录】db-gpt源码安装问题汇总

    2024-03-20 12:36:02       19 阅读
  6. Android学习进阶

    2024-03-20 12:36:02       22 阅读
  7. docker基础(二)之docker build

    2024-03-20 12:36:02       19 阅读
  8. 布隆过滤器的实现及使用

    2024-03-20 12:36:02       19 阅读