并发栅栏CyclicBarrier—简单问

  • 2019 年 10 月 3 日
  • 笔记

并发栅栏CyclicBarrier—简单问

 

背景:前几天在网上看到关于Java并发包java.concurrent中一个连环炮的面试题,整理下以备不时之需。

CyclicBarrier简介:

栅栏类似于闭锁,它能够阻塞一组线程直到某个事件发生;它与闭锁(CountDownLatch)的区分关键在于,闭锁是所有线程等待一个外部事件的发生;而栅栏则是所有线程相互等待,直到所有线程都到达某一点时才打开栅栏,然后线程可以继续执行。

问题:

如果想实现所有的线程一起等待某个事件的发生,当某个事件发生时,所有线程一起开始往下执行的话,有什么好的办法吗?

回答:

可以使用栅栏,Java并发包中的CyclicBarrier。

又问:

你知道CyclicBarrier的实现原理吗?

回答:

CyclicBarrier.await方法调用CyclicBarrier.dowait方法,每次调用await方法都会使计数器-1,当减少到0时就会唤醒所有的线程。(计数器count就是线程总数,CyclicBarrier cyclicBarrier = new CyclicBarrier(100);)最核心的部分就是 int index = –count; 和 nextGeneration();方法。

1 public int await() throws InterruptedException, BrokenBarrierException {  2 try {  3 return dowait(false, 0L);  4 } catch (TimeoutException toe) {  5 throw new Error(toe); // cannot happen  6 }  7 }

1 public int await(long timeout, TimeUnit unit)  2 throws InterruptedException,  3 BrokenBarrierException,  4 TimeoutException {  5 return dowait(true, unit.toNanos(timeout));  6 }

 1 private int dowait(boolean timed, long nanos)   2 throws InterruptedException, BrokenBarrierException,   3 TimeoutException {   4 final ReentrantLock lock = this.lock;   5 lock.lock();   6 try {   7 final Generation g = generation;   8   9 if (g.broken)  10 throw new BrokenBarrierException();  11  12 if (Thread.interrupted()) {  13 breakBarrier();  14 throw new InterruptedException();  15 }  16  17 int index = --count; // 最核心的部分就是此处1  18 if (index == 0) { // tripped  19 boolean ranAction = false;  20 try {  21 final Runnable command = barrierCommand;  22 if (command != null)  23 command.run();  24 ranAction = true;  25 nextGeneration(); // 最核心的部分就是此处2  26 return 0;  27 } finally {  28 if (!ranAction)  29 breakBarrier();  30 }  31 }  32  33 // loop until tripped, broken, interrupted, or timed out  34 for (;;) {  35 try {  36 if (!timed)  37 trip.await();  38 else if (nanos > 0L)  39 nanos = trip.awaitNanos(nanos);  40 } catch (InterruptedException ie) {  41 if (g == generation && ! g.broken) {  42 breakBarrier();  43 throw ie;  44 } else {  45 // We're about to finish waiting even if we had not  46 // been interrupted, so this interrupt is deemed to  47 // "belong" to subsequent execution.  48 Thread.currentThread().interrupt();  49 }  50 }  51  52 if (g.broken)  53 throw new BrokenBarrierException();  54  55 if (g != generation)  56 return index;  57  58 if (timed && nanos <= 0L) {  59 breakBarrier();  60 throw new TimeoutException();  61 }  62 }  63 } finally {  64 lock.unlock();  65 }  66 }

1 private void nextGeneration() {  2 // signal completion of last generation  3 trip.signalAll();  4 // set up next generation  5 count = parties;  6 generation = new Generation();  7 }

又问:

除此之外,您还知道其它的实现方式吗?

回答:

方案1:读写锁,刚开始主线程获取写锁,然后所有子线程获取读锁,然后等事件发生时主线程释放写锁;

方案2:CountDownLatch闭锁,CountDownLatch初始值设为1,所有子线程调用await方法等待,等事件发生时调用countDown方法计数减为0;

方案3:Semaphore,Semaphore初始值设为N,刚开始主线程先调用acquire(N)申请N个信号量,其他线程调用acquire()阻塞等待,等事件发生时同时主线程释放N个信号量。

CountDownLatch闭锁实现模拟如下:

 1 import java.util.concurrent.CountDownLatch;   2   3 public class CountDownLatchDemo {   4   5 /**   6 * 模拟老爸去饭店   7 */   8 public static void fatherToRes()   9 {  10 System.out.println("老爸步行去饭店需要3小时。");  11 }  12  13 /**  14 * 模拟老妈去饭店  15 */  16 public static void motherToRes()  17 {  18 System.out.println("老妈挤公交去饭店需要2小时。");  19 }  20  21 /**  22 * 模拟我去饭店  23 */  24 public static void meToRes()  25 {  26 System.out.println("我乘地铁去饭店需要1小时。");  27 }  28  29 /**  30 * 模拟一家人到齐了  31 */  32 public static void togetherToEat()  33 {  34 System.out.println("一家人到齐了,开始吃饭");  35 }  36  37  38 private static CountDownLatch latch = new CountDownLatch(3);  39  40 public static void main(String[] args) throws InterruptedException  41 {  42  43 new Thread()  44 {  45 public void run()  46 {  47 fatherToRes();  48 latch.countDown();  49 };  50 }.start();  51 new Thread()  52 {  53 public void run()  54 {  55 motherToRes();  56 latch.countDown();  57 };  58 }.start();  59 new Thread()  60 {  61 public void run()  62 {  63 meToRes();  64 latch.countDown();  65 };  66 }.start();  67  68 latch.await();  69 togetherToEat();  70 }  71 }

又问:

您觉得这些方式里哪个方式更好呢?

回答:

CountDownLatch闭锁是等待一组线程执行完毕后才能继续执行;

CyclicBarrier栅栏是能让一组线程达到一个同步点时被阻塞,直到最后一个线程达到,阻塞才会消失,其是可以循环使用的;

Semaphore信号量是只允许一定数量的线程同时执行,一般用来限制访问资源的线程数量。

又问:

如果你这个时候依然可以说出来你自己更好的实现方式,那么面试官肯定还会揪着这个继续问你。