Structured Streaming基础--学习笔记

Structured streaming介绍

spark进行实时数据流计算时有两个工具:

  • Spark Streaming:编写rdd代码处理数据流,可以解决非结构化的流式数据
  • Structured Streaming:编写df代码处理数据流,可以解决结构化和半结构化的流式数据

1,数据相关介绍

有界数据和无界数据

①有界数据:

  • 有起始位置,有结束位置。比如文件数据 有起始行,有结束行
  • 有明确的数据容量大小。处理数据时就能知道处理的数据大小
  • 在处理数据时,按批次处理。数据处理完成程序就结束
  • 离线计算时处理的都是有界数据

②无界数据

  • 有起始位置,没有结束位置,知道数据的起始位置在哪里,但是数据到哪结束不知道(因为数据在不断产生,什么时候结束不知道)
  • 流式数据都是无界数据
  • 无界数据的总量是不确定的
  • 数据是不断产生的
  • 数据有时效性 (有效期)
  • 处理无界数据时,程序是持续运行的
  • 实时计算时处理的都是无界数据
  • 近期实时计算处理的微批数据

离线计算:

  • 离线计算就是在计算开始前已知所有输入数据,输入数据不会产生变化,且在解决一个问题后就要立即得出结果的前提下进行的计算。
  • 数据处理时间大于1个小时,一般离线计算的处理时间都是t+1天
  • mapreduce框架/spark框架

近实时计算:

  • 近实时计算就是在计算开始前将多条数据(流数据)放在一起处理,同时处理的是几条数据
  • 数据处理时间在5分钟到1小时范围内
  • spark框架

实时计算:

  • 实时计算就是一条一条的处理数据,处理的时间延迟很低
  • 数据处理时间小于5分钟
  • flink框架

2,Structured streaming基本使用

没有ncat服务的话,在线安装或离线导入

命令:yum install nc

执行前需要先启动ncat服务

命令:ncat -lk 8888

from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#读取socket工具中的流数据


options = {
   
   
    #指定ip地址
    'host':'192.168.88.100',
    #指定socket的端口号
    'port':'8888'
}

df1 = ss.readStream.load(format='socket',**options)
#查看里面的数据不能通过show()方法查看
df1.printSchema()
#展示数据
#start:启动流计算
#awaitTermiantion():使应用程序一直运行
df1.writeStream.start(format='console',outputMode='append').awaitTermination()

3,Structured Streaming编程模型

1、Input Table 输入数据表 无界表

2、Query 对数据进行查询计算

3、Result Table 保存计算结果

4、Output 输出结果

变成模型遵循ETL处理流程:
①E->读取流数据,转换成无界表
②T->使用sparkSql处理流数据,流计算,查询计算
③L->存储E的结果

from pyspark.sql import SparkSession,functions as F

ss = SparkSession.builder.getOrCreate()

options = {
   
   
    # 指定ip地址
    'host': '192.168.88.100',
    # 指定socket的端口号
    'port': '8888'
}

df1 = ss.readStream.load(format='socket',**options)
df_split = df1.select(F.split('value',','

相关推荐

  1. html 基础学习笔记

    2024-01-24 03:52:02       49 阅读
  2. photoshop基础学习笔记

    2024-01-24 03:52:02       42 阅读
  3. Ps基础学习笔记

    2024-01-24 03:52:02       30 阅读
  4. Ps基础学习笔记

    2024-01-24 03:52:02       34 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-24 03:52:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-24 03:52:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-24 03:52:02       82 阅读
  4. Python语言-面向对象

    2024-01-24 03:52:02       91 阅读

热门阅读

  1. NVIDIA 驱动和 CUDA 版本信息速查

    2024-01-24 03:52:02       57 阅读
  2. 代码随想录二刷 | 回溯 |复原IP地址

    2024-01-24 03:52:02       60 阅读
  3. 【C++PCL】点云处理K-Means点云分割

    2024-01-24 03:52:02       59 阅读
  4. 化妆-护肤品选购

    2024-01-24 03:52:02       55 阅读
  5. sql server 修改表前 先判断是否有这个列

    2024-01-24 03:52:02       50 阅读
  6. 动态规划Day16(编辑距离,删除元素待写完)

    2024-01-24 03:52:02       47 阅读
  7. python爬虫之协程

    2024-01-24 03:52:02       43 阅读