2025-06-08
内卷九股文
0

目录

CompletableFuture
扩展 - 函数式接口
实际应用
零依赖
一元依赖
二元依赖
多元依赖
Future 方法
CompletableFuture 源码分析
当前任务执行方式
任务编排的存储&执行方式
任务编排流程
查看后置任务执行时机
源码执行流程图
总结

平时多线程开发一般就是使用 Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor 这些内容,之前聊到 FutureTask 是同步非阻塞,将这些内容组合在一起去解决一些并发编程的问题时,很多时候不能去完成多线程的编排,这个时候就需要异步非阻塞的 CompletableFuture 来处理这些任务之间的逻辑关系了。

CompletableFuture

CompletableFuture 是一个异步任务编排工具,底层是对 Future 的扩展和增强, implements Future 接口和 CompletionStage,通过 CompletionStage 接口实现任务链式调用。其本质就是 Future 的增强版,支持函数式编程和流式调用。

异步执行,默认线程池是 ForkJoinPool.commonPool(),可通过构造函数指定线程池

扩展 - 函数式接口

在正式开始之前,需要先掌握一些函数式编程的基础,需要了解以下函数式接口

接口调用方法响应
Runnablerun()无入参,没有返回值
Supplier<T>T get()无入参、有返回值
Function<T, R>R apply(T t);1个入参、有返回值
Consumer<T>void accept(T t)1个入参、没有返回值
BiFunction<T, U, R>R apply(T t, U u)2个入参、有返回值
BiConsumer<T, U>void accept(T t, U u)2个入参、没有返回值

实际应用

CompletableFuture 如果不提供线程池的话,默认使用的 ForkJoinPool,而 ForkJoinPool 内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。

零依赖

零依赖(不依赖链式调用的任何任务)包含 supplyAsyncrunAsync两个方法,核心是异步任务执行,使用 Supplier<U> supplierRunnable 作为入参,属于零依赖范畴,调用时即立刻执行

  • CompletableFuture<U> supplyAsync(Supplier<U> supplier) 异步执行任务、有返回值
  • CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 异步执行任务、自定义线程池、有返回值
  • CompletableFuture<Void> runAsync(Runnable runnable) 异步执行任务、没有返回值`
  • CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 异步执行任务、自定义线程池、没有返回值
  • CompletableFuture<U> completedFuture(U value)
  • T join() 完成时返回结果值,如果异常完成则抛出 CompletionException 异常。

一元依赖

一元依赖包含 thenApplythenAcceptthenRunthenComposewhenCompletehandleexceptionally 方法,其流程依赖于 上一个异步任务执行完成后,用于执行一个新的异步任务,依赖1个 CompletableFuture,也就是先后执行的关系。

  • thenApply 把前面任务的执行结果,做为Function参数,并返回当前Function处理的结果
  • thenAccept 把前面任务的执行结果,做为Function参数没有返回值
  • thenRun 无视前面任务的结果,执行 Runnable无入参,无返回值
  • thenCompose 把前面任务的执行结果,做为Function参数返回 CompletableFuture 类型对象
  • whenComplete 以前面任务结果作为第一个参数,会出现的异常信息为第二个参数没有返回值
  • handle 以前面任务结果作为第一个参数,会出现的异常信息为第二个参数有返回值
  • exceptionally 前面任务执行正常则返回前面任务的结果如果异常不为 null 则返回 Function 执行的结果

开发时注意

需要结合状态机管理思想(未完成/完成/异常),通过 handle()/exceptionally() 方法正确处理异常

代码示例:

java
public static void main(String[] args) throws IOException { ExecutorService executor = Executors.newFixedThreadPool(10); /*CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> { String id = UUID.randomUUID().toString(); System.out.println("执行任务A:" + id); return id; }); CompletableFuture<String> taskB = taskA.thenApply(result -> { System.out.println("任务B获取到任务A结果:" + result); result = result.replace("-", ""); return result; }); System.out.println("main线程拿到结果:" + taskB.join());*/ CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> { String id = UUID.randomUUID().toString(); System.out.println("执行任务A:" + id + "," + Thread.currentThread().getName()); return id; }).thenApplyAsync(result -> { System.out.println("任务B获取到任务A结果:" + result + "," + Thread.currentThread().getName()); result = result.replace("-", ""); return result; },executor); System.out.println("main线程拿到结果:" + taskB.join()); }

二元依赖

二元依赖包含 thenCombinethenAcceptBothrunAfterBothapplyToEitheracceptEitherrunAfterEither 方法,其流程依赖于 上一个异步任务与一个其他任务执行完成后,用于执行一个新的异步任务,依赖2个 CompletableFuture 调用链任务。

  • thenCombine 以前面任务与other入参任务的结果做为参数计算合并任务,有返回值(返回计算后的结果)
  • thenAcceptBoth 以前面任务与other入参任务的结果做为参数计算合并任务,没有返回值
  • runAfterBoth 在前面任务与other任务之后运行 Runnable没有返回值
  • applyToEither 前面任务与other任务,任意任务完成后作为入参运行有返回值
  • acceptEither 前面任务与other任务,任意任务完成后作为入参运行没有返回值
  • runAfterEither 在任一异步之后运行,执行 Runnable 没有入参没有返回值

例如:任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。

java
// api CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) // demo public static void main(String[] args) throws IOException { CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> { System.out.println("任务A"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 78; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println("任务B"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 66; }), (resultA, resultB) -> { System.out.println("任务C"); int resultC = resultA + resultB; return resultC; }); System.out.println(taskC.join()); System.in.read(); }

多元依赖

多元依赖包含 thenCombinethenAcceptBothrunAfterBothapplyToEitheracceptEitherrunAfterEither 方法,其流程依赖于 上一个异步任务与一个其他任务执行完成后,用于执行一个新的异步任务,多元依赖依赖多个 CompletableFuture 调用链任务。

  • CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 阻塞等待所有给定的CompletableFuture 执行结束,当所有 CompletableFuture 任务完成时返回一个无返回值类型的CompletableFuture

  • CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 阻塞等待任意一个给定的CompletableFuture 对象执行结束当任一 CompletableFuture 任务完成时返回一个Object类型的CompletableFuture

场景对比

  • 同步模型: CPU资源大量浪费在阻塞等待上,导致CPU资源利用率低
  • 异步模型: Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果
  • CompletableFuture: 对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排"

Future 方法

  • boolean cancel(boolean mayInterruptIfRunning) 根据 mayInterruptIfRunning 是否中断任务
  • boolean isCancelled() 在正常完成之前,如果取消了此 CompletableFuture,则返回 true
  • boolean isDone() 已完成返回 true
  • V get() 等待计算完成,然后返回结果
  • V get(long timeout, TimeUnit unit) 最大等待timeout时间计算完成,然后返回结果

CompletableFuture 源码分析

CompletableFuture 的源码内容特别多。不需要把所有源码都看了,更多的是要掌握整个CompletableFuture 的源码执行流程,以及任务的执行时机。

CompletableFuture 中比较简单的方法作为分析的入口,从而掌握整体执行的流程。

当前任务执行方式

将任务和 CompletableFuture 封装到一起,再执行封住好的具体对象的 run 方法即可

java
// 提交任务到CompletableFuture public static CompletableFuture<Void> runAsync(Runnable runnable) { // asyncPool:执行任务的线程池 // runnable:具体任务。 return asyncRunStage(asyncPool, runnable); } // 内部执行的方法 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { // 对任务做非空校验 if (f == null) throw new NullPointerException(); // 直接构建了CompletableFuture的对象,作为最后的返回结果 CompletableFuture<Void> d = new CompletableFuture<Void>(); // 将任务和CompletableFuture对象封装为了AsyncRun的对象 // 将封装好的任务交给了线程池去执行 e.execute(new AsyncRun(d, f)); // 返回构建好的CompletableFuture return d; } // 封装任务的AsyncRun类信息 static final class AsyncRun extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { // 声明存储CompletableFuture对象以及任务的成员变量 CompletableFuture<Void> dep; Runnable fn; // 将传入的属性赋值给成员变量 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { this.dep = dep; this.fn = fn; } // 当前对象作为任务提交给线程池之后,必然会执行当前方法 public void run() { // 声明局部变量 CompletableFuture<Void> d; Runnable f; // 将成员变量赋值给局部变量,并且做非空判断 if ((d = dep) != null && (f = fn) != null) { // help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。 dep = null; fn = null; // 先确认任务没有执行。 if (d.result == null) { try { // 直接执行任务 f.run(); // 当前方法是针对Runnable任务的,不能将结果置位null // 要给没有返回结果的Runnable做一个返回结果 d.completeNull(); } catch (Throwable ex) { // 异常结束! d.completeThrowable(ex); } } d.postComplete(); } } }

任务编排的存储&执行方式

首先如果要在前继任务处理后,执行后置任务的话。

有两种情况:

  • 前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储
  • 前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。

如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。

任务编排流程

java
// 编排任务,前继任务搞定,后继任务再执行 public CompletableFuture<Void> thenRun(Runnable action) { // 执行了内部的uniRunStage方法, // null:线程池,现在没给。 // action:具体要执行的任务 return uniRunStage(null, action); } // 内部编排任务方法 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { // 后继任务不能为null,健壮性判断 if (f == null) throw new NullPointerException(); // 创建CompletableFuture对象d,与后继任务f绑定 CompletableFuture<Void> d = new CompletableFuture<Void>(); // 如果线程池不为null,代表异步执行,将任务压栈 // 如果线程池是null,先基于uniRun尝试下,看任务能否执行 if (e != null || !d.uniRun(this, f, null)) { // 如果传了线程池,这边需要走一下具体逻辑 // e:线程池 // d:后继任务的CompletableFuture // this:前继任务的CompletableFuture // f:后继任务 UniRun<T> c = new UniRun<T>(e, d, this, f); // 将封装好的任务,push到stack栈结构 // 只要前继任务没结束,这边就可以正常的将任务推到栈结构中 // 放入栈中可能会失败 push(c); // 无论压栈成功与否,都要尝试执行以下。 c.tryFire(SYNC); } // 无论任务执行完毕与否,都要返回后继任务的CompletableFuture return d; }

查看后置任务执行时机

任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行

java
// 后置任务无论压栈成功与否,都需要执行tryFire方法 static final class UniRun<T> extends UniCompletion<T,Void> { Runnable fn; // executor:线程池 // dep:后置任务的CompletableFuture // src:前继任务的CompletableFuture // fn:具体的任务 UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { // 声明局部变量 CompletableFuture<Void> d; CompletableFuture<T> a; // 赋值局部变量 // (d = dep) == null:赋值加健壮性校验 if ((d = dep) == null || // 调用uniRun。 // a:前继任务的CompletableFuture // fn:后置任务 // 第三个参数:传入的是this,是UniRun对象 !d.uniRun(a = src, fn, mode > 0 ? null : this)) // 进到这,说明前继任务没结束,等! return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } // 是否要主动执行任务 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { // 方法要么正常结束,要么异常结束 Object r; Throwable x; // a == null:健壮性校验 // (r = a.result) == null:判断前继任务结束了么? // f == null:健壮性校验 if (a == null || (r = a.result) == null || f == null) // 到这代表任务没结束。 return false; // 后置任务执行了没? == null,代表没执行 if (result == null) { // 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) completeThrowable(x, r); else // 到这,前继任务正常结束,后置任务正常执行 try { // 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim // 如果是因为没有传递executor,c就是null,不会执行c.claim if (c != null && !c.claim()) // 如果返回false,任务异步执行了,直接return false return false; // 如果claim没有基于线程池运行任务,那这里就是同步执行 // 直接f.run了。 f.run(); // 封装Null结果 completeNull(); } catch (Throwable ex) { // 封装异常结果 completeThrowable(ex); } } return true; } // 异步的线程池处理任务 final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { // 只要有线程池对象,不为null if (e == null) return true; executor = null; // disable // 基于线程池的execute去执行任务 e.execute(this); } return false; }

前继任务执行完毕后,基于嵌套的方式执行后置。

java
// A:嵌套了B+C, B:嵌套了D+E // 前继任务搞定,遍历stack执行后置任务 // A任务处理完,解决嵌套的B和C final void postComplete() { // f:前继任务的CompletableFuture // h:存储后置任务的栈结构 CompletableFuture<?> f = this; Completion h; // (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据 while ((h = f.stack) != null || // 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据 (f != this && (h = (f = this).stack) != null)) { // t:当前栈中任务的后续任务 CompletableFuture<?> d; Completion t; // 拿到之前的栈顶h后,将栈顶换数据 if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } // 执行tryFire方法, f = (d = h.tryFire(NESTED)) == null ? this : d; } } } // 回来了 NESTED == -1 final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; // 内部会执行postComplete,运行B内部嵌套的D和E return d.postFire(a, mode); }

源码执行流程图

image.png

总结

CompletableFuture 最重要的就是解决了异步回调的问题,优雅的组合多个异步任务,帮助我们简化回调逻辑,避免了“回调地狱”。其中需要使用自定义线程池避免 ForkJoinPool 通用资源的竞争,结合状态机管理(未完成/完成/异常),通过 handle()/exceptionally() 方法正确处理异常,同时 CompletableFuture 是基于 Future 的增强,避免 get() 的阻塞调用。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!