[自研开源] MyData 数据集成之数据过滤 v0.7.2

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

概述

本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

使用场景

业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

  1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

    在这里插入图片描述

    例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

    以下代码是 提供数据 类型的任务执行过程:

    // 提供数据
    case MdConstant.DATA_PRODUCER:
        // 调用api 获取json
        String json = ApiUtil.read(taskInfo);
        // 将json按字段映射 解析为业务数据
        jobDataService.parseData(taskInfo, json);
        // 根据条件过滤数据
        jobDataFilterService.doFilter(taskInfo);
        // 保存业务数据
        jobDataService.saveTaskData(taskInfo);
        // 更新环境变量
        jobVarService.saveVarValue(taskInfo, json);
    	break;
    

    jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

    public void doFilter(TaskInfo task) {
        Assert.notNull(task);
    	// 获取业务数据
        List<Map> dataList = task.getProduceDataList();
        // 获取配置的过滤条件
        List<BizDataFilter> dataFilters = task.getDataFilters();
    
        if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {
            return;
        }
    
        // 定义新的数据集合,用于存储 过滤后的数据
        List<Map> filterDatas = ListUtil.toList();
        // 遍历数据,并进行过滤
        dataList.forEach(data -> {
    
            boolean isCorrect = false;
    
            for (BizDataFilter filter : dataFilters) {
                String key = filter.getKey();
                Object filterValue = filter.getValue();
                String op = filter.getOp();
    
                // 当数据中 不包含 过滤的字段名,则执行下一项过滤
                if (!data.containsKey(key)) {
                    continue;
                }
    
                // 当数据中 指定字段的值 无效,则过滤该数据
                Object dataValue = data.get(key);
                if (ObjectUtil.isNull(dataValue)) {
                    isCorrect = true;
                    break;
                }
    
                // 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效
                if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {
                    break;
                }
    
                String cDataValue = dataValue.toString();
                String cFilterValue = filterValue.toString();
                // 根据op类型,过滤数据
                switch (op) {
                    case MdConstant.DATA_OP_EQ:
                        // 等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);
                        break;
                    case MdConstant.DATA_OP_NE:
                        // 不等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);
                        break;
                    case MdConstant.DATA_OP_GT:
                        // 大于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);
                        break;
                    case MdConstant.DATA_OP_GTE:
                        // 大于等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);
                        break;
                    case MdConstant.DATA_OP_LT:
                        // 小于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);
                        break;
                    case MdConstant.DATA_OP_LTE:
                        // 小于等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);
                        break;
    
                    default:
                        throw new RuntimeException("JobDataFilter: 不支持的过滤操作");
                }
            }
    
            // 当 未被过滤,则添加到过滤结果
            if (isCorrect) {
                filterDatas.add(data);
            }
        });
    
        task.setProduceDataList(filterDatas);
    
        task.appendLog("过滤前的业务数据:{}", dataList);
        task.appendLog("过滤条件:{}", dataFilters);
        task.appendLog("过滤后的业务数据:{}", filterDatas);
    }
    

    注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

  2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

    在这里插入图片描述

    以下代码是 消费数据 类型任务的执行过程:

    // 消费数据
    case MdConstant.DATA_CONSUMER:
        List<BizDataFilter> filters = taskInfo.getDataFilters();
        if (CollUtil.isNotEmpty(filters)) {
            // 解析过滤条件值中的 自定义字符串
            parseFilterValue(filters);
            // 排除值为null的条件
            filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
        }
        String dataCode = taskInfo.getDataCode();
        if (StrUtil.isNotEmpty(dataCode)) {
            // 根据过滤条件 查询数据
            List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
            taskInfo.setConsumeDataList(dataList);
            // 根据字段映射转换为api参数
            jobDataService.convertData(taskInfo);
        }
        // 调用api传输数据
        ApiUtil.write(taskInfo);
        break;
    

    bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

    public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {
            MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);
            Query query = new Query();
            // 遍历数据过滤条件
            if (CollUtil.isNotEmpty(bizDataFilters)) {
                // mongodb的查询条件集合
                List<Criteria> criteriaList = CollUtil.newArrayList();
                for (BizDataFilter bizDataFilter : bizDataFilters) {
                    // 条件key
                    String key = bizDataFilter.getKey();
                    // 条件操作
                    String op = bizDataFilter.getOp();
                    // 条件值
                    Object value = bizDataFilter.getValue();
    
                    // 根据条件操作类型 调用mongodb对应的查询方法
                    Criteria criteria = Criteria.where(key);
                    switch (op) {
                        case MdConstant.DATA_OP_EQ:
                            criteria.is(value);
                            break;
                        case MdConstant.DATA_OP_NE:
                            criteria.ne(value);
                            break;
                        case MdConstant.DATA_OP_GT:
                            criteria.gt(value);
                            break;
                        case MdConstant.DATA_OP_GTE:
                            criteria.gte(value);
                            break;
                        case MdConstant.DATA_OP_LT:
                            criteria.lt(value);
                            break;
                        case MdConstant.DATA_OP_LTE:
                            criteria.lte(value);
                            break;
    
                        default:
                            throw new RuntimeException("BizDataDAO: 不支持的过滤操作");
                    }
                    // 存入mongodb的查询条件集合
                    criteriaList.add(criteria);
                }
    
                // mongodb查询条件集合 加入查询中
                query.addCriteria(new Criteria().andOperator(criteriaList));
            }
    
            // 执行查询
            return mongoTemplate.find(query, Map.class, dataCode);
        }
    

配置操作

  1. 创建任务过程请参考 使用手册

  2. 在创建任务界面中,添加数据过滤条件

    如下图过滤条件是 salary > 600
    在这里插入图片描述

  3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
    在这里插入图片描述

相关推荐

  1. [开源] MyData v0.7.2 更新日志

    2024-03-17 19:36:03       45 阅读
  2. 荒原梦·考数学:2025 考每日一题(002

    2024-03-17 19:36:03       65 阅读
  3. 数据仓库数据集成开源工具

    2024-03-17 19:36:03       37 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-17 19:36:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-17 19:36:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-17 19:36:03       87 阅读
  4. Python语言-面向对象

    2024-03-17 19:36:03       96 阅读

热门阅读

  1. [AIGC] Kafka分区分配策略详解

    2024-03-17 19:36:03       68 阅读
  2. 3.17Code

    3.17Code

    2024-03-17 19:36:03      35 阅读
  3. Android/iOS APP备案:遇到的问题汇总指南!

    2024-03-17 19:36:03       45 阅读
  4. JWT原理

    JWT原理

    2024-03-17 19:36:03      36 阅读
  5. (容斥原理例题)洛谷P1287 盒子与球

    2024-03-17 19:36:03       50 阅读
  6. LeetCode350:两个数组的交集Ⅱ

    2024-03-17 19:36:03       42 阅读
  7. 配置服务器自启动极简方式 /etc/rc.d/rc.local

    2024-03-17 19:36:03       44 阅读