Spark-Scala语言实战(7)

在之前的文章中,我们学习了如何在IDEA中导入jars包,并做了一道例题,了解了RDD。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(6)-CSDN博客文章浏览阅读695次,点赞15次,收藏24次。今天我会给大家带来如何在IDEA中导入jars包,以及使用SparkRDD,并正确使用它们同时也会给大家讲解一道实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/137121524?spm=1001.2014.3001.5502

今天开始的文章,我会带给大家如何在spark的中使用我们的RDD方法,今天学习RDD方法中的map,sortby,collect三种方法。

目录

一、知识回顾

二、RDD方法

1.map

2.sortby

3.collect

拓展-RDD和DStream

1.RDD和DStream的区别

2.RDD和DStream的联系


一、知识回顾

导入jars包的过程在上一篇文章中以及讲解的很清楚了,图文一步一步带着做。

主要就是进入Libraries 添加java,然后选择spark的jars文件夹即可

如果还有不懂的朋友可以直接评论问我。

在就是文件的这几行代码

import org.apache.spark.{SparkConf, SparkContext}

 val conf=new SparkConf().setMaster("local").setAppName("123456")
    val sc=new SparkContext(conf)

这是配置与方法,记住它们的作用。

现在,开始今天的学习吧

二、RDD方法

1.map

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

例:

import org.apache.spark.{SparkConf, SparkContext}  
  
// 定义一个名为p1的Scala对象  
object p1 {  
  // 定义main方法,作为程序的入口点  
  def main(args: Array[String]): Unit = {  
    // 创建一个Spark配置对象,并设置运行模式为"local"(本地模式),应用程序名称为"p2"  
    val conf = new SparkConf().setMaster("local").setAppName("p2")  
      
    // 使用Spark配置对象创建一个SparkContext对象,SparkContext是Spark功能的入口点  
    val sc = new SparkContext(conf)  
      
    // 创建一个包含整数的列表,并使用parallelize方法将其转换为RDD  
    val ppp = sc.parallelize(List(1, 2, 3, 4, 5))  
      
    // 使用map操作将RDD中的每个元素乘以2,并返回一个新的RDD  
    val ppppp = ppp.map(x => x * 2)  
      
    //oreach方法遍历并打印每个元素  
    ppppp.collect().foreach(println)  
         
  }  
}

可以看到我们输出的在原列表上*2,达到了代码预期效果

2.sortby

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  • 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  • 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

例:

import org.apache.spark.{SparkConf, SparkContext}  
    
object p1 {   
  def main(args: Array[String]): Unit = {     
    val conf = new SparkConf().setMaster("local").setAppName("p2")  
      
    // 使用配置好的conf对象创建一个SparkContext对象sc。   
    val sc = new SparkContext(conf)  
    // 使用SparkContext的parallelize方法将包含整数的序列转换成一个RDD。  
    // 这个RDD现在可以在Spark上并行处理。  
    val ppp = sc.parallelize(Seq(5, 1, 9, 3, 7))  
    // 对ppp RDD中的元素进行排序。  
    // 使用sortBy方法,并传递一个函数x => x作为参数,表示按照元素本身的值进行排序(升序)。  
    val pppp = ppp.sortBy(x => x)   
    // 这将返回一个包含RDD所有元素的数组,存储在ppppp中。  
    val ppppp = pppp.collect()  
      
    // 使用foreach方法遍历数组ppppp中的每个元素,并使用println函数打印它们。  
    ppppp.foreach(println)  

  }  
}

看下输出可以看到我们的元素已经排序了

3.collect

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

例:

import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("p2")
    val sc=new SparkContext(conf)
    val pp = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val ppp = pp.collect()
    ppp.foreach(println)
  }
}

collect的作用是将RDD中的数据收集到驱动程序中,所以这里运行看不出区别。

拓展-RDD和DStream

在上一篇文章中,我们了解到了RDD,那么DStream是什么呢,我们先来了解一下:

DStream(离散流)是Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream的内部实际上是一系列持续不断产生的RDD,每个RDD包含特定时间间隔的数据。DStream的创建可以通过输入数据源如Kafka、Flume,或者通过对其他DStream应用高阶函数如map、reduce、join、window来实现。

1.RDD和DStream的区别

RDD DStream
定义 弹性分布式数据集,是Spark中最基本的数据处理模型。 离散流,是Spark Streaming提供的一种高级抽象,代表一个持续不断的数据流。
数据结构 静态的、不可变的数据集,可以划分为多个分区。 动态的、连续的数据流,内部由一系列RDD组成。
数据处理方式 批处理,适用于静态数据的处理和分析。 流处理,适用于实时数据流的处理和分析。
时间维度 无特定的时间维度,主要关注数据的分区和处理。 具有时间维度,每个RDD代表一段时间内的数据。
操作方式 对整个RDD进行操作,结果生成新的RDD。 对DStream进行操作,结果生成新的DStream,底层转换为RDD操作。
应用场景 大规模数据的批处理任务,如机器学习、数据挖掘等。 实时数据流处理任务,如日志分析、实时监控等。
容错性 具有容错性,数据丢失可以自动恢复。 继承了RDD的容错性特点。
与Spark的关系 Spark的核心组件,用于构建各种数据处理和分析任务。 Spark Streaming的核心组件,用于处理实时数据流。

2.RDD和DStream的联系

RDD DStream
基础构建单元 RDD是Spark的基本数据处理单元。 DStream基于RDD构建,每个时间间隔内的数据对应一个RDD。
计算模型 RDD支持分布式计算模型,数据被划分为多个分区进行并行处理。 DStream继承了RDD的计算模型,对流数据进行分布式处理。
容错性 RDD具有容错性,可以自动恢复丢失的数据。 DStream同样具有容错性,因为它基于RDD构建。
操作方式 RDD提供了一系列转换操作(如map、reduce)和动作操作(如collect、save)。 DStream也提供了类似的操作,这些操作最终会转换为底层RDD的操作。
数据处理能力 RDD适用于批处理任务,可以对大规模数据集进行处理和分析。 DStream适用于实时流处理任务,可以对连续的数据流进行实时分析和处理。
底层实现 DStream内部实际上是由一系列RDD组成的,每个RDD代表一段时间内的数据。 DStream的操作最终会转换为RDD的操作,利用RDD的分布式计算能力。
扩展性 RDD可以通过自定义操作进行扩展,支持更多的数据处理场景。 DStream同样可以通过自定义操作和转换函数进行扩展,以满足特定的实时处理需求

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-03-30 07:56:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-30 07:56:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-30 07:56:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-30 07:56:05       20 阅读

热门阅读

  1. 2024年github开源top100中文

    2024-03-30 07:56:05       20 阅读
  2. Qt学习建议

    2024-03-30 07:56:05       17 阅读
  3. linux正则表达式之*

    2024-03-30 07:56:05       18 阅读
  4. 机器视觉学习(九)—— 边缘检测

    2024-03-30 07:56:05       19 阅读
  5. BIM自动化简介

    2024-03-30 07:56:05       20 阅读
  6. VUE——mixins混入

    2024-03-30 07:56:05       20 阅读
  7. 如何避免公网IP安全风险

    2024-03-30 07:56:05       16 阅读
  8. MongoDB聚合运算符:$last

    2024-03-30 07:56:05       20 阅读
  9. Linux内网提权

    2024-03-30 07:56:05       19 阅读
  10. 机器学习:scikit-learn库的主要组件

    2024-03-30 07:56:05       17 阅读
  11. Leetcode 15. 三数之和

    2024-03-30 07:56:05       19 阅读