1. CyclicBarrier的介绍与源码分析

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。

构造函数

public CyclicBarrier(int parties) {
this(parties, null);
}
public int getParties() {
return parties;
}

实现原理:在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁,接着先从await方法返回,再从CyclicBarrier的await方法中返回。

await源码

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

dowait源码

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation; if (g.broken)
throw new BrokenBarrierException(); if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
} int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
} // loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
} if (g.broken)
throw new BrokenBarrierException(); if (g != generation)
return index; if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

当最后一个线程到达屏障点,也就是执行dowait方法时,会在return 0 返回之前调用finally块中的breakBarrier方法。

breakBarrier源代码

private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

CyclicBarrier主要用于一组线程之间的相互等待,而CountDownLatch一般用于一组线程等待另一组些线程。实际上可以通过CountDownLatch的countDown()和await()来实现CyclicBarrier的功能。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()。注意:在一个线程中先调用countDown(),然后调用await()。

其它方法:CycliBarrier对象可以重复使用,重用之前应当调用CyclicBarrier对象的reset方法。

reset源码

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

2. 使用示例

package javalearning;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class CyclicBarrierDemo {
private CyclicBarrier cb = new CyclicBarrier(4);
private Random rnd = new Random(); class TaskDemo implements Runnable{
private String id;
TaskDemo(String id){
this.id = id;
}
@Override
public void run(){
try {
Thread.sleep(rnd.nextInt(1000));
System.out.println("Thread " + id + " will wait");
cb.await();
System.out.println("-------Thread " + id + " is over");
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
} public static void main(String[] args){
CyclicBarrierDemo cbd = new CyclicBarrierDemo();
ExecutorService es = Executors.newCachedThreadPool();
es.submit(cbd.new TaskDemo("a"));
es.submit(cbd.new TaskDemo("b"));
es.submit(cbd.new TaskDemo("c"));
es.submit(cbd.new TaskDemo("d"));
es.shutdown();
}
}

在这个示例中,我们创建了四个线程a、b、c、d,这四个线程提交给了线程池。四个线程不同时间到达cb.await()语句,当四个线程都输出“Thread x will wait”以后才会输出“Thread x is over”。

运行结果

Thread d will wait

Thread a will wait

Thread c will wait

Thread b will wait

-------Thread b is over

-------Thread d is over

-------Thread a is over

-------Thread c is over

3. 参考内容

[1] http://ifeve.com/concurrency-cyclicbarrier/

最新文章

  1. 个人整理的一些web前端面试题
  2. java查看当前项目所有线程列表界面
  3. ABAP 锁机制
  4. JavaScript学习之窗口
  5. Setup Tensorflow with GPU on Mac OSX 10.11
  6. Java基础知识强化33:String类之String类的获取功能
  7. HDOJ 1325 并查集
  8. 20170114 - Mac 向上一级文件夹快捷键
  9. runtime基础
  10. HDU 2489 Minimal Ratio Tree 最小生成树+DFS
  11. Angular4中路由Router类的跳转navigate
  12. SpringBoot集成redis,使用@Cachexxxx
  13. Bellman_ford模板
  14. Hibernate常见面试题(转)
  15. 20145221高其_MSF基础应用
  16. matlab生成滤波器系数组
  17. Jenkins权限控制-Role Strategy Plugin插件使用
  18. SqlServer日常积累(三)
  19. Ubuntu : 在主机和虚拟机之间传文件
  20. 「AH2017/HNOI2017」礼物

热门文章

  1. c++中的指针
  2. Spring笔记--0907
  3. Windows &amp; Office完美结合,助力办公
  4. border-width和border其它属性配合实现的小三角形标签效果
  5. C++ 非阻塞套接字的使用 (1)
  6. 使用的组件:JQuery UI
  7. 深入理解openstack网络架构(1)
  8. Oracle expdp/impdp导出导入命令及数据库备份
  9. js操作Dom的一些方法简化
  10. Demystifying ASP.NET MVC 5 Error Pages and Error Logging