详解 Spark 核心编程之 RDD 持久化

一、问题引出

/**
案例:对同一份数据文件分别做 WordCount 聚合操作和 Word 分组操作
期望:针对数据文件只进行一次分词、转换操作得到 RDD 对象,然后再对该对象分别进行聚合和分组,实现数据重用
*/
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        // 聚合操作
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        // 分组操作
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
    }
}

/**
结果:flatRdd.map过程在聚合时和分组时分别都执行了,说明针对数据文件的分词、转换操作被重复执行了,只有对象被重用,而数据没有被重用
解析:
	1.RDD是不会存储数据的,当某个 RDD 转换成新的 RDD 后,该 RDD 中的数据就没有了
	2.如果需要再次用到该 RDD 的数据,则需要从数据源开始重新执行到该 RDD 来获取数据
解决:针对某个需要被重复使用的 RDD 对象在其进行下一步操作时先将数据进行缓存持久化或checkpoint,后续的其它操作从缓存持久化或checkpoint中获取数据
*/

二、RDD Cache

/**
缓存或持久化方法:
	1.rdd.cache():底层调用 persist() 方法,默认是将数据保存到 JVM 堆内存中
	2.rdd.persist(StorageLevel):可以指定数据的保存级别
说明:
	1.持久化方法被调用时不会立即进行缓存,而是在触发action算子时,数据才会被缓存在计算节点的内存中
	2.缓存除了用于数据重用,还可以提高容错性
*/
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        //mapRdd.cache()
        mapRdd.persist()
        
        // 聚合操作
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        // 分组操作
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
        
        /*
        结果:聚合和分组前的操作过程只执行了一遍,实现了数据重用
        */
    }
}

// 存储级别
object StorageLevel {
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) // 副本
    val MEMORY_ONLY = new StorageLevel(false, true, false, true) // 内存不足丢弃数据
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) // 内存不足溢写磁盘
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

三、RDD CheckPoint

/**
方法:rdd.checkpoint(),将 RDD 中间结果写入磁盘
说明:
	1.对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发
	2.checkpoint保存由于在job执行完不会被删除,所以必须指定保存路径,一般保存在分布式文件系统
*/
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        // 指定checkpoint保存路径
        sc.setCheckpointDir("checkpoint")
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        mapRdd.checkpoint()
        
        // 聚合操作
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        // 分组操作
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
        
        /*
        结果:聚合和分组前的操作过程只执行了一遍,实现了数据重用
        */
    }
}

四、缓存和检查点区别

  • cache 和 persist 会在原有的血缘关系中添加新的依赖,一旦数据出错可以重头读取数据;checkpoint 检查点会切断原有的血缘关系,重新建立新的血缘关系,相当于改变数据源
  • cache 是将数据临时存储在 JVM 堆内存中,性能较高,但安全性低,persist 可以指定存储级别,将数据临时存储在磁盘文件中,涉及到 IO,性能较低,作业执行完毕后临时文件会被删除;checkpoint 是将数据长久地存储分布式文件系统中,安全性较高,但涉及 IO 且会独立开启一个作业从数据源开始获取数据,所以性能较低,一般在 checkpoint 前先进行 cache,当 checkpoint 时 job 只需从缓存中读取数据即可,可以提高性能

相关推荐

  1. 详解 Spark 核心编程 RDD 持久

    2024-06-05 20:00:07       35 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-05 20:00:07       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-05 20:00:07       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-05 20:00:07       82 阅读
  4. Python语言-面向对象

    2024-06-05 20:00:07       91 阅读

热门阅读

  1. 【HarmonyOS】应用通知广播的使用

    2024-06-05 20:00:07       34 阅读
  2. Vue基础(2)响应式基础

    2024-06-05 20:00:07       30 阅读
  3. AIGC如何改变人类生活20240529

    2024-06-05 20:00:07       30 阅读
  4. Oracle拉链表

    2024-06-05 20:00:07       29 阅读
  5. PostgreSQL 和Oracle锁机制对比

    2024-06-05 20:00:07       33 阅读
  6. Neo4J中构建的知识图谱,如何使用推理算法

    2024-06-05 20:00:07       31 阅读
  7. EasyExcel实现导入导出

    2024-06-05 20:00:07       25 阅读
  8. QT常用快捷键

    2024-06-05 20:00:07       30 阅读
  9. 华为欧拉 openEuler 23.09 一键安装 Oracle 12CR2 单机

    2024-06-05 20:00:07       28 阅读
  10. go语言进阶 包

    2024-06-05 20:00:07       28 阅读
  11. [12] 使用 CUDA 加速排序算法

    2024-06-05 20:00:07       27 阅读
  12. 将vector/array从非托管c++传递到c#

    2024-06-05 20:00:07       32 阅读
  13. ubuntu使用Docker笔记

    2024-06-05 20:00:07       35 阅读