一.semaphore信号量,底层也是基于AQS
使用:
/**
* 可以理解为控制某个资源最多有多少个线程同时执行,(比如洗手间,并行与排队)
* 如果满了只能等待直到其它资源释放(可以理解为并发量控制)
* @author Binglong
* @date 2018-11-12
*/
public class SemaphoreUtils {
public static void main(String[] args) {
final int SH_SIZE = 10;
Semaphore semaphore = new Semaphore(SH_SIZE);
final int TH_NUM = 20;
for (int i = 0; i < TH_NUM; i++) {
ThreadPoolUtils.getSingle().threadPoolDo(new TaskSemaphore(semaphore));
}
}
} class TaskSemaphore implements Runnable {
private Semaphore semaphore; TaskSemaphore(Semaphore semaphore) {
this.semaphore = semaphore;
} public void run() {
String threadName = Thread.currentThread().getName();
try {
this.semaphore.acquire();
System.out.println(threadName + ":occupy...");
Thread.sleep(new Random().nextInt(10000));
System.out.println(threadName + ":over...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
    //注意一点要放到finally
semaphore.release();
}
}
}

源码

package java.util.concurrent;
import java.util.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*; public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L; //定义一个内部类
private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) {
setState(permits);
} final int getPermits() {
return getState();
} final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
} protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
} final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
} final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
} static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) {
super(permits);
} protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
} static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) {
super(permits);
} protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
} public Semaphore(int permits) {
sync = new NonfairSync(permits);
} public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
} public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
} public void acquireUninterruptibly() {
sync.acquireShared(1);
} public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
} public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
} public void release() {
sync.releaseShared(1);
} public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
} public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
} public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
} public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
} public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
} public int availablePermits() {
return sync.getPermits();
} public int drainPermits() {
return sync.drainPermits();
} protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
} public boolean isFair() {
return sync instanceof FairSync;
} public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
} public final int getQueueLength() {
return sync.getQueueLength();
} protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
} public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
二、CyclicBarrier
使用:
public static void main(String[] args) {
   //创建一个CyclicBarrier并在主线程上new一个任务
final int N = 5;
final CyclicBarrier cyclic = new CyclicBarrier(N, new Runnable() {
public void run() {
        //主线程任务(等最后一个线程做完)
try {
System.out.println("汇总计算开始");
Thread.sleep(Math.abs(10));
System.out.println("汇总计算完成");
} catch (Exception e) {
e.printStackTrace();
} }
}); for (int i = 0; i < N; i++) {
final int t = i;
new Thread(new Runnable() {
public void run() {
         //每个线程任务做完等待(主线程做完才继续往下走)
try {
System.out.println(t + "中心数据已计算开始");
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
System.out.println(t + "中心数据已计算结束");
cyclic.await();
System.out.println(t + "中心数据退出");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
} 0中心数据已计算开始
3中心数据已计算开始
4中心数据已计算开始
2中心数据已计算开始
4中心数据已计算结束
1中心数据已计算开始
1中心数据已计算结束
3中心数据已计算结束
2中心数据已计算结束
0中心数据已计算结束
汇总计算开始
汇总计算完成
0中心数据退出
1中心数据退出
4中心数据退出
2中心数据退出
3中心数据退出
源码:
1.构造方法

//只是做等待parties个线程(没有主线程任务)
public CyclicBarrier(int parties) {
this(parties, null);
}
//等待parties个线程后,先完成barrierAction的run方法,其它线程继续执行
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
 
package java.util.concurrent;
import java.util.concurrent.locks.*; public class CyclicBarrier { private static class Generation {
boolean broken = false;
} /** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation(); private int count; private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
} private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
} private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
     //获取锁
lock.lock();
try {
final Generation g = generation; if (g.broken)
throw new BrokenBarrierException(); if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
} int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
} // loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
} if (g.broken)
throw new BrokenBarrierException(); if (g != generation)
return index; if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
} public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
} public CyclicBarrier(int parties) {
this(parties, null);
} public int getParties() {
return parties;
} public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
} public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
} public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
} public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
} public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}
 
 
 
2.重要方法
a.wait()方法,当调用wait()方法的线程数量,达到CyclicBarrier构造方法的N时,(CyclicBarrier在构造方法的Runnable barrierAction,方法完成后,当前线程继续执行)
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
return dowait(true, unit.toNanos(timeout));
} public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
} b.getParties()获取CyclicBarrier等待的线程数,也就是CyclicBarrier构造方法参数parties的值 c.getNumberWaiting() how many thread wait now d.rest()
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
 
三、CountDownLatch
使用:
/**
* countDownLatch.countDown()调用一次减一,到0时,其它await方法继续往下执行
* 可以做并发开关(把SIZE设置为1,通过主线程来countDown(),其它线程都调用await()方法)
* @author Binglong
* @date 2018-11-12
*/
public class CountDownLatchUtils {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 20;
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 0; i < SIZE; i++) {
ThreadPoolUtils.getSingle().threadPoolDo(new TaskCountDownLatch(countDownLatch));
}
System.out.println("waiting.....");
// Thread.sleep(10000);
// countDownLatch.countDown();
}
} class TaskCountDownLatch implements Runnable { private CountDownLatch countDownLatch; TaskCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
} public void run() {
String name = Thread.currentThread().getName();
try {
System.out.println(name + ":waiting.."+countDownLatch.getCount());
//等待一定数量任务继续执行
Thread.sleep(new Random().nextInt(10000));
countDownLatch.countDown();
System.out.println(name + ":over...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
 
1.使用
三个方法
CountDownLatch(int count):构造器中的计数值(count)。
 
void await() :会一直阻塞当前线程,直到计时器的值为0
 
void countDown():计数减一
 
 
 
 
2.原理
CountDownLatch源代码是有内部类Sync实现,而Sync是继承AQS(抽象队列同步器)
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {
setState(count);
} int getCount() {
return getState();
} protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
} protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
} //构造器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
} //countDown方法
public void countDown() {
//releaseShared方法是抽象队列同步器的方法
sync.releaseShared(1);
} //await方法
public void await() throws InterruptedException {
//acquireSharedInterruptibly方法是抽象队列同步器的方法
sync.acquireSharedInterruptibly(1);
}
 

最新文章

  1. 获取移除指定Url参数(原创)
  2. iOS 设置铃声---加载音乐和音频然后进行播放
  3. 【解题报告】POJ-1106 Transmitters
  4. linux yum install resource - epel
  5. 移动平台的meta标签-----神奇的功效
  6. hibernate中的缓存机制
  7. 【转】IOS缓存机制详解
  8. 小型Mp3播放器
  9. TypeScript入门指南(JavaScript的超集)
  10. spring-定时器(1)
  11. [解决]Linux Tomcat启动慢--Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [236,325] milliseconds
  12. java之Spring(AOP)-Annotation实现添加切面
  13. slim.arg_scope中python技巧
  14. [数据结构] 希尔排序 C语言程序
  15. MPD软件工作坊上海站本周末在上海举行
  16. 第一天 hello world
  17. UVA - 11853 Paintball(dfs)
  18. 项目笔记---事半功倍之StyleCop(一)
  19. MongoDB学习笔记(二)--Capped集合 &amp;&amp; GridFS存储文件
  20. 定义路由的state参数

热门文章

  1. SQL Server使用sys.master_files计算tempdb大小不正确
  2. PIC单片机基础1
  3. Swift 访问控制
  4. 【spring源码分析】IOC容器初始化(四)
  5. [解读REST] 3.基于网络应用的架构
  6. 《css世界》笔记之流、元素与基本尺寸
  7. JavaScript判断对象是否是NULL
  8. 安装maven,并配置eclipse
  9. (二)jdk8学习心得之Lambda表达式
  10. docker企业实战视频教程