一次业务的批量数据任务的处理优化

一次业务的批量数据任务的处理优化

业务背景

一个重新生成所有客户的财务业务指标数据的批量数据处理任务。

1.0版本 分批处理模式

根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。

    private void updateForRegenerateByCustomer(List<Integer> customerIdList,
        SystemUserCommonDTO user, LocalDateTime now) {
        List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():
            customerInfoService.listByIdList(customerIdList);
        //先清理客户的数据
        updateForCleanByCustomerIdList(baseInfoList,user,now);

        int maxSize = baseInfoList.size();
        //计算当前任务数量
        int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;
        CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];
        //计算每个任务分段的数量
        int size = maxSize / currentMaxPoolSize;
        for(int i=0;i<currentMaxPoolSize;i++){
            final int begin = i * size;
            final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;
            //创建异步处理的分段任务
            tasks[i] = CompletableFuture.runAsync(
                ()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now)
                ,executorService)
                .whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));
        }
        // 向线程池提交任务
        CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();
    }

	/**
	  * 生成指定客户列表的所有数据
	  **/
    private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,
        SystemUserCommonDTO user, LocalDateTime now){
        //每个线程只处理自己的分段的数据
        for(int i=begin;i<end;i++){
            CustomerBaseInfo baseInfo = baseInfoList.get(i);
            //每个客户独立事务
            TransactionalUtils.runWithNewTransactional(
                ()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));
        }
    }

	/**
	* 生成指定客户的所有数据
	**/
	private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){
        //1、重新生成客户的所有业务类型的数据
        List<FinanceBiMaintainDto> maintainDtoList =
            financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);
        if(CollectionUtils.isEmpty(maintainDtoList)){
            return ;
        }
        //生成每个指标的数据
        Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList
            .stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));
        indicatorMaintainDtoMap.forEach((k,v)->{
            log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);
            financeBiManager.updateForBiMaintain(k, v,user,now);
        });
    }

运行耗时:1420.145秒

2.0版本 平衡任务队列模式

1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。

updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。

private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,
        LocalDateTime now) {
        List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :
            customerInfoService.listByIdList(customerIdList);
        //先清理客户的数据
        updateForCleanByCustomerIdList(baseInfoList, user, now);

        int maxSize = baseInfoList.size();
        int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);
        //根据线程数,构建固定的任务数量
        CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];

		//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(
            baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));

		//创建多个线程去消耗客户队列
        for (int i = 0; i < currentMaxPoolSize; i++) {
            tasks[i] =
                CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService)
                    .whenCompleteAsync((k, v) -> {
                        if (v != null) {
                            log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",
                                Thread.currentThread().getName()), v);
                        } else {
                            log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",
                                Thread.currentThread().getName());
                        }
                    });
        }
        // 向线程池提交任务
        CompletableFuture.allOf(tasks)
            .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length))
            .join();
    }

	/**
	  * 生成指定客户列表的所有数据
	  **/
    private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,
        LocalDateTime now) {
        Integer customerId = queue.poll();
        //循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。
        while (customerId != null) {
            final Integer currentCustomerId = customerId;
            TransactionalUtils.runWithNewTransactional(
                () -> updateForGenerateByCustomerId(currentCustomerId, user, now));
            customerId = queue.poll();
        }
    }
    

优化后的耗时:1037.059秒

3.0版本 优化调度平衡任务队列模式

2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。

updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。


private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,
        LocalDateTime now) {
        List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :
            customerInfoService.listByIdList(customerIdList);
        //先清理客户的数据
        updateForCleanByCustomerIdList(baseInfoList, user, now);
		//获取客户的统计数据
        Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =
            customerStatisticsInfoService.listAll().stream()
                .collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));

        int maxSize = baseInfoList.size();
        int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);
        CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];
		
		//根据客户的统计数据,构建待处理的客户队列
        ConcurrentLinkedQueue<Integer> queue =
            baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列               .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,
                    Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId)
                .collect(Collectors.toCollection(ConcurrentLinkedQueue::new));

        for (int i = 0; i < currentMaxPoolSize; i++) {
            tasks[i] = CompletableFuture.supplyAsync(() -> {
                updateForGenerateByCustomerIdList(queue, user, now);
                return Thread.currentThread().getName();
            }, executorService).whenCompleteAsync((k, ex) -> {
                if (ex != null) {
                    log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);
                } else {
                    log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);
                }
            });
        }
        // 向线程池提交任务
        CompletableFuture.allOf(tasks)
            .whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length))
            .join();
    }

耗时:726.725秒

总结

最终的耗时从1400多秒 降低到700多秒。降低了一半左右。

相关推荐

  1. 业务批量数据任务处理优化

    2024-07-11 04:50:05       18 阅读
  2. 用Python代码实现数据批量处理

    2024-07-11 04:50:05       37 阅读
  3. 记录业务遇到sql问题

    2024-07-11 04:50:05       36 阅读
  4. 记录关于线程池任务编排和共享数据尝试

    2024-07-11 04:50:05       34 阅读
  5. 若依项目组装树型结构数据效率优化

    2024-07-11 04:50:05       31 阅读
  6. 优选算法刷题」:只出现数字

    2024-07-11 04:50:05       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 04:50:05       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 04:50:05       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 04:50:05       57 阅读
  4. Python语言-面向对象

    2024-07-11 04:50:05       68 阅读

热门阅读

  1. 力扣之有序链表去重

    2024-07-11 04:50:05       24 阅读
  2. PyTorch DataLoader 学习

    2024-07-11 04:50:05       17 阅读
  3. 微生活服务平台与元宇宙的融合

    2024-07-11 04:50:05       20 阅读
  4. C++ 入门05:类和对象

    2024-07-11 04:50:05       26 阅读
  5. mysqli 与mysql 区别和联系, 举例说明

    2024-07-11 04:50:05       24 阅读
  6. SQL Server镜像与日志:数据保护的双重保障

    2024-07-11 04:50:05       19 阅读
  7. 系统设计题-路由表最长匹配

    2024-07-11 04:50:05       22 阅读
  8. springboot+vue项目实战2024第三集修改用户信息

    2024-07-11 04:50:05       26 阅读
  9. stm32实现软件spi

    2024-07-11 04:50:05       23 阅读