摸鱼大数据——Spark Structured Steaming——结构化流的编程综合案例

7、综合案例

需求: 已知文件中存储了多个单词,要求计算统计出现的次数

7.1 词频统计_读取文件方式
 # 导包
 import os
 from pyspark.sql import SparkSession,functions as F
 ​
 # 绑定指定的python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder\
         .config('spark.sql.shuffle.partitions',1)\
         .appName('pyspark_demo')\
         .master('local[*]')\
         .getOrCreate()
 ​
     # 2.数据输入
     # 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读
     df = spark.readStream\
         .format('text')\
         .load('file:///export/data/spark_project/structured_Streaming/data/')
 ​
     # 查看数据类型
     print(type(df))
     # 3.数据处理(切分,转换,分组聚合)
     # 和SparkSQL操作一模一样,支持sql和dsl两种风格
     # SQL方式
     df.createTempView('tb')
     sql_df = spark.sql("""
         select words,count(1) as cnt
         from (
           select explode(split(value,' ')) as words from tb
         ) t group by words
     """)
     # DSL方式
     dsl_df = df.select(
         F.explode(F.split('value',' ')).alias('words')
     ).groupBy('words').agg(
         F.count('words').alias('cnt')
     )
 ​
     # 4.数据输出
     # 注意: 输出不能使用原来sparksql的show()
     # 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可
     sql_df.writeStream.format('console').outputMode('complete').start()
     dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
     # 5.关闭资源
     spark.stop()

7.2 词频统计_Socket方式
 # 导包
 import os
 from pyspark.sql import SparkSession,functions as F
 ​
 # 绑定指定的python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder\
         .config('spark.sql.shuffle.partitions',1)\
         .appName('pyspark_demo')\
         .master('local[*]')\
         .getOrCreate()
 ​
     # 2.数据输入
     # 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读
     df = spark.readStream\
         .format('socket')\
         .option('host',"192.168.88.161")\
         .option('port',"55555")\
         .load()
 ​
     # 查看数据类型
     print(type(df))
     # 3.数据处理(切分,转换,分组聚合)
     # 和SparkSQL操作一模一样,支持sql和dsl两种风格
     # SQL方式
     df.createTempView('tb')
     sql_df = spark.sql("""
         select words,count(1) as cnt
         from (
           select explode(split(value,' ')) as words from tb
         ) t group by words
     """)
     # DSL方式
     dsl_df = df.select(
         F.explode(F.split('value',' ')).alias('words')
     ).groupBy('words').agg(
         F.count('words').alias('cnt')
     )
 ​
     # 4.数据输出
     # 注意: 输出不能使用原来sparksql的show()
     # 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可
     sql_df.writeStream.format('console').outputMode('complete').start()
     dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
     # 5.关闭资源
     spark.stop()
 ​

7.3 自动生成数据_Rate方式
 from pyspark.sql import SparkSession
 import os
 ​
 os.environ["SPARK_HOME"] = "/export/server/spark"
 os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
 os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
 ​
 if __name__ == '__main__':
     # 1.创建SparkSession对象
     spark = SparkSession.builder \
         .appName("StructuredStream_rate") \
         .master('local[*]') \
         .getOrCreate()
         
     # 2。读取数据
     df = spark.readStream \
         .format('rate') \
         .option("rowsPerSecond", "5") \
         .option('numPartitions', 1) \
         .load()
 ​
     # 3.数据处理
     # 略
 ​
     # 4.数据输出:
     df.writeStream \
         .format('console') \
         .outputMode('update') \
         .option('truncate', 'false') \
         .start() \
         .awaitTermination()
 ​
     # 5.关闭资源
     spark.stop()

相关推荐

  1. 数据——Hive调优10-12

    2024-07-21 05:12:03       23 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-21 05:12:03       49 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-21 05:12:03       53 阅读
  3. 在Django里面运行非项目文件

    2024-07-21 05:12:03       42 阅读
  4. Python语言-面向对象

    2024-07-21 05:12:03       53 阅读

热门阅读

  1. 渗透测试过程中如何做好个人防护?

    2024-07-21 05:12:03       18 阅读
  2. [ptrade交易实战] 第十七篇 期货交易类函数!

    2024-07-21 05:12:03       21 阅读
  3. 【C++11】initializer_list、可变参数模板详解

    2024-07-21 05:12:03       20 阅读
  4. 踏进互动科技世界使用Arduino

    2024-07-21 05:12:03       16 阅读
  5. 第五节shell脚本中的运行流程控制(1)(2)

    2024-07-21 05:12:03       18 阅读
  6. Oracle外键约束的三种删除行为

    2024-07-21 05:12:03       16 阅读
  7. SpringBoot整合ElasticSearch

    2024-07-21 05:12:03       18 阅读
  8. 分层评估的艺术:sklearn中的策略与实践

    2024-07-21 05:12:03       20 阅读