大企业网站建设方案,建设ca网站,南昌网站开发机构,专业建网站的学校CyclicBarrier#xff0c;一个同步辅助类#xff0c;在API中是这么介绍的#xff1a;
它允许一组线程互相等待#xff0c;直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中#xff0c;这些线程必须不时地互相等待#xff0c;此时 Cycl…CyclicBarrier一个同步辅助类在API中是这么介绍的
它允许一组线程互相等待直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中这些线程必须不时地互相等待此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用所以称它为循环 的 barrier。
通俗点讲就是就像在跑步比赛的时候只有运动员全部到达的时候裁判才会让他们开始跑。
实现分析
首先这个工具类可以用两个构造方法调用一个是默认都构造方法单个参数只设置屏障点一个是用于线程到达屏障时优先执行barrierAction方便处理更复杂的业务场景但一般选用单个参数的。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0) throw new IllegalArgumentException();this.parties parties;this.count parties;this.barrierCommand barrierAction;
}public CyclicBarrier(int parties) {this(parties, null);
}在CyclicBarrier中最重要的方法莫过于await()方法在所有参与者都已经在此 barrier 上调用 await 方法之前将一直等待。如下
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);//不超时等待} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}await()方法内部调用dowait(boolean timed, long nanos)方法 private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {//获取锁final ReentrantLock lock this.lock;lock.lock();try {//分代final Generation g generation;//当前generation“已损坏”抛出BrokenBarrierException异常//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrieif (g.broken)//当某个线程试图等待处于断开状态的 barrier 时或者 barrier 进入断开状态而线程处于等待状态时抛出该异常throw new BrokenBarrierException();//如果线程中断终止CyclicBarrierif (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}//进来一个线程 count - 1int index --count;//count 0 表示所有线程均已到位触发Runnable任务if (index 0) { // trippedboolean ranAction false;try {final Runnable command barrierCommand;//触发任务if (command ! null)command.run();ranAction true;//唤醒所有等待线程并更新generationnextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}for (;;) {try {//如果不是超时等待则调用Condition.await()方法等待if (!timed)trip.await();else if (nanos 0L)//超时等待调用Condition.awaitNanos()方法等待nanos trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g generation ! g.broken) {breakBarrier();throw ie;} else {// Were about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// belong to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();//generation已经更新返回indexif (g ! generation)return index;//“超时等待”并且时间已到,终止CyclicBarrier并抛出异常if (timed nanos 0L) {breakBarrier();throw new TimeoutException();}}} finally {//释放锁lock.unlock();}}其实await()的处理逻辑还是比较简单的如果该线程不是到达的最后一个线程则他会一直处于等待状态除非发生以下情况
最后一个线程到达即index 0超出了指定时间超时等待其他的某个线程中断当前线程其他的某个线程中断另一个等待的线程其他的某个线程在等待barrier超时其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态。
在上面的源代码中我们可能需要注意Generation 对象在上述代码中我们总是可以看到抛出BrokenBarrierException异常那么什么时候抛出异常呢如果一个线程处于等待状态时如果其他线程调用reset()或者调用的barrier原本就是被损坏的则抛出BrokenBarrierException异常。同时任何线程在等待时被中断了则其他所有线程都将抛出BrokenBarrierException异常并将barrier置于损坏状态。
同时Generation描述着CyclicBarrier的更显换代。在CyclicBarrier中同一批线程属于同一代。当有parties个线程到达barriergeneration就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。
private static class Generation {boolean broken false;
}默认barrier是没有损坏的。
当barrier损坏了或者有一个线程中断了则通过breakBarrier()来终止所有的线程
private void breakBarrier() {generation.broken true;count parties;trip.signalAll();
}在breakBarrier()中除了将broken设置为true还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。
当所有线程都已经到达barrier处index 0则会通过nextGeneration()进行更新换地操作在这个步骤中做了三件事唤醒所有线程重置countgeneration。
private void nextGeneration() {trip.signalAll();count parties;generation new Generation();
}CyclicBarrier同时也提供了await(long timeout, TimeUnit unit) 方法来做超时控制内部还是通过调用doawait()实现的。
应用示例
1、比如我们开会只有等所有的人到齐了才会开会如下
public class CyclicBarrierTest {private static CyclicBarrier cyclicBarrier;static class CyclicBarrierThread extends Thread{public void run() {System.out.println(Thread.currentThread().getName() 到了);//等待try {cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args){cyclicBarrier new CyclicBarrier(5, new Runnable() {Overridepublic void run() {System.out.println(人到齐了开会吧....);}});for(int i 0 ; i 5 ; i){new CyclicBarrierThread().start();}}
}结果 2、使用了一个缓存线程执行18个任务任务要求每次处理的线程达到6个时候就释放一次不足6个等待。
Testpublic void test1() {ExecutorService executorService Executors.newCachedThreadPool();CyclicBarrier cyclicBarrier new CyclicBarrier(6);for (int i 1; i 18; i) {executorService.execute(new Runnable() {Overridepublic void run() {System.out.println(Thread.currentThread().getName() 开始等待其他线程);try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName() 开始执行业务逻辑耗时0.5秒);// 工作线程开始处理这里用Thread.sleep()来模拟业务处理Thread.sleep(500);System.out.println(Thread.currentThread().getName() 业务逻辑执行完毕);} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});}executorService.shutdown();}结果