并发容器学习—ConcurrentSkipListMap与ConcurrentSkipListSet 原
2024-10-13 19:31:24
一、ConcurrentSkipListMap并发容器
1.ConcurrentSkipListMap的底层数据结构
要学习ConcurrentSkipListMap,首先要知道什么是跳表或跳跃表(SkipList),跳表链表的升级版,且是有序的,另外跳表还是一种不论查找、添加或是删除效率可以和平衡二叉树媲美的层次结构(其时间复杂度是O(n)=log2n),更重要的是,跳表的实现要简单明了的多。
上图就是一个跳表的层次结构图,跳表的特征有:
1.跳表分为若干层,层级越高跳跃性越大。
2.跳表的最底层的链表包含所有数据。
3.跳表是有序的。
4.跳表的头结点永远指向最高层的第一个元素。
下面在来看看跳表的索引结点的结构:一般分为三个区域:两个个指针域和一个数据存储区域;两个指针域分别是指向同层级下个索引的的next指针,和指向下个层级拥有相同数据的的down指针,如下图所示。
跳表的查找相对简单明了的多,从最高层的头指针开始查询,例如下图查找14,从最高层3开始找,14大于3,找下个22,14小于22,说明最高层没有14这个数。进入下一层,14比8大,比22小说明这一层也没有该元素,在进入下一层。比8大往后找,找到14。
下图展示了14的查找路线,红色线条表示对比后比14大,不进入索引的线路。
2.底层跳表的实现
最底层链表结点Node的定义:
static final class Node<K,V> {
final K key; //存储键
volatile Object value; //存储值
volatile Node<K,V> next; //下个结点
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}
//不安全的操作变量,用于实现CAS操作
private static final sun.misc.Unsafe UNSAFE;
//value的内存地址偏移量
private static final long valueOffset;
//下个结点的内存地址偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
valueOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("value"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Index的定义:
static class Index<K,V> {
final Node<K,V> node; //链表中结点的引用
final Index<K,V> down; //指向下一层的Index索引
volatile Index<K,V> right; //右边的索引
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
//不安全的操作变量
private static final sun.misc.Unsafe UNSAFE;
private static final long rightOffset; //right的内存偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Index.class;
rightOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("right"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HeadIndex是专门用在每个层级的头结点上的定义,它是继承了Index的基础上增加一个代表层级的level属性:
static final class HeadIndex<K,V> extends Index<K,V> {
final int level; //层级索引
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
HeadIndex、Index和Node之间的联系,如下图所示:
3.ConcurrentSkipListMap的继承关系
ConcurrentSkipListMap的继承关系如下图所示,它继承了AbstractMap类,并且实现了 ConcurrentNavigableMap和ConcurrentMap接口。由此可知,ConcurrentSkipListMap必然是个有序的、并发操作安全的、具有伸缩视图功能的Map集合。
之前数据结构及源码的学习过程中,上图的大部分接口及抽象类都已经学习过,仅有一个 ConcurrentNavigableMap接口没有了解过:
public interface ConcurrentNavigableMap<K,V>
extends ConcurrentMap<K,V>, NavigableMap<K,V>
{
//返回当前map的子map,即将fromKey到toKey的子集合截取出来,fromInclusive和toInclusive
//表示子集合中是否包含fromKey和toKey这两个临界键值对
ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive,
K toKey, boolean toInclusive);
//返回不大于toKey的子集合,是否包含toKey由inclusive决定
ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive);
//返回不小于fromKey的子集合,是否包含fromKey由inclusive决定
ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive);
//返回大约fromKey,且小于等于toKey的子集合
ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey);
//返回小于toKey的子集合
ConcurrentNavigableMap<K,V> headMap(K toKey);
//返回大于等于fromKey的子集合
ConcurrentNavigableMap<K,V> tailMap(K fromKey);
//返回一个排序是反序的map(即将原本正序的Map进行反序排列)
ConcurrentNavigableMap<K,V> descendingMap();
//返回所有key组成的set集合
public NavigableSet<K> navigableKeySet();
//返回所有key组成的set集合
NavigableSet<K> keySet();
//返回一个key的set视图,并且这个set中key是反序的
public NavigableSet<K> descendingKeySet();
}
了解了接口,再来看ConcurrentSkipListMap的构造方法及一些重要的属性:
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
//特殊值,用于
private static final Object BASE_HEADER = new Object();
//跳表的头索引
private transient volatile HeadIndex<K,V> head;
//比较器,用于key的比较,若为null,则使用自然排序(key自带的Comparable实现)
final Comparator<? super K> comparator;
//键的视图
private transient KeySet<K> keySet;
//键值对的视图
private transient EntrySet<K,V> entrySet;
//值的视图
private transient Values<V> values;
//反序排列的map
private transient ConcurrentNavigableMap<K,V> descendingMap;
private static final int EQ = 1;
private static final int LT = 2;
private static final int GT = 0;
//空构造
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
//带比较器的构造方法
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
//带初始元素集合的构造方法
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
this.comparator = null;
initialize();
putAll(m);
}
//使用m集合的比较器构造方法
public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
this.comparator = m.comparator();
initialize();
buildFromSorted(m);
}
//初始化变量
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
//新建HeadIndex结点,head指向该头结点
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
}
4.put的过程
public V put(K key, V value) {
if (value == null) //这里可以知道Value要求不能为null
throw new NullPointerException();
return doPut(key, value, false); //实际执行put操作的方法
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null) //有序的map,key不能为null,否则无法比较
throw new NullPointerException();
Comparator<? super K> cmp = comparator; //获取比较器,若无则为null,使用自然排序
outer: for (;;) { //外层循环
//findPredecessor查找key的前驱索引对应的结点b
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
//判断b结点是否是链表的尾结点,即b结点是否还有后继结点
//若有后继结点(n不为null表示有后继结点),那么要将key对应的结点插入b和n之间
//若b为尾结点,则key对应结点将为新的尾结点
if (n != null) {
Object v; int c;
Node<K,V> f = n.next; //获取n的后继结点
//判断n还是不是b的后继结点
//若n不在是b的后继结点,说明已经有线程抢先在b结点之后插入一个新结点
//退出内层循环,重新定位插入
if (n != b.next)
break;
//判断n结点是否要被删除(value为null表示逻辑删除),辅助删除后重新定位key
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f); //让当前线程去辅助删除(物理删除)
break;
}
//判断b是否要被删除,若要被删除,则退出当前循环,重新定位key
if (b.value == null || v == n) // b is deleted
break;
//判断n结点的键与key的大小
//这里开始精准定位key在链表中的位置,找出key实际在链表中的前驱结点
//key比n结点的key大,则继续比较后续结点的key
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
//若key与n.key相等(c==0),说明key对应的结点已经存在
//那么根据onlyIfAbsent来决定是否覆盖旧value值
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv; //更新成功,直接返回旧值
}
break; //更新失败继续循环尝试
}
// else c < 0; fall through
}
//到这说明b就是key的前驱结点
//新建key的结点z,并尝试更新b的next为z,成功就退出外层循环
//更新失败继续尝试
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
/**
* 上面的步骤是将新结点插入到跳表最底层的链表中
* 接下来则是要向上生成各层的索引,是否生成某一层次的索引通过抛硬币的方式
* 决定,即从最底层开始随机决定是否在上一层建议索引,若为true则在上一层生成索引
* 并且继续抛硬币决定是否再往上层建立索引;若失败则布建立索引,直接结束
*/
//生成一个随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
//判断是否要更新层级(随机数必须是正偶数才更新层级)
//这里决定的是当前新增的结点是否要向上建立索引
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
//决定level的等级,也就是结点要向上建立基层索引
//level的值有rnd从低二位开始有几个连续的1决定
//例如:rnd==0010 0100 0001 1110 那么level就为5
//由此可以看出跳表的最高层次不能超过31层(level<=31)
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
//判断level是否超过当前跳表的最高等级(即是否大于头索引的level)
//若未超过,那么直接建立一个从1到level的纵向索引列
if (level <= (max = h.level)) {
//新建一个索引列,后建的索引的down指针指向前一个索引
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
//若新建结点的level大于头结点的level,则要新建一个新的level层
else { // try to grow by one level
level = max + 1; // 只能新增一层,为一个结点就新增2层以上没有意义
@SuppressWarnings("unchecked")
Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1];
//新建一个结点z的索引列,并将这个索引列的所有索引都放到数组中
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head; //获取头索引
int oldLevel = h.level; //获取旧层次level
//判断旧层次level是否发生改变(即是否有其他线程抢先更新了跳表的level,出现竞争)
if (level <= oldLevel) // lost race to add level
break; //竞争失败,直接结束当前循环
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//生成新的HeadIndex节点
//正常情况下,循环只会执行一次,如果由于其他线程的并发操作导致 oldLevel 的值不稳定,那么会执行多次循环体
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//尝试更新头索引的值
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
//将新增的idx插入跳表之中
splice: for (int insertionLevel = level;;) {
int j = h.level; //获取最高level
//查找需要清理的索引
for (Index<K,V> q = h, r = q.right, t www.michenggw.com= idx;;) {
//其他线程并发操作导致头结点被删除,直接退出外层循环
//这种情况发生的概率很小,除非并发量实在太大
if (q == null || t == null)
break splice;
//判断q是否为尾结点
if (r != null) {
Node<K,V> n = r.node; //获取索引r对应的结点n
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key); //比较新增结点的key与n.key的大小
//判断n是否要被删除
if (n.value == null) {
if (!q.unlink(r)) //删除r索引
break;
r = q.right;
continue;
}
//c大于0,说明当前结点n.key小于新增结点的key
// 继续向右查找一个结点的key大于新增的结点key的索引
//新结点对应的索引要插入q,r之间
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
//判断是否
if (j == insertionLevel) {
//尝试着将 t 插在 q 和 r 之间,如果失败了,退出内循环重试
if (!q.link(r, t))
break; // restart
//如果插入完成后,t 结点被删除了,那么结束插入操作
if (t.node.value == null) {
// 查找节点,查找过程中会删除需要删除的节点
findNode(key);
break splice;
}
//到达最底层,没有要插入新索引的索引层了
if (--insertionLevel == 0)
break splice;
}
//向下继续链接其它index 层
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
//查找key在map中的位置的前驱结点
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
//获取head头索引,遍历跳表
for (Index<K,V> q = head, r = q.right, d;;) {
//判断是否有后继索引,若有
if (r != null) {
Node<K,V> n = r.node; //获取索引对应的结点
K k = n.key; //获取结点对应的key
//判断结点的value是够为null,为null表示结点已经被删除了
//一个结点从链表中被删除时,会将value赋为null,因为此时跳表其他层级的索引还可能
//会引用着该结点,value赋null,标识该结点已经被废弃,对应的索引(如果存在)也应该从跳表中删除
//那么调用unlink方法删除索引
if (n.value == null) {
//尝试将r索引移除出跳表,成功就继续查找
//失败就继续尝试
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//比较当前索引中结点的键k与key的大小
//若key大于k,则继续查找比较下个索引
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right; //下个右边索引
continue;
}
}
//若是key小于k,则判断是够到达跳表的最底层的上一层,
//若是到达最底层的上一层,则说明当前索引就是key的前驱索引,node就是对应的结点
//若不是最底层的上一层,则继续比较查找
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
//Index中的方法,断开两个索引之间的联系
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
//CAS方式更新当前索引的右边索引
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
//帮助将b结点从链表中删除
void helpDelete(Node<K,V> b, Node<K,V> f) {
//判断是否有其他线程已经将n删除(有竞争),若是已经删除直接返回
if (f == next && this == b.next) {
//判断f是否为null,即当前要删除的this结点是否为尾结点
//若this为尾结点,则尝试更新this的后继为new Node<K,V>(f),这里没看懂。。
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next); //尝试将当前结点this的前驱结点b的next更新为this的后继结点f
}
}
//CAS方式更新当前结点的后继为val
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
上面doPut的过程代码比较多,总的来说干了这么几件事:1.查找确认key在跳表中的位置。2.清理跳表,并检查跳表是否被其他线程改变,若是改变重新定位key。3.判断key是否已经在跳表中存在,存在就根据onlyIfAbsent决定是否覆盖。4。将key生成的结点node插入底层链表中。5.决定是否创建key对应的索引列,索引列要建几层。6.最后将新增的索引列链接到跳表中。
5.get的过程
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) { //外层循环
//查找key的前驱索引对应的结点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//n为null,说明b是为结点,则key不存在
if (n == null)
break outer;
Node<K,V> f = n.next;
//b的后继不为n,说明b的后继别其他线程改变,重新定位key
if (n != b.next) // inconsistent read
break;
//判断n是否已经被删除,若是则辅助删除,并重新定位key
if ((v = n.value) www.shengchanyule.com== null) { // n is deleted
n.helpDelete(b, f);
break;
}
//判断b是否已经被删除,若是则辅助删除,并重新定位key
if (b.value == null || v == n) // b is deleted
break;
//判断n是否是要查找的结点
//c==0,表示key==n.key,找到要查找的额结点
、 //c若是小于0,那么key就不存在,结束查找
//c大于0,说明还没找到,则继续遍历查找
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
6.remove的过程
public V remove(Object key) {
return doRemove(key,www.iqushipin.com null);
}
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//findPredecessor查找key的前驱索引对应的结点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//n为null,说明b是为结点,则key不存在
if (n == null)
break outer;
Node<K,V> f = n.next;
//b的后继不为n,说明b的后继别其他线程改变,重新定位key
if (n != b.next) // inconsistent read
break;
//判断n是否已经被删除,若是则辅助删除,并重新定位key
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//判断b是否已经被删除,若是则辅助删除,并重新定位key
if (b.value == null || www.shengchangyule.net v == n) // b is deleted
break;
//key不存在,直接结束删除
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
//当前结点不是要删除的结点,继续遍历查找
if (c > 0) {
b = n;
n = f;
continue;
}
//找到要删除的结点,但已被别的线程抢先删除,则直接结束删除
if (value != null && !value.equals(v))
break outer;
//尝试逻辑删除,失败则继续尝试
if (!n.casValue(v, null))
break;
//给节点添加删除标识(next节点改为一个指向自身的节点)
//然后把前继节点的next节点CAS修改为next.next节点(彻底解除n节点的链接)
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // 清除已删除的结点
else {
//删除n节点对应的index,findPredecessor中有清除索引的步骤
findPredecessor(key, cmp); // clean index
//判断是否要减少跳表的层级
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv; //返回删除结点的value
}
}
return null;
}
//自环起来标志该结点要被删除
boolean appendMarker(Node<K,V> f) {
return casNext(f, new Node<K,V>(f));
}
7.size的统计
ConcurrentSkipListMap的size也是个瞬时值,并能过分依靠:
//遍历底层链表来统计结点的个数
public int size() {
long count = 0;
for (Node<K,V> n = findFirst(www.syylegw.com/); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count www.365soke.com>= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
//获取底层链表的头结点
final Node<K,V> findFirst(www.078881.cn/ ) {
for (Node<K,V> b, n;;) {
if ((n = (b = head.node).next) == null)
return null;
if (n.value != null)
return n;
n.helpDelete(b, n.next); //协助删除已被标记删除的结点
}
}
二、ConcurrentSkipListSet并发容器
1.ConcurrentSkipListSet
ConcurrentSkipListSet的底层实现就是利用的ConcurrentSkipListMap,以加入的数据作为ConcurrentSkipListMap的key,而value则恒为一个布尔对象 Boolean. TRUE。因此其源码就不在过多分析
最新文章
- python之路 - 基础1
- ul、li分列显示
- TJI读书笔记16-异常处理
- MySQL MHA配置常见问题
- [deviceone开发]-仿微信主界面示例
- gjd
- Maven 3.3.3 Win10环境下的使用实例(中)
- 受限玻尔兹曼机(RBM)学习笔记(八)RBM 的评估
- 如何调试MFC中的内存泄漏
- js基本框架
- java基础(一) 深入解析基本类型
- IOS 模块并且发布到NPM
- python验证码识别接口及识别思路代码
- 正则表达式--C#正则表达式的符号及例子
- thread run 和 start 的区别
- 关于字符串split一些用法
- LeetCode - Kth Largest Element in a Stream
- Android DevArt4:IntentFilter学习及深入~问题描述:在不指定具体action前提下,如果有两个以上的Activity,具有完全相同的intent-filter,项目同步是否会出现异常?程序运行是否会崩溃?
- Rafy框架
- JavaMelody - 常用配置
热门文章
- iOS----------UITextField实现过滤选中状态拼音
- 前后端交互实现(nginx,json,以及datatable的问题相关)
- 【NodeJS】Node.JS 开发环境安装
- 转摘app测试方法总结
- restful api与传统api的区别(方式及语法)
- [转]QQ空间、新浪微博、腾讯微博等一键分享API链接代码
- Spring MVC 响应视图(六)
- c/c++ linux epoll系列1 创建epoll
- LeetCode算法题-License Key Formatting(Java实现)
- 阿里Canal安装和代码示例