ConcurrentHashMap源码分析 - Sanarous的博客

ConcurrentHashMap源码分析

为什么要使用 ConcurrentHashMap

ConcurrentHashMap 是线程安全且高效的 HashMap。使用 ConcurrentHashMap 的主要原因是 HashMap 是非线程安全的(HashMap 线程不安全的原因可以参考这里),如果要在多线程环境下使用 Map 结构,可以有以下方式:

  1. 使用 Collections.synchronizedMap(Mao<K,V> m) 方法把 HashMap 变成一个线程安全的 Map
  2. 使用 HashTable
  3. 使用 ConcurrentHashMap

为什么不用 HashTable 呢,这是因为 HashTable 的效率非常低,其内部方法都是使用 Synchronized 来保证线程安全,并且若线程竞争激烈时,HashTable 的效率会变得更加低,因为当一个线程访问 HashTable 的同步方法,其它线程也访问 HashTable 的同步方法,那么会进入阻塞或者轮询状态。比如线程 1 使用 put 方法进行元素添加,线程 2 不但不能使用 put 方法添加元素,还不能使用 get 方法查询元素。所以多线程环境下往往不会使用 HashTable。

为了解决上述问题,Java 官方提供了 ConcurrentHashMap 类来实现并发访问性能较高的 HashMap。其优点如下:

  • 线程安全
  • 相比于 HashTable 和 Collections.synchronizedMap() 效率高

ConcurrentHashMap的结构

ConcurrentHashMap 在 JDK 1.7 和 JDK 1.8 的结构是不相同的,这里我们先解析 JDK 1.7 下的 ConcurrentHashMap,在这个基础上再分析 JDK 1.8 下 ConcurrentHashMap 的实现并比较与 1.7 做出了什么优化。

JDK1.7下的ConcurrentHashMap

在 JDK 1.7 中,ConcurrentHashMap 的结构是分段锁(segment) + HashEntry 数组。ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。

其中 segment 是一种可重入锁(ReentrantLock),在 ConcurrentHashMap 中扮演锁的角,HashEntry 则用于寸储键值对数据。一个 ConcurrentHashMap 中包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表的结构,一个 Segment 中包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素。每个 Segment 守护着一个 HashEntry 数组里面的元素,对 HashEntry 数组中元素进行修改时,必须先获取对应的 Segment 锁。

JDK1.8下的ConcurrentHashMap

JDK 1.8 中 ,ConcurrentHashMap 的结构是数组 + 链表 + 红黑树,不再是分段锁(segment)机制,而是采用了 CAS + Synchronized 细化了锁粒度,从而提升了并发性能,并且在数组链表长度超过 8 时,会自动将链表转化为红黑树,这样解决了散列不均匀产生链表过长从而使得搜索时间复杂度过大的问题。

Synchronized 只锁定当前链表或者红黑树的首节点,这样只要 hash 不冲突,就不会产生并发,从而提升了一定的并发性能。

ConcurrentHashMap的源码分析

为了更全面的理解 ConcurrentHashMap 的源码和发展历史,这里也分为 JDK 1.7 和 JDK 1.8 中的 ConcurrentHashMap 源码分析。

JDK 1.7 源码

由上面小节我们初步了解了 ConcurrentHashMap 在 1.7 中的数据结构,其中我们需要关注的点:

  • ConcurrentHashMap 如何定位 Segment, 如何定位 HashEntry

  • 修改的加锁逻辑,如何进行扩容

  • 读数据时,如何做到不加锁但保证线程安全的?

ConcurrentHashMap 的默认常量字段有:

常量含义初始值
DEFAULT_INITIAL_CAPACITY初始容量16
DEFAULT_LOAD_FACTOR初始负载因子0.75
DEFAULT_CONCURRENCY_LEVEL并发度16
MAX_SEGMENTSsegment最大容量2^16
RETRIES_BEFORE_LOCK重试次数2

ConcurrentHashMap 的初始化方法也是通过 initialCapacity、loadFactor 和 ConcurrencyLevel 等几个参数初始化 segement 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 中的 HashEntry 数组来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;

for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

初始化segment数组

初始化 segment 数组源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//并发度最大只有segmengs数组大小
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
// 为了保证segemnts数组长度为2的N次方
int sshift = 0;
int ssize = 1; //segment数组的长度
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

由上可知,segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出的。为了能通过按为与的散列算法来定位 segments 数组的索引,必须保证 segments 数组的长度为 2 的 N 次方(power-of-two size),所以必须计算出一个大于或等于 concurrencyLevel 的最小的 2 的 N 次方来作为 segments 数组的长度。

比如假如concurrencyLevel 等于 14 、15 或者 16,ssize 都会等于 16,即容器里面锁的个数也是 16。

初始化segmentShift和segmentMask

这两个全局变量需要在定位 segment 时的散列算法中使用,sshift 等于 ssize 从 1 向左移位的次数,在默认情况下 concurrencyLevel 等于 16,1 需要往左移位移动 4 次,所以 sshift 等于 4。segmentShift 用于定位参与散列函数运算的位数, segmentShift = 32 - sshift,所以等于 28,这里之所以用 32 是因为 ConcurrentHashMap 里的 hash() 方法输出的最大数是 32 位的,后面的测试中我们可以看到这点。 segmentMask 是散列运算的掩码,等于 ssize - 1,即最大值是 16, segmentMask 的最大值为 65535。

初始化每个segment

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1; // c就是segment中HashEntry数组的长度
while (cap < c)
cap <<= 1;

for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

上面代码中的 cap 就是每个 segment 中 HashEntry 数组的长度,它等于 initialCapacity 除以 ssize 的倍数 c,如果 c 大于 1,就会取大于等于 c 的 2 的 N 次方值,所以 cap 不是 1,就是 2 的 N 次方。 segment 的容量 threshold = (int) cap * loadFactor,默认情况下 initialCapacity 等于 16,loadFactor 等于 0.75,通过运算 cap 等于 1,threshold 等于零。

定位Segment

在插入和获取元素的时候,必须先通过散列(hash)算法定位到对应的 Segment。翻源码可以看到,ConcurrentHashMap 中的 hash 算法是单独写的方法,使用的是 Wang/Jenkins hash 的变种算法对元素的 hashCode 进行一次再散列:

1
2
3
4
5
6
7
8
9
10
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

之所以进行再散列,目的是减少散列冲突,使元素能均匀的分布在不同的 Segment 上,从而提高容器的存储效率。假如散列的质量差到极点,所有的元素都在一个 Segment 中,不仅存取元素缓慢,分段锁也会失去意义。

ConcurrentHashMap 通过以下散列算法定位 segment:

1
2
3
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

在默认情况下,segmentShift 的值为 28,segmentMask 的值为15,再散列后的数的最大是 32 位二进制数据,向右无符号移动 28 位,意思是让高 4 位参与到散列运算中。

get 操作

先看源码:

1
2
3
4
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash); //源码如下
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//Segment类中的get方法
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash); //源码如下
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}

可以看到 get 操作比较简单和高效,先经过一次再 hash,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。

1
2
3
4
5
//定位到具体的HashEntry
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}

这个操作高效的原因是整个 get 操作没有加锁,除非读到的值为空时才会加锁重读。而原理就是使用 volatile 修饰 get 方法中需要用到的共享变量,比如 count 字段和 存储值的 HashEntry 的 value。volatile修饰的变量可以保证可见性,能被多线程同时读并且不会读到过期的值。

1
2
transient volatile int count;
volatile V value;

在定位元素的代码中可以发现,定位 HashEntry 和 Segment 的散列算法虽然一样,都是与数组的长度减 1 再相与,但是相与的值不一样。定位 segment 使用的是元素的 hashCode 经过再散列后得到的值的高位,而定位 HashEntry 直接使用的是再散列后的值。这么做的目的是避免两次散列后的值一样,虽然元素在 Segment 里面散开了,但是却没有在 HashEntry 中散开。

put 操作

源码:

1
2
3
4
5
6
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false); //源码如下
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash(); //大于阈值,需要进行rehash扩容
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1); //定位HashEntry位置
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue;
//如果对应的key已经有值,覆盖
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
//写入新值
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}

put 操作为了线程安全性,首先需要定位到对应的 Segment,并在操作共享变量过程中加锁。插入操作主要有两个步骤:

  1. 判断 Segment 中的 HashEntry 数组是否需要扩容
  2. 定位添加元素的位置,然后将其放到 HashEntry 数组中

值得一提的是,Segment 的扩容判断比 HashMap 的扩容判断更加合理,因为 HashMap 是在插入元素后判断元素是否已经达到容量的,如果达到了就进行扩容,但是有可能扩容后并没有新元素进行添加,那么这时候就相当于扩容无效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;

/*
* Reclassify nodes in each list to new Map. Because we are
* using power-of-two expansion, the elements from each bin
* must either stay at same index, or move with a power of two
* offset. We eliminate unnecessary node creation by catching
* cases where old nodes can be reused because their next
* fields won't change. Statistically, at the default
* threshold, only about one-sixth of them need cloning when
* a table doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by any
* reader thread that may be in the midst of traversing table
* right now.
*/

HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i];

if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;

// Single node on list
if (next == null)
newTable[idx] = e;

else {
// Reuse trailing consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;

// Clone all remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}
size 操作

ConcurrentHashMap 统计容量很有意思,统计整个 ConcurrentHashMap 中元素的数量,就必须统计所有 Segment 里元素的大小后求和。Segment 中的全局变量 count 是一个 volatile 变量,那么在多线程环境下,是不是直接把所有 Segment 里的 count 相加就可以得到整个 ConcurrentHashMap 大小了呢?

在多线程环境下显然不是的,虽然相加时可以获取每个 Segment 的 count 最新值,但是可能累加前使用的 count 发生了变化,那么统计结果就不准了,所以,最安全的方法是在统计 size 的时候把所有的 Segment 的 put 、remove 和 clean 方法都锁住,但是这样做显然效率非常低下。

我们看一下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum)
break;
}
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}

其中的常量RETRIES_BEFORE_LOCK在最上面已经提到过,默认值是 2,表示重试次数。我们可以看到,源码中的做法是先尝试 2 次(初始的重试次数)通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计过程中,容器的 count 发生了变化,再采用加锁的方式来统计所有 Segment 的大小。而判断统计过程中容器是否发生变化是通过 modCount 这个变量感知的,在 put 、remove 和 clean 方法里操作元素前都会将变量 modCount 值加 1,那么在统计 size 前后比较 modCount 是否发生变化,从而得知容器的大小是否发生变化。

JDK 1.8 源码

JDK 1.8 数据结构见上,为了优化性能,提升并发度,JDK 1.8 开始去掉了 Segment 分段锁机制,直接采取了与 HashMap 相同的数据结构,不同的是为了保证线程安全,采用了 CAS + Synchronized 进行 put 、remove 等操作。

不同于 JDK 1.7 中的 HashEntry 存储键值对,JDK 1.8 中采用了 Node 这个内部类进行键值对的存储。

先看一下 JDK 1.8 中 ConcurrentHashMap 的字段:

字段名含义初始值
MAXIMUM_CAPACITY最大容量2^30
DEFAULT_CAPACITY默认容量16
MAX_ARRAY_SIZE最大数组容量Integer.MAX_VALUE - 8
DEFAULT_CONCURRENCY_LEVEL默认并发度16
LOAD_FACTOR负载因子0.75f

当然还有很多字段,这里只挑一些非常重要的。

JDK 1.8 中 ConcurrentHashMap 比较复杂,但是分析时只分析如何利用 CAS + Synchronized 进行高效的同步更新数据。

put 操作

先看源码:

1
2
3
public V put(K key, V value) {
return putVal(key, value, false); //源码如下
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap 不允许插入null键,HashMap允许插入一个null键
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
//for循环的作用:因为更新元素是使用CAS机制更新,需要不断的失败重试,直到成功为止。
for (Node<K,V>[] tab = table;;) {
// f:链表或红黑二叉树头结点,向链表中添加元素时,需要synchronized获取f的锁。
Node<K,V> f; int n, i, fh;
//判断Node[]数组是否初始化,没有则进行初始化操作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//通过hash定位Node[]数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头结点),添加失败则进入下次循环。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//检查到内部正在移动元素(Node[] 数组扩容)
else if ((fh = f.hash) == MOVED)
//帮助它扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//锁住链表或红黑二叉树的头结点
synchronized (f) {
//判断f是否是链表的头结点
if (tabAt(tab, i) == f) {
//如果fh>=0 是链表节点
if (fh >= 0) {
binCount = 1;
//遍历链表所有节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果节点存在,则更新value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//不存在则在链表尾部添加新节点。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//TreeBin是红黑二叉树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//添加树节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的size数量+1
addCount(1L, binCount);
return null;
}

简而言之,put 操作的逻辑如下:

  1. 判断 Node[] 数组是否初始化,没有则进行初始化操作

  2. 通过 hash 定位 Node[] 数组的索引坐标,是否有 Node 节点,如果没有则使用 CAS 进行添加(链表的头结点),添加失败则进入下次循环。

  3. 检查到内部正在扩容,如果正在扩容,就帮助它一块扩容。

  4. 如果 f != null,则使用 synchronized 锁住 f 元素(链表/红黑二叉树的头元素)

    4.1 如果是 Node (链表结构)则执行链表的添加操作。
    4.2 如果是 TreeNode (树型结果)则执行树添加操作。

  5. 判断链表长度已经达到临界值 8 就需要把链表转换为树结构。

    get 操作

先看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

还是不加锁,但是同 JDK 1.7 相同,Node 节点中的 value 和 next 都是 volatile 的,即线程对这些数据的修改对其他线程是立马可见的。

参考文章

  1. 方腾飞 等著 《Java 并发编程的艺术》
  2. Doug Lea 等著 《Java 并发编程实战》
  3. ConcurrentHashMap (Java Platform SE 8 ) - Oracle Help Center
  4. 探索 ConcurrentHashMap 高并发性的实现机制
  5. 聊聊并发(四)——深入分析 ConcurrentHashMap
  6. ConcurrentHashMap实现原理及源码分析
  7. ConcurrentHashMap 原理解析(JDK1.8)
  8. ConcurrentHashMap之1.7与1.8小结
  9. HashMap为什么是线程不安全的?
如果这篇文章对您很有帮助,不妨
-------------    本文结束  感谢您的阅读    -------------
0%