本文总结了J.U.C里在基于AQS的基础上实现的几种用于线程的并发控制同步组件。
CountDownLatch
等待其它线程完成的CountDownLatch,组件中维护了一个计数器,在初始化组件时给定计数器一个参数作为倒数起点,在一个线程完成时调用组件倒数方法latch.countDown();
使计数器-1。当计数器数字为0时,线程从线程阻塞处 latch.await();
返回。
public static void main(String[] args) throws Exception {
// latch
final CountDownLatch latch = new CountDownLatch(threadCount);
// 线程池
ExecutorService executors = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int param = i;
executors.execute(()->{
try{
execute(param);
}catch (Exception e){
log.error("error : ",e);
}finally {
latch.countDown();
}
});
}
// 等待latch倒数为0
latch.await();
log.info("execute finish");
executors.shutdown();
}
execute();
方法打印倒数参数:
private static void execute(int param) throws InterruptedException {
log.info("receive parameter: {}", param);
Thread.sleep(500);
}
console输出:
15:36:56.340 [pool-1-thread-3] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 2 15:36:56.340 [pool-1-thread-2] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 1 15:36:56.340 [pool-1-thread-4] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 3 15:36:56.339 [pool-1-thread-1] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 0 15:36:56.340 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 4 15:36:56.843 [main] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - execute finish
可以看到在new CountDownLatch(5)
场景下线程在执行完5次execute()
后从 latch.await()
返回,随即输出"execute finish"。
在使用CountDownLatch分解处理任务时,需要考虑到单个线程因不可知问题执行时间过长导致CountDownLatch的长时间阻塞。因此await()
支持传入时间参数实现超时返回:
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
可以看到源码里造成线程阻塞的实现sync
对象即是CountDownLatch维护的基于AQS的内部静态同步器。
改写一下上述测试:
- 超过1s即从阻塞返回
latch.await(1000,TimeUnit.MILLISECONDS);
- 当线程3进入时阻塞程序2s造成超时
private static void execute(int param) throws InterruptedException { if(param == 3){ Thread.sleep(2000); } log.info("receive parameter: {}", param); }
console输出:
15:44:45.998 [pool-1-thread-2] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 1 15:44:45.998 [pool-1-thread-3] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 2 15:44:45.998 [pool-1-thread-1] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 0 15:44:45.998 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 4 15:44:46.997 [main] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - execute finish 15:44:47.996 [pool-1-thread-4] INFO net.check321.concurrency.clazz.aqs.CountDownLatchTest - receive parameter: 3
可见超过1s时main线程直接输出"execute finish"并没有等待线程3执行完毕。
CyclicBarrier
同步屏障是一个可循环使用的多线程同步控制器。当足够数量(屏障初始化数量)的线程到达屏障点时,线程同时从阻塞处返回。
初始化屏障,一个参数定义屏障数量,第二个参数(可选)称为"barrier-action",支持传入一个
java.lang.Runnable
类型线程处理,在到达屏障数量时优先执行传入的线程逻辑。private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, () -> { log.info("callback is runing..."); });
在线程ready后屏障阻塞执行逻辑
private static void execute(int threadNum) throws InterruptedException, BrokenBarrierException {
Thread.sleep(2000);
log.info("thread {} get ready!", threadNum);
cyclicBarrier.await();
log.info("thread {} finished!", threadNum);
}
- 启动5个线程调用
public static void main(String[] args) throws Exception {
// 线程池
ExecutorService executors = Executors.newCachedThreadPool();
try {
for (int i = 0; i < threadCount; i++) {
Thread.sleep(1000);
final int threadNum = i;
executors.execute(() -> {
try {
execute(threadNum);
} catch (Exception e) {
log.error("error : ", e);
}
});
}
} finally {
executors.shutdown();
}
}
console输出:
16:16:15.639 [pool-1-thread-1] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 0 get ready! 16:16:16.634 [pool-1-thread-2] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 1 get ready! 16:16:17.634 [pool-1-thread-3] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 2 get ready! 16:16:18.634 [pool-1-thread-4] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 3 get ready! 16:16:19.634 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 4 get ready! 16:16:19.634 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - callback is runing... 16:16:19.634 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 4 finished! 16:16:19.634 [pool-1-thread-1] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 0 finished! 16:16:19.634 [pool-1-thread-3] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 2 finished! 16:16:19.634 [pool-1-thread-2] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 1 finished! 16:16:19.635 [pool-1-thread-4] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - thread 3 finished!
可以看到执行线程在0-4 ready被阻塞,并且在到达屏障数量时(5)优先执行了barrier-action,随即各线程从阻塞处返回执行之后逻辑。
CountDownLatch 与 CyclicBarrier
上述两种同步组件在使用感受上差别不多,最大的差别在于CountDownLatch
的计数器是一次性的,当倒数计数器的数字后该latch便不可继续使用。而CyclicBarrier
弥补了这一不足,该barrier是可循环重复使用的。
举个例子:
在上述CyclicBarrier
测试代码里加入一个主线程barrier,在线程barrier执行完成后输出该barrier的计数器数字。
private final static CyclicBarrier mainBarrier = new CyclicBarrier(1, () -> {
log.info("mainBarrier callback is runing...");
log.info("final barrier: {}",cyclicBarrier.getNumberWaiting());
});
console输出:
17:24:18.403 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - mainBarrier callback is runing... 17:24:18.404 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.CyclicBarrierTest - final barrier: 0
当线程cyclicBarrier
执行完成后,waitingNumbers又被归零,该barrier又可以重复使用。
在实验CyclicBarrier
的过程中试图调用reset()
方法去重置屏障数,结果程序抛出异常。查找相关资料后得知CyclicBarrier在释放所有屏障后会自动重置屏障数且如果要手动重置屏障数推荐使用新建屏障对象的方式而不是调用reset()
方法。因为该方法会让因屏障而阻塞的线程抛出java.util.concurrent.BrokenBarrierException
。
附上源码中的注释:
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
Semaphore
理解信号量的含义一定要配合多线程并发里一张经典的图:
信号量就像十字路口的信号灯一样控制车流(线程)的定量(并发量)通行(执行)或停止(阻塞)。
举个例子:
- 定义一个信号量并初始化并发量(3):
// 信号量
final Semaphore semaphore = new Semaphore(3);
- 在并发逻辑的前后调用
acquire()
与release()
控制信号量的开启与关闭:
for (int i = 0; i < threadCount; i++) {
final int param = i;
executors.execute(() -> {
try {
semaphore.tryAcquire(20000,TimeUnit.MILLISECONDS); // 尝试加锁
execute(param);
semaphore.release(); // 释放锁
} catch (Exception e) {
log.error("error : ", e);
}
});
}
console输出:
22:58:18.773 [pool-1-thread-2] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 1 22:58:18.773 [pool-1-thread-3] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 2 22:58:18.773 [pool-1-thread-1] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 0 22:58:20.788 [pool-1-thread-4] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 3 22:58:20.788 [pool-1-thread-6] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 5 22:58:20.788 [pool-1-thread-5] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 4 22:58:22.789 [pool-1-thread-9] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 8 22:58:22.789 [pool-1-thread-7] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 6 22:58:22.789 [pool-1-thread-8] INFO net.check321.concurrency.clazz.aqs.SemaphoreTest - receive parameter: 7
在线程数为9的情况下可以从输出时间上看到每3个线程形成一次并发,信号量很好的控制了多线程同一时间的并发数。
在实际开发中经常使用信号量来控制对公共资源的限流获取,比如数据库连接这种珍贵的公共资源。当请求数量超过数据库的连接数时,可以用一个与连接数数目相等的信号量实现并发情况下数据库连接的安全使用。
参考:
- 《Java并发编程的艺术》第8章 Java中的并发工具类
- When to reset CyclicBarrier in java multithreading
Comments
请在后台配置评论类型和相关的值。