2025-06-09
内卷九股文
0

目录

线程池
JDK自带的构建线程池的方式 Executors
固定大小的线程池 (newFixedThreadPool)
单线程的线程池 (newSingleThreadExecutor)
可缓存的线程池 (newCachedThreadPool)
定时或周期性任务的线程池 (newScheduledThreadPool)
ThreadPoolExecutor
为什么要自定义线程池
ThreadPoolExecutor 应用
ThreadPoolExecutor 源码剖析
核心属性
有参构造
execute 方法
addWorker 方法
Worker 工作线程
ThreadPoolExecutor 的 runWorker 方法
getTask 方法
shutdownNow 关闭方法
线程池的核心参数设计规则
线程池参数示例
队列选型指南
线程池处理任务的核心流程
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 应用
ScheduledThreadPoolExecutor 源码剖析
核心属性
schedule 方法
At 和 With 方法 & 任务的 run 方法
总结

在开始以前,先思考下什么是线程创建有哪几种方式,为了避免频繁创建和销毁线程造成不必要的性能,一般在使用线程时,会采用线程池,那什么是线程池呢?

线程池

Java线程池是并发编程的核心组件,而 ThreadPoolExecutor 作为其最核心的实现,通过池化技术和精细化任务调度,解决了频繁创建/销毁线程的资源消耗问题。

本文将从设计思想、工作原理到源码实现,全方位解析 ThreadPoolExecutor 的底层机制。

JDK自带的构建线程池的方式 Executors

JDK 中基于 Executors 提供了很多种线程池

固定大小的线程池 (newFixedThreadPool)

创建一个固定大小的线程池,可控制线程最大并发数,超出的线程会在队列中等待。

java
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

构建时,需要给 newFixedThreadPool 方法提供一个 nThreads 的属性,而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用 ThreadPoolExecutor

构建好当前线程池后,线程个数已经固定好。如果线程没构建,线程会等待着任务执行时被创建(线程是懒加载,在构建之初,线程并没有构建出来,而是随着人任务的提交才会将线程在线程池中构建出来)。

如果线程都已经构建好了,此时任务会被放到 LinkedBlockingQueue 无界队列中存放,等待线程从LinkedBlockingQueue 中去 take 出任务,然后执行。

java
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.execute(() -> { System.out.println("1号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); threadPool.execute(() -> { System.out.println("2号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); threadPool.execute(() -> { System.out.println("3号任务:" + Thread.currentThread().getName() + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); }

单线程的线程池 (newSingleThreadExecutor)

这个线程池看名字就知道是 单例线程池,线程池中只有一个工作线程在处理任务。当你希望所有的任务都在一个线程中顺序执行时使用。这可以保证任务的顺序执行,适用于需要串行处理任务的场景。

java
// 当前这里就是构建单例线程池的方式 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService // 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1 // 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中 // 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序 // 当前这种线程池可以用作顺序处理的一些业务中 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

示例代码:

java
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "111"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "222"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "333"); }); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "," + "444"); }); }

可缓存的线程池 (newCachedThreadPool)

适用于执行大量短期异步任务的情况。线程池会根据需要创建新线程,并且在长时间闲置后自动终止线程。适用于执行大量独立且耗时短的任务。

java
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

最大的一个特点,任务只要提交到当前的 newCachedThreadPool 中,就必然有工作线程可以处理,其本质是 SynchronousQueue 不存储任何元素,直接传递,是一个 生产者-消费者 模型,这也是线程数量设置无界的原因。

  • 当第一次提交任务到线程池时,会直接构建一个工作线程
  • 这个工作线程带执行完后,默认60秒没有任务可以执行后,会结束
  • 如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行
  • 如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。

代码示例:

java
public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 1; i <= 200; i++) { final int j = i; executorService.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + j); }); } }

定时或周期性任务的线程池 (newScheduledThreadPool)

用于需要定时或定期执行任务的场景。你可以使用 schedulescheduleAtFixedRate 方法来安排任务的执行。

来看一下源码中如何构建的:

java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

基于这个方法可以看到,构建的是 ScheduledThreadPoolExecutor 线程池

java
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{ //.... }

所以本质上还是正常线程池,只不过在原来的线程池基础上实现了定时任务的功能。

原理是基于 DelayQueue 实现的延迟执行,周期性执行是任务执行完毕后,再次扔回到阻塞队列。

代码示例:

java
public static void main(String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // 正常执行 // pool.execute(() -> { // System.out.println(Thread.currentThread().getName() + ":1"); // }); // 延迟执行,执行当前任务延迟5s后再执行 // pool.schedule(() -> { // System.out.println(Thread.currentThread().getName() + ":2"); // },5,TimeUnit.SECONDS); // 周期执行,当前任务第一次延迟5s执行,然后没3s执行一次 // 这个方法在计算下次执行时间时,是从任务刚刚开始时就计算。 // pool.scheduleAtFixedRate(() -> { // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(System.currentTimeMillis() + ":3"); // },2,1,TimeUnit.SECONDS); // 周期执行,当前任务第一次延迟5s执行,然后没3s执行一次 // 这个方法在计算下次执行时间时,会等待任务结束后,再计算时间 pool.scheduleWithFixedDelay(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + ":3"); },2,1,TimeUnit.SECONDS); }

至于 Executors 提供的 newSingleThreadScheduledExecutor 单例的定时任务线程池就不说了,一个线程的线程池可以延迟或者以一定的周期执行一个任务。

ThreadPoolExecutor

前面讲到的 Executors 中的构建线程池的方式,大多数还是基于 ThreadPoolExecutor 去 new 出来的。

为什么要自定义线程池

首先 ThreadPoolExecutor 中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。

但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去 new ThreadPoolExecutor 手动设置一些核心属性。

自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。

查看一下ThreadPoolExecutor提供的七个核心参数

java
public ThreadPoolExecutor( int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁) int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程) long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间 TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位 BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中 ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息 RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略 // 初始化线程池的操作 }

线程池的工作流程

当一个任务执行时,优先启用核心线程;当核心线程占满以后,任务会进入到阻塞队列中;当阻塞队列打满时,会开始创建最大线程,开始处理任务;当核心线程、任务队列、最大线程都打满,就触发了拒绝策略;当任务处理完,在达到设定的等待时间后,最大线程会被销毁。如果设置了 allowCoreThreadTimeOut 为true,那核心线程达到设定的等待时间,也会被销毁。

ThreadPoolExecutor 应用

手动new一下,处理的方式还是执行 execute 或者 submit 方法。

JDK提供的几种拒绝策略:

  • AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常
  • CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理
  • DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉
  • DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理
  • 自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操作。

代码构建线程池,并处理有无返回结果的任务

java
public static void main(String[] args) throws ExecutionException, InterruptedException { //1. 构建线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("test-ThreadPoolExecutor"); return thread; } }, new MyRejectedExecution() ); //2. 让线程池处理任务,没返回结果 threadPool.execute(() -> { System.out.println("没有返回结果的任务"); }); //3. 让线程池处理有返回结果的任务 Future<Object> future = threadPool.submit(new Callable<Object>() { @Override public Object call() throws Exception { System.out.println("我有返回结果!"); return "返回结果"; } }); Object result = future.get(); System.out.println(result); //4. 如果是局部变量的线程池,记得用完要shutdown threadPool.shutdown(); } private static class MyRejectedExecution implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("根据自己的业务情况,决定编写的代码!"); } }

ThreadPoolExecutor 源码剖析

线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。

核心属性

核心属性主要就是ctl,定义一个原子类,基于ctl拿到线程池的状态以及工作线程个数,来看下源码文档中的注释翻译:

主池控制状态 ctl 是一个原子整数,它打包了两个概念字段:

  • workerCount:表示线程的有效数量。
  • runState:表示是否正在运行、关闭等(也就是运行状态)。

为了将它们打包成一个int,我们将workerCount限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个可表示的线程。如果将来出现问题,可以将变量更改为AtomicLong,并调整下面的shiftmask常量。但是,在需要之前,使用int可以更快、更简单地执行此代码。

workerCount 是允许启动和不允许停止的工人数量。该值可能暂时不同于实际活动线程的数量,例如当ThreadFactory在被询问时无法创建线程,以及当退出的线程在终止前仍在执行簿记时。用户可见的池大小报告为工作集的当前大小。

runState 提供主要的生命周期控制,取值为:

  • RUNNING:接受新任务和进程排队任务
  • SHUTDOWN:不接受新任务,但进程排队任务
  • STOP:不接受新任务,不处理排队任务,中断正在进行的任务
  • TIDYING:所有任务均已终止,workerCount 为0,转换到状态 TIDING 的线程将运行 terminated() 钩子方法
  • TERMINATED:terminated() 已完成。

状态的转换为:

  • RUNNING -> SHUTDOWN : 调用 shutdown() 时
  • (RUNNING 或 SHUTDOWN)-> STOP : 调用 shutdownNow()时
  • SHUTDOWN -> TIDING : 队列和池都为空时
  • STOP -> TIDING : 池为空时
  • TIDING -> TERMINATED 当 terminated() 钩子方法完成时

在整个线程池的执行流程中,会基于ctl判断上述两个内容

java
// 当前是线程池的核心属性 // 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。 // ctl表示着线程池中的2个核心状态: // 线程池的状态:ctl的高3位,表示线程池状态 // 工作线程的数量:ctl的低29位,表示工作线程的个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer.SIZE:在获取Integer的bit位个数 // 声明了一个常量:COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 00000000 00000000 00000000 00000001 // 00100000 00000000 00000000 00000000 // 00011111 11111111 11111111 11111111 // CAPACITY就是当前工作线程能记录的工作线程的最大个数 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池状态的表示 // 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理 // 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。 private static final int RUNNING = -1 << COUNT_BITS; // 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。 private static final int STOP = 1 << COUNT_BITS; // 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。 private static final int TIDYING = 2 << COUNT_BITS; // 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。 private static final int TERMINATED = 3 << COUNT_BITS; // 在使用下面这几个方法时,需要传递ctl进来 // 基于&运算的特点,保证只会拿到ctl高三位的值。 private static int runStateOf(int c) { return c & ~CAPACITY; } // 基于&运算的特点,保证只会拿到ctl低29位的值。 private static int workerCountOf(int c) { return c & CAPACITY; }

线程池状态的特点以及转换的方式

image.png

有参构造

有参构造没啥说的,记住核心线程个数是允许为0的。

java
// 有参构造。无论调用哪个有参构造,最终都会执行当前的有参构造 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 健壮性校验 // 核心线程个数是允许为0个的。 // 最大线程数必须大于0,最大线程数要大于等于核心线程数 // 非核心线程的最大空闲时间,可以等于0 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) // 不满足要求就抛出参数异常 throw new IllegalArgumentException(); // 阻塞队列,线程工厂,拒绝策略都不允许为null,为null就扔空指针异常 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 不要关注当前内容,系统资源访问决策,和线程池核心业务关系不大。 this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // 各种赋值,JUC包下,几乎所有涉及到线程挂起的操作,单位都用纳秒。 // 有参构造的值,都赋值给成员变量。 // Doug Lea的习惯就是将成员变量作为局部变量单独操作。 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

execute 方法

execute 方法是提交任务到线程池的核心方法,线程池的执行流程其实说的就是 execute 方法内部做了哪些判断,接下来看 execute 源码的分析

java
// 提交任务到线程池的核心方法 // command就是提交过来的任务 public void execute(Runnable command) { // 提交的任务不能为null if (command == null) throw new NullPointerException(); // 获取核心属性ctl,用于后面的判断 int c = ctl.get(); // 如果工作线程个数,小于核心线程数。 // 满足要求,添加核心工作线程 if (workerCountOf(c) < corePoolSize) { // addWorker(任务,是核心线程吗) // addWorker返回true:代表添加工作线程成功 // addWorker返回false:代表添加工作线程失败 // addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程 if (addWorker(command, true)) // 工作线程构建出来了,任务也交给command去处理了。 return; // 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl c = ctl.get(); } // 添加核心工作线程失败,往这走 // 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 如果任务添加到阻塞队列成功,走if内部 // 如果任务在扔到阻塞队列之前,线程池状态突然改变了。 // 重新获取ctl int recheck = ctl.get(); // 如果线程池的状态不是RUNNING,将任务从阻塞队列移除, if (!isRunning(recheck) && remove(command)) // 并且直接拒绝策略 reject(command); // 在这,说明阻塞队列有我刚刚放进去的任务 // 查看一下工作线程数是不是0个 // 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务 // 发生这种情况有两种: // 1. 构建线程池时,核心线程数是0个。 // 2. 即便有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut为true,代表核心线程也可以超时 else if (workerCountOf(recheck) == 0) // 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理 addWorker(null, false); } // 任务添加到阻塞队列失败 // 构建一个非核心工作线程 // 如果添加非核心工作线程成功,直接完事,告辞 else if (!addWorker(command, false)) // 添加失败,执行决绝策略 reject(command); }

execute 方法的完整执行流程图

image.png

addWorker 方法

addWorker 中主要分成两大块去看

  • 第一块:校验线程池的状态以及工作线程个数
  • 第二块:添加工作线程并且启动工作线程

校验线程池的状态以及工作线程个数

java
// 添加工作线程之校验源码 private boolean addWorker(Runnable firstTask, boolean core) { // 外层for循环在校验线程池的状态 // 内层for循环是在校验工作线程的个数 // retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环 retry: for (;;) { // 获取ctl int c = ctl.get(); // 拿到ctl的高3位的值 int rs = runStateOf(c); // ========================== 线程池状态判断 ========================== // 如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程个数为0,添加一个工作线程去处理阻塞队列的任务 // 判断线程池的状态是否大于等于SHUTDOWN,如果满足,说明线程池不是RUNNING if (rs >= SHUTDOWN && // 如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务 // 如果三个条件有一个没满足,返回false,配合!,就代表不需要添加 !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 不需要添加工作线程 return false; for (;;) { // ========================== 工作线程个数判断 ========================== // 基于ctl拿到低29位的值,代表当前工作线程个数 int wc = workerCountOf(c); // 如果工作线程个数大于最大值了,不可以添加了,返回false if (wc >= CAPACITY || // 基于core来判断添加的是否是核心工作线程 // 如果是核心:基于corePoolSize去判断 // 如果是非核心:基于maximumPoolSize去判断 wc >= (core ? corePoolSize : maximumPoolSize)) // 代表不能添加,工作线程个数不满足要求 return false; // 针对ctl进行 + 1,采用CAS的方式 if (compareAndIncrementWorkerCount(c)) // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。 break retry; // 重新获取一次ctl的值 c = ctl.get(); // 判断重新获取到的ctl中,表示的线程池状态跟之前的是否有区别 // 如果状态不一样,说明有变化,重新的去判断线程池状态 if (runStateOf(c) != rs) // 跳出一次外层for循环 continue retry; } } // 省略添加工作线程以及启动的过程,下面讲解 ... }

添加工作线程并且启动工作线程

java
private boolean addWorker(Runnable firstTask, boolean core) { // 省略校验部分的代码,上面为校验代码 ... // 添加工作线程以及启动工作线程~~~ // 声明了三个变量 // 工作线程启动了没,默认false boolean workerStarted = false; // 工作线程添加了没,默认false boolean workerAdded = false; // 工作线程,默认为null Worker w = null; try { // 构建工作线程,并且将任务传递进去 w = new Worker(firstTask); // 获取了Worker中的Thread对象 final Thread t = w.thread; // 判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker // 一般如果为null,代表ThreadFactory有问题。 if (t != null) { // 加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次获取线程池状态。 int rs = runStateOf(ctl.get()); // 再次判断 // 如果满足 rs < SHUTDOWN 说明线程池是RUNNING,状态正常,执行if代码块 // 如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 到这,可以添加工作线程。 // 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); // private final HashSet<Worker> workers = new HashSet<Worker>(); // 将new好的Worker添加到HashSet中。 workers.add(w); // 获取了HashSet的size,拿到工作线程个数 int s = workers.size(); // largestPoolSize在记录最大线程个数的记录 // 如果当前工作线程个数,大于最大线程个数的记录,就赋值 if (s > largestPoolSize) largestPoolSize = s; // 添加工作线程成功 workerAdded = true; } } finally { mainLock.unlock(); } // 如果工作线程添加成功, if (workerAdded) { // 直接启动Worker中的线程 t.start(); // 启动工作线程成功 workerStarted = true; } } } finally { // 做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉 if (!workerStarted) addWorkerFailed(w); } // 返回工作线程是否启动成功 return workerStarted; } // 工作线程启动失败,需要不的步长操作 private void addWorkerFailed(Worker w) { // 因为操作了workers,需要加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 如果w不为null,之前Worker已经new出来了。 if (w != null) // 从HashSet中移除 workers.remove(w); // 同时对ctl进行 - 1,代表去掉了一个工作线程个数 decrementWorkerCount(); // 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了。 tryTerminate(); } finally { // 释放锁 mainLock.unlock(); } }

Worker 工作线程

Worker 对象主要包含了两个内容

  • 工作线程要执行任务
  • 工作线程可能会被中断,控制中断
java
// Worker 继承了AQS,目的就是为了控制工作线程的中断。 // Worker 实现了 Runnable,内置 Thread 对象,在执行start时,必然要执行 Worker 中断额外一些操作 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // =======================Worker管理任务================================ // 线程工厂构建的线程 final Thread thread; // 当前Worker要执行的任务 Runnable firstTask; // 记录当前工作线程处理了多少个任务。 volatile long completedTasks; // 有参构造 Worker(Runnable firstTask) { // 将State设置为-1,代表当前不允许中断线程 setState(-1); // 任务赋值 this.firstTask = firstTask; // 基于线程工作构建Thread,并且传入的Runnable是Worker this.thread = getThreadFactory().newThread(this); } // 当thread执行start方法时,调用的是Worker的run方法, public void run() { // 任务执行时,执行的是runWorker方法 runWorker(this); } // =======================Worker管理中断================================ // 当前方法是中断工作线程时,执行的方法 void interruptIfStarted() { Thread t; // 只有Worker中的state >= 0的时候,可以中断工作线程 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 如果状态正常,并且线程未中断,这边就中断线程 t.interrupt(); } catch (SecurityException ignore) { } } } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }

ThreadPoolExecutor 的 runWorker 方法

runWorker就是让工作线程拿到任务去执行即可。

并且在内部也处理了在工作线程正常结束和异常结束时的处理方案

java
// 工作线程启动后执行的任务。 final void runWorker(Worker w) { // 拿到当前线程 Thread wt = Thread.currentThread(); // 从worker对象中拿到任务 Runnable task = w.firstTask; // 将Worker中的firstTask置位空 w.firstTask = null; // 将Worker中的state置位0,代表当前线程可以中断的 w.unlock(); // allow interrupts // 判断工作线程是否是异常结束,默认就是异常结束 boolean completedAbruptly = true; try { // 获取任务 // 直接拿到第一个任务去执行 // 如果第一个任务为null,去阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { // 执行了Worker的lock方法,当前在lock时,shutdown操作不能中断当前线程,因为当前线程正在处理任务 w.lock(); // 比较ctl >= STOP,如果满足找个状态,说明线程池已经到了STOP状态甚至已经要凉凉了 // 线程池到STOP状态,并且当前线程还没有中断,确保线程是中断的,进到if内部执行中断方法 // if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中断线程} // 如果线程池状态不是STOP,确保线程不是中断的。 // 如果发现线程中断标记位是true了,再次查看线程池状态是大于STOP了,再次中断线程 // 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了。 if ( ( runStateAtLeast(ctl.get(), STOP) || ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) ) ) && !wt.isInterrupted()) wt.interrupt(); try { // 勾子函数在线程池中没有做任何的实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写勾子函数 // 前置勾子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务。 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 前后置勾子函数 afterExecute(task, thrown); } } finally { // 任务执行完,丢掉任务 task = null; // 当前工作线程处理的任务数+1 w.completedTasks++; // 执行unlock方法,此时shutdown方法才可以中断当前线程 w.unlock(); } } // 如果while循环结束,正常走到这,说明是正常结束 // 正常结束的话,在getTask中就会做一个额外的处理,将ctl - 1,代表工作线程没一个。 completedAbruptly = false; } finally { // 考虑干掉工作线程 processWorkerExit(w, completedAbruptly); } } // 工作线程结束前,要执行当前方法 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果是异常结束 if (completedAbruptly) // 将ctl - 1,扣掉一个工作线程 decrementWorkerCount(); // 操作Worker,为了线程安全,加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中 completedTaskCount += w.completedTasks; // 将工作线程从hashSet中移除 workers.remove(w); } finally { // 释放锁 mainLock.unlock(); } // 只要工作线程凉了,查看是不是线程池状态改变了。 tryTerminate(); // 获取ctl int c = ctl.get(); // 判断线程池状态,当前线程池要么是RUNNING,要么是SHUTDOWN if (runStateLessThan(c, STOP)) { // 如果正常结束工作线程 if (!completedAbruptly) { // 如果核心线程允许超时,min = 0,否则就是核心线程个数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0,可能会出现没有工作线程,并且阻塞队列有任务没有线程处理 if (min == 0 && ! workQueue.isEmpty()) // 至少要有一个工作线程处理阻塞队列任务 min = 1; // 如果工作线程个数 大于等于1,不怕没线程处理,正常return if (workerCountOf(c) >= min) return; } // 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程 addWorker(null, false); } }

getTask 方法

工作线程在去阻塞队列获取任务前,要先查看线程池状态

如果状态没问题,去阻塞队列 take 或者是 poll 任务

第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉

java
// 当前方法就在阻塞队列中获取任务 // 前面半部分是判断当前工作线程是否可以返回null,结束。 // 后半部分就是从阻塞队列中拿任务 private Runnable getTask() { // timeOut默认值是false。 boolean timedOut = false; // 死循环 for (;;) { // 拿到ctl int c = ctl.get(); // 拿到线程池的状态 int rs = runStateOf(c); // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 如果可以返回null,先扣减工作线程个数 decrementWorkerCount(); // 返回null,结束runWorker的while循环 return null; } // 基于ctl拿到工作线程个数 int wc = workerCountOf(c); // 核心线程允许超时,timed为true // 工作线程个数大于核心线程数,timed为true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ( // 如果工作线程个数,大于最大线程数。(一般情况不会满足),把他看成false // 第二个判断代表,只要工作线程数小于等于核心线程数,必然为false // 即便工作线程个数大于核心线程数了,此时第一次循环也不会为true,因为timedOut默认值是false // 考虑第二次循环了,因为循环内部必然有修改timeOut的位置 (wc > maximumPoolSize || (timed && timedOut)) && // 要么工作线程还有,要么阻塞队列为空,并且满足上述条件后,工作线程才会走到if内部,结束工作线程 (wc > 1 || workQueue.isEmpty()) ) { // 第二次循环才有可能到这。 // 正常结束,工作线程 - 1,因为是CAS操作,如果失败了,重新走for循环 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 工作线程从阻塞队列拿任务 try { // 如果是核心线程,timed是false,如果是非核心线程,timed就是true Runnable r = timed ? // 如果是非核心,走poll方法,拿任务,等待一会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果是核心,走take方法,死等。 workQueue.take(); // 从阻塞队列拿到的任务不为null,这边就正常返回任务,去执行 if (r != null) return r; // 说明当前线程没拿到任务,将timeOut设置为true,在上面就可以返回null退出了。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

shutdownNow 关闭方法

首先查看 shutdownNow 方法,可以从 RUNNING 状态转变为 STOP

java
// shutDownNow方法,shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了。 public List<Runnable> shutdownNow() { // 声明返回结果 List<Runnable> tasks; // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 不关注这个方法…… checkShutdownAccess(); // 将线程池状态修改为STOP advanceRunState(STOP); // 无论怎么,直接中断工作线程。 interruptWorkers(); // 将阻塞队列的任务全部扔到List集合中。 tasks = drainQueue(); } finally { // 释放锁 mainLock.unlock(); } tryTerminate(); return tasks; } // 将线程池状态修改为STOP private void advanceRunState(int STOP) { // 死循环。 for (;;) { // 获取ctl属性的值 int c = ctl.get(); // 第一个判断:如果当前线程池状态已经大于等于STOP了,不管了,告辞。 if (runStateAtLeast(c, STOP) || // 基于CAS,将ctl从c修改为STOP状态,不修改工作线程个数,但是状态变为了STOP // 如果修改成功结束 ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c)))) break; } } // 无论怎么,直接中断工作线程。 private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历HashSet,拿到所有的工作线程,直接中断。 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } // 移除阻塞队列,内容全部扔到List集合中 private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 阻塞队列自带的,直接清空阻塞队列,内容扔到List集合 q.drainTo(taskList); // 为了避免任务丢失,重新判断,是否需要编辑阻塞队列,重新扔到List if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } // 查看当前线程池是否可以变为TERMINATED状态 final void tryTerminate() { // 死循环。 for (;;) { // 拿到ctl int c = ctl.get(); // 如果是RUNNING,直接告辞。 // 如果状态已经大于等于TIDYING,马上就要凉凉,直接告辞。 // 如果状态是SHUTDOWN,但是阻塞队列还有任务,直接告辞。 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果还有工作线程 if (workerCountOf(c) != 0) { // 再次中断工作线程 interruptIdleWorkers(ONLY_ONE); // 告辞,等你工作线程全完事,我这再尝试进入到TERMINATED状态 return; } // 加锁,为了可以执行Condition的释放操作 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将线程池状态修改为TIDYING状态,如果成功,继续往下走 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 这个方法是空的,如果你需要在线程池关闭后做一些额外操作,这里你可以自行实现 terminated(); } finally { // 最终修改为TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。 // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作 // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

再次 shutdown 方法,可以从 RUNNING 状态转变为 SHUTDOWN

SHUTDOWN 状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务

java
public void shutdown() { // 加锁。。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 不看。 checkShutdownAccess(); // 里面是一个死循环,将线程池状态修改为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); // 说了,这个是为了ScheduleThreadPoolExecutor准备的,不管 onShutdown(); } finally { mainLock.unlock(); } // 尝试结束线程 tryTerminate(); } // 中断空闲线程 private void interruptIdleWorkers(boolean onlyOne) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 如果线程没有中断,那么就去获取Worker的锁,基于tryLock可知,不会中断正在干活的线程 if (!t.isInterrupted() && w.tryLock()) { try { // 会中断空闲线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

线程池的核心参数设计规则

线程池的使用难度不大,难度在于线程池的参数并不好配置。

主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,甚至还有混合型的。因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。

想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。需要将项目部署到测试环境或者是沙箱环境中,结果各种压测得到一个相对符合的参数。如果每次修改项目都需要重新部署,成本太高了。此时咱们可以实现一个动态监控以及修改线程池的方案。

因为线程池的核心参数无非就是:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue:工作队列

线程池中提供了获取核心信息的 get 方法,同时也提供了动态修改核心属性的 set 方法。

image.png

结合以上思路,可以采用动态线程池的思想进行调整,采用一些开源项目提供的方式去做监控和修改,比如 hippo4jdynamic-tp 就可以对线程池进行监控,而且可以和SpringBoot整合,也可以基于 Nacos 配置中心自定义一个轻量级的动态线程池组件。

动态线程池思路

在启动时加载 Nacos 配置,注册成 Bean 被 spring 管理,可通过 ImportBeanDefinitionRegistrar 注册器实现,在 Nacos 配置变更时,通过 ApplicationListener<EnvironmentChangeEvent> 进行监听,如果是动态线程池的变更,则更新线程池对象即可。

注意:如果是整个对象更新,则需要将原对象删除,创建新对象添加到spring容器中,需要实现其他依赖此对象类的刷新效果,也就是 @RefreshSocpe 的效果(此处有个简单实现,就是通过工厂模式管理线程池,比如线程池都有一个名字,也就是 Key,作为工厂缓存的值,使用者只需要根据 key 取对应线程池对象即可,然后填充新对象即可,也要注意并发问题与任务完整性的处理)。

线程池参数示例

参数设置准则典型值示例
corePoolSizeCPU密集型:CPU核心数;
IO 密集型:CPU核心数 * (1 + 平均等待时间 W/计算时间 C)
8核CPU:IO密集型可设32
maximumPoolSize核心线程数 * 2 ~ 3倍core=8 → max=24
keepAliveTime非核心线程存活时间,建议30~60秒30, TimeUnit.SECONDS

队列选型指南

队列类型特点适用场景
SynchronousQueue零容量,直接传递任务高吞吐量短任务
LinkedBlockingQueue无界队列(默认Integer.MAX_VALUE)需积压任务的批处理系统
ArrayBlockingQueue有界队列,FIFO流量可控的实时系统
PriorityBlockingQueue优先级队列任务有优先级的调度系统

线程池处理任务的核心流程

基于 addWorker 添加工作线程的流程切入到整体处理任务的位置

image.png

ScheduledThreadPoolExecutor

从名字上就可以看出,当前线程池是用于执行定时任务的线程池。

Java比较早的定时任务工具是 Timer 类。但是 Timer 问题很多,串行的,不靠谱,会影响到其他的任务执行。

其实除了 Timer 以及 ScheduleThreadPoolExecutor 之外,正常在企业中一般会采用 Quartz 或者是 SpringBoot 提供的 Schedule 的方式去实现定时任务的功能。

ScheduledThreadPoolExecutor 支持延迟执行以及周期性执行的功能。

ScheduledThreadPoolExecutor 应用

定时任务线程池的有参构造

java
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }

发现 ScheduledThreadPoolExecutor 在构建时,直接调用了父类的构造方法

ScheduledThreadPoolExecutor 的父类就是 ThreadPoolExecutor

首先 ScheduleThreadPoolExecutor 最多允许设置3个参数:

  • 核心线程数
  • 线程工厂
  • 拒绝策略

首先没有设置阻塞队列,以及最大线程数和空闲时间以及单位

阻塞队列设置的是 DelayedWorkQueue,其实本质就是 DelayQueue,一个延迟队列。DelayQueue 是一个无界队列。所以最大线程数以及非核心线程的空闲时间是不需要设置的。

代码落地使用

java
public static void main(String[] args) { //1. 构建定时任务线程池 ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor( 5, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); return t; } }, new ThreadPoolExecutor.AbortPolicy() ); //2. 应用ScheduledThreadPoolExecutor // 跟直接执行线程池的execute没啥区别 pool.execute(() -> { System.out.println("execute"); }); // 指定延迟时间执行 System.out.println(System.currentTimeMillis()); pool.schedule(() -> { System.out.println("schedule"); System.out.println(System.currentTimeMillis()); },2, TimeUnit.SECONDS); // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务开始时就计算 // 周期性执行就是将执行完毕的任务再次设置好延迟时间,并且重新扔到阻塞队列 // 计算的周期执行,也是在原有的时间上做累加,不关注任务的执行时长。 System.out.println(System.currentTimeMillis()); pool.scheduleAtFixedRate(() -> { System.out.println("scheduleAtFixedRate"); System.out.println(System.currentTimeMillis()); },2,3,TimeUnit.SECONDS); // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务结束后再计算下次的延迟时间 System.out.println(System.currentTimeMillis()); pool.scheduleWithFixedDelay(() -> { System.out.println("scheduleWithFixedDelay"); System.out.println(System.currentTimeMillis()); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } },2,3,TimeUnit.SECONDS); }

ScheduledThreadPoolExecutor 源码剖析

核心属性

后面的方法业务流程会涉及到这些属性。

java
// 这里是针对任务取消时的一些业务判断会用到的标记 private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true; private volatile boolean removeOnCancel = false; // 计数器,如果两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行 private static final AtomicLong sequencer = new AtomicLong(); // 这个方法是获取当前系统时间的毫秒值 final long now() { return System.nanoTime(); } // 内部类。核心类之一。 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { // 全局唯一的序列,如果两个任务时间一直,基于当前属性判断 private final long sequenceNumber; // 任务执行的时间,单位纳秒 private long time; /** * period == 0:执行一次的延迟任务 * period > 0:代表是At * period < 0:代表是With */ private final long period; // 周期性执行时,需要将任务重新扔回阻塞队列,基础当前属性拿到任务,方便扔回阻塞队列 RunnableScheduledFuture<V> outerTask = this; /** * 构建schedule方法的任务 */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * 构建At和With任务的有参构造 */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } } // 核心组件:延时阻塞队列(DelayedWorkQueue) private class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { // 基于最小堆(优先队列)管理待执行任务 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[16]; // 堆排序保证队首任务过期时间最近 }
  • 时间驱动调度:任务封装为 ScheduledFutureTask,记录下次执行时间 time(纳秒精度)

  • 任务排序:队列按 time 排序,队首任务最先到期(peek().getDelay() <= 0)

schedule 方法

execute 方法也是调用的 schedule 方法,只不过传入的延迟时间是0纳秒

schedule 方法就是将任务和延迟时间封装到一起,并且将任务扔到阻塞队列中,再去创建工作线程去 take 阻塞队列。

java
// 延迟任务执行的方法。 // command:任务 // delay:延迟时间 // unit:延迟时间的单位 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 健壮性校验。 if (command == null || unit == null) throw new NullPointerException(); // 将任务和延迟时间封装到一起,最终组成ScheduledFutureTask // 要分成三个方法去看 // triggerTime:计算延迟时间。最终返回的是当前系统时间 + 延迟时间 // triggerTime就是将延迟时间转换为纳秒,并且+当前系统时间,再做一些健壮性校验 // ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并且设置任务执行的方式 // decorateTask:当前方式是让用户基于自身情况可以动态修改任务的一个扩展口 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 任务封装好,执行delayedExecute方法,去执行任务 delayedExecute(t); // 返回FutureTask return t; } // triggerTime做的事情 // 外部方法,对延迟时间做校验,如果小于0,就直接设置为0 // 并且转换为纳秒单位 private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 将延迟时间+当前系统时间 // 后面的校验是为了避免延迟时间超过Long的取值范围 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } // ScheduledFutureTask有参构造 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // time就是任务要执行的时间 this.time = ns; // period,为0,代表任务是延迟执行,不是周期执行 this.period = 0; // 基于AtmoicLong生成的序列 this.sequenceNumber = sequencer.getAndIncrement(); } // delayedExecute 执行延迟任务的操作 private void delayedExecute(RunnableScheduledFuture<?> task) { // 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,进到if if (isShutdown()) // 不是RUNNING。 // 执行拒绝策略。 reject(task); else { // 线程池状态是RUNNING // 直接让任务扔到延迟的阻塞队列中 super.getQueue().add(task); // DCL的操作,再次查看线程池状态 // 如果线程池在添加任务到阻塞队列后,状态不是RUNNING if (isShutdown() && // task.isPeriodic():现在反回的是false,因为任务是延迟执行,不是周期执行 // 默认情况,延迟队列中的延迟任务,可以执行 !canRunInCurrentRunState(task.isPeriodic()) && // 从阻塞队列中移除任务。 remove(task)) task.cancel(false); else // 线程池状态正常,任务可以执行 ensurePrestart(); } } // 线程池状态不为RUNNING,查看任务是否可以执行 // 延迟执行:periodic==false // 周期执行:periodic==true // continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false // executeExistingDelayedTasksAfterShutdown:延迟执行任务,默认为true boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } // 当前情况,shutdownOK为true final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); // 如果状态是RUNNING,正常可以执行,返回true // 如果状态是SHUTDOWN,根据shutdownOK来决定 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } // 任务可以正常执行后,做的操作 void ensurePrestart() { // 拿到工作线程个数 int wc = workerCountOf(ctl.get()); // 如果工作线程个数小于核心线程数 if (wc < corePoolSize) // 添加核心线程去处理阻塞队列中的任务 addWorker(null, true); else if (wc == 0) // 如果工作线程数为0,核心线程数也为0,这是添加一个非核心线程去处理阻塞队列任务 addWorker(null, false); }

At 和 With 方法 & 任务的 run 方法

scheduleAtFixedRatescheduleWithFixedDelay 的差异主要体现在任务间隔计算上。

前者像地铁时刻表,固定发车间隔;后者更像每次任务完成后才开始倒计时下次执行。

这个区别在任务执行时间超过间隔时会产生截然不同的行为:At会堆积任务,With会顺延。它们的适用场景,比如数据同步用At,资源清理用With。

这两个方法在源码层面上的第一个区别,就是在计算周期时间时,需要将这个值传递给period,基于正负数在区别At和With

所以查看一个方法就ok,查看At方法

java
// At方法, // command:任务 // initialDelay:第一次执行的延迟时间 // period:任务的周期执行时间 // unit:上面两个时间的单位 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // 健壮性校验 if (command == null || unit == null) throw new NullPointerException(); // 周期时间不能小于等于0. if (period <= 0) throw new IllegalArgumentException(); // 将任务以及第一次的延迟时间,和后续的周期时间封装好。 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 扩展口,可以对任务做修改。 RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 周期性任务,需要在任务执行完毕后,重新扔会到阻塞队列,为了方便拿任务,将任务设置到outerTask成员变量中 sft.outerTask = t; // 和schedule方法一样的方式 // 如果任务刚刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况,当前任务不执行 delayedExecute(t); return t; } // 延迟任务以及周期任务在执行时,都会调用当前任务的run方法。 public void run() { // periodic == false:一次性延迟任务 // periodic == true:周期任务 boolean periodic = isPeriodic(); // 任务执行前,会再次判断状态,能否执行任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 判断是周期执行还是一次性任务 else if (!periodic) // 一次性任务,让工作线程直接执行command的逻辑 ScheduledFutureTask.super.run(); // 到这个else if,说明任务是周期执行 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下次任务执行的时间 setNextRunTime(); // 将任务重新扔回线程池做处理 reExecutePeriodic(outerTask); } } // 设置下次任务执行的时间 private void setNextRunTime() { // 拿到period值,正数:At,负数:With long p = period; if (p > 0) // 拿着之前的执行时间,直接追加上周期时间 time += p; else // 如果走到else,代表任务是With方式,这种方式要重新计算延迟时间 // 拿到当前系统时间,追加上延迟时间, time = triggerTime(-p); } // 将任务重新扔回线程池做处理 void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 如果状态ok,可以执行 if (canRunInCurrentRunState(true)) { // 将任务扔到延迟队列 super.getQueue().add(task); // DCL,判断线程池状态 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else // 添加工作线程 ensurePrestart(); } }

总结

ThreadPoolExecutor 的核心价值在于通过三层缓冲机制实现资源的最优调度:

  1. 核心线程池:常驻线程处理稳态流量
  2. 任务队列:缓冲突发流量
  3. 非核心线程:应对队列满载的极端场景

配合状态机和 CAS无 锁操作,实现了高并发环境下的线程安全调度。其源码中展现的位运算技巧(如ctl设计,存储工作线程数量+线程池状态)和 Worker 线程复用机制,是Java并发编程的经典范例。

性能数据参考: 在8核服务器上,合理配置的 ThreadPoolExecutor 可支撑20,000+ QPS的任务调度,相比单线程执行效率提升8-10倍,同时保持95%的CPU利用率。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,且定时执行是基于延迟队列 DelayedWorkQueue 实现的,底层任务是基于 ScheduledFutureTask 优先队列,内部记录下次执行时间 time,队列按 time 排序,队首任务最先到期的。At/With 方法主要是通过 period 参数控制,是执行一次,还是按固定时间或固定延迟时间执行, At 会堆积任务,With 会顺延。融合了 时间轮算法思想(通过优先队列模拟)和 线程池资源管理,是 Java 并发库中高精度调度的经典设计

掌握 ThreadPoolExecutor 的原理与实现,是构建高并发、高可靠Java应用的基石。建议结合 ForkJoinPool(分治场景)和 ScheduledThreadPoolExecutor(定时任务)形成完整的线程池技术体系。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!