引入ConcurrentHashMap

模拟使用hashmap在多线程场景下发生线程不安全现象

import java.util.HashMap;
import java.util.Map;
import java.util.UUID; /**
* 模拟hashmap在多线程场景下的出现的不安全现象之一
* hashmap还有put丢失,jdk1.7扩容成环的问题
*/
public class Demo2 {
public static void main(String[] args) {
Map<String, String> hashmap = new HashMap<>();
//开30个线程去往hashmap中添加元素
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
hashmap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(hashmap);
}, String.valueOf(i)).start();
}
}
}

运行结果如下,由于fail-fast机制的存在,出现了并发修改失败的错误

如何解决该问题呢?

方式一:使用hashtable

Map<String, String> hashmap = new Hashtable<>();

方式二:使用Collections.synchronizedMap

Map<String, String> hashmap = Collections.synchronizedMap(new HashMap<>());

方式三:使用并发集合容器ConcurrentHashMap

Map<String, String> hashmap = new ConcurrentHashMap<>();

浅析Java7中ConcurrentHashMap源码

数据结构

ConcurrentHashMap JDK1.7的数据结构是由Segment数组+HashEntry数组组成,其解决hash冲突的方式与jdk1.7中的hashmap方式差不多,解决线程安全是采用一种分段锁的思想,多个线程操作多个Segment是相互独立的,这样一来相比于传统的hashtable就大大提高了并发度。

我们在简单画个图来理解分段锁的思想:数组套数组,多个线程独立访问Segment,扩容嵌套数组

Segment与HashEntry

我们在来看下其Segment数组以及HashEntry数组在源码中是如何定义的。

先来看看Segment的定义:由以下我们可以看到每个Segment都是继承的ReentLock,且其内部嵌套的是HashEntry数组,Segment的数量相当于锁的数量,这些锁彼此之间福独立,即“分段锁”

//以内部类的形式定义,并且继承的ReentratLock
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //由此处也可以看出Segment内部嵌套的是HashEntry数组
transient volatile HashEntry<K,V>[] table; //Segment的个数
transient int count;
//modCount代表被修改的次数,每次Remove、put都相当于一次修改
transient int modCount;
//阈值
transient int threshold;
//负载因子
final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
} //以下是Segment内部的一些操作
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
.......
} private void rehash(HashEntry<K,V> node) {
......
} final V remove(Object key, int hash, Object value) {
....
} ......

在来看看HashEntry的定义

//以内部类的形式定义
static final class HashEntry<K,V> {
final int hash;
final K key;
//采用volatile修饰,保证其可见性和有序性
volatile V value;
volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
} //在HashEntry数组后面链上HashEntry对象
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
} // Unsafe类是Java提供的操作内存的类,
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

关于Unsafe类中的putOrderedObject方法,摘自Java魔法类:Unsafe应用解析

//存储变量的引用到对象的指定的偏移量处,使用volatile的存储语义
public native void putObjectVolatile(Object o, long offset, Object x);
//有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到。只有在field被volatile修饰符修饰时有效,而我们的HashEntry就是被volatile修饰的
public native void putOrderedObject(Object o, long offset, Object x);

关于Unsafe类,是Java提供的操作内存的类,其内容博大精深。可参考美团技术团队写的:Java魔法类:Unsafe应用解析

构造函数

我们来看下ConcurrentHashMap的构造函数在源码中是如何定义的

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
//默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认并发等级
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小Segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大Segment数量
static final int MAX_SEGMENTS = 1 << 16; //默认构造函数
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
} public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
} public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
} /**
* initialCapacity:初始参数
* loadFactor:加载因子
* concurrencyLevel:并发级别即Segment的数量
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//非法数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 用来记录向左按位移动的次数
int sshift = 0;
//用来记录Segment的数量
int ssize = 1;
//该段while循环保证Segment的数量是2的幂
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
//这里SegmentMask先提前减一了,
//在hashmap中计算数组下标索引是(table.length-1)&hash
//这里也可以推断出Segment数量一旦确定不能在变,扩容是扩Segment数组内的HashEntry数组
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//每个Segment数组内要放置多少个HashEntry数组
int c = initialCapacity / ssize;
//确保无余数
if (c * ssize < initialCapacity)
++c;
//确保每个Segment内部的HashEntry数组的大小一定为2的幂,当三个参数皆为默认值时,其Segment内部的table大小是2,
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化Segment数组,并填充Segment[0],阈值是(int)(cap * loadFactor),当参数皆为默认时,该值为1,当put第一个元素时不会扩容,在put就会触发扩容
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
} .....

由构造函数可以看出来

  • Segment数量默认是16,初始容量默认是16,负载因子默认是0.75,最小Segment是2
  • Segment的数量即为并发级别,且内部保证是2的幂,Segment内部的table大小也保证为2的幂
  • Segment数量一旦确定不会在更改,后续添加元素不会增加Segment的数量,而是增加Segment中链表数组的容量,这样的好处是扩容也不用针对整个ConcurrentHashMap来进行了,而是针对Segment里面的数组
  • 初始化了Segment[0],其他Segment还是null

put函数

来看看put函数

    public V put(K key, V value) {
Segment<K,V> s;
//value不能为空
if (value == null)
throw new NullPointerException();
//通过hash函数获取关于key的hash值
int hash = hash(key);
//计算要插入的Segment数组的下标,位运算提高计算速度,由于此处使用位运算,所以得保证是2的幂可以减少hash冲突,具体原因不详述
int j = (hash >>> segmentShift) & segmentMask;
//如果要插入的Segment为初始化,调用ensureSeggment函数进行初始化(初始化concurrentHashMap时只初始化了第一个Segment[0])
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//调用Segment的put函数
return s.put(key, hash, value, false);
}

到现在我们还没有发现加锁,在接着看Segment中的put函数,可见是在该函数中加的锁,这又一次验证了是分段锁,计算完了Segment位置后,在针对某一个Segment内部进行插入的时候上锁。

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//去获取独占锁,获取锁失败进入scanAndLockForPut函数
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
//到此处肯定已经获取到锁了
try {
//Segment内部的HashEntry数组
HashEntry<K,V>[] tab = table;
//计算元素插入的位置
int index = (tab.length - 1) & hash;
//定位到第index个HashEntry
HashEntry<K,V> first = entryAt(tab, index);
//该段for循环使用头插法将元素进行插入
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//如果在链表中找到相同的key,则新值替换旧值,并退出函数
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//onlyIfAbsent默认为false,!onlyIfAbsent表示替换旧值
if (!onlyIfAbsent) {
e.value = value;
//修改次数+1
++modCount;
}
break;
}
//如果没有key值相同的则遍历到链表尾部
e = e.next;
}
else {//已经遍历到链表尾部
if (node != null)//在scanAndLockForPut函数中已经建立好node
node.setNext(first); //把node插入链表的头部
else
//新建node,插入到链表头部
node = new HashEntry<K,V>(hash, key, value, first);
//该count代表元素的个数
int c = count + 1;
//判断是否超过阈值,超过调用rehash扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//把node赋值给tab[index]
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//释放锁
unlock();
}
return oldValue;
}

Segment内部的put函数涉及到一个scanAndLockForPut函数,多个线程去进行put操作,去竞争锁,那那些没获取到锁的线程它是如何处理的呢,我们来看一下scanAndLockForPut函数

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//自旋过程中遍历链表,若发现没有重复的key值,则提前先新建一个节点为后续的插入节约时间
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//自旋次数达到若干次后就调用lock()进行阻塞,阻塞后的线程由AQS进行管理入队列
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

该函数简化简化下来的思想如下:

//线程竞争锁失败后进入该函数
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//tryLock函数与Lock函数的区别就是tryLock函数获取锁失败会返回false,而不是阻塞
while(!tryLock()){//自旋操作
......
System.out.println("干点自己的事情...")
}
}

所以scanAndLockForPut函数的策略就是拿不到锁的线程不让它直接阻塞,而是让其自旋,自旋达到一定次数之后在调用lock()进行阻塞,另外在自旋的过程中遍历了后面的HashEntry链表,如果没有发现重复的节点就提前先建立一个,为线程之后拿到锁插入节省时间。

ensureSegment函数

在ConcurrentHashMap初始化时,只初始化了Segment[0],其他的Segment数组都是null,多个线程可能同时调用ensureSegment去初始化Segment[j],所以在该函数内部应该避免重复初始化的问题,保证其线程安全。

    private Segment<K,V> ensureSegment(int k) {
//赋值ss=this.segments
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//第一次判断segment[j]是否被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//使用segment[0]为原型去初始化新的segment
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//第二次判断segment[j]是否被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//while循环+CAS操作,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {//第三次判断segment[j]是否被初始化
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

可见UNSAFE.getObjectVolatile(ss, u)) == null出现了三次,多次去判断segment[j]是否被初始化了,即使如此也不能完全避免重复初始化,最后还采用CAS操作保证其只被初始化

rehash函数

我们在来看看具体是如何扩容的,在Segment内部的put函数我们看到,超过阈值后会进行扩容操作

        private void rehash(HashEntry<K,V> node) {
//获取旧数组和其容量
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//扩容为旧容量的2倍、设置新的阈值
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
//创建新的数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//sizeMask提前减1了
int sizeMask = newCapacity - 1;
//遍历原数组
for (int i = 0; i < oldCapacity ; i++) {
//获取旧数组中的元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
//计算插入的索引
int idx = e.hash & sizeMask;
if (next == null) // 链表中只有单个元素时,直接放入新数组中去
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//寻找链表中最后一个hash值不等于lastIdx的元素
for (HashEntry<K,V> last = next;last != null;last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//一个优化,把在lastRun之后的链表元素直接链到新hash表中的lastIdx位置
newTable[lastIdx] = lastRun;
//在lastrun之前的所有链表元素,需要在新的位置逐个拷贝
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 把新的节点加入Hash表
int nodeIndex = node.hash & sizeMask;
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

可见扩容函数是扩容为原来数组的两倍大小,且扩容进行了一次优化,并没有对元素依次拷贝,而是先通过for循环找到lastRun位置。lastRun到链表末尾的所有元素,其hash值没有改变,所以不需要一次重新拷贝,只需要把这部分链表链到新hash表中所对应的位置即可。lastRun之前的节点则需要依次拷贝。

get函数

整个get函数相对来是实现思路不复杂,先找到在哪个Segment数组中,再去寻找具体在哪个table上,整个过程没加锁,因为Sigment中的HashEntry和HashEntry中的value都是由volatile修饰的,volatile保证了内存的可见性。

    public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
//先计算在哪个segment数组中
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//计算在segment数组中的哪个HashEntry上
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//key值和当前节点的key指向同一片地址,或者当前节点的hash等于key的hash并且equals比价后相同则说明是目标节点
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

小结

ConcurrentHashMap内容颇多且有难度,以上为简单阅读,如有不对的恳请指正。

  • 在JDK1.7中,ConcurrentHashMap是基于分段锁的思想来提高并发能力,数据结构采用Segment数组+HashEntry数组+链表来实现,每个Segment都相当于一把锁(其继承自ReentrantLock),多个线程操作多个Segment是相互独立的,Segment有多少个即为并发级别有多大。
  • Segment在ConcurrentHashMap初始化后就不会改变了,其扩容是针对每个Segment内部的HashEntry数组扩容,扩容为原来的两倍大小且进行了优化。
  • 多个线程put操作时候,竞争锁失败的线程会进行自旋,自旋达到一定次数在直接调用lock进行阻塞。
  • 初始化ConcurrentHashMap的时候只会填充第一个Segment[0],需要在多线程情况下避免重复初始化Segment[j]
  • 读操作未上锁,Segment中的HashEntry数组和hashEntry对象中的value都是用volatile修饰的

最新文章

  1. Windows Server 2012 R2桌面化详细设置图解
  2. physx之刚体运动
  3. SQL SERVER调用textcopy写文件
  4. 请求php返回json生成自定义对象
  5. ubuntu grub配置
  6. C#,新建的系统服务项目有些机器不能运行
  7. Android FragmentStatePageAdapter的使用Demo
  8. 基于Asp.Net Core Mvc和EntityFramework Core 的实战入门教程系列-3
  9. JavaSe: String的编译期优化
  10. android使用sharesdk的小感
  11. 项目sql统计
  12. poj3278_kuagnbin带你飞专题一
  13. 矩阵乘法code
  14. el表达式用==和eq的注意事项
  15. (zhuan) Recurrent Neural Network
  16. Spring MVC @PathVariable被截断
  17. Spring AMQP 源码分析 02 - CachingConnectionFactory
  18. dubbo的一些默认变量
  19. python:使用Fabric自动化你的任务
  20. MVC设计模式实现权限管理登录,超详细

热门文章

  1. zookeeper java代码实现master 选举
  2. 线程安全,syncronized 用法
  3. Prism 源码解读5-数据绑定和命令
  4. iOS 图片加载速度优化
  5. iOS pch
  6. [vijos]1083小白逛公园&lt;线段树&gt;
  7. Hystrix 使用手册 | 官方文档翻译
  8. Java实验五参考答案
  9. 7.Metasploit后渗透
  10. DG磁盘分区提示错误