使用CompletableFuture实现并发计算-结合实例

一、什么是CompletableFuture?

      CompletableFuture 是Java 8引入的一个强大的并发工具类,它是Future接口的扩展实现。它提供了更丰富的异步编程模型和功能,允许开发者以非阻塞的方式处理异步计算的结果,并且可以将多个异步任务链式组合起来形成复杂的流程。

二、主要特点

1. 异步执行:

     通过supplyAsync()、runAsync()等方法可以提交一个Runnable或Callable到线程池中异步执行。

2. 结果获取:

     与传统的Future类似,可以通过get()方法来阻塞地获取异步任务的结果,或者使用thenApply(), thenAccept(), thenRun()等方法在前一个异步操作完成后进行后续处理,这些方法都不会阻塞当前线程。

3. 异常处理:

     提供.exceptionally()方法来捕获并处理Future执行过程中可能抛出的异常。

4. 组合异步操作:

    支持多种组合方式,如:
    - .thenCompose()用于基于前一个Future的结果启动另一个异步任务。
    - .allOf()用于等待一组CompletableFuture都完成。
    - .anyOf()用于等待一组CompletableFuture中的任何一个完成。

5. 流式API:

    其设计鼓励函数式的编程风格,使得代码更加简洁易读。

6. 取消操作:

    支持通过cancel(boolean mayInterruptIfRunning)方法取消正在进行的异步任务。

三、实例

1、以同时获取两个ETA,并将结果相加为例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ETAExample {

    // 假设这是异步获取ETA的方法
    public static int getETA() {
        try {
            Thread.sleep(1000); // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new Random().nextInt(100); // 返回一个模拟的ETA值
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getETA());
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> getETA());

        CompletableFuture<Integer> combinedFuture = CompletableFuture.allOf(future1, future2)
                .thenApply(v -> future1.join() + future2.join());

        int finalETA = combinedFuture.get(); // 获取最终的ETA之和
        System.out.println("Final ETA: " + finalETA);
    }
}

.supplyAsync(),用于创建并执行两个异步任务来获取ETA;

.allof(),当两个获取ETA的CompletableFuture完成后,返回一个新的、已完成状态的 CompletableFuture。

.thenApply(),等.allof()执行完后,执行.thenApply()方法。本文中是将两个ETA加和;

.join(),CompletableFuture完成时返回结果值。执行过程中如果发生异常,会抛出一个unchecked异常。

2、异常处理

2.1 .exceptionally(Function<Throwable, T>)

      .exceptionally()可以为Future在执行过程中可能抛出的任何异常提供一个恢复策略。参数是一个函数,它接受一个Throwable对象(即异常),并返回一个替代的结果值。当Future由于异常而未能正常完成时,该函数会被调用,并且返回值将作为Future的新结果。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (/* 某些条件导致异常 */) {
        throw new RuntimeException("获取ETA失败");
    }
    return getETA(); // 假设这个方法会返回一个整数类型的ETA
});

future.exceptionally(throwable -> {
    System.out.println("发生异常: " + throwable.getMessage());
    return -1; // 返回一个默认的ETA值或错误码
});

int result = future.get(); // 此时无论是否发生异常,都会得到一个非null的结果

2.2 .handle(BiFunction<? super T, Throwable, U>)
     这个方法更通用,它同时处理成功和失败的情况。

     参数是一个BiFunction,其第一个参数是正常的计算结果(如果有的话),第二个参数是可能出现的异常。不管是正常结束还是异常终止,handle方法都会被执行,它可以统一处理两种情况下的结果。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getETA());

// 使用 handle 方法统一处理成功和失败的结果
future.handle((result, throwable) -> {
    if (throwable != null) { // 处理异常情况
        System.out.println("发生异常: " + throwable.getMessage());
        return -1;
    } else { // 处理正常情况
        return result * 2; // 假设这里对正常结果进行某种操作后再返回
    }
});

Integer finalResult = future.get(); // 获取最终处理后的结果

2.3 try ... catch 

    此方法比较常见,本文就不再赘述了,大家可移步:【Java基础】[异常处理]try,catch,finally_catch块完全有可能得不到执行-CSDN博客

【JAVA基础】[异常处理]项目中悄无声息的RuntimeException_runtimeexception生成随机序列化的id-CSDN博客

     以上异常处理的方法,都可以帮我们保证系统的健壮性。

3、线程池:ForkJoinPool.commonPool()

      前面我们说过,.supplyAsync(),用于创建并执行两个异步任务来获取ETA,那这两个任务由哪个线程池管理并执行呢?答: ForkJoinPool.commonPool();

      ForkJoinPool.commonPool()不同于ThreadPoolExecutor,ForkJoinPool.commonPool()利用工作窃取机制,如果一个线程在其本地队列上没有找到任务时,它会尝试从其他线程的队列中“窃取”任务来执行。后面我计划单独出一篇文章来介绍。

      ForkJoinPool.commonPool()是一个全局共享的线程池,它为整个JVM进程提供服务。当在同一个工程中,多个线程或任务使用CompletableFuture.supplyAsync(...)方法时,如果不指定自
 定义的ExecutorService,那么默认情况下这些异步计算任务会提交到这个公共的ForkJoinPool中执行。这意味着所有依赖于commonPool()的任务将会共享同一组工作线程来并发执行。如果这些任务是CPU密集型且数量很大,可能会导致线程池资源紧张,从而影响性能。
    对于I/O密集型任务或者短生命周期的任务,公共池通常可以很好地复用线程资源。然而,在特定场景下,比如需要对线程数进行精细化控制、确保优先级或者隔离不同类型的计算任务时,建议创建并指定自定义的ForkJoinPool或其它类型的ExecutorService来处理异步任务。

    从官方文档中,我们可以看到,给出了两个异步执行方法:

1)static <U> CompletableFuture<U>    supplyAsync(Supplier<U> supplier)

返回一个新的CompletableFuture,它是由ForkJoinPool.commonPool()中运行的任务异步完成的,其值通过调用给定的Supplier获得。
2)static <U> CompletableFuture<U>    supplyAsync(Supplier<U> supplier, Executor executor)

返回一个新的CompletableFuture,它由给定执行器中运行的任务异步完成,其值通过调用给定的Supplier获得。

使用2),我们可以传入自定义的Executor。

四、使用过程中的注意事项

1. 异常处理:
     由于CompletableFuture的异步操作可能抛出异常,因此需要正确处理get()方法调用或者通过.exceptionally()、.handle()等方法捕获和处理可能出现的异常。前面我们结合实例也介绍过了。

2. Future的完成状态不可变:
    一旦CompletableFuture完成了(成功或失败),其结果是不可变的。所以我们不能更改已完成Future的结果。

3. 线程池配置:
    默认情况下,supplyAsync()会使用ForkJoinPool.commonPool()执行任务,所以如果我们的任务数量非常大或者执行时间较长,可能会对公共线程池造成压力,不同的业务之前可能引起资源竞争。在这种情况下,应该自定义ExecutorService来管理并发任务。

4. 取消任务:
    如果需要支持取消异步操作,可以调用CompletableFuture.cancel(true),但要确保任务是可以被中断的,并且在内部逻辑中检查Thread.currentThread().isInterrupted()以响应中断请求。

5. 链式操作顺序:
    使用.thenApply(), .thenAccept(), .thenCompose()等方法构建异步计算流程时,注意这些操作的执行顺序与它们在链中的位置保持一致。

6. 资源管理:
    在异步操作中打开的任何资源(如数据库连接、文件句柄)应在操作完成后正确关闭或释放。

7. allOf() vs anyOf():
    确保使用正确的组合器。CompletableFuture.allOf()等待所有Future都完成,而anyOf()只要任何一个完成就返回。这些就是字面意思,我们就不再赘述了。

8. 避免阻塞主线程:
    尽量避免在主线程或其他关键路径上直接调用get()阻塞等待结果。如果必须同步等待,考虑使用join()方法,它不会抛出InterruptedException。

9. 超时处理:
    可以结合CompletableFuture.get(long, TimeUnit)设置超时时间,防止因某个异步任务长时间未完成而导致整个程序挂起。

五、总结

     以上我们简要的了解了CompletableFuture的主要功能和在并发计算中的应用,我们可以看到CompletableFuture极大地简化了Java中的异步编程模型,提高了代码可读性和程序的响应能力,是构建高效、灵活并发应用的重要组件之一。后面将在应用中不断使用和加深理解。

参考:

Java Platform SE 8
 ,GPT

相关推荐

  1. 使用CompletableFuture实现并发计算-结合实例

    2024-01-25 01:58:01       39 阅读
  2. Matlab并行计算实践

    2024-01-25 01:58:01       37 阅读
  3. 实现CompletableFuture的返回数据,放入每个list中

    2024-01-25 01:58:01       37 阅读
  4. [AIGC] CompletableFuture如何实现任务链式调用?

    2024-01-25 01:58:01       13 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-25 01:58:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-25 01:58:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-25 01:58:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-25 01:58:01       20 阅读

热门阅读

  1. 数据结构:单链表

    2024-01-25 01:58:01       33 阅读
  2. 微信小程序腾讯地图定位转高德地图定位

    2024-01-25 01:58:01       28 阅读
  3. c语言不定参数

    2024-01-25 01:58:01       36 阅读
  4. GO——recover

    2024-01-25 01:58:01       32 阅读
  5. 解决Milo连接OPU UA报错的两个常见报错

    2024-01-25 01:58:01       38 阅读
  6. 开发安全之:Server-Side Request Forgery

    2024-01-25 01:58:01       30 阅读
  7. 动态规划最后一天(回文串)

    2024-01-25 01:58:01       38 阅读
  8. 【力扣每日一题】力扣2765最长交替子数组

    2024-01-25 01:58:01       43 阅读
  9. python基础——进程

    2024-01-25 01:58:01       36 阅读
  10. WPF关键帧动画

    2024-01-25 01:58:01       43 阅读