Flink算子简单测试样例

Flink算子简单测试样例

1. 创建执行环境
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

2. 创建数据流
        // 创建数据流
        DataStream<String> source = env.addSource(new DataGeneratorSource<>(new DataGenerator<String>() {
   
            final int CNT = 10000; // 模拟一万条数
            int i = 0;

            @Override
            public void open(String s, FunctionInitializationContext functionInitializationContext, RuntimeContext runtimeContext) throws Exception {
   }

            @Override
            public boolean hasNext() {
   
                return i < CNT;
            }

            @Override
            public String next() {
   
                i++;
                try {
   
                    Thread.sleep(new Random().nextInt(2000)); // 随机发生时间
                } catch (InterruptedException e) {
   
                }
                return "" + i;
            }
        })).returns(String.class).uid("source").name("source");

3. 数据补充
        // 数据补充-添加时间戳,增加金额
        SingleOutputStreamOperator<Map<String, String>> mapOperator = source.map((MapFunction<String, Map<String, String>>) s -> {
   
            HashMap<String, String> hashMap = new HashMap<>();
            hashMap.put("userid", s);
            hashMap.put("amt", new Random().nextInt(100) + "");
            hashMap.put("time", System.currentTimeMillis() + "");
            return hashMap;
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
   
        })).uid("mapOperator").name("mapOperator");

4. 数据过滤
        // 数据过滤-只取时间戳为偶数的数据
        SingleOutputStreamOperator<Map<String, String>> filterOperator = mapOperator.filter((FilterFunction<Map<String, String>>) data -> {
   
//                System.out.println("从mapOperator接到数据:" + data);
            long time = Long.parseLong(data.get("time"));
            return time % 2 == 0;
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
   
        })).uid("filterOperator").name("filterOperator");

5. 数据放大
        // 数据放大-时间戳是4的倍数,双倍奖励,8的倍数,三倍奖励
        SingleOutputStreamOperator<Map<String, String>> flatMapOperator = filterOperator.flatMap((FlatMapFunction<Map<String, String>, Map<String, String>>) (data, collector) -> {
   
            collector.collect(data);
            if (Long.parseLong(data.get("time")) % 4 == 0) {
   
                collector.collect(data);
            }
            if (Long.parseLong(data.get("time")) % 8 == 0) {
   
                collector.collect(data);
            }
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
   
        })).uid("flatMapOperator").name("flatMapOperator");

6. 数据输出
        // 数据输出
        flatMapOperator.print();

        // 执行程序
        env.execute("FlinkTest");

7. 执行结果
{
   amt=45, time=1705048891056, userid=4}
{
   amt=45, time=1705048891056, userid=4}
{
   amt=45, time=1705048891056, userid=4}
{
   amt=56, time=1705048894374, userid=6}
{
   amt=96, time=1705048899462, userid=10}
{
   amt=65, time=1705048901638, userid=12}
{
   amt=33, time=1705048902544, userid=13}
{
   amt=33, time=1705048902544, userid=13}
{
   amt=33, time=1705048902544, userid=13}
{
   amt=10, time=1705048903748, userid=14}
{
   amt=10, time=1705048903748, userid=14}
...

Process finished with exit code 0

相关推荐

  1. Flink算子简单测试

    2024-01-13 11:58:02       38 阅读
  2. C#中UDP的简单使用+

    2024-01-13 11:58:02       39 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-13 11:58:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-13 11:58:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-13 11:58:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-13 11:58:02       18 阅读

热门阅读

  1. 在矩阵回溯中进行累加和比较的注意点

    2024-01-13 11:58:02       35 阅读
  2. 数据分析---SQL(2)

    2024-01-13 11:58:02       36 阅读
  3. Python修改二值图像某特定颜色

    2024-01-13 11:58:02       34 阅读
  4. 微服务入门介绍(一)

    2024-01-13 11:58:02       25 阅读
  5. 编程笔记 html5&css&js 037 CSS选择器

    2024-01-13 11:58:02       25 阅读
  6. textarea文本框根据输入内容自动适应高度

    2024-01-13 11:58:02       30 阅读
  7. Linux部署excalidraw-cn白板

    2024-01-13 11:58:02       33 阅读