IDEA编写各种WordCount运行

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name>
    <url>http://maven.apache.org</url>

    <!-- 定义的一些常量 -->
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <spark.version>3.3.2</spark.version>
        <scala.version>2.12.15</scala.version>
    </properties>

    <dependencies>
        <!-- scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <!-- 配置Maven的镜像库 -->
    <!-- 依赖下载国内镜像库 -->
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <layout>default</layout>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </repository>
    </repositories>

    <!-- maven插件下载国内镜像库 -->
    <pluginRepositories>
        <pluginRepository>
            <id>ali-plugin</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount")
    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 将单词和1组合放入到元组中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用本地模式运行Spark程序(开发调试的时候使用)
 */
object LocalWordCount {
  def main(args: Array[String]): Unit = {
    // 指定当前用户为root
    System.setProperty("HADOOP_USER_NAME", "root")

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程

    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(line => {
      val words = line.split(" ")
      println(words)  // debug
      words
    })

    // 将单词和1组合放入到元祖中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                String[] words = lines.split("\\s+");
                return Arrays.asList(words).iterator();
            }
        });

        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                return tp.swap(); // 交换
            }
        });

        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);

        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaLambdaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);
        // 排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        // 调回来
        JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);
        result.saveAsTextFile(args[1]);
        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~

相关推荐

  1. Idea可以运行Python!

    2024-03-12 09:22:05       9 阅读
  2. PyTorch中各种求和运算

    2024-03-12 09:22:05       41 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-12 09:22:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-12 09:22:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-12 09:22:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-12 09:22:05       20 阅读

热门阅读

  1. 用python实现选择排序

    2024-03-12 09:22:05       29 阅读
  2. 软考笔记--层次式架构之表现层框架设计

    2024-03-12 09:22:05       20 阅读
  3. CSS note

    2024-03-12 09:22:05       27 阅读
  4. Spring Boot面试系列-02

    2024-03-12 09:22:05       22 阅读
  5. kuberadm搭建k8s集群

    2024-03-12 09:22:05       17 阅读
  6. 第九节 JDBC数据类型

    2024-03-12 09:22:05       23 阅读
  7. Spring Boot实现热部署有哪几种方式

    2024-03-12 09:22:05       18 阅读