Java 创建线程的方式,一般常用的是 Thread
、Runnable
,如果需要当前处理的任务有返回结果的话,
需要使用 Callable
,Callable
运行需要配合 Future
,那 Future
是个什么鬼?FutureTask
又是个什么鬼?你是否深入了解过源码的处理?接下来我们将分析 Future
接口和 FutureTask
类的关系,然后深入分析 FutureTask
的源码实现。
Future
表示 异步计算的结果,是 Java 并发包的核心接口,定义了异步计算结果的基本操作,它提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。它通常与 ExecutorService
一起使用,以异步方式执行任务,一般会使用 FutureTask
实现类去接收 Callable
任务的返回结果。
Future
允许你启动一个可能耗时的计算,而不必等待它完成就能继续执行其他任务。然后,你可以使用 Future
对象来查询计算是否完成,等待计算完成,并检索计算的结果。
主要的方法包括:
boolean isDone()
:检查计算是否完成,如果任务完成,则返回 true。boolean isCancelled()
:检查任务是否已取消,如果任务被取消,则返回 true。V get()
:等待任务完成,获取计算结果(阻塞直到完成)。V get(long timeout, TimeUnit unit)
:等待任务在给定的时间范围内完成,并获取其结果。boolean cancel(boolean mayInterruptIfRunning)
:尝试取消任务的执行。FutureTask 属于 java.util.concurrent 包,是一个可以取消异步任务的类。
从源码中的文档注释可以看到,FutureTask
是 Future
的一个实现类,ta同时实现了 Runnable
和Future
接口。因此,它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
FutureTask类提供了一个Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。结果只能在计算完成后才能检索; 如果计算尚未完成,则get方法将阻止。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。
从上面的FutureTask类图中可以看出,FutureTask实现了RunnableFuture接口,RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask兼备Runnable和Future两种特性
异步计算:在多线程编程中,FutureTask用于封装一个可调用任务(例如实现了Callable接口的任务),并允许在一个单独的线程中执行该任务。这样可以在执行耗时操作(如网络请求、文件读取、复杂计算等)时,不会阻塞主线程或其他线程的执行。
结果获取:提供了一种机制来获取异步计算的结果。通过FutureTask的get方法,可以在任务完成后获取其执行结果,如果任务尚未完成,get方法可以阻塞当前线程,直到任务完成并返回结果。
大方向是FutureTask对任务的控制:
FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。
javapublic static void main(String[] args) throws InterruptedException {
// 构建FutureTask,基于泛型执行返回结果类型
// 在有参构造中,声明Callable或者Runnable指定任务
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("任务开始执行……");
Thread.sleep(2000);
System.out.println("任务执行完毕……");
return "OK!";
});
// 构建线程池
ExecutorService service = Executors.newFixedThreadPool(10);
// 线程池执行任务
service.execute(futureTask);
// futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法
// run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。
// 如果希望任务可以反复被执行,需要去调用runAndReset方法
// futureTask.run();
// 对返回结果的获取,类似阻塞队列的poll方法
// 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
// try {
// String s = futureTask.get(3000, TimeUnit.MILLISECONDS);
// System.out.println("返回结果:" + s);
// } catch (Exception e) {
// System.out.println("异常返回:" + e.getMessage());
// e.printStackTrace();
// }
// 对返回结果的获取,类似阻塞队列的take方法,死等结果
// try {
// String s = futureTask.get();
// System.out.println("任务结果:" + s);
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
// 对任务状态的控制
// System.out.println("任务结束了么?:" + futureTask.isDone());
// Thread.sleep(1000);
// System.out.println("任务结束了么?:" + futureTask.isDone());
// Thread.sleep(1000);
// System.out.println("任务结束了么?:" + futureTask.isDone());
}
看FutureTask的源码,要从几个方向去看:
FutureTask 使用状态机管理任务生命周期,首先要清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。
java/**
FutureTask的核心属性
FutureTask任务的状态流转
* NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED 任务被中断
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW = 0; // 初始状态
private static final int COMPLETING = 1; // 执行完成,结果设置中
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 执行异常
private static final int CANCELLED = 4; // 任务取消
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED = 6; // 已中断
private Callable<V> callable; // 实际执行的任务
private Object outcome; // 存储任务结果或异常
private volatile Thread runner; // 执行任务的线程
private volatile WaitNode waiters; // 等待结果的线程链表
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
LockSupport
LockSupport 是 Java 并发包中的一个基础工具类,提供线程阻塞和唤醒的基本能力,它是构建更高级同步器(如锁、屏障等)的基础。
主要特点:
此类与使用它的每个线程关联一个许可(在{@link Semaphore Semaphore
}类的意义上)。
如果许可证可用,将立即返回对{@code park
}的调用,并在此过程中使用它;否则可能会被阻塞。如果许可证尚未可用,则调用{@code unpark
}可使其可用。(不过,与信号量不同,许可证不会累积。最多只有一个。)
核心方法:
java// 阻塞当前线程
static void park()
// 阻塞当前线程,有超时时间
static void parkNanos(long nanos)
// 阻塞直到某个时间点
static void parkUntil(long deadline)
// 唤醒指定线程
static void unpark(Thread thread)
与 wait()/notify()
比较:
特性 | LockSupport | wait()/notify() |
---|---|---|
所属层次 | JUC 底层工具 | Object 原生方法 |
需要同步块 | 不需要 | 必须在 synchronized 块中使用 |
唤醒顺序 | 精确指定线程 | 随机唤醒一个等待线程 |
许可机制 | 有许可概念(先unpark后park有效) | 无此概念 |
中断响应 | 会响应中断 | 会响应中断 |
异常处理 | 不会抛出 IllegalMonitorStateException | 可能抛出 IllegalMonitorStateException |
任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理
java// 当线程池执行FutureTask任务时,会调用的方法
public void run() {
// 如果当前任务状态不是NEW,直接return告辞
if (state != NEW ||
// 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程
// 如果CAS失败,直接return告辞
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
// 将要执行的任务拿到
Callable<V> c = callable;
// 健壮性判断,保证任务不是null
// 再次判断任务的状态是NEW(DCL)
if (c != null && state == NEW) {
// 执行任务
// result:任务的返回结果
// ran:如果为true,任务正常结束。 如果为false,任务异常结束。
V result;
boolean ran;
try {
// 执行任务
result = c.call();
// 正常结果,ran设置为true
ran = true;
} catch (Throwable ex) {
// 如果任务执行期间出了异常
// 返回结果置位null
result = null;
// ran设置为false
ran = false;
// 封装异常结果
setException(ex);
}
if (ran)
// 封装正常结果
set(result);
}
} finally {
// 将执行任务的线程置位null
runner = null;
// 拿到任务的状态
int s = state;
// 如果状态大于等于INTERRUPTING
if (s >= INTERRUPTING)
// 进来代表任务中断,做一些后续处理
handlePossibleCancellationInterrupt(s);
}
}
注意重复执行问题
javaFutureTask task = new FutureTask(...);
new Thread(task).start(); // 正确
new Thread(task).start(); // 错误!任务不会再次执行
任务执行完毕后,修改任务的状态以及封装任务的结果
java// 没有异常的时候,正常返回结果
protected void set(V v) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将返回结果赋值给 outcome 属性
outcome = v;
// 将任务状态变为NORMAL,正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 一会再说……
finishCompletion();
}
}
// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常信息封装到 outcome 属性
outcome = t;
// 将任务状态变为EXCEPTIONAL,异常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
// 一会再说……
finishCompletion();
}
}
任务取消的一个方式
java// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {
// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning
// 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTING
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 如果mayInterruptIfRunning为true
// 就需要中断线程
if (mayInterruptIfRunning) {
try {
// 拿到任务线程
Thread t = runner;
if (t != null)
// 如果线程不为null,直接interrupt
t.interrupt();
} finally {
// 将任务状态设置为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 任务结束后的一些处理~~ 一会看~~
finishCompletion();
}
return true;
}
这个是线程获取FutureTask任务执行结果的方法
java// 拿任务结果
public V get() throws InterruptedException, ExecutionException {
// 获取任务的状态
int s = state;
// 要么是NEW,任务还没执行完
// 要么COMPLETING,任务执行完了,结果还没封装好。
if (s <= COMPLETING)
// 让当前线程阻塞,等待结果
s = awaitDone(false, 0L);
// 最终想要获取结果,需要执行report方法
return report(s);
}
// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// 针对get方法传入了等待时长时,需要计算等到什么时间点
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 声明好需要的Node,queued:放到链表中了么?
WaitNode q = null;
boolean queued = false;
for (;;) {
// 查看线程是否中断,如果中断,从等待链表中移除,甩个异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 拿到状态
int s = state;
// 到这,说明任务结束了。
if (s > COMPLETING) {
if (q != null)
// 如果之前封装了WaitNode,现在要清空
q.thread = null;
return s;
}
// 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了
else if (s == COMPLETING)
Thread.yield();
// 如果还没初始化WaitNode,初始化
else if (q == null)
q = new WaitNode();
// 没放队列的话,直接放到waiters的前面
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 准备挂起线程,如果timed为true,挂起一段时间
else if (timed) {
// 计算出最多可以等待多久
nanos = deadline - System.nanoTime();
// 如果等待的时间没了
if (nanos <= 0L) {
// 移除当前的Node,返回任务状态
removeWaiter(q);
return state;
}
// 等一会
LockSupport.parkNanos(this, nanos);
}
else
// 死等
LockSupport.park(this);
}
}
// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {
// 拿到outcome 返回结果
Object x = outcome;
// 如果任务状态是NORMAL,任务正常结束,返回结果
if (s == NORMAL)
return (V)x;
// 如果任务状态大于等于取消
if (s >= CANCELLED)
// 直接抛出异常
throw new CancellationException();
// 到这就是异常结束
throw new ExecutionException((Throwable)x);
}
为什么任务会进入等待链表?
注意结果获取阻塞
java// 在主线程直接调用get()会导致阻塞
ExecutorService executor = ...;
Future<String> future = executor.submit(task);
// UI线程避免直接调用
future.get(); // 可能导致界面卡死
只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法
而这个方法其实就是唤醒那些执行get方法等待任务结果的线程
java// 任务结束后触发
private void finishCompletion() {
// 在任务结束后,需要唤醒
for (WaitNode q; (q = waiters) != null;) {
// 第一步直接以CAS的方式将WaitNode置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 拿到了Node中的线程
Thread t = q.thread;
// 如果线程不为null
if (t != null) {
// 第一步先置位null
q.thread = null;
// 直接唤醒这个线程
LockSupport.unpark(t);
}
// 拿到当前Node的next
WaitNode next = q.next;
// next为null,代表已经将全部节点唤醒了吗,跳出循环
if (next == null)
break;
// 将next置位null
q.next = null;
// q的引用指向next
q = next;
}
break;
}
}
// 任务结束后,可以基于这个扩展方法,记录一些信息
done();
// 任务执行完,把callable具体任务置位null
callable = null;
}
在 FutureTask 中,当调用 get() 方法的线程因为任务未完成而进入等待链表后,当任务正常完成时会执行/唤醒:
等待链表为什么会包含多个节点?
等待链表可以包含多个节点,典型场景包括:
示例:
javaFutureTask<String> task = new FutureTask<>(...);
// 线程A
new Thread(() -> task.get()).start();
// 线程B
new Thread(() -> task.get()).start();
// 线程C
new Thread(() -> task.get()).start();
状态变量:FutureTask内部使用一个volatile修饰的整数变量来表示状态,这个变量有不同的取值,对应不同的任务状态,如新建(NEW)、已完成(COMPLETED)、已取消(CANCELLED)等。这些状态的转换是原子操作,通过Unsafe类或CAS(Compare - and - Swap)机制来保证线程安全。
状态转换:例如,当任务开始执行时,状态从NEW转换为RUNNING,当任务执行成功完成后,状态转换为COMPLETED,如果任务被取消,状态转换为CANCELLED或INTERRUPTED(取决于取消的方式)。
基于 AQS(AbstractQueuedSynchronizer)的同步机制:FutureTask的底层实现依赖于AQS来实现同步和阻塞。AQS是一个用于构建锁和同步器的框架,它提供了基于队列的等待和唤醒机制。FutureTask通过继承AQS来实现自己的同步逻辑。
等待获取结果:当一个线程调用FutureTask的get方法时,如果任务尚未完成,该线程会被封装成一个Node添加到AQS的等待队列中,然后线程会被阻塞。这个等待队列是一个双向链表结构,用于管理等待获取结果的线程。
任务完成后的唤醒:当任务完成后,FutureTask会通过AQS的唤醒机制,将等待队列中的线程逐个唤醒。唤醒的线程会再次尝试获取任务的结果,如果任务已经完成,就可以成功获取结果,否则会再次被阻塞。
结果存储和可见性:任务的结果存储在FutureTask内部的一个变量中,通过volatile修饰来保证结果的内存可见性。当任务完成后,结果会被正确地写入这个变量,并且其他等待获取结果的线程可以立即看到这个结果。
在早期版本中:
这种实现方式确实直接依赖于AQS。
从JDK9开始,FutureTask 的实现进行了重构:
改变的主要原因
线程池中的任务调度:当FutureTask被提交给线程池(如ExecutorService)时,线程池会从自己的工作队列中取出FutureTask并分配给一个空闲的线程来执行。线程池中的线程在执行FutureTask时,与直接执行FutureTask的原理是一样的,都是通过AQS来实现同步和阻塞,以及通过状态转换来管理任务的执行过程。
线程池的资源管理和优化:线程池可以根据自身的配置和当前的负载情况,合理地分配资源来执行FutureTask。例如,一个ThreadPoolExecutor可以根据核心线程数、最大线程数、队列容量等参数来决定是立即执行FutureTask,还是将其放入队列中等待执行,或者拒绝执行(如果队列已满且线程数达到最大线程数)。这有助于提高系统的整体性能和资源利用率。
FutureTask 是一个同步非阻塞处理任务的方式,FutureTask 获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果,性能影响较大,所以需要一个异步非阻塞处理任务的方式。
CompletableFuture
在一定程度上就提供了各种异步非阻塞的处理方案,并且提供响应式编程,代码编写上,效果更佳。CompletableFuture是JDK1.8 Doug Lea研发,CompletableFuture也是实现了Future接口实现的功能,提供非常丰富的函数去执行各种异步操作,可以不使用FutureTask,直接使用CompletableFuture即可。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!