CompletableFuture的一些常用方法
CompletableFuture
是Java 8中提供的一个强大的异步编程工具,它极大地简化了并发编程和异步任务的管理和组合。下面是一些核心方法及其使用场景的概述:
创建异步任务:
runAsync(Runnable runnable)
创建一个不返回结果的异步任务。当你只需要执行某个动作,而不关心结果时,可以使用此方法。CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 执行耗时操作 longRunningTask(); });
supplyAsync(Supplier<U> supplier)
创建一个返回结果的异步任务。任务完成后可以通过get()
方法获取结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 返回计算结果 return heavyComputation(); }); String result = future.get(); // 阻塞直到结果准备好
上述方法还可以传入一个自定义的
Executor
,用于执行异步任务,默认情况下,如果没有指定,会使用ForkJoinPool.commonPool()
。
结果转换:
thenApply(Function<? super T,? extends U> fn)
在前一个任务完成之后,根据前一个任务的结果执行一个函数并返回新的CompletableFuture
。CompletableFuture<Integer> futureInt = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<String> futureStr = futureInt.thenApply(result -> "Result is: " + result);
thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
类似于thenApply
,但是返回的是一个新的CompletableFuture
,适用于任务之间有依赖的情况。
消费结果:
thenAccept(Consumer<? super T> action)
不返回新值,只接受并消费前一个任务的结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); future.thenAccept(value -> System.out.println("Received value: " + value));
处理异常:
exceptionally(Function<Throwable,? extends T> fn)
当CompletableFuture遇到异常时,使用给定的函数处理异常并返回一个替代结果。CompletableFuture<String> future = ...; CompletableFuture<String> fallback = future.exceptionally(ex -> "Fallback Value");
handle(BiFunction<? super T, Throwable, ? extends U> fn)
不管是正常完成还是异常完成,都会调用给定的函数来处理结果或异常。CompletableFuture<String> future = ...; CompletableFuture<Object> handled = future.handle((result, ex) -> { if (ex != null) { return handleException(ex); } else { return processSuccess(result); } });
组合多个CompletableFuture:
allOf(CompletableFuture<?>... cfs)
返回一个新的CompletableFuture,当所有给定的CompletableFuture都完成后,这个新的Future才完成。anyOf(CompletableFuture<?>... cfs)
当任何一个给定的CompletableFuture完成后,这个新的Future就完成。
监听任务完成:
thenRun(Runnable action)
当前CompletableFuture完成后,不管是否有结果,都运行给定的动作。whenComplete(BiConsumer<? super T,? super Throwable> action)
当前CompletableFuture完成后,无论成功还是失败,都会执行给定的BiConsumer。
以上只是CompletableFuture
的部分核心方法,实际使用时可以根据需求组合这些方法实现复杂的异步逻辑。
实际运用案例代码
以下是几个CompletableFuture
的常用方法的示例,以帮助理解它们在实际编程中的应用:
创建异步任务
// 使用runAsync执行无返回值的异步任务 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { // 执行耗时操作,如IO或计算 performHeavyOperation(); }); // 使用supplyAsync执行有返回值的异步任务 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return longRunningCalculation(); });
链式处理结果
CompletableFuture<Integer> futureInt = CompletableFuture.supplyAsync(() -> calculate()); // 结果转换 CompletableFuture<Double> futureDouble = futureInt.thenApply(result -> result * 1.2); // 消费结果 futureDouble.thenAccept(result -> log.info("Computed result: {}", result)); // 处理异常并提供默认值 CompletableFuture<Double> safeFuture = futureDouble.exceptionally(ex -> { log.error("Error occurred", ex); return DEFAULT_VALUE; });
组合多个异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> loadFromDB1()); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> loadFromDB2()); // 等待所有任务完成 CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2); // 获取所有结果 CompletableFuture<List<String>> combined = allDone.thenApply(v -> Arrays.asList(future1.join(), future2.join()));
任意一个任务完成时执行后续操作
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> expensiveOp1()); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> expensiveOp2()); // 当任意一个任务完成时执行后续操作 CompletableFuture<Object> anyFinished = CompletableFuture.anyOf(future1, future2) .thenApply(result -> "At least one operation completed with result: " + result);
错误处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Simulated error"); }); // 全局错误处理器 CompletableFuture<String> withErrorHandler = future.handle((result, exception) -> { if (exception != null) { log.error("An exception occurred", exception); return "Error"; } return result; });
并行处理并聚合结果
List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10) .mapToObj(i -> CompletableFuture.supplyAsync(() -> calculateNumber(i))) .collect(Collectors.toList()); CompletableFuture<List<Integer>> combinedFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
以上示例展示了CompletableFuture
如何创建异步任务、处理结果、组合多个任务以及处理异常情况。在实际项目中,你可以根据具体需求灵活运用这些方法。
CompletableFuture的使用场景
在日常Java开发中,尤其是涉及到异步编程、多线程和并发处理的情况下,CompletableFuture
类的使用相当普遍。自从Java 8引入以来,随着异步编程范式的流行以及对于提升系统性能和响应能力的需求增加,CompletableFuture
已经成为现代Java应用中不可或缺的一部分。
CompletableFuture
提供了一种声明式、链式编程风格来处理异步操作的结果,它支持组合多个异步任务,通过方法如thenApply()
, thenAccept()
, thenCompose()
等实现任务间的依赖关系,并且能够方便地处理异常和最终结果。此外,CompletableFuture
还能够与Java的ExecutorService
配合,更有效地利用线程资源。
在日常开发中,以下是一些常见的使用场景:
网络请求/IO密集型操作:当发起HTTP请求、数据库查询或其他I/O操作时,为了不阻塞主线程或工作线程,可以使用
CompletableFuture
将这些操作异步化。微服务架构:在微服务间调用时,异步响应可以帮助构建非阻塞的服务间通信,提高系统的整体吞吐量。
批处理与流式处理:在处理大量数据时,特别是大数据处理框架中,常常需要将计算步骤分解为一系列可以并发执行的任务。
事件驱动编程:在事件驱动的系统中,异步处理事件非常关键,
CompletableFuture
可用于构建复杂的事件处理流程。延迟加载或缓存刷新:当需要异步加载或刷新数据时,可以使用
CompletableFuture
来确保主流程不受阻塞,同时也能得到数据准备完成的通知。
综上所述,由于其功能强大且适应现代编程范式,CompletableFuture在许多复杂系统和高性能应用的日常开发中得到了广泛应用。
底层原理
CompletableFuture
底层主要依赖于 Java 的 Future
和 CompletionStage
接口,以及 ForkJoinPool
并发框架。
Future 和 CompletionStage 接口:
Future
接口代表一个异步计算的结果,它提供了判断任务是否已完成,以及获取计算结果或抛出异常的方法。CompletionStage
是 Java 8 引入的一个新接口,用于表示异步计算阶段,它扩展了 Future 的概念,增加了更多关于异步任务结果处理和组合的方法。
ForkJoinPool:
CompletableFuture
默认使用ForkJoinPool.commonPool()
来执行异步任务,ForkJoinPool
是一个高效的线程池,采用了分治算法和工作窃取(work-stealing)策略,尤其适合处理大量小颗粒度的任务。- 当通过
runAsync
或supplyAsync
创建一个CompletableFuture
时,如果没有提供自定义的Executor
,那么任务会被提交到ForkJoinPool
中执行。
内部状态机和数据结构:
CompletableFuture
内部实现了一个状态机,通过volatile变量跟踪任务的状态(未初始化、运行中、已完成等)和结果(成功或失败)。- 它还维护了一个同步队列,用于保存等待任务完成的回调函数,当异步任务完成时,会依次执行这些回调函数,实现任务结果的处理和进一步的异步任务链。
依赖和组合:
CompletableFuture
的强大之处在于它可以方便地将多个异步任务链接起来,形成任务流水线。例如thenApply
、thenCompose
、thenAccept
等方法就是通过添加任务到内部队列并在当前任务完成后触发这些后续任务来实现的。
内存可见性和线程安全:
- 为了保证在多线程环境下的正确性和一致性,
CompletableFuture
的内部状态变更、任务调度和结果通知都遵循 Java 内存模型的规则,使用 volatile 变量和 CAS(Compare and Swap)操作来确保线程安全和内存可见性。
- 为了保证在多线程环境下的正确性和一致性,
简而言之,CompletableFuture
底层通过状态机和内部数据结构管理异步任务的生命周期,利用 ForkJoinPool
实现高效的任务执行,同时提供了丰富的 API 用于任务的组合和结果处理,从而实现了高级别的异步编程模型。
CompletableFuture跟自己创建线程放到线程池实现异步有何区别,哪个更好呢
CompletableFuture
和 自己创建线程放入线程池实现异步的主要区别在于抽象层次、控制复杂性和易用性等方面:
抽象层次:
- 自己创建线程并放入线程池:你需要直接处理线程的创建、管理、任务提交以及结果的同步。这通常涉及
ExecutorService
的使用,例如ThreadPoolExecutor
,并且可能需要编写额外的代码来协调不同任务的执行顺序和结果合并。 - CompletableFuture:作为Java 8引入的高级API,它提供了一种更高层次的抽象,可以直接定义任务和它们之间的依赖关系,而无需手动管理线程的生命周期。CompletableFuture封装了线程池执行和结果处理逻辑,简化了异步编程模型。
- 自己创建线程并放入线程池:你需要直接处理线程的创建、管理、任务提交以及结果的同步。这通常涉及
控制复杂性:
- 自己创建线程:如果你直接处理线程,需要自行管理线程安全、资源回收等问题,当任务之间有复杂的依赖关系时,逻辑可能会变得难以维护。
- CompletableFuture:通过方法链式调用(如
thenApply
、thenCompose
等),可以直观地描述任务的执行顺序和结果传递,它内置了线程安全和内存可见性保证,更适合构建复杂的异步流程。
易用性:
- 自己创建线程:在简单的场景下,手动创建线程并提交至线程池相对简单,但对于涉及多个步骤的异步操作,尤其是需要组合多个异步任务时,代码可能变得冗长和复杂。
- CompletableFuture:提供了丰富的API,可以很容易地处理异常、取消任务、组合多个异步操作,以及支持灵活的回调逻辑,大大提高了异步编程的便捷性和可读性。
至于哪个更好,这取决于具体的应用场景和需求:
- 如果你的需求很简单,只需单个或少量独立异步任务,且不需要复杂的任务间依赖关系,直接使用线程池可能足够简洁高效。
- 对于复杂的异步编程场景,尤其是在微服务、高并发处理、响应式编程等场景下,
CompletableFuture
因其高级抽象、易于组合和管理的优点而更为推荐。它能帮助你更好地组织异步逻辑,减少潜在的并发错误,并提高代码的可读性和可维护性。
总的来说,CompletableFuture
在大多数情况下是一个更好的选择,因为它提供了更现代的异步编程模型和更强的功能支持。然而,在一些特殊场合,例如需要对线程池行为有精细控制时,直接使用线程池可能更加灵活。不过即使在这种情况下,也可以结合使用线程池和CompletableFuture
,利用后者进行更高级别的任务编排。