Flink 计数器Accumulator

简述

在 Apache Flink 中,Accumulator 是一个用于收集作业执行期间聚合信息的工具。它允许在用户定义的函数(如 MapFunction, FlatMapFunction, ProcessFunction 等)中累积值,并在作业完成后检索这些值。这对于跟踪诸如事件数量、处理延迟等统计信息非常有用。

要使用 Accumulator,需要首先定义一个 Accumulator 接口的实现,然后在用户定义函数中注册和使用它。

1. 定义 Accumulator:

通常,不需要直接定义 Accumulator 接口的实现,因为 Flink 已经为提供了一些内置的 Accumulator 类型,如 IntCounter, LongCounter, DoubleCounter 等。但如果需要自定义的聚合逻辑,可以实现 Accumulator 接口。

2. 在函数中使用 Accumulator:

在用户定义函数中,可以通过 getRuntimeContext().getAccumulator(“name”) 来获取或注册一个 Accumulator。然后,可以在逻辑中更新它的值。

3. 检索 Accumulator 的值:

在作业执行完成后,可以通过 JobExecutionResult 的 getAccumulatorResult() 方法来检索 Accumulator 的值。

但请注意,由于 Accumulator 已经被 Metric 系统所取代,以下是一个使用 Metric 的示例,它提供了类似的功能:

import org.apache.flink.api.common.functions.RuntimeContext;  
import org.apache.flink.metrics.Counter;  
import org.apache.flink.streaming.api.functions.source.SourceFunction;  
  
public class MySourceFunction implements SourceFunction<String> {  
  
    private transient Counter counter;  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        super.open(parameters);  
        // 获取或注册一个 Counter  
        this.counter = getRuntimeContext().getMetricGroup().counter("my-counter");  
    }  
  
    @Override  
    public void run(SourceContext<String> ctx) throws Exception {  
        // ... 数据源逻辑 ...  
  
        // 更新 Counter 的值  
        counter.inc();  
  
        // 发送数据到下游  
        ctx.collect("some data");  
    }  
  
    // ... 其他必要的方法 ...  
}

在这个示例中,我们使用了 Flink 的 Metric 系统来创建一个计数器,并在数据源函数中更新它的值。这样,就可以在作业执行期间跟踪和检索这个计数器的值了。

代码

package com.wfg.flink.connector.mongodb;


import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;

import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;

/**
 * @author wfg
 * 根据名字统计访问次数
 */
public class MongoAccumulatorCounts {

    public static void main(String[] args) throws Exception {
        String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");
        System.out.println("StartTime:" + startTime);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启Checkpointing,设置Checkpoint间隔
        env.enableCheckpointing(30000);
        // 设置Checkpoint模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置最小Checkpoint间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置最大并发Checkpoint数目
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 使用RocksDB作为状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.setParallelism(10);

        // 配置MongoDB源
        MongoSource<String> mongoSource = MongoSource.<String>builder()
                .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
                .setDatabase("sjzz")
                .setCollection(MONGO_TEST_PV_COLLECTION)
                .setFetchSize(2048)
//                .setLimit(10000)
                .setNoCursorTimeout(true)
                .setPartitionStrategy(PartitionStrategy.SINGLE)
                .setPartitionSize(MemorySize.ofMebiBytes(64))
                .setDeserializationSchema(new MongoDeserializationSchema<>() {
                    @Override
                    public String deserialize(BsonDocument document) {
                        return document.toJson();
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                })
                .build();
        // 创建MongoDB数据流
        DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为Key
        DataStream<Tuple2<String, Integer>> nameCountStream = sourceStream
                .map(new RichMapFunction<String, Tuple2<String, Integer>>() {
                    private LongCounter elementCounter = new LongCounter();
                    Long count = 0L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        //-2注册累加器
                        getRuntimeContext().addAccumulator("elementCounter", elementCounter);
                    }

                    @Override
                    public Tuple2<String, Integer> map(String value) {
                        KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);
                        //-3.使用累加器
                        this.elementCounter.add(1);
                        count += 1;
                        System.out.println("不使用累加器统计的结果:" + count);
                        return Tuple2.of(data.getUserName(), 1);
                    }
                }).setParallelism(10);
//                .keyBy(value->value.f0)
//                 .sum("f1");
        sourceStream.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
//-4.获取加强结果
        JobExecutionResult jobResult = env.execute();
        long nums = jobResult.getAccumulatorResult("elementCounter");
        System.out.println("使用累加器统计的结果:" + nums);
        System.out.println("-----------------------------------");
        System.out.println("startTime: " + startTime);
        System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));
    }
}

相关推荐

  1. Flink 计数器Accumulator

    2024-06-18 21:30:07       6 阅读
  2. Flink去重计数统计用户数

    2024-06-18 21:30:07       47 阅读
  3. gradient accumulate举例子解释

    2024-06-18 21:30:07       38 阅读
  4. <span style='color:red;'>Flink</span>

    Flink

    2024-06-18 21:30:07      35 阅读
  5. Spark如何用累加器Accumulator收集日志

    2024-06-18 21:30:07       29 阅读
  6. 【LeetCode】2620. 计数器

    2024-06-18 21:30:07       36 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-18 21:30:07       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-18 21:30:07       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-18 21:30:07       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-18 21:30:07       20 阅读

热门阅读

  1. MySQL触发器基本结构

    2024-06-18 21:30:07       8 阅读
  2. Roboflow对YOLO数据集、标注、训练、下载

    2024-06-18 21:30:07       8 阅读
  3. Bean 的生命周期

    2024-06-18 21:30:07       5 阅读
  4. web前端开发哪个城市:探索最佳发展地

    2024-06-18 21:30:07       9 阅读
  5. Linux中的进程控制

    2024-06-18 21:30:07       7 阅读
  6. 高并发系统中面临的问题 及 解决方案

    2024-06-18 21:30:07       4 阅读
  7. Vue 3 的 setup 函数使用及避坑指南

    2024-06-18 21:30:07       6 阅读
  8. leetcode-11-二叉树前中后序遍历以及层次遍历

    2024-06-18 21:30:07       6 阅读
  9. API接口被刷 如何解决

    2024-06-18 21:30:07       6 阅读
  10. 机器学习中的神经网络入门

    2024-06-18 21:30:07       6 阅读
  11. C++中的访问者模式

    2024-06-18 21:30:07       4 阅读
  12. 双指针练习:和为s的两个数字

    2024-06-18 21:30:07       5 阅读
  13. CVPR2024 分割Segmentation相关论文37篇速览

    2024-06-18 21:30:07       6 阅读