FlinkSql一个简单的测试程序

FlinkSql一个简单的测试程序

以下是一个简单的 Flink SQL 示例,展示了如何使用 Flink Table API 和 Flink SQL 进行基本的数据流处理。


  1. 定义数据实体 CC :
    - CC 类表示数据流中的元素,包含两个字段: character (字符)和 count (计数)。
    - 提供了无参构造函数和带参构造函数,用于创建 CC 对象。
    // 1. 定义数据实体
    public static class CC {
   
        public String character;
        public long count;

        public CC() {
   
        }

        public CC(String character, long count) {
   
            this.character = character;
            this.count = count;
        }
    } 

  1. 创建执行环境并模拟数据流:
    - 创建了 Flink 执行环境 StreamExecutionEnvironment 和 StreamTableEnvironment 。
    - 创建了一个包含字符串元素的数据流 inputStream ,其中包括 “hello”, “world” 和 “!!!”。
        // 2. 创建执行环境并模拟数据流
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        EnvironmentSettings environmentSettings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);

        DataStream<String> inputStream = env.fromElements(
                "hello",
                "world",
                "!!!"
        ).uid("source").name("source");

  1. 对数据流进行 flatMap 操作:
    - 使用 flatMap 对每个输入字符串进行拆分,并将每个字符映射为一个 CC 对象。
        // 3. 对数据流进行flatMap()操作
        SingleOutputStreamOperator<CC> streamOperator = inputStream.flatMap(new FlatMapFunction<String, CC>() {
   
            @Override
            public void flatMap(String value, Collector<CC> out) throws Exception {
   
                for (char c : value.toCharArray()) {
   
                    out.collect(new CC(c + "",1L));
                }
            }
        });

  1. 将数据流转为 Table :
    - 使用 tableEnv.fromDataStream 将 streamOperator 转换为一个 Table 对象。
        // 4. 将数据流转为Table
        Table table = tableEnv.fromDataStream(streamOperator);

  1. 使用 Table API 操作数据流:
    - 对 table 进行选择和过滤操作,保留字符不为空的记录。
    - 对过滤后的数据进行分组,并计算每个字符的计数总和,将结果存储在 result 中。
        // 5. 使用tableApi操作数据流,并输出结果
        Table filter = table
                .select($("character"), $("count"))
                .filter($("character").isNotEqual(""));
        Table result = filter
                .groupBy($("character"))
                .select($("character"), $("count").sum().as("character_count"));

        tableEnv.toRetractStream(result, Row.class).print();

  1. 使用 Flink SQL 操作数据流:
    - 将 table 注册为临时视图 “CC”。
    - 执行 SQL 查询,对 “CC” 进行分组,计算每个字符的计数总和,并将结果存储在 result2 中。
        // 6. 使用FlinkSql操作数据流,并输出结果
        tableEnv.createTemporaryView("CC", table);
        Table result2 = tableEnv.sqlQuery("SELECT `character`, SUM(`count`) FROM CC group by `character`");

        tableEnv.toRetractStream(result2, Row.class).print();

  1. 执行任务:
    - 使用 env.execute(“Flink Sql Test”) 启动 Flink 作业,处理数据流并输出结果。
        // 7.执行任务
        env.execute("Flink Sql Test");

  1. 执行结果:
(true,+I[h, 1])
(true,+I[e, 1])
(true,+I[l, 1])
(false,-U[l, 1])
(true,+U[l, 2])
(true,+I[o, 1])
(true,+I[w, 1])
(false,-U[o, 1])
(true,+U[o, 2])
(true,+I[r, 1])
(false,-U[l, 2])
(true,+U[l, 3])
(true,+I[d, 1])
(true,+I[!, 1])
(false,-U[!, 1])
(true,+U[!, 2])
(false,-U[!, 2])
(true,+U[!, 3])

Process finished with exit code 0

通过这段代码,您可以了解如何使用 Flink Table API 和 Flink SQL 对数据流进行简单的处理和分析,包括数据拆分、选择、过滤、分组和计算。最后,通过 toRetractStream 方法将结果打印输出。

相关推荐

  1. FlinkSql一个简单测试程序

    2024-02-20 15:28:03       25 阅读
  2. 一个简单程序

    2024-02-20 15:28:03       9 阅读
  3. 【Flink】FlinkSQLDataGen连接器(测试利器)

    2024-02-20 15:28:03       30 阅读
  4. 微信小程序实现一个简单登录功能

    2024-02-20 15:28:03       37 阅读
  5. <span style='color:red;'>FlinkSQL</span>

    FlinkSQL

    2024-02-20 15:28:03      30 阅读
  6. flinksql

    2024-02-20 15:28:03       12 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-20 15:28:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-20 15:28:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-20 15:28:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-20 15:28:03       18 阅读

热门阅读

  1. Python基础笔记11

    2024-02-20 15:28:03       25 阅读
  2. python将pdf转换成图片

    2024-02-20 15:28:03       26 阅读
  3. 【层序遍历】429. N 叉树的层序遍历

    2024-02-20 15:28:03       28 阅读
  4. docker的底层原理

    2024-02-20 15:28:03       27 阅读
  5. Spring源码笔记之SpringIOC--(3)什么是BeanFactory?

    2024-02-20 15:28:03       29 阅读
  6. Android 应用使用情况统计

    2024-02-20 15:28:03       32 阅读
  7. Vue:Vuex模块化编码(非常实用)

    2024-02-20 15:28:03       27 阅读
  8. 嵌出式------001

    2024-02-20 15:28:03       26 阅读
  9. docker 运行 tdengine 并且mybatis 连接

    2024-02-20 15:28:03       27 阅读
  10. python用websockets创建服务端websocket创建客户端

    2024-02-20 15:28:03       28 阅读
  11. 基于python+mysql的宠物领养网站系统

    2024-02-20 15:28:03       27 阅读
  12. Python 进阶语法:正则表达式

    2024-02-20 15:28:03       27 阅读