一、简介

  Exchanger类允许在两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据。也就是第一个线程的数据进入到第二个线程中,第二线程的数据进入到第一个线程中。

  Exchanger可以用于校对工作的场景。

  Exchanger只有一个构造函数:

public Exchanger() {
participant = new Participant();
}

  这个类提供对外的接口非常简洁,两个重载的范型exchange方法:

// 除非当前线程被中断,否则一直等待另一个线程到达这个交换点,然后将交换的数据    x传输给它,并收到另一个线程传过来的数据。
public V exchange(V x) throws InterruptedException // 和上一个方法功能基本一样,只不过这个方法增加了等待超时时间
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

1.1、Exchanger源码详解

  Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者。源码中的描述如下:

for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}

  Exchanger中定义了如下几个重要的成员变量:

/**
* Per-thread state
*/
private final Participant participant; /**
* Elimination array; null until enabled (within slotExchange).
* Element accesses use emulation of volatile gets and CAS.
*/
private volatile Node[] arena; /**
* Slot used until contention detected.
*/
private volatile Node slot;

participant的作用是为每个线程关联一个Node对象。Participant继承自ThreadLocal:

/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}

Node类定义如下:

@sun.misc.Contended static final class Node {
    int index;              // Arena index
    int bound;              // Last recorded value of Exchanger.bound
    int collides;           // Number of CAS failures at current bound
    int hash;               // Pseudo-random for spins
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

index:arena的下标,多个槽位的时候利用;
bound:上一次记录的Exchanger.bound;
collides:在当前bound下CAS失败的次数;
hash:伪随机数,用于自旋;
item:这个线程的当前项,也就是需要交换的数据;
match:交换的数据;
parked:挂起时设置线程值,其他情况下为null;

看exchange(V x)方法。

exchange(V x)方法

如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。该方法源码如下:

public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}

exchange(V x)方法主要步骤如下:

1. 判断程序要交换的数据是否为空指针,若为空指针,则将NULL_ITEM设置为要交换的数据,NULL_ITEM是一个来代替空指针的对象,它定义为:

private static final Object NULL_ITEM = new Object();

2. 若arena为null,则通过slotExchange(Object item, boolean timed, long ns)方法来交换数据;否则,若arena不为null,则运行下一步骤。

3. 判断程序中断状态,若程序没有被中断,则运行arenaExchange(Object item, boolean timed, long ns)方法来交换数据;否则,抛出InterruptedException异常。

4. 返回交换后的数据,若数据为NULL_ITEM,则将其转换为空指针null。

在整个过程中,最主要的就是那两个数据交换方法,我们先来看一看slotExchange(Object item, boolean timed, long ns)方法。

slotExchange(Object item, boolean timed, long ns)方法
该方法源码如下:

private final Object slotExchange(Object item, boolean timed, long ns) {
// 获取与线程相关联的Node对象
Node p = participant.get();
// 获取当前线程对象
Thread t = Thread.currentThread();
// 判断线程中断状态
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null; // 进入自旋
for (Node q;;) {
// 如果slot不为null,表明已经有其他线程等待交换数据
if ((q = slot) != null) {
// 通过CAS交换数据信息,成功则返回交换数据
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// 获取其他线程交换的数据
Object v = q.item;
// 槽位内值被改为参数item,这是等待线程需要的数据
q.match = item;
// 获取等待线程
Thread w = q.parked;
// 等待线程不为null,则将其唤醒
if (w != null)
U.unpark(w);
// 返回拿到的数据
return v;
}
// CAS操作失败,则创建arena用于竞争
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
// 如果arena不为null,方法返回null,随后进入arenaExchange方法
else if (arena != null)
return null; // caller must reroute to arenaExchange
// 否则,q(slot)为空,通过CAS尝试将slot设置为p,失败之后自旋重试,成功则跳出自旋,进入spin+block模式
else {
p.item = item;
// 将slot设置为占据该slot线程所对应的Node
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
} // 等待release
// 若exchange操作有时间限制,则先计算结束时间和自旋次数,进入自旋+阻塞
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 直到成功交换到数据
while ((v = p.match) == null) {
// 自旋
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
// 其它线程来交换数据了,另一个线程修改了solt,但是还没有设置match数据,这时可以再稍等一会
else if (slot != p)
spins = SPINS;
// 需要阻塞当前线程,等待其它线程来交换数据
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
// 设置Node节点中的parked属性为当前线程,当其他线程要交换数据时,需要通过parked属性来唤醒该线程
p.parked = t;
// 阻塞当前线程
if (slot == p)
U.park(false, ns);
// 当前线程被唤醒之后,做一些清除操作
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// 交换失败,重置slot
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
// 清除match信息
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
// 返回交换得到的数据,若失败,则返回值为null
return v;
}

slotExchange(Object item, boolean timed, long ns)方法的整个业务逻辑如下所示:

当一个线程来交换数据时,槽位(slot)有两种状态:

如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,若CAS操作成功,则slot就已经被当前线程占据。如果失败,则有可能其他线程抢先了占据了slot,当前线程需要重头开始循环。占据slot成功的线程,需要等待其它线程来进行数据交换,此时,当前线程需要进行一段时间的自旋:

若在线程自旋期间,有其他线程来交换数据,则获取交换数据后,直接返回数据,而不用阻塞该进程。

若在线程自旋期间,没有其他线程来交换数据,那么就需要阻塞当前线程,在阻塞之前,还需要进行一次槽位判断,若槽位发生了变化,说明有其它线程来交换数据了,此时会延长当前线程的自旋时间,可能数据交换马上就完成;若槽位没有发生变化,则直接挂起当前线程,等待其他线程来交换数据,在另一个线程交换数据完成之后,另一个线程会唤醒与之配对交换的线程(即前面被挂起的线程),被唤醒的线程,继续执行,拿到交换的数据之后,直接返回,若出现了超时、被中断的情况,则返回值为null。

如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,然后,当前线程就会创建arena数组来避免竞争,用于后续的数据交换。

slotExchange(Object item, boolean timed, long ns)方法的整个执行步骤就是这样了,下面我们再看一看arenaExchange(Object item, boolean timed, long ns)方法的执行步骤。

arenaExchange(Object item, boolean timed, long ns)方法
在介绍该方法之前,我们需要先来了解一下与之相关的数据结构

在前面介绍的Node类,被加上了一个@sun.misc.Contended注解,这个是用来避免伪共享的,关于伪共享的详解,可以看这篇博客。在Exchanger类中,ASHIFT就是用来避免伪共享的:

/**
* The byte distance (as a shift value) between any two used slots
* in the arena. 1 << ASHIFT should be at least cacheline size.
*/
private static final int ASHIFT = 7;

对ASHIFT进行详细说明,下面看一看arenaExchange(Object item, boolean timed, long ns)方法:

private final Object arenaExchange(Object item, boolean timed, long ns) {
// Node数组,具有多个槽位
Node[] a = arena;
// 获取与线程相关联的Node对象
Node p = participant.get();
// p.index初始值为0
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
// 在数组中,根据索引i取出数据,j相当于该线程要访问的第一个槽位
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// 该槽位有数据,即已经有线程在此槽位等待交换数据
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
// 进行数据交换
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// bound是最大的有效位置,和MMASK相与,得到真正存储数据的最大索引值
// 如果i小于最大索引,且对应槽位为空
else if (i <= (m = (b = bound) & MMASK) && q == null) {
// 将需要交换的数据赋值给p
p.item = item; // offer
// 通过CAS来设置该槽位的数据,等待其他线程来交换
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
// 自旋
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
// 有其他线程来和该线程交换数据
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
// 其它线程来交换数据了,另一个线程修改了槽位数据,但是还没有设置match数据,这时可以再稍等一会
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
// m == 0表明已经到达arena数组中最小的存储数据槽位,当前线程需要阻塞在这里
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
// 再次检查槽位,看看在阻塞前,有没有线程来交换数据
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// 当前这个槽位一直没有线程来交换数据,可以换个槽位试试
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
// 更新bound
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
// 减小索引值,往最小的存储数据槽位的方向挪动
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
// 超时
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
// 占据槽位失败
else
p.item = null; // clear offer
}
// i不在有效索引范围内,或者对应槽位已经被其它线程抢先交换了
else {
// 更新p.bound
if (p.bound != b) { // stale; reset
p.bound = b;
// bound的CAS失败次数初始为0
p.collides = 0;
// i如果到达最大值,就递减
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
// 更新bound的CAS失败次数
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
// 递增i
else
i = m + 1; // grow
// 更新index
p.index = i;
}
}
}

在slotExchange方法中,当存在竞争时,会创建arena数组:

arena = new Node[(FULL + 2) << ASHIFT];

在创建arena数组之前,会先设置bound为SEQ(SEQ=MMASK + 1),即bound的初始值为256:

/**
* The maximum supported arena index. The maximum allocatable
* arena size is MMASK + 1. Must be a power of two minus one, less
* than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
* for the expected scaling limits of the main algorithms.
*/
private static final int MMASK = 0xff; /**
* Unit for sequence/version bits of bound field. Each successful
* change to the bound also adds SEQ.
*/
private static final int SEQ = MMASK + 1;

arena的大小为(FULL + 2) << ASHIFT,因为1 << ASHIFT 是用于避免伪共享的,因此实际有效的Node 只有FULL + 2 个。

然后通过以下代码来获取arena中的节点:

Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);

ABASE定义如下:

Class<?> ak = Node[].class;
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

U.arrayBaseOffset(ak)就是计算存放第一个元素的内存地址,相对于数组对象起始地址的内存偏移量。可以看出,ABASE是计算出了一个新的起始地址,其前面的(1 << ASHIFT)位置都没有被利用。

当一个线程来交换数据时,若计算出的槽位索引有效,那对应的槽位有两种状态:

如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,

若CAS操作成功
则slot就已经被当前线程占据。然后该线程会采用自旋+阻塞的方式进行等待交换数据。只有当槽位是第一个(m==0,i <= m)时,线程才会阻塞,否则,若长时间没有其他线程来交换数据,当前线程会换个槽位等待,首先,线程会将旧槽位的值通过CAS置为null,然后更新bound,索引值减半(i = p.index >>>= 1),如果设置了超时,需要进行超时判断,若发生超时,则直接返回。

若CAS操作失败
则有可能其他线程抢先了占据了slot,则将p.item设置为null,重新自旋。

如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,那么就更新bound和p.index。

arenaExchange(Object item, boolean timed, long ns)方法的运行逻辑总结如下:

当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。 第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。

最新文章

  1. 【C语言入门教程】7.3 结构体指针的定义和引用
  2. 21SpringMvc_异步发送表单数据到Bean,并响应JSON文本返回(这篇可能是最重要的一篇了)
  3. [改善Java代码]使用匿名类的构造函数
  4. Redis参数配置和运维说明
  5. vue数组语法兼容问题
  6. 阿里云服务器云数据库免费体验(Java Web详细实例)
  7. 贪吃蛇游戏——C语言双向链表实现
  8. 论C++的发家史以及相对其他语言优缺
  9. int main(int argc,char *argv[])与int main(int argc,char **argv)区别?
  10. Js_protoType_原型
  11. 文件IO(2)
  12. java同步代码(synchronized)中使用BlockingQueue
  13. Python——errno
  14. Disruptor学习笔记(一):基本原理和概念
  15. 一步一步教你使用 LSMW 批量处理数据
  16. 安装完Linux Mint后,发现系统中竟没有中文输入法
  17. mybatis 高级映射 简单例子
  18. 【beta】Scrum站立会议第3次....11.6
  19. libxml2 在mingw中 xmlfree连接错误问题
  20. 数据结构----线性表顺序和链式结构的使用(c)

热门文章

  1. sudo 权限的管理
  2. iptables详解(5)iptables的icmp扩展
  3. Linux CPU问题排查
  4. C#一些不太熟悉的类——扩展学习
  5. Codeforces 1206 D - Shortest Cycle
  6. P2085 最小函数值[优先队列]
  7. 小知识——c++关于指针的理解
  8. XSLT可扩展样式表语言转换 System.Xml.Xsl、XslCompiledTransform类
  9. 并发编程:Thread和Runable-01
  10. javascript权威指南第22章高级技巧