2025-06-08
内卷九股文
0

目录

Future
FutureTask
FutureTask 作用
FutureTask 应用
FutureTask 源码分析
FutureTask 中的核心属性
FutureTask 的 run 方法
FutureTask 的 set&setException 方法
FutureTask 的 cancel 方法
FutureTask 的 get 方法
FutureTask 的 finishCompletion 方法
FutureTask 原理
内部状态
JDK8 及之前的实现机制
JDK9 及以后的改变
与线程池的协作
总结

Java 创建线程的方式,一般常用的是 ThreadRunnable,如果需要当前处理的任务有返回结果的话, 需要使用 CallableCallable 运行需要配合 Future,那 Future 是个什么鬼?FutureTask 又是个什么鬼?你是否深入了解过源码的处理?接下来我们将分析 Future 接口和 FutureTask 类的关系,然后深入分析 FutureTask 的源码实现。

Future

Future 表示 异步计算的结果,是 Java 并发包的核心接口,定义了异步计算结果的基本操作,它提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。它通常与 ExecutorService 一起使用,以异步方式执行任务,一般会使用 FutureTask 实现类去接收 Callable 任务的返回结果。

Future 允许你启动一个可能耗时的计算,而不必等待它完成就能继续执行其他任务。然后,你可以使用 Future 对象来查询计算是否完成,等待计算完成,并检索计算的结果。

主要的方法包括:

  • boolean isDone():检查计算是否完成,如果任务完成,则返回 true。
  • boolean isCancelled():检查任务是否已取消,如果任务被取消,则返回 true。
  • V get():等待任务完成,获取计算结果(阻塞直到完成)。
  • V get(long timeout, TimeUnit unit):等待任务在给定的时间范围内完成,并获取其结果。
  • boolean cancel(boolean mayInterruptIfRunning):尝试取消任务的执行。

FutureTask

FutureTask 属于 java.util.concurrent 包,是一个可以取消异步任务的类。

image.png

从源码中的文档注释可以看到,FutureTaskFuture 的一个实现类,ta同时实现了 RunnableFuture 接口。因此,它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

FutureTask类提供了一个Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索; 如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。

image.png

从上面的FutureTask类图中可以看出,FutureTask实现了RunnableFuture接口,RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask兼备Runnable和Future两种特性

FutureTask 作用

异步计算:在多线程编程中,FutureTask用于封装一个可调用任务(例如实现了Callable接口的任务),并允许在一个单独的线程中执行该任务。这样可以在执行耗时操作(如网络请求、文件读取、复杂计算等)时,不会阻塞主线程或其他线程的执行。

结果获取:提供了一种机制来获取异步计算的结果。通过FutureTask的get方法,可以在任务完成后获取其执行结果,如果任务尚未完成,get方法可以阻塞当前线程,直到任务完成并返回结果。

FutureTask 应用

大方向是FutureTask对任务的控制:

  • 任务执行过程中状态的控制
  • 任务执行完毕后,返回结果的获取

FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。

java
public static void main(String[] args) throws InterruptedException { // 构建FutureTask,基于泛型执行返回结果类型 // 在有参构造中,声明Callable或者Runnable指定任务 FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("任务开始执行……"); Thread.sleep(2000); System.out.println("任务执行完毕……"); return "OK!"; }); // 构建线程池 ExecutorService service = Executors.newFixedThreadPool(10); // 线程池执行任务 service.execute(futureTask); // futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法 // run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。 // 如果希望任务可以反复被执行,需要去调用runAndReset方法 // futureTask.run(); // 对返回结果的获取,类似阻塞队列的poll方法 // 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException // try { // String s = futureTask.get(3000, TimeUnit.MILLISECONDS); // System.out.println("返回结果:" + s); // } catch (Exception e) { // System.out.println("异常返回:" + e.getMessage()); // e.printStackTrace(); // } // 对返回结果的获取,类似阻塞队列的take方法,死等结果 // try { // String s = futureTask.get(); // System.out.println("任务结果:" + s); // } catch (ExecutionException e) { // e.printStackTrace(); // } // 对任务状态的控制 // System.out.println("任务结束了么?:" + futureTask.isDone()); // Thread.sleep(1000); // System.out.println("任务结束了么?:" + futureTask.isDone()); // Thread.sleep(1000); // System.out.println("任务结束了么?:" + futureTask.isDone()); }

FutureTask 源码分析

看FutureTask的源码,要从几个方向去看:

  • 先查看FutureTask中提供的一些状态
  • 在查看任务的执行过程

FutureTask 中的核心属性

FutureTask 使用状态机管理任务生命周期,首先要清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。

java
/** FutureTask的核心属性 FutureTask任务的状态流转 * NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回 * NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常 * NEW -> CANCELLED 任务被取消 * NEW -> INTERRUPTING -> INTERRUPTED 任务被中断 */ // 记录任务的状态 private volatile int state; // 任务被构建之后的初始状态 private static final int NEW = 0; // 初始状态 private static final int COMPLETING = 1; // 执行完成,结果设置中 private static final int NORMAL = 2; // 正常完成 private static final int EXCEPTIONAL = 3; // 执行异常 private static final int CANCELLED = 4; // 任务取消 private static final int INTERRUPTING = 5; // 中断中 private static final int INTERRUPTED = 6; // 已中断 private Callable<V> callable; // 实际执行的任务 private Object outcome; // 存储任务结果或异常 private volatile Thread runner; // 执行任务的线程 private volatile WaitNode waiters; // 等待结果的线程链表 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }

LockSupport

LockSupport 是 Java 并发包中的一个基础工具类,提供线程阻塞和唤醒的基本能力,它是构建更高级同步器(如锁、屏障等)的基础。

主要特点:

  • 提供线程阻塞(park)和唤醒(unpark)的原始操作
  • 每个线程关联一个"许可"(permit)概念
  • 是 Java 并发框架的基础构建块
  • 被广泛用于 AQS、FutureTask 等并发工具实现中

此类与使用它的每个线程关联一个许可(在{@link Semaphore Semaphore}类的意义上)。

如果许可证可用,将立即返回对{@code park}的调用,并在此过程中使用它;否则可能会被阻塞。如果许可证尚未可用,则调用{@code unpark}可使其可用。(不过,与信号量不同,许可证不会累积。最多只有一个。)

核心方法:

java
// 阻塞当前线程 static void park() // 阻塞当前线程,有超时时间 static void parkNanos(long nanos) // 阻塞直到某个时间点 static void parkUntil(long deadline) // 唤醒指定线程 static void unpark(Thread thread)

wait()/notify() 比较:

特性LockSupportwait()/notify()
所属层次JUC 底层工具Object 原生方法
需要同步块不需要必须在 synchronized 块中使用
唤醒顺序精确指定线程随机唤醒一个等待线程
许可机制有许可概念(先unpark后park有效)无此概念
中断响应会响应中断会响应中断
异常处理不会抛出 IllegalMonitorStateException可能抛出 IllegalMonitorStateException

FutureTask 的 run 方法

任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理

java
// 当线程池执行FutureTask任务时,会调用的方法 public void run() { // 如果当前任务状态不是NEW,直接return告辞 if (state != NEW || // 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程 // 如果CAS失败,直接return告辞 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 将要执行的任务拿到 Callable<V> c = callable; // 健壮性判断,保证任务不是null // 再次判断任务的状态是NEW(DCL) if (c != null && state == NEW) { // 执行任务 // result:任务的返回结果 // ran:如果为true,任务正常结束。 如果为false,任务异常结束。 V result; boolean ran; try { // 执行任务 result = c.call(); // 正常结果,ran设置为true ran = true; } catch (Throwable ex) { // 如果任务执行期间出了异常 // 返回结果置位null result = null; // ran设置为false ran = false; // 封装异常结果 setException(ex); } if (ran) // 封装正常结果 set(result); } } finally { // 将执行任务的线程置位null runner = null; // 拿到任务的状态 int s = state; // 如果状态大于等于INTERRUPTING if (s >= INTERRUPTING) // 进来代表任务中断,做一些后续处理 handlePossibleCancellationInterrupt(s); } }

注意重复执行问题

java
FutureTask task = new FutureTask(...); new Thread(task).start(); // 正确 new Thread(task).start(); // 错误!任务不会再次执行

FutureTask 的 set&setException 方法

任务执行完毕后,修改任务的状态以及封装任务的结果

java
// 没有异常的时候,正常返回结果 protected void set(V v) { // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 将返回结果赋值给 outcome 属性 outcome = v; // 将任务状态变为NORMAL,正常结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 一会再说…… finishCompletion(); } } // 任务执行期间出现了异常,这边要封装结果 protected void setException(Throwable t) { // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 将异常信息封装到 outcome 属性 outcome = t; // 将任务状态变为EXCEPTIONAL,异常结束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 一会再说…… finishCompletion(); } }

FutureTask 的 cancel 方法

任务取消的一个方式

  • 任务直接从NEW状态转换为CANCEL
  • 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
java
// 取消任务操作 public boolean cancel(boolean mayInterruptIfRunning) { // 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果mayInterruptIfRunning为true // 就需要中断线程 if (mayInterruptIfRunning) { try { // 拿到任务线程 Thread t = runner; if (t != null) // 如果线程不为null,直接interrupt t.interrupt(); } finally { // 将任务状态设置为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 任务结束后的一些处理~~ 一会看~~ finishCompletion(); } return true; }

FutureTask 的 get 方法

这个是线程获取FutureTask任务执行结果的方法

java
// 拿任务结果 public V get() throws InterruptedException, ExecutionException { // 获取任务的状态 int s = state; // 要么是NEW,任务还没执行完 // 要么COMPLETING,任务执行完了,结果还没封装好。 if (s <= COMPLETING) // 让当前线程阻塞,等待结果 s = awaitDone(false, 0L); // 最终想要获取结果,需要执行report方法 return report(s); } // 线程等待FutureTask结果的过程 private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 针对get方法传入了等待时长时,需要计算等到什么时间点 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 声明好需要的Node,queued:放到链表中了么? WaitNode q = null; boolean queued = false; for (;;) { // 查看线程是否中断,如果中断,从等待链表中移除,甩个异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 拿到状态 int s = state; // 到这,说明任务结束了。 if (s > COMPLETING) { if (q != null) // 如果之前封装了WaitNode,现在要清空 q.thread = null; return s; } // 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了 else if (s == COMPLETING) Thread.yield(); // 如果还没初始化WaitNode,初始化 else if (q == null) q = new WaitNode(); // 没放队列的话,直接放到waiters的前面 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 准备挂起线程,如果timed为true,挂起一段时间 else if (timed) { // 计算出最多可以等待多久 nanos = deadline - System.nanoTime(); // 如果等待的时间没了 if (nanos <= 0L) { // 移除当前的Node,返回任务状态 removeWaiter(q); return state; } // 等一会 LockSupport.parkNanos(this, nanos); } else // 死等 LockSupport.park(this); } } // get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果 private V report(int s) throws ExecutionException { // 拿到outcome 返回结果 Object x = outcome; // 如果任务状态是NORMAL,任务正常结束,返回结果 if (s == NORMAL) return (V)x; // 如果任务状态大于等于取消 if (s >= CANCELLED) // 直接抛出异常 throw new CancellationException(); // 到这就是异常结束 throw new ExecutionException((Throwable)x); }

为什么任务会进入等待链表?

  • 如果任务已经完成,get()会立即返回结果
  • 如果任务未完成,调用线程会被包装成WaitNode并加入等待链表
  • 线程随后会通过LockSupport.park()被阻塞

注意结果获取阻塞

java
// 在主线程直接调用get()会导致阻塞 ExecutorService executor = ...; Future<String> future = executor.submit(task); // UI线程避免直接调用 future.get(); // 可能导致界面卡死

FutureTask 的 finishCompletion 方法

只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法

而这个方法其实就是唤醒那些执行get方法等待任务结果的线程

java
// 任务结束后触发 private void finishCompletion() { // 在任务结束后,需要唤醒 for (WaitNode q; (q = waiters) != null;) { // 第一步直接以CAS的方式将WaitNode置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { // 拿到了Node中的线程 Thread t = q.thread; // 如果线程不为null if (t != null) { // 第一步先置位null q.thread = null; // 直接唤醒这个线程 LockSupport.unpark(t); } // 拿到当前Node的next WaitNode next = q.next; // next为null,代表已经将全部节点唤醒了吗,跳出循环 if (next == null) break; // 将next置位null q.next = null; // q的引用指向next q = next; } break; } } // 任务结束后,可以基于这个扩展方法,记录一些信息 done(); // 任务执行完,把callable具体任务置位null callable = null; }

在 FutureTask 中,当调用 get() 方法的线程因为任务未完成而进入等待链表后,当任务正常完成时会执行/唤醒:

  • 当 FutureTask 包装的任务执行完成(包括正常返回和抛出异常)
  • 会触发 finishCompletion() 方法
  • 法会遍历等待链表,逐个唤醒所有等待线程

等待链表为什么会包含多个节点?

等待链表可以包含多个节点,典型场景包括:

  • 多个线程同时调用get()方法等待同一个FutureTask的结果
  • 每个等待线程都会被封装为一个WaitNode并按调用顺序链接起来

示例:

java
FutureTask<String> task = new FutureTask<>(...); // 线程A new Thread(() -> task.get()).start(); // 线程B new Thread(() -> task.get()).start(); // 线程C new Thread(() -> task.get()).start();

FutureTask 原理

内部状态

状态变量:FutureTask内部使用一个volatile修饰的整数变量来表示状态,这个变量有不同的取值,对应不同的任务状态,如新建(NEW)、已完成(COMPLETED)、已取消(CANCELLED)等。这些状态的转换是原子操作,通过Unsafe类或CAS(Compare - and - Swap)机制来保证线程安全。

状态转换:例如,当任务开始执行时,状态从NEW转换为RUNNING,当任务执行成功完成后,状态转换为COMPLETED,如果任务被取消,状态转换为CANCELLED或INTERRUPTED(取决于取消的方式)。

JDK8 及之前的实现机制

基于 AQS(AbstractQueuedSynchronizer)的同步机制:FutureTask的底层实现依赖于AQS来实现同步和阻塞。AQS是一个用于构建锁和同步器的框架,它提供了基于队列的等待和唤醒机制。FutureTask通过继承AQS来实现自己的同步逻辑。

等待获取结果:当一个线程调用FutureTask的get方法时,如果任务尚未完成,该线程会被封装成一个Node添加到AQS的等待队列中,然后线程会被阻塞。这个等待队列是一个双向链表结构,用于管理等待获取结果的线程。

任务完成后的唤醒:当任务完成后,FutureTask会通过AQS的唤醒机制,将等待队列中的线程逐个唤醒。唤醒的线程会再次尝试获取任务的结果,如果任务已经完成,就可以成功获取结果,否则会再次被阻塞。

结果存储和可见性:任务的结果存储在FutureTask内部的一个变量中,通过volatile修饰来保证结果的内存可见性。当任务完成后,结果会被正确地写入这个变量,并且其他等待获取结果的线程可以立即看到这个结果。

JDK9 及以后的改变

在早期版本中:

  • FutureTask内部维护了一个Sync内部类,它继承自AQS
  • 使用AQS的状态(state)来表示任务的状态(未开始、运行中、已完成等)
  • 依赖AQS的等待队列机制来管理等待任务结果的线程

这种实现方式确实直接依赖于AQS。

从JDK9开始,FutureTask 的实现进行了重构:

  • 移除了对AQS的直接依赖:不再使用继承AQS的内部类
  • 改用更简单的状态管理:使用volatile int state和 Unsafe.CAS 操作直接管理状态
  • 简化等待机制:使用LockSupport进行线程阻塞/唤醒
  • 使用简单的单向链表管理等待线程

改变的主要原因

  • AQS对于FutureTask的需求来说过于重量级
  • 直接使用CAS和volatile可以获得更好的性能
  • 简化实现,减少维护成本

与线程池的协作

线程池中的任务调度:当FutureTask被提交给线程池(如ExecutorService)时,线程池会从自己的工作队列中取出FutureTask并分配给一个空闲的线程来执行。线程池中的线程在执行FutureTask时,与直接执行FutureTask的原理是一样的,都是通过AQS来实现同步和阻塞,以及通过状态转换来管理任务的执行过程。

线程池的资源管理和优化:线程池可以根据自身的配置和当前的负载情况,合理地分配资源来执行FutureTask。例如,一个ThreadPoolExecutor可以根据核心线程数、最大线程数、队列容量等参数来决定是立即执行FutureTask,还是将其放入队列中等待执行,或者拒绝执行(如果队列已满且线程数达到最大线程数)。这有助于提高系统的整体性能和资源利用率。

总结

FutureTask 是一个同步非阻塞处理任务的方式,FutureTask 获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果,性能影响较大,所以需要一个异步非阻塞处理任务的方式。

CompletableFuture 在一定程度上就提供了各种异步非阻塞的处理方案,并且提供响应式编程,代码编写上,效果更佳。CompletableFuture是JDK1.8 Doug Lea研发,CompletableFuture也是实现了Future接口实现的功能,提供非常丰富的函数去执行各种异步操作,可以不使用FutureTask,直接使用CompletableFuture即可。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!