使用 CompletableFuture 分批处理任务

一、无返回值任务函数

// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<Void>> futures = new ArrayList<>(batches.size());

// 数据处理
for (int i = 0; i < batches.size(); i++) {
   
    logger.info("批次 " + i + " 开始处理...");
    String logId = LogIdThreadLocal.getLogId();  // 传递主线程的 logId
    List<StatisticsDTO> batchData = batches.get(i);
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
   
        try {
   
            LogIdThreadLocal.setLogId(logId);
            processBatch(batchData);
        } finally {
   
            LogIdThreadLocal.clean();
        }
    });
    futures.add(future);
}

// 等待所有的异步任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();

二、带返回值任务函数

// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<List<StatisticsDTO>>> futures = new ArrayList<>(batches.size());

// 数据处理
for (int i = 0; i < batches.size(); i++) {
   
    logger.info("批次 " + i + " 开始处理...");
    String logId = LogIdThreadLocal.getLogId();  // 传递主线程的 logId
    List<StatisticsDTO> batchData = batches.get(i);
    CompletableFuture<List<DoctorAvatarAnalysisDTO>> future = CompletableFuture.supplyAsync(() -> {
   
        try {
   
            LogIdThreadLocal.setLogId(logId);
            return processBatch(batchData);
        } finally {
   
            LogIdThreadLocal.clean();
        }
    });
    futures.add(future);
}

// 等待所有 CF 完成并合并结果
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List<StatisticsDTO> result = allOf.thenApply(
        v -> futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())
).join();

在这里插入图片描述

相关推荐

  1. CompletableFuture使用

    2024-01-12 16:36:04       19 阅读
  2. [AIGC] CompletableFuture如何实现任务链式调用?

    2024-01-12 16:36:04       11 阅读
  3. 使用CompletableFuture实现并发计算-结合实例

    2024-01-12 16:36:04       37 阅读
  4. PHP使用 enqueue/amqp-lib拓展实现rabbitmq任务处理

    2024-01-12 16:36:04       18 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-12 16:36:04       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-12 16:36:04       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-12 16:36:04       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-12 16:36:04       18 阅读

热门阅读

  1. R语言【base】——sample():随机取样和排列

    2024-01-12 16:36:04       30 阅读
  2. 游戏服务器开发知识付费时代到来了

    2024-01-12 16:36:04       38 阅读
  3. 【mysql】有关mysql查询隐式类型转换的问题

    2024-01-12 16:36:04       34 阅读
  4. LinkedList和ArrayList

    2024-01-12 16:36:04       33 阅读
  5. git远程仓库配置

    2024-01-12 16:36:04       41 阅读
  6. openssl3.2 - 官方demo学习 - cipher - aesccm.c

    2024-01-12 16:36:04       28 阅读
  7. 常见的传感器技术汇总简介

    2024-01-12 16:36:04       37 阅读