Spark-Transformation以及Action开发实战

创建RDD

  • RDD是Spark的编程核心,在进行Spark编程是,首要任务就是创建一个初始的RDD
  • Spark提供三种创建RDD方式:集合、本地文件、HDFS文件
    • 集合:主要用于本地测试,在实际部署到集群运行之前,自己使用集合构造测试数据,测试Spark流程
    • 本地文件:临时性的处理工作
    • HDFS:最常用的生产上的方式

使用集合创建RDD

  • 通过SparkContext.parallelize方法将集合转化为RDD
  • 通过parallelize方法可以设置RDD的partition数量,Spark会为每一个partition运行一个task来处理


    public static void main(String[] args) {
   
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("CreateRDDArrayJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> list = Arrays.asList(1, 2, 3, 4);
        JavaRDD<Integer> rdd = sc.parallelize(list, 2);
        Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
   
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
   
                return v1 + v2;
            }
        });
        System.out.println("sum:" + sum);

    }
  def main(args: Array[String]): Unit = {
   
    val conf = new SparkConf();
    conf.setAppName("CreateRDDArray").setMaster("local")
    val context = new SparkContext(conf);
    val arr = Array(1,2,3,4)

    // 集合创建RDD
    val rdd = context.parallelize(arr, 2)
    val sum = rdd.reduce(_ + _)

    println("sum=:" + sum)
  }
  

在这里插入图片描述

使用本地文件以及HDFS文件创建RDD

  • 通过SparkContext.textFile方法创建RDD,这时的RDD就是一行一行的文件数据
  • textFile方法支持针对目录、压缩文件以及通配符的方式
  • 默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile手动设置分区数量,只能比Block多,不能比Block少

这个可以参考上一篇blog(文件路径可以是hdfs://hadoop01:9000/test/hello,也可以是本地路径):https://blog.csdn.net/Grady00/article/details/136736362

Transformation以及Action

  • Spark支持两种RDD操作:Transformation、Action
    • Transformation可以理解为转换的意思,表示针对RDD数据的一个转换操作,主要对已有的RDD创建一个新的RDD,常见的有MAP,flatMap,filter等。
    • Transformation的特性:lazy,如果一个Spark任务只定义了Transformation算子,即使执行这个任务,任务中的算子也不会真正执行,也就是Transformation算子是不会出发Spark任务执行的,只是记录了对RDD的一些操作,只有进行了Action操作之后所有的Transformation才会真正执行。Spark通过lazy这种特性,来执行底层的Spark任务执行的优化,避免产生过多的中间结果
    • Action可以理解为执行,出发任务执行的操作,主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,还可以把结果返回给Driver
    • Action特性:执行Action操作才会出发一个Spark Job的运行,从而触发这个Action之前所有的Transformation操作

不管是Transformation还是Action中的操作,我们都把它称为算子,比如map算子,reduce算子等等

Transformation开发

public static void main(String[] args) {
   
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("TransformationJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
    // 将RDD中的每个元素进行处理,一进一出
        mapOp(sc);
    // 对RDD的每个元素进行判断,返回true则保存
        filterOp(sc);
    // 与map类似,但是每个元素都可以返回一个或多个新元素
        flatMapOp(sc);
    // 根据key进行分组,每个key对应一个Iterable<value>
    //BN:15003 15005 
	//US:150001 15002 
	//IN:15004 	
        groupByKeyOp(sc);
        groupByKeyOp2(sc);
    // 对每个相同的key对应的value进行reduce操作
    //reduceByKeyOp result:(BN,2)
	//reduceByKeyOp result:(US,2)
	//reduceByKeyOp result:(IN,1)
        reduceByKeyOp(sc);
    // 对每个相同的key对应的value进行排序操作
        sortedByKeyOp(sc);
    // 对两个包含<key,value>对的RDD进行join操作
        joinOp(sc);
    // 对RDD中的元素去重
        distinctOp(sc);
    }

    /**
     * 分组
     * @param sc
     */
    private static void groupByKeyOp(JavaSparkContext sc) {
   
        Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(15002, "US");
        Tuple2<Integer, String> t3 = new Tuple2<>(15003, "BN");
        Tuple2<Integer, String> t4 = new Tuple2<>(15004, "IN");
        Tuple2<Integer, String> t5 = new Tuple2<>(15005, "BN");

        List<Tuple2<Integer, String>> list = Arrays.asList(t1, t2, t3, t4, t5);
        JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(list);
        rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
   
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
   
                return new Tuple2<>(tup._2, tup._1);
            }
        }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
   
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {
   
                String area = tup._1;
                System.out.print(area + ":");
                Iterable<Integer> id = tup._2;
                for (Integer item: id
                     ) {
   
                    System.out.print(item + " ");
                }
                System.out.println();
            }
        });
    }

    /**
     * 拆分
     * @param sc
     */
    private static void flatMapOp(JavaSparkContext sc) {
   
        JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("good work", "work hard", "tom good", "take it easy"));
        javaRDD.flatMap(new FlatMapFunction<String, String>() {
   
            @Override
            public Iterator<String> call(String line) throws Exception {
   
                String[] words = line.split(" ");

                return Arrays.asList(words).iterator();
            }
        }).

相关推荐

  1. spark sql实践开发后端引擎

    2024-03-17 11:36:03       49 阅读
  2. transformer实战

    2024-03-17 11:36:03       29 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-17 11:36:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-17 11:36:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-17 11:36:03       87 阅读
  4. Python语言-面向对象

    2024-03-17 11:36:03       97 阅读

热门阅读

  1. 码云使用 创建项目

    2024-03-17 11:36:03       45 阅读
  2. 比特币,区块链及相关概念简介(三)

    2024-03-17 11:36:03       44 阅读
  3. 容器只适用于微服务吗?

    2024-03-17 11:36:03       38 阅读
  4. docker的基本知识点

    2024-03-17 11:36:03       39 阅读
  5. stm32_f103c8点亮led(01)

    2024-03-17 11:36:03       43 阅读
  6. 1.AD域控如何强制删除不可以用域控服务器

    2024-03-17 11:36:03       38 阅读
  7. 如何调整服务器系统时间

    2024-03-17 11:36:03       42 阅读
  8. 【Http】三握四挥

    2024-03-17 11:36:03       46 阅读