CyclicBarrier中文翻译是循环栏栅,用在并发中,可以理解为它要求指定数量的线程必须都到达某个指定位置时才能往下走。为什么叫循环,因为这个栏栅可以多次循环使用,一次放开栏栅后,所有线程都会通过。然后关闭栏栅,后面就可以接着使用。
生活中的例子是,比如一个团队准备去爬山,约定在某个时间地点,要每个成员都到达才会出发。先到达的只能在集合地等待了,只要最后一个成员到达,大家才会出发。代码例子如下:
public static void main(String[] args) throws InterruptedException {CyclicBarrier barrier = new CyclicBarrier(2);Thread t1 = new Thread(()->{System.out.println("小明到达了集合地");try {barrier.await();} catch (Exception e) {e.printStackTrace();}System.out.println("小明出发爬山");});Thread t2 = new Thread(()->{System.out.println("小张到达了集合地");try {barrier.await();} catch (Exception e) {e.printStackTrace();}System.out.println("小张出发爬山");});t1.start();Thread.sleep(1000); t2.start();t1.join();t2.join();System.out.println("大部队出发爬山");}
运行结果:
小明到达了集合地
小张到达了集合地
小张出发爬山
小明出发爬山
大部队出发爬山
分析源码之前我们整理下通过栏栅的情况,大概有以下四种:
从以下代码中看到,加锁使用了重入锁,等待通过栏栅使用了Condition
public class CyclicBarrier {/** 由于是循环栏栅, Generation的broken用来记录每次栏栅是否被破坏*/private static class Generation {boolean broken = false;}/** 进入栏栅需要加锁 */private final ReentrantLock lock = new ReentrantLock();/** 等待栏栅通过的线程 */private final Condition trip = wCondition();/** 阻拦的线程数,到达这个数量就能通过栏栅 */private final int parties;/** 栏栅放开后运行的任务,可为空 */private final Runnable barrierCommand;/** The current generation */private Generation generation = new Generation();/** 剩余等待通过栏栅的线程数 */private int count;public CyclicBarrier(int parties) {this(parties, null);}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = unt = parties;this.barrierCommand = barrierAction;}
}
要加入栏栅阻拦线程,我们只需要调用await()方法即可,而调用数量达到parties后,栏栅会自动放开。下面我们一起看看await()的源码。
public int await() throws InterruptedException, BrokenBarrierException {try {// 第一个参数传false说明不指定等待栏栅放开的时间,会无限时间等待return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {// 首先加重入锁,控制并发 final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;// 当前栏栅被破坏抛出异常if (g.broken)throw new BrokenBarrierException();// 如果当前线程被中断,那么放开栏栅,让栏栅处阻塞的线程往下执行if (Thread.interrupted()) {// 该方法会放开栏栅,并唤醒栏栅处的线程breakBarrier();throw new InterruptedException();}int index = --count;// index=0说明栏栅可以放开if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;// 放开栏栅之前看是否需要执行给定的任务if (command != null)command.run();ranAction = true;// 该方法会放开栏栅,并唤醒栏栅处的线程,并且重置栏栅nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out// 这里轮询直到栏栅放开for (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {// 发生中断异常,并且栏栅没被破坏,那么就破坏栏栅breakBarrier();throw ie;} else {// 栏栅被重置,或者栏栅破坏了,就中断当前线程Thread.currentThread().interrupt();}}// 如果栏栅被中断,那么抛出异常if (g.broken)throw new BrokenBarrierException();// 正常情况返回当前的计数if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}/** 破坏栏栅,也会唤醒栏栅处的线程,破坏栏栅的方法没有重置栏栅,栏栅将不能再次使用 */private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();}/** 放开栏栅,并重置栏栅,下次就可以 */private void nextGeneration() {// 唤醒在栏栅处的线程trip.signalAll();// set up next generationcount = parties;// 重置栏栅generation = new Generation();}
从以上代码分析中发现,如果栏栅被中断或其他异常破坏,那么阻塞的线程会抛出异常,栏栅也不能再次使用。只有正常通过的栏栅才能够循环使用。
重置逻辑比较简单,首先加锁,然后调用breakBarrier()破坏栏栅,最后再nextGeneration()重置generation。
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier(); // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}}
CyclicBarrier用在多线程的统一行动上,在某个位置拦住线程,必须等待指定数量的线程到达后才会放开阻碍。
注意的是如果栏栅被异常破坏了,那么栏栅将不能循环再次使用了。
本文发布于:2024-01-28 05:56:55,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17063926195291.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |