Java 并发包(JUC)提供了一系列强大的并发工具类,本文将深入解析常用工具类的原理,并通过完整代码示例展示其实际应用场景
基于 AQS(AbstractQueuedSynchronizer)实现,初始化计数器值(state),线程通过countDown()释放共享锁递减计数state-1),调用await()的线程阻塞直到计数器归零(state=0)。
一次性协作机制,主线程等待指定数量的子任务完成。
CountDownLatch(int count):构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N
countDown()
调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零 由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤 在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里调用 countDown 方法即可
await(long time,TimeUnit unit):方法等待特定时间后,就会不再阻塞当前线程
java // 模拟启动服务器(需所有服务就绪) public class ServerInitializer { public static void main(String[] args) throws InterruptedException { int serviceCount = 3; CountDownLatch latch = new CountDownLatch(serviceCount); new Thread(new Service("Auth", 2000, latch)).start(); new Thread(new Service("Database", 3000, latch)).start(); new Thread(new Service("Cache", 1500, latch)).start(); latch.await(); // 阻塞直到计数器归零 System.out.println("所有服务启动完成!"); } static class Service implements Runnable { private final String name; private final int bootTime; private final CountDownLatch latch; Service(String name, int bootTime, CountDownLatch latch) { this.name = name; this.bootTime = bootTime; this.latch = latch; } @Override public void run() { try { Thread.sleep(bootTime); System.out.println(name + " 服务已启动"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); // 计数器减1 } } } }
通过 ReentrantLock 和 Condition 实现,线程调用await()时计数减1并阻塞,当计数归零时唤醒所有线程并重置计数器。
可复用的同步点,用于多线程相互等待到达屏障点。
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
CyclicBarrier(int parties):其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞
java // 多玩家游戏准备场景 public class GameSession { public static void main(String[] args) { int playerCount = 4; CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> System.out.println("所有玩家准备完成!开始游戏...")); for (int i = 1; i <= playerCount; i++) { new Thread(new Player("玩家" + i, barrier)).start(); } } static class Player implements Runnable { private final String name; private final CyclicBarrier barrier; Player(String name, CyclicBarrier barrier) { this.name = name; this.barrier = barrier; } @Override public void run() { try { // 模拟加载资源时间 int loadTime = ThreadLocalRandom.current().nextInt(1000, 3000); Thread.sleep(loadTime); System.out.println(name + " 准备完成 (" + loadTime + "ms)"); barrier.await(); // 等待其他玩家 // 游戏开始后的操作 System.out.println(name + " 开始行动"); } catch (Exception e) { e.printStackTrace(); } } } }
基于 AQS 实现,通过acquire()获取许可计数器减1(state-1),release()释放许可计数器加1(state+1),当计数器为0(state=0)时阻塞获取线程。
控制同时访问特定资源的线程数量,流量控制,限流
Semaphore(int permits): 构造函数定义许可证数量
java // 数据库连接池模拟 public class ConnectionPool { private static final int POOL_SIZE = 5; private final Semaphore semaphore = new Semaphore(POOL_SIZE, true); private final Set<Connection> connections = new HashSet<>(); public ConnectionPool() { for (int i = 0; i < POOL_SIZE; i++) { connections.add(createMockConnection()); } } public Connection getConnection() throws InterruptedException { semaphore.acquire(); // 获取许可 return getAvailableConnection(); } public void releaseConnection(Connection conn) { if (conn != null) { returnConnection(conn); semaphore.release(); // 释放许可 } } private synchronized Connection getAvailableConnection() { Connection conn = connections.iterator().next(); connections.remove(conn); return conn; } private synchronized void returnConnection(Connection conn) { connections.add(conn); } private Connection createMockConnection() { return new Connection() { // 模拟连接实现 }; } // 使用示例 public static void main(String[] args) { ConnectionPool pool = new ConnectionPool(); ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { Connection conn = pool.getConnection(); System.out.println(Thread.currentThread().getName() + " 获取连接,执行查询..."); Thread.sleep(1000); // 模拟查询 pool.releaseConnection(conn); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executor.shutdown(); } }
Exchanger(交换者)是一个用于线程间协作的工具类
基于 LockSupport.park/unpark,每个交换点维护一个 "槽位",首个到达线程存入数据并阻塞,第二个线程到达时交换数据
用于进行线程间的数据交换
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据
这两个线程通过 exchange 方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
应用
遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果
校对工作:采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
可重复使用的同步屏障,功能类似于 CyclicBarrier 和 CountDownLatch,但支持更灵活的使用。
基于树形结构减少竞争,每个阶段(phase)独立计数,支持动态注册/注销参与者,分层移除减少同步开销
可重用的同步屏障,支持动态调整参与者
java // 多阶段考试模拟 public class ExaminationSystem { public static void main(String[] args) { int studentCount = 5; Phaser phaser = new Phaser(1); // 注册主线程 System.out.println("考试开始,学生入场"); // 注册学生线程 for (int i = 0; i < studentCount; i++) { phaser.register(); new Thread(new Student(phaser), "学生" + (i+1)).start(); } // 第一阶段:笔试 phaser.arriveAndAwaitAdvance(); System.out.println("\n笔试结束,开始机试"); // 第二阶段:机试 phaser.arriveAndAwaitAdvance(); System.out.println("\n机试结束,开始面试"); // 第三阶段:面试 phaser.arriveAndAwaitAdvance(); System.out.println("\n所有考试结束"); phaser.arriveAndDeregister(); // 主线程退出 } static class Student implements Runnable { private final Phaser phaser; Student(Phaser phaser) { this.phaser = phaser; } @Override public void run() { // 参加笔试 System.out.println(Thread.currentThread().getName() + " 正在笔试..."); sleepRandom(1000, 3000); System.out.println(Thread.currentThread().getName() + " 完成笔试"); phaser.arriveAndAwaitAdvance(); // 参加机试 System.out.println(Thread.currentThread().getName() + " 正在机试..."); sleepRandom(1500, 4000); System.out.println(Thread.currentThread().getName() + " 完成机试"); phaser.arriveAndAwaitAdvance(); // 参加面试 System.out.println(Thread.currentThread().getName() + " 正在面试..."); sleepRandom(2000, 5000); System.out.println(Thread.currentThread().getName() + " 完成面试"); phaser.arriveAndDeregister(); // 退出考试 } private void sleepRandom(int min, int max) { try { Thread.sleep(ThreadLocalRandom.current().nextInt(min, max)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
一种基于功能的锁,有三种模式用于控制读写访问。
StampedLock 的状态由版本和模式组成。锁获取方法返回一个戳,该戳表示并控制对锁状态的访问;这些方法的“try”版本可能会返回特殊值零,表示无法获取访问权限。锁释放和转换方法需要图章作为参数,如果它们与锁的状态不匹配,则会失败。这三种模式是:
写锁:writeLock方法可能会在等待独占访问时阻塞,并返回一个戳记,该戳记可在unlockWrite方法中使用以释放锁。还提供了无计时和计时版本的tryWriteLock。当锁处于写模式时,无法获取读锁,所有乐观读验证都将失败。
读锁:readLock方法可能会阻塞等待非独占访问,并返回一个戳记,该戳记可在unlockRead方法中使用以释放锁。同时,还提供了无计时和有计时的tryReadLock版本。
乐观读锁:方法tryOptimisticRead仅在锁当前未以写模式持有时返回非零戳记。如果自获取给定戳记以来,锁尚未以写模式被获取,则方法validate返回true,在这种情况下,最近一次写锁释放之前的所有操作都发生在调用tryOptimisticRead之后的操作之前。这种模式可以被视为一种极其弱的读锁版本,写者可以随时破坏它。对于简短的只读代码段使用乐观读取模式通常可以减少争用并提高吞吐量。然而,其使用本身具有脆弱性。乐观读取部分应仅读取字段,并将其保存在局部变量中,以便在验证后稍后使用。在乐观读取模式下读取的字段可能非常不一致,因此仅当您足够熟悉数据表示以检查一致性和/或反复调用方法validate()时,才适用此用法。例如,在首次读取对象或数组引用,然后访问其某个字段、元素或方法时,通常需要执行这些步骤。
乐观读锁优化
分为三种模式:写锁、悲观读锁、乐观读,通过戳标记(stamp)用于锁状态验证,自动进行锁升级/降级机制
java // 使用StampedLock实现高效缓存 public class OptimisticCache { private final StampedLock lock = new StampedLock(); private Map<String, String> cache = new HashMap<>(); // 写操作使用排他锁 public void put(String key, String value) { long stamp = lock.writeLock(); try { cache.put(key, value); } finally { lock.unlockWrite(stamp); } } // 读操作使用乐观锁 public String get(String key) { // 1. 尝试乐观读 long stamp = lock.tryOptimisticRead(); String value = cache.get(key); // 2. 检查是否被修改 if (!lock.validate(stamp)) { // 升级为悲观读锁 stamp = lock.readLock(); try { value = cache.get(key); } finally { lock.unlockRead(stamp); } } return value; } // 使用示例 public static void main(String[] args) { OptimisticCache cache = new OptimisticCache(); ExecutorService executor = Executors.newFixedThreadPool(10); // 并发写 for (int i = 0; i < 5; i++) { final int id = i; executor.execute(() -> { cache.put("key" + id, "value" + id); }); } // 并发读 for (int i = 0; i < 10; i++) { executor.execute(() -> { for (int j = 0; j < 5; j++) { String value = cache.get("key" + ThreadLocalRandom.current().nextInt(5)); System.out.println("Read: " + value); } }); } executor.shutdown(); } }
通过理解这些工具的核心原理和适用场景,开发者可以:
在调试的过程中进行调试与监控,可以使用 JConsole 查看锁竞争,开启 -XX:+PrintConcurrentLocks,或者使用 Arthas 监控线程阻塞
最终建议:根据实际场景组合使用工具类(如 Phaser+CompletableFuture 处理多阶段异步任务),并配合监控工具持续优化。
本文作者:张豪
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!