SpringBoot集成flink

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令

nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 添加 Flink 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>1.17.0</version>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.et.flink.job.SocketJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

二、SoketJob

public class SocketJob{
	
	public static void main(String[] args)throws Exception{
		
		// 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 指定并行度,默认电脑线程数
        env.setParallelism(3);
        // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
        DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);

        // 处理数据: 切换、转换、分组、聚合 得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
//                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);


        // 输出
        sum.print();

        // 执行
        env.execute();
	}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

在这里插入图片描述

集群执行:
执行maven打包,将打包的jar上传到集群中
在这里插入图片描述

相关推荐

  1. springboot集成mybatis-plus

    2024-03-11 00:12:03       52 阅读
  2. springboot集成字典注解

    2024-03-11 00:12:03       56 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-11 00:12:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-11 00:12:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-03-11 00:12:03       82 阅读
  4. Python语言-面向对象

    2024-03-11 00:12:03       91 阅读

热门阅读

  1. 1672.最富有的客户的资产总量

    2024-03-11 00:12:03       43 阅读
  2. ArrayLIst和linkedlist的区别

    2024-03-11 00:12:03       49 阅读
  3. Linux acl权限

    2024-03-11 00:12:03       45 阅读
  4. (力扣题库)跳跃游戏II(C++)

    2024-03-11 00:12:03       36 阅读
  5. 力扣爆刷第90天之hot100五连刷36-40

    2024-03-11 00:12:03       44 阅读
  6. Android 中为什么要使用 Binder 机制?

    2024-03-11 00:12:03       38 阅读
  7. Docker入门简介

    2024-03-11 00:12:03       38 阅读
  8. HTML知识点

    2024-03-11 00:12:03       43 阅读
  9. gitlab仓库迁移

    2024-03-11 00:12:03       40 阅读
  10. 2024.3.7 简易网络聊天室

    2024-03-11 00:12:03       37 阅读