在Java的并发包中,Semaphore类表示信号量。Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。Semaphore有两个构造函数,参数permits表示许可数,它最后传递给了AQS的state值。线程在运行时首先获取许可,

如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。

如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。在创建Semaphore对象的时候还可以指定它的公平性。

一般常用非公平的信号量,非公平信号量是指在获取许可时先尝试获取许可,

而不必关心是否已有需要获取许可的线程位于等待队列中,如果获取失败,才会入列。

而公平的信号量在获取许可时首先要查看等待队列中是否已有线程,如果有则入列。

先看测试案例:

package com.cxy.cyclicBarrier;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; /**
* Created by Administrator on 2017/4/10.
*/
public class CxyDemo {
private final static int threadCount = ; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(); for (int i = ; i < threadCount; i++) {
System.out.println(i+"----------------------");
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
// log.error("exception" , e);
System.out.println(e);
}
});
}
exec.shutdown();
} private static void test(int threadNum) throws Exception {
// log.info("{}" , threadNum);
System.out.println("a"+threadNum);
Thread.sleep();
} }

执行结果:

源码分析:

构造方法:

 public Semaphore(int permits) {
sync = new NonfairSync(permits);
} /**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

第一个构造方法:是允许的信号量

第二个,里面传入的boolean参数,采用的是公平锁还是分公平锁

tryAcquire源码
 public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(, unit.toNanos(timeout));
} public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= ||
doAcquireSharedNanos(arg, nanosTimeout);
} private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= ) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release源码
    public void release() {
sync.releaseShared();
} public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
} private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, ))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == &&
!compareAndSetWaitStatus(h, , Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

最新文章

  1. No matching provisioning profile found: Your build settings specify a provis...
  2. net不安装Oracle11g客户端直接使用ODAC
  3. strcmp函数使用总结
  4. Sencha Toucha之Ext.Ajax
  5. DB2 的create or update方法
  6. 建表的sql
  7. JSON特殊字符处理
  8. Servlet &amp; JSP - Cookie
  9. 【JQuery学习历程】2.JQuery选择器
  10. [转]Ubuntu Tweak 0.8.7 发布:支持 Ubuntu 14.04
  11. 本地化SilverLight应用程序(多语言支持)
  12. POJ 2234 Matches Game 尼姆博弈
  13. linux的学习系列 8---进程管理
  14. Java对象序列化
  15. CCF认证考试——折点计数
  16. HashPayloadPcapReader
  17. Linux文件管理笔记
  18. MySQL Server and Server-Startup Programs
  19. 有用的git命令
  20. MYSQL判断不存在时创建表或创建数据库

热门文章

  1. 【原】Coursera—Andrew Ng机器学习—课程笔记 Lecture 5 Octave Tutorial—5.4 绘制数据图
  2. array_splice()函数 ,删除数组中的某个值
  3. php扩展开发2--添加类
  4. Java 基于quartz实现定时 之二(XML方式配置)
  5. 【转】LVS/Nginx如何处理session问题
  6. Shell +Cygwinterminal+WinMySQL 传参数授权
  7. Django-----restframework图解
  8. bootstrap列排序
  9. [GO]随机数的使用
  10. The Three Models of ASP.NET MVC Apps