2025-06-12
虚拟机与并发
0

目录

AbstractQueuedSynchronizer
核心设计
深入理解两个核心队列
核心方法解析
独占模式
共享模式
需重写的模板方法
内部机制详解
CLH 队列(FIFO)
线程阻塞与唤醒
条件变量(ConditionObject)
加锁流程源码剖析
lock方法
tryLock 方法
lockInterruptibly 方法
释放锁流程源码剖析
unlock 方法
AQS 中常见的问题
AQS 中为什么要有一个虚拟的 head 节点
AQS中为什么使用双向链表
ConditionObject
ConditionObject 的介绍 & 应用
Condition 的构建方式 & 核心属性
Condition的 await 方法分析(前置分析)
Condition的 signal 方法分析
Conditiond 的 await 方法分析(后置分析)
Condition 的 awaitNanos & signalAll 方法分析
总结

之前阅读过了 ReentrantLockReentrantReadWriteLock 的源码,我们都发现其继承了 AbstractQueuedSynchronizer 类,平常我们说的 AQS 就是 AbstractQueuedSynchronizer 的简称。其实在我们的juc包下,很多的工具都是依赖于它,今天我们就将了解下AQS的魅力。

AbstractQueuedSynchronizer

AQS 就是 AbstractQueuedSynchronizer 抽象类,AQS其实就是JUC包下的一个基类,JUC下的很多内容都是基于AQS实现了部分功能,比如 ReentrantLockReentrantReadWriteLockThreadPoolExecutorCountDownLatchSemaphoreThreadPoolExecutor.Worker 等等都是基于AQS实现。

image.png

核心设计

AQS 采用 模板方法模式,开发者只需重写特定方法(如 tryAcquiretryRelease),而 队列管理、线程阻塞/唤醒 等复杂逻辑由 AQS 完成。其核心包括:

  1. 同步状态(state)

首先 AQS 中提供了一个由 volatile 修饰,并且采用 CAS 方式修改的int类型的 state 变量表示资源状态(如锁的重入次数、信号量许可数)。

  1. CLH 队列

AQS 中维护了一个双向链表结构的等待队列,存储阻塞线程。AQS 中声明了队列的 headtail,队列中每个节点都是 Node 对象,每个 Node 都声明了 prevnext

java
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }

什么是 CLH

CLH(Craig-Landin-Hagersten)是一种基于单向链表的自旋锁实现方式,主要用于解决多线程环境下的锁竞争问题。其核心思想是通过队列排队减少锁的争用,降低CPU资源的消耗。

CLH 锁采用单向链表结构,线程申请锁时会在链表尾部添加节点,并通过CAS操作确保节点正确入队。入队后,线程会在前驱节点上自旋等待锁释放,而非持续进行CAS操作。这种机制在锁竞争激烈时能显著减少CPU总线风暴。

实现机制:

  • 入队操作‌:线程创建新节点并尝试通过CAS操作插入链表尾部,若失败则循环重试;
  • 等待机制‌:成功入队后,线程在前驱节点的locked属性上普通自旋,直到前驱节点释放锁;
  • 公平性‌:通过FIFO队列保证线程按顺序获取锁(非严格意义上的公平锁,但能避免饥饿)
  1. 条件队列

每个条件变量对应一个单向链表(ConditionObject),用于实现等待/通知机制。

image.png

深入理解两个核心队列

在开始之前,我们首先需要理解 AQS(AbstractQueuedSynchronizer)中的两个核心队列:

  • CLH 队列(通常称为同步队列)
  • Condition 队列(由 ConditionObject 维护)。

这两个队列在AQS中扮演不同的角色,但又相互关联

CLH 队列(同步队列)

  • 作用: 用于管理那些在获取锁(或同步状态)时失败的线程。当线程尝试获取锁但锁不可用时,该线程会被包装成一个节点(Node)并加入 CLH 队列的尾部,然后进入阻塞状态。当锁释放时,队列中的下一个节点(通常是头节点的后继节点)会被唤醒并尝试获取锁。

  • 特点: 这是一个双向队列(实际上是一个双向链表),采用FIFO(先进先出)策略,保证了公平性(在公平锁模式下)。

Condition 队列(条件队列)

  • 作用:与 ConditionObject 相关,用于实现条件等待。当线程调用Condition.await() 方法时,当前线程会释放锁,并进入该条件队列等待。当其他线程调用 Condition.signal()signalAll() 时,条件队列中的一个或全部节点会被转移到 CLH 队列中,重新参与锁的竞争。

  • 特点:这是一个单向队列,每个 ConditionObject 都有自己的条件队列。因此,一个 AQS 可以有多个条件队列(对应多个 Condition )。

关联:

  • 当线程调用 Condition.await() 时,该线程会被包装成 Node 节点并加入条件队列(注意:此时线程已经释放了锁)。

  • 当其他线程调用 Condition.signal() 时,它会将条件队列中的第一个节点(等待时间最长的节点)转移到 CLH 队列的尾部。这样,被转移的节点就有机会再次竞争锁。一旦它成功获取锁,就可以从 await() 方法返回。

如何让小白快速理解?

我们可以用一个现实场景来比喻:

场景:医院候诊

  • CLH队列(同步队列):就像医院挂号窗口前的排队队伍。每个来挂号的人(线程)都需要排队(加入队列),按照先来后到的顺序等待叫号(获取锁)进入诊室。

  • 条件队列(Condition队列):假设有一个病人需要先做检查(比如拍X光)才能继续看病。那么他就会暂时离开挂号窗口(释放锁),进入检查室门口等待(进入条件队列)。当他做完检查(被 signal() 通知),他需要重新回到挂号窗口前的队伍(CLH 队列)中排队(重新竞争锁),等待再次叫号。

这样,条件队列就是针对特定条件(比如需要检查)而临时等待的地方,而同步队列则是最终获取资源(看病)必须经过的排队。

总结:

  • CLH 队列:锁竞争的主战场,所有想要获取锁的线程都在这里排队。

  • Condition 队列:用于条件等待,当条件不满足时,线程暂时离开锁竞争,进入条件队列等待。当条件满足时,再重新回到CLH队列中排队。

注意:一个线程在同一时间只能存在于一个队列中(要么在CLH队列,要么在某个条件队列,不会同时存在于两个队列)。

核心方法解析

AQS 的核心方法分为两类:独占模式共享模式

独占模式

  • 获取锁

acquire(int arg) → 调用tryAcquire(需重写)尝试获取,失败则入队阻塞。

  • 释放锁

release(int arg) → 调用tryRelease(需重写)释放状态,唤醒后继线程。

共享模式

  • 获取许可

acquireShared(int arg) → 调用tryAcquireShared(需重写)获取,失败则入队。

  • 释放许可

releaseShared(int arg) → 调用tryReleaseShared(需重写)释放,唤醒等待线程。

需重写的模板方法

方法名作用
tryAcquire(int arg)尝试独占获取资源(返回boolean)
tryRelease(int arg)尝试独占释放资源(返回boolean)
tryAcquireShared(int arg)尝试共享获取资源(返回int:负数为失败,0为成功但无剩余,
正数为成功且有剩余)
tryReleaseShared(int arg)尝试共享释放资源(返回boolean)
isHeldExclusively()当前线程是否独占资源

内部机制详解

CLH 队列(FIFO)

  • 节点(Node):存储线程引用、等待状态(WAITING/CANCELLED)、前驱/后继指针。

  • 入队逻辑:竞争失败的线程被包装为 Node,通过 CAS 加入队尾。

  • 出队逻辑:头节点(虚拟节点)释放资源后唤醒后继节点。

线程阻塞与唤醒

  • LockSupport.park() 阻塞线程,LockSupport.unpark(thread) 唤醒线程。

  • 中断处理:AQS 在唤醒时检查中断标志并恢复中断状态。

条件变量(ConditionObject)

  • await():释放锁,创建 Node 加入条件队列,阻塞线程。

  • signal():将条件队列的头节点转移到 CLH 队列,等待重新获取锁。

exclusiveOwnerThread

除了上述的机制外,还有一个 Thread exclusiveOwnerThread; 属性,它是从 AQS 的父类 AbstractOwnableSynchronizer 中继承来的,主要作用是记录独占模式下的当前线程,在 ReentrantLockReentrantReadWriteLockThreadPoolExecutor.Worker 中都有应用。

image.png

加锁流程源码剖析

AQS 中主要有三种加锁,以下这个是非公平锁的流程,接下来我们对加锁源码分析

image.png

lock方法

执行 lock 方法后,公平锁和非公平锁的执行套路不一样

java
// 非公平锁 final void lock() { // 上来就先基于CAS的方式,尝试将state从0改为1 if (compareAndSetState(0, 1)) // 获取锁资源成功,会将当前线程设置到exclusiveOwnerThread属性,代表是当前线程持有着锁资源 setExclusiveOwnerThread(Thread.currentThread()); else // 执行acquire,尝试获取锁资源 acquire(1); } // 公平锁 final void lock() { // 执行acquire,尝试获取锁资源 acquire(1); }

acquire 方法,是公平锁和非公平锁的逻辑一样

java
public final void acquire(int arg) { // tryAcquire:再次查看,当前线程是否可以尝试获取锁资源 if (!tryAcquire(arg) && // 没有拿到锁资源 // addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾 // acquireQueued:查看我是否是第一个排队的节点,如果是可以再次尝试获取锁资源,如果长时间拿不到,挂起线程 // 如果不是第一个排队的节点,就尝试挂起线程即可 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 中断线程的操作 selfInterrupt(); }

tryAcquire 方法竞争锁最资源的逻辑,分为公平锁和非公平锁

java
// 非公平锁实现 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取了state熟属性 int c = getState(); // 判断state当前是否为0,之前持有锁的线程释放了锁资源 if (c == 0) { // 再次抢一波锁资源 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); // 拿锁成功返回true return true; } } // 不是0,有线程持有着锁资源,如果是,证明是锁重入操作 else if (current == getExclusiveOwnerThread()) { // 将state + 1 int nextc = c + acquires; if (nextc < 0) // 说明对重入次数+1后,超过了int正数的取值范围 // 01111111 11111111 11111111 11111111 // 10000000 00000000 00000000 00000000 // 说明重入的次数超过界限了。 throw new Error("Maximum lock count exceeded"); // 正常的将计算结果,复制给state setState(nextc); // 锁重入成功 return true; } // 返回false return false; } // 公平锁实现 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // .... int c = getState(); if (c == 0) { // 查看AQS中是否有排队的Node // 没人排队抢一手 。有人排队,如果我是第一个,也抢一手 if (!hasQueuedPredecessors() && // 抢一手~ compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 锁重入~~~ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 查看是否有线程在AQS的双向队列中排队 // 返回false,代表没人排队 public final boolean hasQueuedPredecessors() { // 头尾节点 Node t = tail; Node h = head; // s为头结点的next节点 Node s; // 如果头尾节点相等,证明没有线程排队,直接去抢占锁资源 return h != t && // s节点不为null,并且s节点的线程为当前线程(排在第一名的是不是我) (s == null || s.thread != Thread.currentThread()); }

addWaite 方法,将没有拿到锁资源的线程扔到 AQS 队列中去排队

java
// 没有拿到锁资源,过来排队, mode:代表互斥锁 private Node addWaiter(Node mode) { // 将当前线程封装为Node, Node node = new Node(Thread.currentThread(), mode); // 拿到尾结点 Node pred = tail; // 如果尾结点不为null if (pred != null) { // 当前节点的prev指向尾结点 node.prev = pred; // 以CAS的方式,将当前线程设置为tail节点 if (compareAndSetTail(pred, node)) { // 将之前的尾结点的next指向当前节点 pred.next = node; return node; } } // 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { // 拿到尾结点 Node t = tail; // 如果尾结点为空,AQS中一个节点都没有,构建一个伪节点,作为head和tail if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 比较熟悉了,以CAS的方式,在AQS中有节点后,插入到AQS队列的末尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

acquireQueued 方法,判断当前线程是否还能再次尝试获取锁资源,如果不能再次获取锁资源,或者又没获取到,尝试将当前线程挂起

java
// 当前没有拿到锁资源后,并且到AQS排队了之后触发的方法。 中断操作这里不用考虑 final boolean acquireQueued(final Node node, int arg) { // 不考虑中断 // failed:获取锁资源是否失败(这里简单掌握落地,真正触发的,还是tryLock和lockInterruptibly) boolean failed = true; try { boolean interrupted = false; // 死循环………… for (;;) { // 拿到当前节点的前继节点 final Node p = node.predecessor(); // 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源。 if (p == head && tryAcquire(arg)) { // 获取锁资源成功 // 设置头结点为当前获取锁资源成功Node,并且取消thread信息 setHead(node); // help GC p.next = null; // 获取锁失败标识为false failed = false; return interrupted; } // 没拿到锁资源…… // shouldParkAfterFailedAcquire:基于上一个节点转改来判断当前节点是否能够挂起线程,如果可以返回true, // 如果不能,就返回false,继续下次循环 if (shouldParkAfterFailedAcquire(p, node) && // 这里基于Unsafe类的park方法,将当前线程挂起 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 在lock方法中,基本不会执行。 cancelAcquire(node); } } // 获取锁资源成功后,先执行setHead private void setHead(Node node) { // 当前节点作为头结点 伪 head = node; // 头结点不需要线程信息 node.thread = null; node.prev = null; } // 当前Node没有拿到锁资源,或者没有资格竞争锁资源,看一下能否挂起当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // -1,SIGNAL状态:代表当前节点的后继节点,可以挂起线程,后续我会唤醒我的后继节点 // 1,CANCELLED状态:代表当前节点以及取消了 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 上一个节点为-1之后,当前节点才可以安心的挂起线程 return true; if (ws > 0) { // 如果当前节点的上一个节点是取消状态,我需要往前找到一个状态不为1的Node,作为他的next节点 // 找到状态不为1的节点后,设置一下next和prev do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 上一个节点的状态不是1或者-1,那就代表节点状态正常,将上一个节点的状态改为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

tryLock 方法

tryLock 方法,无论公平锁还有非公平锁。都会走非公平锁抢占锁资源的操作。

其本质就是拿到 state 的值, 如果是0,直接 CAS 浅尝一下,state 不是0,那就看下是不是锁重入操作,如果没抢到,或者不是锁重入操作,告辞,返回 false

java
public boolean tryLock() { // 非公平锁的竞争锁操作 return sync.nonfairTryAcquire(1); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

tryAcquireNanos 尝试以独占模式获取,如果中断则中止,如果超过给定的超时则失败。

java
// tryLock(time,unit)执行的方法 public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException { // 线程的中断标记位,是不是从false,别改为了true,如果是,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquire分为公平和非公平锁两种执行方式,如果拿锁成功, 直接告辞, return tryAcquire(arg) || // 如果拿锁失败,在这要等待指定时间 doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 如果等待时间是0秒,直接告辞,拿锁失败 if (nanosTimeout <= 0L) return false; // 设置结束时间。 final long deadline = System.nanoTime() + nanosTimeout; // 先扔到AQS队列 final Node node = addWaiter(Node.EXCLUSIVE); // 拿锁失败,默认true boolean failed = true; try { for (;;) { // 如果在AQS中,当前node是head的next,直接抢锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 结算剩余的可用时间 nanosTimeout = deadline - System.nanoTime(); // 判断是否是否用尽的位置 if (nanosTimeout <= 0L) return false; // shouldParkAfterFailedAcquire:根据上一个节点来确定现在是否可以挂起线程 if (shouldParkAfterFailedAcquire(p, node) && // 避免剩余时间太少,如果剩余时间少就不用挂起线程 nanosTimeout > spinForTimeoutThreshold) // 如果剩余时间足够,将线程挂起剩余时间 LockSupport.parkNanos(this, nanosTimeout); // 如果线程醒了,查看是中断唤醒的,还是时间到了唤醒的。 if (Thread.interrupted()) // 是中断唤醒的! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

cancelAcquire 取消正在进行的获取尝试,源码分析:

image.png

java
// 取消在AQS中排队的Node private void cancelAcquire(Node node) { // 如果当前节点为null,直接忽略。 if (node == null) return; //1. 线程设置为null node.thread = null; //2. 往前跳过被取消的节点,找到一个有效节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //3. 拿到了上一个节点之前的next Node predNext = pred.next; //4. 当前节点状态设置为1,代表节点取消 node.waitStatus = Node.CANCELLED; // 脱离AQS队列的操作 // 当前Node是尾结点,将tail从当前节点替换为上一个节点 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 到这,上面的操作CAS操作失败 int ws = pred.waitStatus; // 不是head的后继节点 if (pred != head && // 拿到上一个节点的状态,只要上一个节点的状态不是取消状态,就改为-1 (ws == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 上面的判断都是为了避免后面节点无法被唤醒。 // 前继节点是有效节点,可以唤醒后面的节点 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 当前节点是head的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }

lockInterruptibly 方法

java
// 这个是 lockInterruptibly 和 tryLock(time,unit) 唯一的区别 // lockInterruptibly,拿不到锁资源,就死等,等到锁资源释放后,被唤醒,或者是被中断唤醒 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 中断唤醒抛异常! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 这个方法可以确认,当前挂起的线程,是被中断唤醒的,还是被正常唤醒的。 // 中断唤醒,返回true,如果是正常唤醒,返回false return Thread.interrupted(); }

释放锁流程源码剖析

image.png

unlock 方法

java
public void unlock() { // 释放锁资源不分为公平锁和非公平锁,都是一个sync对象 sync.release(1); } // 释放锁的核心流程 public final boolean release(int arg) { // 核心释放锁资源的操作之一 if (tryRelease(arg)) { // 如果锁已经释放掉了,走这个逻辑 Node h = head; // h不为null,说明有排队的(录课时估计脑袋蒙圈圈。) // 如果h的状态不为0(为-1),说明后面有排队的Node,并且线程已经挂起了。 if (h != null && h.waitStatus != 0) // 唤醒排队的线程 unparkSuccessor(h); return true; } return false; } // ReentrantLock释放锁资源操作 protected final boolean tryRelease(int releases) { // 拿到state - 1(并没有赋值给state) int c = getState() - releases; // 判断当前持有锁的线程是否是当前线程,如果不是,直接抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // free,代表当前锁资源是否释放干净了。 boolean free = false; if (c == 0) { // 如果state - 1后的值为0,代表释放干净了。 free = true; // 将持有锁的线程置位null setExclusiveOwnerThread(null); } // 将c设置给state setState(c); // 锁资源释放干净返回true,否则返回false return free; } // 唤醒后面排队的Node private void unparkSuccessor(Node node) { // 拿到头节点状态 int ws = node.waitStatus; if (ws < 0) // 先基于CAS,将节点状态从-1,改为0 compareAndSetWaitStatus(node, ws, 0); // 拿到头节点的后续节点。 Node s = node.next; // 如果后续节点为null或者,后续节点的状态为1,代表节点取消了。 if (s == null || s.waitStatus > 0) { s = null; // 如果后续节点为null,或者后续节点状态为取消状态,从后往前找到一个有效节点环境 for (Node t = tail; t != null && t != node; t = t.prev) // 从后往前找到状态小于等于0的节点 // 找到离head最新的有效节点,并赋值给s if (t.waitStatus <= 0) s = t; } // 只要找到了这个需要被唤醒的节点,执行unpark唤醒 if (s != null) LockSupport.unpark(s.thread); }

AQS 中常见的问题

AQS 中为什么要有一个虚拟的 head 节点

AQS 可以没有 head,设计之初指定 head 只是为了更方便的操作。方便管理双向链表而已,一个哨兵节点的存在。

比如 ReentrantLock 中释放锁资源时,会考虑是否需要唤醒后继节点。如果头结点的状态不是-1。就不需要去唤醒后继节点。唤醒后继节点时,需要找到 head.next 节点,如果 head.next 为 null,或者是取消了,此时需要遍历整个双向链表,从后往前遍历,找到离 head 最近的 Node。规避了一些不必要的唤醒操作。

如果不用虚拟节点(哨兵节点),当前节点挂起,当前节点的状态设置为-1。可行。AQS 本身就是使用了哨兵节点做双向链表的一些操作。

网上说了,虚拟的head,可以避免重复唤醒操作。虚拟的head并没有处理这个问题。

AQS中为什么使用双向链表

AQS 的双向链表就为了更方便的操作 Node 节点。

在执行 tryLocklockInterruptibly 方法时,如果在线程阻塞时,中断了线程,此时线程会执行cancelAcquire 取消当前节点,不在 AQS 的双向链表中排队。如果是单向链表,此时会导致取消节点,无法直接将当前节点的 prev 节点的 next 指针,指向当前节点的 next 节点。

如何利用 AQS 实现一个自定义的 CountDownLatch ?

java
// 经典用法示例:使用AQS实现CountDownLatch public class MyLatch { private static class Sync extends AbstractQueuedSynchronizer { protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // CAS递减state至0 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } // 使用代码省略... }

ConditionObject

ConditionObject 的介绍 & 应用

ConditionObjectAbstractQueuedSynchronizer 类中锁条件机制的子类实现。像 synchronized提供了 waitnotify 的方法实现线程在持有锁时,可以实现挂起,已经唤醒的操作。

基于 AbstractQueuedSynchronizer 实现的 ReentrantLock 也拥有这个功能,它的实现就是 ConditionObject。想执行 await 或者是 signal 就必须先持有lock锁的资源。

先look一下 Condition 的代码示例:

java
// 源码 public class ReentrantLock implements Lock, java.io.Serializable { final ConditionObject newCondition() { return new ConditionObject(); } } // demo public static void main(String[] args) throws InterruptedException, IOException { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); System.out.println("子线程获取锁资源并await挂起线程"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程挂起后被唤醒!持有锁资源"); }).start(); Thread.sleep(100); // =================main====================== lock.lock(); System.out.println("主线程等待5s拿到锁资源,子线程执行了await方法"); condition.signal(); System.out.println("主线程唤醒了await挂起的子线程"); lock.unlock(); }

Condition 的构建方式 & 核心属性

发现在通过 lock 锁对象执行 newCondition 方法时,本质就是直接 new 的 AQS 提供的ConditionObject 对象

java
final ConditionObject newCondition() { return new ConditionObject(); }

其实 lock 锁中可以有多个 Condition 对象。

在对 Condition1 进行操作时,不会影响到 Condition2 的单向链表。

其次可以发现 ConditionObject 中,只有两个核心属性:

java
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;

虽然 Node 对象有 prevnext,但是在 ConditionObject 中是不会使用这两个属性的,只要在Condition 队列中,这两个属性都是 null。在 ConditionObject 中只会使用 nextWaiter 的属性实现单向链表的效果。

Condition的 await 方法分析(前置分析)

持有锁的线程在执行 await 方法后会做几个操作:

  • 判断线程是否中断,如果中断了,什么都不做。
  • 没有中断,就讲当前线程封装为 Node 添加到 Condition 的单向链表中
  • 一次性释放掉锁资源。
  • 如果当前线程没有在 AQS 队列,就正常执行 LockSupport.park(this) 挂起线程。
java
// await方法的前置分析,只分析到线程挂起 public final void await() throws InterruptedException { // 先判断线程的中断标记位是否是true if (Thread.interrupted()) // 如果是true,就没必要执行后续操作挂起了。 throw new InterruptedException(); // 在线程挂起之前,先将当前线程封装为Node,并且添加到Condition队列中 Node node = addConditionWaiter(); // fullyRelease在释放锁资源,一次性将锁资源全部释放,并且保留重入的次数 int savedState = fullyRelease(node); // 省略一行代码…… // 当前Node是否在AQS队列中? // 执行fullyRelease方法后,线程就释放锁资源了,如果线程刚刚释放锁资源,其他线程就立即执行了signal方法, // 此时当前线程就被放到了AQS的队列中,这样一来线程就不需要执行LockSupport.park(this);去挂起线程了 while (!isOnSyncQueue(node)) { // 如果没有在AQS队列中,正常在Condition单向链表里,正常挂起线程。 LockSupport.park(this); // 省略部分代码…… } // 省略部分代码…… } // 线程挂起先,添加到Condition单向链表的业务~~ private Node addConditionWaiter() { // 拿到尾节点。 Node t = lastWaiter; // 如果尾节点有值,并且尾节点的状态不正常,不是-2,尾节点可能要拜拜了~ if (t != null && t.waitStatus != Node.CONDITION) { // 如果尾节点已经取消了,需要干掉取消的尾节点~ unlinkCancelledWaiters(); // 重新获取lastWaiter t = lastWaiter; } // 构建当前线程的Node,并且状态设置为-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果last节点为null。直接将当前节点设置为firstWaiter if (t == null) firstWaiter = node; else // 如果last节点不为null,说明有值,就排在lastWaiter的后面 t.nextWaiter = node; // 把当前节点设置为最后一个节点 lastWaiter = node; // 返回当前节点 return node; } // 干掉取消的尾节点。 private void unlinkCancelledWaiters() { // 拿到头节点 Node t = firstWaiter; // 声明一个节点,爱啥啥~~~ Node trail = null; // 如果t不为null,就正常执行~~ while (t != null) { // 拿到t的next节点 Node next = t.nextWaiter; // 如果t的状态不为-2,说明有问题 if (t.waitStatus != Node.CONDITION) { // t节点的next为null t.nextWaiter = null; // 如果trail为null,代表头结点状态就是1, if (trail == null) // 将头结点指向next节点 firstWaiter = next; else // 如果trail有值,说明不是头结点位置 trail.nextWaiter = next; // 如果next为null,说明单向链表遍历到最后了,直接结束 if (next == null) lastWaiter = trail; } // 如果t的状态是-2,一切正常 else { // 临时存储t trail = t; } // t指向之前的next t = next; } } // 一次性释放锁资源 final int fullyRelease(Node node) { // 标记位,释放锁资源默认失败! boolean failed = true; try { // 拿到现在state的值 int savedState = getState(); // 一次性释放干净全部锁资源 if (release(savedState)) { // 释放锁资源失败了么? 没有! failed = false; // 返回对应的锁资源信息 return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) // 如果释放锁资源失败,将节点状态设置为取消 node.waitStatus = Node.CANCELLED; } }

Condition的 signal 方法分析

分为了几个部分:

  • 确保执行 signal 方法的是持有锁的线程
  • 脱离 Condition 的队列
  • 将 Node 状态从 -2 改为 0
  • 将 Node 添加到 AQS 队列
  • 为了避免当前 Node 无法在 AQS 队列正常唤醒做了一些判断和操作
java
// 线程挂起后,可以基于signal唤醒~ public final void signal() { // 在ReentrantLock中,如果执行signal的线程没有持有锁资源,直接扔异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 拿到排在Condition首位的Node Node first = firstWaiter; // 有Node在排队,才需要唤醒,如果没有,直接告辞~~ if (first != null) doSignal(first); } // 开始唤醒Condition中的Node中的线程 private void doSignal(Node first) { // 先一波do-while走你~~~ do { // 获取到第二个节点,并且将第二个节点设置为firstWaiter if ( (firstWaiter = first.nextWaiter) == null) // 说明就一个节点在Condition队列中,那么直接将firstWaiter和lastWaiter置位null lastWaiter = null; // 如果还有nextWaiter节点,因为当前节点要被唤醒了,脱离整个Condition队列。将nextWaiter置位null first.nextWaiter = null; // 如果transferForSignal返回true,一切正常,退出while循环 } while (!transferForSignal(first) && // 如果后续节点还有,往后面继续唤醒,如果没有,退出while循环 (first = firstWaiter) != null); } // 准备开始唤醒在Condition中排队的Node final boolean transferForSignal(Node node) { // 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 如果失败了,说明在signal之前应当是线程被中断了,从而被唤醒了。 return false; // 如果正常的将Node的状态从-2改为0,这是就要将Condition中的这个Node扔到AQS的队列。 // 将当前Node扔到AQS队列,返回的p是当前Node的prev Node p = enq(node); // 获取上一个Node的状态 int ws = p.waitStatus; // 如果ws > 0 ,说明这个Node已经被取消了。 // 如果ws状态不是取消,将prev节点的状态改为-1,。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果prev节点已经取消了,可能会导致当前节点永远无法被唤醒。立即唤醒当前节点,基于acquireQueued方法, // 让当前节点找到一个正常的prev节点,并挂起线程 // 如果prev节点正常,但是CAS修改prev节点失败了。证明prev节点因为并发原因导致状态改变。还是为了避免当前 // 节点无法被正常唤醒,提前唤醒当前线程,基于acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程 LockSupport.unpark(node.thread); // 返回true return true; }

Conditiond 的 await 方法分析(后置分析)

分为了几个部分:

  • 唤醒之后,要先确认是中断唤醒还是 signal 唤醒,还是 signal 唤醒后被中断
  • 确保当前线程的 Node 已经在 AQS 队列中
  • 执行 acquireQueued 方法,等待锁资源。
  • 在获取锁资源后,要确认是否在获取锁资源的阶段被中断过,如果被中断过,并且不是THROW_IE,那就确保interruptMode是REINTERRUPT。
  • 确认当前 Node 已经不在 Condition 队列中了
  • 最终根据 interruptMode 来决定具体做的事情
  • 0:嘛也不做。
  • THROW_IE:抛出异常
  • REINTERRUPT:执行线程的interrupt方法
java
// 现在分析await方法的后半部分 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 中断模式~ int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果线程执行到这,说明现在被唤醒了。 // 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中) // 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。 // 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理) // checkInterruptWhileWaiting可以确认当前中如何唤醒的。 // 返回的值,有三种 // 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列) // THROW_IE(-1):中断唤醒,并且可以确保在AQS队列 // REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // Node一定在AQS队列 // 执行acquireQueued,尝试在ReentrantLock中获取锁资源。 // acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 如果线程在AQS队列排队时,被中断了,并且不是THROW_IE状态,确保线程的interruptMode是REINTERRUPT // REINTERRUPT:await不是中断唤醒,但是后续被中断过!!! interruptMode = REINTERRUPT; // 如果当前Node还在condition的单向链表中,脱离Condition的单向链表 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,什么事都不做! if (interruptMode != 0) // 如果不是0~ reportInterruptAfterWait(interruptMode); } // 判断当前线程被唤醒的模式,确认interruptMode的值。 private int checkInterruptWhileWaiting(Node node) { // 判断线程是否中断了。 return Thread.interrupted() ? // THROW_IE:代表线程是被interrupt唤醒的,需要向上排除异常 // REINTERRUPT:代表线程是signal唤醒的,但是在唤醒之后,被中断了。 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : // 线程是正常的被signal唤醒,并且线程没有中断过。 0; } // 判断线程到底是中断唤醒的,还是signal唤醒的! final boolean transferAfterCancelledWait(Node node) { // 基于CAS将Node的状态从-2改为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 说明是中断唤醒的线程。因为CAS成功了。 // 将Node添加到AQS队列中~(如果是中断唤醒的,当前线程同时存在Condition的单向链表以及AQS的队列中) enq(node); // 返回true return true; } // 判断当前的Node是否在AQS队列(signal唤醒的,但是可能线程还没放到AQS队列) // 等到signal方法将线程的Node扔到AQS队列后,再做后续操作 while (!isOnSyncQueue(node)) // 如果没在AQS队列上,那就线程让步,稍等一会,Node放到AQS队列再处理(看CPU) Thread.yield(); // signal唤醒的,返回false return false; } // 确认Node是否在AQS队列上 final boolean isOnSyncQueue(Node node) { // 如果线程状态为-2,肯定没在AQS队列 // 如果prev节点的值为null,肯定没在AQS队列 if (node.waitStatus == Node.CONDITION || node.prev == null) // 返回false return false; // 如果节点的next不为null。说明已经在AQS队列上。、 if (node.next != null) // 确定AQS队列上有! return true; // 如果上述判断都没有确认节点在AQS队列上,在AQS队列中寻找一波 return findNodeFromTail(node); } // 在AQS队列中找当前节点 private boolean findNodeFromTail(Node node) { // 拿到尾节点 Node t = tail; for (;;) { // tail是否是当前节点,如果是,说明在AQS队列 if (t == node) // 可以跳出while循环 return true; // 如果节点为null,AQS队列中没有当前节点 if (t == null) // 进入while,让步一手 return false; // t向前引用 t = t.prev; } } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 如果是中断唤醒的await,直接抛出异常! if (interruptMode == THROW_IE) throw new InterruptedException(); // 如果是REINTERRUPT,signal后被中断过 else if (interruptMode == REINTERRUPT) // 确认线程的中断标记位是true // Thread.currentThread().interrupt(); selfInterrupt(); }

Condition 的 awaitNanos & signalAll 方法分析

awaitNanos:仅仅是在 await 方法的基础上,做了一内内的改变,整体的逻辑思想都是一样的。

挂起线程时,传入要阻塞的时间,时间到了,自动唤醒,走添加到 AQS 队列的逻辑

java
// await指定时间,多了个时间到了自动醒。 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // deadline:当前线程最多挂起到什么时间点 final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { // nanosTimeout的时间小于等于0,直接告辞!! if (nanosTimeout <= 0L) { // 正常扔到AQS队列 transferAfterCancelledWait(node); break; } // nanosTimeout的时间大于1000纳秒时,才可以挂起线程 if (nanosTimeout >= spinForTimeoutThreshold) // 如果大于,正常挂起 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 计算剩余的挂起时间,可能需要重新的走while循环,再次挂起线程 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); // 剩余的挂起时间 return deadline - System.nanoTime(); }

signalAll 方法。这个方法一看就懂,之前 signal 是唤醒1个,这个是全部唤醒

java
// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列 private void doSignalAll(Node first) { // 将头尾都置位null~ lastWaiter = firstWaiter = null; do { // 拿到next节点的引用 Node next = first.nextWaiter; // 断开当前Node的nextWaiter first.nextWaiter = null; // 修改Node状态,扔AQS队列,是否唤醒! transferForSignal(first); // 指向下一个节点 first = next; } while (first != null); }

总结

AQS 是 Java 并发包的引擎,理解其原理能帮助开发者更高效地使用锁和同步器,甚至自定义高性能同步组件。核心价值在于:将复杂的线程排队、阻塞/唤醒逻辑标准化,开发者只需关注状态管理。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!