ReentrantReadWriteLock读写锁源码分析 - Sanarous的博客

ReentrantReadWriteLock读写锁源码分析

读写锁的概念

常见的锁(Synchronized 和 Lock 的实现类)基本都是排他锁,同一时刻只允许一个线程进行访问。提起 ReentrantReadWriteLock,不得不想起 ReentrantLock 这个可重入排他锁。实际上这两者之间没有太大关系,都是 Lock 的独立实现。

在介绍原理之前,先简单叙述一下什么是读写锁:

读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其它写线程均被阻塞。具体而言,一个线程要获取读锁的前提条件是:没有其它线程的写锁;没有写请求或者有写请求,但调用线程和持有锁的线程是同一个 。一个线程要获取写锁的前提条件是:没有其它线程的读锁;没有其它线程的写锁。

为了表示方便,以下叙述读写锁均指 ReentrantReadWriteLock 这个实现类,读写锁维护了一对锁:读锁和写锁,通过分离读写锁,使得其吞吐量和并发性在某些应用场景下(尤其是读多写少的多线程环境下)较排他锁提升较大。除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁还简化了读写交互场景的编程方式,使其变得更加简单明了。

在没有读写锁支持之前(Java 5 之前),如果要完成这种场景的代码编写,往往需要使用“等待-通知”机制,具体而言,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知后,所有等待的读操作才能继续执行,这样做的目的主要是使读操作能够读到正确的数据,不会出现脏读。而使用读写锁实现上述功能时,只需要在读操作的时候获取读锁,写操作的时候获取写锁就可以了,内部线程等待通知由内部进行实现,留给程序员的接口变得更加方便和直观。

ReentrantReadWriteLock 具备的特性:

特性说明
公平性选择支持非公平(默认)和公平两种锁获取方式
重进入支持重进入,读线程获取读锁后,能够再次获取读锁,写线程获取写锁后能再次获取写锁,同时也可以获取读锁
锁降级遵循获取写锁、获取读锁再释放写锁的锁降级,写锁能够降级为读锁

读写锁的接口和示例

首先看下接口的 ReadWriteLock,这个接口仅定义两个方法:读写 ReadLock() 和写锁 WriteLock()。

1
2
3
4
5
6
7
8
9
10
11
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*/
Lock writeLock();
}

而 ReentrantReadWriteLock 是其实现类:

1
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable

读写锁中有五个静态内部类:

  • Sync
  • NonfairSync
  • FairSync
  • ReadLock
  • WriteLock

它们之间的关系如下图所示:

其中 Sync 继承自 AQS,是读写锁实现的核心,关于 AQS 可以参考我的另外一篇博客。FairSync 是公平锁的底层实现,NonfairSync 是非公平锁的底层实现。

ReentrantReadWriteLock 类中,除了 readLock() 和 writeLock() 的实现方法外,还提供了一些便于外界监控内部工作状态的方法,这些方法和描述如下:

方法名称描述
int getReadLockCount()返回当前读锁被获取的次数,该次数不等于读锁的线程数,因为可重入的原因,一个线程连续获取了 n 次读锁,那么占据该读锁的线程数是 1,但该方法返回 n
int getReadHoldCount()返回当前线程获取读锁的次数,该方法在 Java 6 中加入,使用 ThreadLocal 保存当前线程获取的次数,这也使得 Java 6的实现变得更复杂
boolean isWriteLocked()判断写锁是否被获取
int getWriteHoldCount()返回当前线写锁被获取的次数

我们可以使用一个例子来表示读写锁的使用方式,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Cache{
static Map<String,Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();

//获取一个Key对应的 value
public static final Object get(String key){
r.lock();
try{
return map.get(key);
}finally{
r.unlock();
}
}

//设置key对应的 value,并返回旧的value
public static final Object put(String key,Object value){
w.lock();
try{
return map.put(key,value);
}finally{
w.unlock();
}
}

//清空所有的内容
public static final void clear(){
w.lock();
try{
map.clear();
}finally{
w.unlock();
}
}
}

上述示例中的 Cache 组合一个非线程安全的 HashMap 作为缓存的实现,同时使用读写锁来保证 Cache 是线程安全的。其中在 get 方法中,需要获取读锁,这使得并发访问该方法时线程不会被阻塞;在 put 方法和 clear 方法中,更新 HashMap 时必须提前获取写锁,并且其它获取读锁和写锁的线程均被阻塞,只有写锁释放后,其它读写操作才能继续进行。

由此可见,使用读写锁提高了多线程环境下读操作的并发性,并且保证了写操作的互斥性和读写操作之间的可见性。尤其是简化了代码的编写方式。

读写锁的源码分析

了解了常用的使用方法后,我们接下来就需要探析一下读写锁的源码实现和核心思想。

读写状态的设计

通过前面分析我们知道,读写锁是依赖 AQS 实现同步功能的,其核心就是读写锁的静态内部类 Sync,其中实现的方法如下:

我们知道,AQS 实现同步是依赖一个 volatile 修饰的 int 类型的 state 状态变量和一个 CLH 队列,那么要靠这个 int 类型的变量同时表示“读”和“写”状态,就只能按照位切割了。事实上读写锁也是这么去实现的,用高 16 位表示读,低 16 位表示写,其划分方式如下图所示:

通过位运算可以很快的计算并确定读写锁各自的状态。假设当前同步状态 state 值为 S,写状态就可以表示为 S & 0x0000FFFF(将高 16 位全部抹去),读状态可以表示为S >> 16(无符号补 0 右移 16 位)。当写状态增加 1 时,就可以表示为 S + 1,当读状态增加 1 时,等于 S + (1 << 16),也就是S + 0x00010000

根据状态的划分能得出一个推论: S 不等于 0 时,当写状态( S & 0x0000FFFF)等于 0 时,则读状态(S >> 16)大于 0,即读锁已被获取。

写锁的获取与释放

写锁的获取入口:

1
2
3
4
5
6
7
8
9
10
11
// WriteLock
public void lock() {
sync.acquire(1);
}

// AQS
public final void acquire(int arg) {
// 尝试获取,获取失败后入队,入队失败则interrupt当前线程
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

写锁的获取依赖tryAcquire()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);

// 操作1:c != 0,说明存在读锁或者写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 写锁为0,读锁不为0 或者获取写锁的线程并不是当前线程,直接失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// Reentrant acquire
// 执行到这里说明是写锁线程的重入操作,直接修改状态,也不需要CAS因为没有竞争
setState(c + acquires);
return true;
}

// 操作2:获取写锁,writerShouldBlock对于非公平模式直接返回fasle,对于公平模式则线程需要排队,因此需要阻塞。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

从源码可以看到写锁是一个支持重进入的排他锁,如果当前线程已经获取到写锁,就增加写状态;如果当前线程在获取写锁时,读锁已经被获取(判断条件是读状态不为 0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

为什么存在读锁时,写锁不能被获取呢?这是为了保证写锁的操作对读锁可见,如果允许读锁在已经存在的条件下获取写锁,那么正在运行的其它线程就无法感知到当前线程的写操作。

写锁的释放入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// WriteLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 释放锁成功后唤醒队列中第一个线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

写锁的释放与 ReentrantLock 的释放操作基本一致,这里没有什么好说的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
// 如果当前线程没有获取写锁却释放,则直接抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

// 状态变更至nextc
int nextc = getState() - releases;

// 因为写锁是可以重入,所以在都释放完毕后要把独占标识清空
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);

// 修改状态
setState(nextc);
return free;
}

读锁的获取与释放

读锁的获取入口:

1
2
3
4
5
6
7
8
9
// ReadLock
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

可以看到依赖于 tryAcquireShared 方法,这个方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 操作1:存在写锁,并且写锁不是当前线程则直接去排队
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);

// 操作2:读锁是否该阻塞,对于非公平模式下写锁获取优先级会高,如果存在要获取写锁的线程则读锁需要让步,公平模式下则先来先到
if (!readerShouldBlock() &&
// 读锁使用高16位,因此存在获取上限为2^16-1
r < MAX_COUNT &&
// 操作3:CAS修改读锁状态,实际上是读锁状态+1
compareAndSetState(c, c + SHARED_UNIT)) {
// 操作4:执行到这里说明读锁已经获取成功,因此需要记录线程状态。
if (r == 0) {
firstReader = current; // firstReader是把读锁状态从0变成1的那个线程
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 这些代码实际上是从ThreadLocal中获取当前线程重入读锁的次数,然后自增下。
HoldCounter rh = cachedHoldCounter; // cachedHoldCounter是上一个获取锁成功的线程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 当操作2,操作3失败时执行该逻辑
return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 最外层嵌套循环
for (;;) {
int c = getState();
// 操作5:存在写锁,且写锁并非当前线程则直接返回失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 操作6:如果当前线程是重入读锁则放行
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
// 当前是firstReader,则直接放行,说明是已获取的线程重入读锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 执行到这里说明是其他线程,如果是cachedHoldCounter(其count不为0)也就是上一个获取锁的线程则可以重入,否则进入AQS中排队
// **这里也是对写锁的让步**,如果队列中头结点为写锁,那么当前获取读锁的线程要进入队列中排队
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 说明是上述刚初始化的rh,所以直接去AQS中排队
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 操作7:修改读锁状态,实际上读锁自增操作
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 操作8:对ThreadLocal中维护的获取锁次数进行更新。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

由上可以看到读锁是一个支持重进入的共享锁,能够被多个线程同时获取,在没有其它写线程访问时,读锁总会被成功获取,所做的也只是增加读状态。如果当前线程已经获取了读锁,则使用 CAS 增加读状态;如果当前线程在获取读锁时,写锁已经被其它线程获取,则进入等待状态。

获取读锁在 JDK1.6 后变得复杂了许多,例如新增了使用 ThreadLocal 的实现的 getReadCount() 方法来返回当前线程获取读锁的次数。

读锁的释放入口:

1
2
3
4
5
6
7
8
9
10
11
12
// ReadLock
public void unlock() {
sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 这里实际上是释放读锁后唤醒写锁的线程操作
return true;
}
return false;
}

其中释放的具体方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 操作1:清理ThreadLocal对应的信息
if (firstReader == current) {;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 已释放完的读锁的线程清空操作
if (count <= 1) {
readHolds.remove();
// 如果没有获取锁却释放则会报该错误
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 操作2:循环中利用CAS修改读锁状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

读锁的释放主要是:

  1. 清理 ThreadLocal 中保存的获取锁数量信息
  2. CAS 修改读锁个数,实际上是自减一

锁降级

锁降级指的是写锁降级为读锁。

如果当前线程拥有写锁,然后将其释放,最后再获取读锁。这种分段完成的过程不能称为锁降级。锁降级实际上是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。但是从读锁升级到写锁是不可能的。

那么这个锁降级体现在哪里呢?

在 tryAcquireShared 方法和 fullTryAcquireShared 中都有体现,例如下面的判断:

1
2
3
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;

上面的代码的意思是:当写锁被持有时,如果持有该锁的线程不是当前线程,就返回 “获取锁失败”,反之当前线程可以继续获取读锁。这就称之为锁降级。

用《Java 并发编程的艺术》书中的例子可以说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void processData(){
readLock.lock();
if(!update){
//必须先释放读锁
readLock.unlock();

//锁降级从写锁获取到开始
writeLock.lock();
try{
if(!update){
//准备数据的流程(略)
...
update = true;
}
readLock.unlock();
}finally{
writeLock.unlock();
}
//锁降级完成,写锁降级为读锁
}
try{
//使用数据的流程(略)
...
}finally{
readLock.unlock();
}
}

上述示例中,当数据发生变更后,update 变量(布尔类型且使用 volatile 修饰)被设置为 false,此时所有访问 processDate() 方法的线程都能感知到变化,但只有一个线程能够获取到写锁,其它线程会被阻塞在读锁和写锁的 unlock() 方法上。当前线程获取写锁完成数据准备后,再获取读锁,随后释放写锁,完成锁降级。
那么锁降级的意义是什么?很多书上或者博客上都会解释如下:

锁降级中读锁的获取是有必要的,主要是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,假设此时另外一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新。如果当前 线程获取读锁,即遵循锁降级的步骤,则线程 T 将会被阻塞,知道当前线程使用数据并释放读锁后,线程 T 才能获取写锁进行数据更新

不过这里解释的是锁降级步骤中读锁的获取是否有必要,跟锁降级的意义似乎并没有太大的关联。根据参考文章 [4],作者提出上面这种解释也是存在问题的。
上面提到,锁降级中,读锁的获取的目的是 “为了保证数据的可见性”。而得到这个结论的依据是 “如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新”。
这里貌似有个漏洞:如果另一个线程获取了写锁(并修改了数据),那么这个锁就被独占了,没有任何其他线程可以读到数据,更不用谈 “感知数据更新”。

作者认为,锁降级说白了就是写锁的一种特殊重入机制。通过这种重入,可以减少一步流程——释放写锁后再次获取读锁。

使用了锁降级,就可以减去释放写锁的步骤。直接获取读锁。效率更高。而且没有线程争用。和 “可见性” 并没有关系。我个人通过阅读源码也觉得该作者的解释更加合理。

用一幅图来展示锁降级:

总的来说,锁降级就是一种特殊的锁重入机制,JDK 使用 先获取写入锁,然后获取读取锁,最后释放写入锁 这个步骤,是为了提高获取锁的效率,而不是所谓的可见。

最后再总结一下获取锁的逻辑,首先判断写锁释放被持有了,如果被持有了,且是当前线程,使用锁降级,如果没有,读锁正常获取。

获取过程中,会使用 firstReader 和 cachedHoldCounter 提高性能。

总结

  • 读写锁适用于读多写少的应用场景,相比于其它排他锁可以提升吞吐量
  • 读写锁利用同步状态 state 位切割方式分为读锁和写锁
  • 读锁是共享锁,写锁是排他锁
  • 锁降级有一定争议,但个人认为其意义是提升获取锁的效率

参考文章

  1. 方腾飞 等著 《Java 并发编程的艺术》
  2. Read/Write Locks in Java (ReentrantReadWriteLock)
  3. JUC 可重入 读写锁 ReentrantReadWriteLock
  4. 并发编程之——读锁源码分析(解释关于锁降级的争议)
  5. Java多线程——ReentrantReadWriteLock源码阅读
如果这篇文章对您很有帮助,不妨
-------------    本文结束  感谢您的阅读    -------------
0%