Java 并发包(JUC)提供了一系列强大的并发工具类,本文将深入解析常用工具类的原理,并通过完整代码示例展示其实际应用场景
基于AQS(AbstractQueuedSynchronizer)实现,初始化计数器值(state),线程通过countDown()释放共享锁递减计数state-1),调用await()的线程阻塞直到计数器归零(state=0)。
一次性协作机制,主线程等待指定数量的子任务完成。
CountDownLatch(int count):构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N
countDown()
调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零 由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤 在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里调用 countDown 方法即可
await(long time,TimeUnit unit):方法等待特定时间后,就会不再阻塞当前线程
java// 模拟启动服务器(需所有服务就绪)
public class ServerInitializer {
public static void main(String[] args) throws InterruptedException {
int serviceCount = 3;
CountDownLatch latch = new CountDownLatch(serviceCount);
new Thread(new Service("Auth", 2000, latch)).start();
new Thread(new Service("Database", 3000, latch)).start();
new Thread(new Service("Cache", 1500, latch)).start();
latch.await(); // 阻塞直到计数器归零
System.out.println("所有服务启动完成!");
}
static class Service implements Runnable {
private final String name;
private final int bootTime;
private final CountDownLatch latch;
Service(String name, int bootTime, CountDownLatch latch) {
this.name = name;
this.bootTime = bootTime;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(bootTime);
System.out.println(name + " 服务已启动");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数器减1
}
}
}
}
通过ReentrantLock和Condition实现,线程调用await()时计数减1并阻塞,当计数归零时唤醒所有线程并重置计数器。
可复用的同步点,用于多线程相互等待到达屏障点。
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
CyclicBarrier(int parties):其参数表示屏障拦截的线程数量,每个线程调用 await
方法告诉CyclicBarrier
我已经到达了屏障,然后当前线程被阻塞
java// 多玩家游戏准备场景
public class GameSession {
public static void main(String[] args) {
int playerCount = 4;
CyclicBarrier barrier = new CyclicBarrier(playerCount,
() -> System.out.println("所有玩家准备完成!开始游戏..."));
for (int i = 1; i <= playerCount; i++) {
new Thread(new Player("玩家" + i, barrier)).start();
}
}
static class Player implements Runnable {
private final String name;
private final CyclicBarrier barrier;
Player(String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 模拟加载资源时间
int loadTime = ThreadLocalRandom.current().nextInt(1000, 3000);
Thread.sleep(loadTime);
System.out.println(name + " 准备完成 (" + loadTime + "ms)");
barrier.await(); // 等待其他玩家
// 游戏开始后的操作
System.out.println(name + " 开始行动");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
基于AQS实现,通过acquire()获取许可计数器减1(state-1),release()释放许可计数器加1(state+1),当计数器为0(state=0)时阻塞获取线程。
控制同时访问特定资源的线程数量,流量控制,限流
公平模式(fair=true)避免线程饥饿
使用tryAcquire()尝试获取避免死锁
释放许可数量必须等于获取数量
Semaphore(int permits): 构造函数定义许可证数量
java// 数据库连接池模拟
public class ConnectionPool {
private static final int POOL_SIZE = 5;
private final Semaphore semaphore = new Semaphore(POOL_SIZE, true);
private final Set<Connection> connections = new HashSet<>();
public ConnectionPool() {
for (int i = 0; i < POOL_SIZE; i++) {
connections.add(createMockConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可
return getAvailableConnection();
}
public void releaseConnection(Connection conn) {
if (conn != null) {
returnConnection(conn);
semaphore.release(); // 释放许可
}
}
private synchronized Connection getAvailableConnection() {
Connection conn = connections.iterator().next();
connections.remove(conn);
return conn;
}
private synchronized void returnConnection(Connection conn) {
connections.add(conn);
}
private Connection createMockConnection() {
return new Connection() {
// 模拟连接实现
};
}
// 使用示例
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool();
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Connection conn = pool.getConnection();
System.out.println(Thread.currentThread().getName()
+ " 获取连接,执行查询...");
Thread.sleep(1000); // 模拟查询
pool.releaseConnection(conn);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
Exchanger(交换者)是一个用于线程间协作的工具类
基于 LockSupport.park/unpark
,每个交换点维护一个 "槽位",首个到达线程存入数据并阻塞,第二个线程到达时交换数据
用于进行线程间的数据交换
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据
这两个线程通过 exchange 方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
应用
遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果
校对工作:采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
基于树形结构减少竞争,每个阶段(phase)独立计数,支持动态注册/注销参与者,分层移相器减少同步开销
可重用的同步屏障,支持动态调整参与者
java// 多阶段考试模拟
public class ExaminationSystem {
public static void main(String[] args) {
int studentCount = 5;
Phaser phaser = new Phaser(1); // 注册主线程
System.out.println("考试开始,学生入场");
// 注册学生线程
for (int i = 0; i < studentCount; i++) {
phaser.register();
new Thread(new Student(phaser), "学生" + (i+1)).start();
}
// 第一阶段:笔试
phaser.arriveAndAwaitAdvance();
System.out.println("\n笔试结束,开始机试");
// 第二阶段:机试
phaser.arriveAndAwaitAdvance();
System.out.println("\n机试结束,开始面试");
// 第三阶段:面试
phaser.arriveAndAwaitAdvance();
System.out.println("\n所有考试结束");
phaser.arriveAndDeregister(); // 主线程退出
}
static class Student implements Runnable {
private final Phaser phaser;
Student(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
// 参加笔试
System.out.println(Thread.currentThread().getName() + " 正在笔试...");
sleepRandom(1000, 3000);
System.out.println(Thread.currentThread().getName() + " 完成笔试");
phaser.arriveAndAwaitAdvance();
// 参加机试
System.out.println(Thread.currentThread().getName() + " 正在机试...");
sleepRandom(1500, 4000);
System.out.println(Thread.currentThread().getName() + " 完成机试");
phaser.arriveAndAwaitAdvance();
// 参加面试
System.out.println(Thread.currentThread().getName() + " 正在面试...");
sleepRandom(2000, 5000);
System.out.println(Thread.currentThread().getName() + " 完成面试");
phaser.arriveAndDeregister(); // 退出考试
}
private void sleepRandom(int min, int max) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(min, max));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
乐观读锁优化
分为三种模式:写锁、悲观读锁、乐观读,通过戳标记(stamp)用于锁状态验证,自动进行锁升级/降级机制
乐观读后必须验证 stamp
不支持重入(与 ReentrantLock 不同)
避免在锁中使用条件变量
写锁不可直接降级为乐观读
java// 使用StampedLock实现高效缓存
public class OptimisticCache {
private final StampedLock lock = new StampedLock();
private Map<String, String> cache = new HashMap<>();
// 写操作使用排他锁
public void put(String key, String value) {
long stamp = lock.writeLock();
try {
cache.put(key, value);
} finally {
lock.unlockWrite(stamp);
}
}
// 读操作使用乐观锁
public String get(String key) {
// 1. 尝试乐观读
long stamp = lock.tryOptimisticRead();
String value = cache.get(key);
// 2. 检查是否被修改
if (!lock.validate(stamp)) {
// 升级为悲观读锁
stamp = lock.readLock();
try {
value = cache.get(key);
} finally {
lock.unlockRead(stamp);
}
}
return value;
}
// 使用示例
public static void main(String[] args) {
OptimisticCache cache = new OptimisticCache();
ExecutorService executor = Executors.newFixedThreadPool(10);
// 并发写
for (int i = 0; i < 5; i++) {
final int id = i;
executor.execute(() -> {
cache.put("key" + id, "value" + id);
});
}
// 并发读
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
for (int j = 0; j < 5; j++) {
String value = cache.get("key" + ThreadLocalRandom.current().nextInt(5));
System.out.println("Read: " + value);
}
});
}
executor.shutdown();
}
}
通过理解这些工具的核心原理和适用场景,开发者可以:
在调试的过程中进行调试与监控,可以使用 JConsole
查看锁竞争,开启 -XX:+PrintConcurrentLocks
,或者使用 Arthas
监控线程阻塞
最终建议:根据实际场景组合使用工具类(如 Phaser+CompletableFuture 处理多阶段异步任务),并配合监控工具持续优化。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!