一种允许多个线程全部等待彼此都到达某个屏障的同步机制

使用

多个线程并发执行同一个CyclicBarrier实例的await方法时,每个线程执行这个方法后,都会被暂停,只有当最后一个线程执行完await方法时,它自身不会暂停,且会唤醒所有等待线程。因为CyclicBarrier内部维护了一个显式锁,它可以识别最后一个执行线程。

CyclicBarrier内部维护一个trip变量来实现等待/通知,所有除最后一个线程的保护条件都是“当前generation中,仍未执行的线程数大于0”,这个仍未执行的线程数默认为总参与线程的数量,await方法每执行一次,这个数量递减1.

使用场景

模拟高并发,或放在一个循环中,使得当前迭代操作的结果作为下一个迭代的基础输入;不然可以直接使用Thread.join()或CoutDwonLatch

问题:

1.可以设置多个屏障吗:可以,以generation区分(cyclic)

示例:
class MyThread extends Thread {
private CyclicBarrier cb;
public MyThread(String name, CyclicBarrier cb) {
super(name);
this.cb = cb;
} public void run() {
System.out.println(Thread.currentThread().getName() + " going to await");
try {
cb.await();
System.out.println(Thread.currentThread().getName() + " continue");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") {
public void run() {
System.out.println(Thread.currentThread().getName() + " barrier action"); }
});
MyThread t1 = new MyThread("t1", cb);
MyThread t2 = new MyThread("t2", cb);
t1.start();
t2.start();
System.out.println(Thread.currentThread().getName() + " going to await");
cb.await();
System.out.println(Thread.currentThread().getName() + " continue");

}
}

源码阅读

public class CyclicBarrier {
//屏障的每一次使用都代表一代示例
private static class Generation {
//屏障是否被破坏
boolean broken = false;
}
//防止线程跳过屏障的锁 默认非公平锁
private final ReentrantLock lock = new ReentrantLock();
//等待线程跳过屏障的条件
private final Condition trip = lock.newCondition();
//参与线程数量
private final int parties;
//跳过屏障后执行的指令
private final Runnable barrierCommand;
//当前阶段????
private Generation generation = new Generation();
//每一阶段仍在等待的线程
private int count; //barrierAction:跳过屏障后执行的指令
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//线程到达屏障 则等待:外部等待调用方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* 执行过程:
* 加锁 -> 判断屏障是否被破坏(抛异常) -> 判断线程是否被中断(毁坏屏障,抛异常) ->
* 判断是否仍有等待线程(N -> 执行屏障命令; Y -> 进入屏障等待) -> 解锁
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//锁住当前线程
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//线程被中断 唤醒所有等待线程 损坏当前屏障 抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//获得并减少正在等待进入屏障的线程个数
int index = --count;
if (index == 0) { // 全部进入屏障了 执行后续命令
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程 进入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 自旋直到跳过屏障/中断/超时 停止
for (;;) {
try {
//没有设置时间 直接等待
if (!timed)
              //线程等待
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//不是当前代的 返回索引
if (g != generation)
return index;
//设置了等待时间且时间小于等于0
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
//条件:所有线程处于等待状态
private void nextGeneration() {
// 唤醒所有线程
trip.signalAll();
// 恢复等待线程数
count = parties;
generation = new Generation();
}
}

最新文章

  1. Execl数据导入sql server方法
  2. 最全的 JavaScript 知识总结
  3. IE浏览器中Image对象onload失效的解决办法
  4. centos7 tomcat service 自启动
  5. [Android]解决ClickableSpan中点击后ListView中item的长按冲突的问题
  6. c# select标签绑定枚举,并以Description做Text显示
  7. php redis 代码实例
  8. rqnoj-342-最不听话的机器人-dp
  9. [python]实现单机版一行wordcount
  10. 用Delphi创建服务程序
  11. POJ Countries in War 3114
  12. TMS320F28335项目开发记录5_28335之CCS编程基础
  13. js动态获取时间的方式
  14. cfDNA基本知识
  15. Application 、Cookie和 Session 两种会话有什么不同
  16. PAT A1103 Integer Factorization (30 分)——dfs,递归
  17. ARIMA模型识别、计算p、q值
  18. mysql 不区分大小写的解决
  19. MyEclipse10.7安装Aptana后重启:An internal error has occurred. No more handles [Could not detect registered XULRunner to use]
  20. Swift3 根据秒数获取视频时长(转换成00:00:00时间格式)以及将时长转换成秒

热门文章

  1. Java排序--排序算法(内排序)
  2. IdentitiServser4 + Mysql实现Authorization Server
  3. 对xxl-job进行simpleTrigger并动态创建任务扩展
  4. Js阻止冒泡,Vue中如何阻止冒泡事件
  5. weex 轮播如何使用?
  6. CSS-百分百布局
  7. spring cloud 入门
  8. java 枚举enum的使用(与在switch中的使用)
  9. VUE 从零开始 学习笔记 一
  10. 多线程编程-- part 9 信号量:Semaphore