ReentrantReadWriteLock读写锁源码分析

ReentrantReadWriteLock读写锁源码分析

请注意,本文编写于  1,075  天前,最后修改于  1,075  天前,其中某些信息可能已经过时。

读写锁的概念

常见的锁(Synchronized 和 Lock 的实现类)基本都是排他锁,同一时刻只允许一个线程进行访问。提起 ReentrantReadWriteLock,不得不想起 ReentrantLock 这个可重入排他锁。实际上这两者之间没有太大关系,都是 Lock 的独立实现。

在介绍原理之前,先简单叙述一下什么是读写锁:

读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其它写线程均被阻塞。具体而言,一个线程要获取读锁的前提条件是:没有其它线程的写锁;没有写请求或者有写请求,但调用线程和持有锁的线程是同一个 。一个线程要获取写锁的前提条件是:没有其它线程的读锁;没有其它线程的写锁。

为了表示方便,以下叙述读写锁均指 ReentrantReadWriteLock 这个实现类,读写锁维护了一对锁:读锁和写锁,通过分离读写锁,使得其吞吐量和并发性在某些应用场景下(尤其是读多写少的多线程环境下)较排他锁提升较大。除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁还简化了读写交互场景的编程方式,使其变得更加简单明了。

在没有读写锁支持之前(Java 5 之前),如果要完成这种场景的代码编写,往往需要使用“等待-通知”机制,具体而言,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知后,所有等待的读操作才能继续执行,这样做的目的主要是使读操作能够读到正确的数据,不会出现脏读。而使用读写锁实现上述功能时,只需要在读操作的时候获取读锁,写操作的时候获取写锁就可以了,内部线程等待通知由内部进行实现,留给程序员的接口变得更加方便和直观。

ReentrantReadWriteLock 具备的特性:

特性说明
公平性选择支持非公平(默认)和公平两种锁获取方式
重进入支持重进入,读线程获取读锁后,能够再次获取读锁,写线程获取写锁后能再次获取写锁,同时也可以获取读锁
锁降级遵循获取写锁、获取读锁再释放写锁的锁降级,写锁能够降级为读锁

读写锁的接口和示例

首先看下接口的 ReadWriteLock,这个接口仅定义两个方法:读写 ReadLock() 和写锁 WriteLock()。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     */
    Lock writeLock();
}

而 ReentrantReadWriteLock 是其实现类:

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable

读写锁中有五个静态内部类:

  • Sync
  • NonfairSync
  • FairSync
  • ReadLock
  • WriteLock

它们之间的关系如下图所示:

其中 Sync 继承自 AQS,是读写锁实现的核心,关于 AQS 可以参考我的另外一篇博客。FairSync 是公平锁的底层实现,NonfairSync 是非公平锁的底层实现。

ReentrantReadWriteLock 类中,除了 readLock() 和 writeLock() 的实现方法外,还提供了一些便于外界监控内部工作状态的方法,这些方法和描述如下:

方法名称描述
int getReadLockCount()返回当前读锁被获取的次数,该次数不等于读锁的线程数,因为可重入的原因,一个线程连续获取了 n 次读锁,那么占据该读锁的线程数是 1,但该方法返回 n
int getReadHoldCount()返回当前线程获取读锁的次数,该方法在 Java 6 中加入,使用 ThreadLocal 保存当前线程获取的次数,这也使得 Java 6的实现变得更复杂
boolean isWriteLocked()判断写锁是否被获取
int getWriteHoldCount()返回当前线写锁被获取的次数

我们可以使用一个例子来表示读写锁的使用方式,示例代码如下:

public class Cache{
    static Map<String,Object> map = new HashMap<>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    
    //获取一个Key对应的 value
    public static final Object get(String key){
        r.lock();
        try{
            return map.get(key);
        }finally{
            r.unlock();
        }
    }
    
    //设置key对应的 value,并返回旧的value
    public static final Object put(String key,Object value){
        w.lock();
        try{
            return map.put(key,value);
        }finally{
            w.unlock();
        }
    }
    
    //清空所有的内容
    public static final void clear(){
        w.lock();
        try{
            map.clear();
        }finally{
            w.unlock();
        }
    }
}

上述示例中的 Cache 组合一个非线程安全的 HashMap 作为缓存的实现,同时使用读写锁来保证 Cache 是线程安全的。其中在 get 方法中,需要获取读锁,这使得并发访问该方法时线程不会被阻塞;在 put 方法和 clear 方法中,更新 HashMap 时必须提前获取写锁,并且其它获取读锁和写锁的线程均被阻塞,只有写锁释放后,其它读写操作才能继续进行。

由此可见,使用读写锁提高了多线程环境下读操作的并发性,并且保证了写操作的互斥性和读写操作之间的可见性。尤其是简化了代码的编写方式。

读写锁的源码分析

了解了常用的使用方法后,我们接下来就需要探析一下读写锁的源码实现和核心思想。

读写状态的设计

通过前面分析我们知道,读写锁是依赖 AQS 实现同步功能的,其核心就是读写锁的静态内部类 Sync,其中实现的方法如下:

我们知道,AQS 实现同步是依赖一个 volatile 修饰的 int 类型的 state 状态变量和一个 CLH 队列,那么要靠这个 int 类型的变量同时表示“读”和“写”状态,就只能按照位切割了。事实上读写锁也是这么去实现的,用高 16 位表示读,低 16 位表示写,其划分方式如下图所示:

通过位运算可以很快的计算并确定读写锁各自的状态。假设当前同步状态 state 值为 S,写状态就可以表示为 S & 0x0000FFFF(将高 16 位全部抹去),读状态可以表示为S >> 16(无符号补 0 右移 16 位)。当写状态增加 1 时,就可以表示为 S + 1,当读状态增加 1 时,等于 S + (1 << 16),也就是S + 0x00010000

根据状态的划分能得出一个推论: S 不等于 0 时,当写状态( S & 0x0000FFFF)等于 0 时,则读状态(S >> 16)大于 0,即读锁已被获取。

写锁的获取与释放

写锁的获取入口:

// WriteLock
public void lock() {
    sync.acquire(1);
}

// AQS
public final void acquire(int arg) {
    // 尝试获取,获取失败后入队,入队失败则interrupt当前线程
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

写锁的获取依赖tryAcquire()方法:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    
    // 操作1:c != 0,说明存在读锁或者写锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)  
        // 写锁为0,读锁不为0 或者获取写锁的线程并不是当前线程,直接失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        
        // Reentrant acquire
        // 执行到这里说明是写锁线程的重入操作,直接修改状态,也不需要CAS因为没有竞争
        setState(c + acquires);
        return true;
    }
    
    // 操作2:获取写锁,writerShouldBlock对于非公平模式直接返回fasle,对于公平模式则线程需要排队,因此需要阻塞。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

从源码可以看到写锁是一个支持重进入的排他锁,如果当前线程已经获取到写锁,就增加写状态;如果当前线程在获取写锁时,读锁已经被获取(判断条件是读状态不为 0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

为什么存在读锁时,写锁不能被获取呢?这是为了保证写锁的操作对读锁可见,如果允许读锁在已经存在的条件下获取写锁,那么正在运行的其它线程就无法感知到当前线程的写操作。

写锁的释放入口:

// WriteLock
public void unlock() {
    sync.release(1);
}
// AQS
public final boolean release(int arg) {
    // 释放锁成功后唤醒队列中第一个线程
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

写锁的释放与 ReentrantLock 的释放操作基本一致,这里没有什么好说的。

protected final boolean tryRelease(int releases) {
    // 如果当前线程没有获取写锁却释放,则直接抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    
    // 状态变更至nextc
    int nextc = getState() - releases;
    
    // 因为写锁是可以重入,所以在都释放完毕后要把独占标识清空
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    
    // 修改状态
    setState(nextc);
    return free;
}

读锁的获取与释放

读锁的获取入口:

// ReadLock
public void lock() {
    sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

可以看到依赖于 tryAcquireShared 方法,这个方法源码如下:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 操作1:存在写锁,并且写锁不是当前线程则直接去排队
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    
    // 操作2:读锁是否该阻塞,对于非公平模式下写锁获取优先级会高,如果存在要获取写锁的线程则读锁需要让步,公平模式下则先来先到
    if (!readerShouldBlock() && 
        // 读锁使用高16位,因此存在获取上限为2^16-1
        r < MAX_COUNT &&
        // 操作3:CAS修改读锁状态,实际上是读锁状态+1
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 操作4:执行到这里说明读锁已经获取成功,因此需要记录线程状态。
        if (r == 0) {
            firstReader = current; // firstReader是把读锁状态从0变成1的那个线程
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { 
            firstReaderHoldCount++;
        } else {
            // 这些代码实际上是从ThreadLocal中获取当前线程重入读锁的次数,然后自增下。
            HoldCounter rh = cachedHoldCounter; // cachedHoldCounter是上一个获取锁成功的线程
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 当操作2,操作3失败时执行该逻辑
    return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 最外层嵌套循环
    for (;;) {
        int c = getState();
        // 操作5:存在写锁,且写锁并非当前线程则直接返回失败
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
            // 操作6:如果当前线程是重入读锁则放行
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            // 当前是firstReader,则直接放行,说明是已获取的线程重入读锁
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 执行到这里说明是其他线程,如果是cachedHoldCounter(其count不为0)也就是上一个获取锁的线程则可以重入,否则进入AQS中排队
                // **这里也是对写锁的让步**,如果队列中头结点为写锁,那么当前获取读锁的线程要进入队列中排队
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 说明是上述刚初始化的rh,所以直接去AQS中排队
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 操作7:修改读锁状态,实际上读锁自增操作
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 操作8:对ThreadLocal中维护的获取锁次数进行更新。
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

由上可以看到读锁是一个支持重进入的共享锁,能够被多个线程同时获取,在没有其它写线程访问时,读锁总会被成功获取,所做的也只是增加读状态。如果当前线程已经获取了读锁,则使用 CAS 增加读状态;如果当前线程在获取读锁时,写锁已经被其它线程获取,则进入等待状态。

获取读锁在 JDK1.6 后变得复杂了许多,例如新增了使用 ThreadLocal 的实现的 getReadCount() 方法来返回当前线程获取读锁的次数。

读锁的释放入口:

// ReadLock
public void unlock() {
    sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared(); // 这里实际上是释放读锁后唤醒写锁的线程操作
        return true;
    }
    return false;
}

其中释放的具体方法如下:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 操作1:清理ThreadLocal对应的信息
    if (firstReader == current) {;
                                 if (firstReaderHoldCount == 1)
                                     firstReader = null;
                                 else
                                     firstReaderHoldCount--;
                                } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        // 已释放完的读锁的线程清空操作
        if (count <= 1) {
            readHolds.remove();
            // 如果没有获取锁却释放则会报该错误
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 操作2:循环中利用CAS修改读锁状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

读锁的释放主要是:

  1. 清理 ThreadLocal 中保存的获取锁数量信息
  2. CAS 修改读锁个数,实际上是自减一

锁降级

锁降级指的是写锁降级为读锁。

如果当前线程拥有写锁,然后将其释放,最后再获取读锁。这种分段完成的过程不能称为锁降级。锁降级实际上是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。但是从读锁升级到写锁是不可能的。

那么这个锁降级体现在哪里呢?

在 tryAcquireShared 方法和 fullTryAcquireShared 中都有体现,例如下面的判断:

if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
        return -1;

上面的代码的意思是:当写锁被持有时,如果持有该锁的线程不是当前线程,就返回 “获取锁失败”,反之当前线程可以继续获取读锁。这就称之为锁降级。

用《Java 并发编程的艺术》书中的例子可以说明:

public void processData(){
    readLock.lock();
    if(!update){
        //必须先释放读锁
        readLock.unlock();
        
        //锁降级从写锁获取到开始
        writeLock.lock();
        try{
            if(!update){
                //准备数据的流程(略)
                ...
                update = true;
            }
            readLock.unlock();
        }finally{
            writeLock.unlock();
        }
        //锁降级完成,写锁降级为读锁
    }
    try{
        //使用数据的流程(略)
        ...
    }finally{
        readLock.unlock();
    }
}

上述示例中,当数据发生变更后,update 变量(布尔类型且使用 volatile 修饰)被设置为 false,此时所有访问 processDate() 方法的线程都能感知到变化,但只有一个线程能够获取到写锁,其它线程会被阻塞在读锁和写锁的 unlock() 方法上。当前线程获取写锁完成数据准备后,再获取读锁,随后释放写锁,完成锁降级。
那么锁降级的意义是什么?很多书上或者博客上都会解释如下:

锁降级中读锁的获取是有必要的,主要是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,假设此时另外一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新。如果当前 线程获取读锁,即遵循锁降级的步骤,则线程 T 将会被阻塞,知道当前线程使用数据并释放读锁后,线程 T 才能获取写锁进行数据更新

不过这里解释的是锁降级步骤中读锁的获取是否有必要,跟锁降级的意义似乎并没有太大的关联。根据参考文章 [4],作者提出上面这种解释也是存在问题的。
上面提到,锁降级中,读锁的获取的目的是 “为了保证数据的可见性”。而得到这个结论的依据是 “如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新”。
这里貌似有个漏洞:如果另一个线程获取了写锁(并修改了数据),那么这个锁就被独占了,没有任何其他线程可以读到数据,更不用谈 “感知数据更新”。

作者认为,锁降级说白了就是写锁的一种特殊重入机制。通过这种重入,可以减少一步流程——释放写锁后再次获取读锁。

使用了锁降级,就可以减去释放写锁的步骤。直接获取读锁。效率更高。而且没有线程争用。和 “可见性” 并没有关系。我个人通过阅读源码也觉得该作者的解释更加合理。

用一幅图来展示锁降级:

总的来说,锁降级就是一种特殊的锁重入机制,JDK 使用 先获取写入锁,然后获取读取锁,最后释放写入锁 这个步骤,是为了提高获取锁的效率,而不是所谓的可见。

最后再总结一下获取锁的逻辑,首先判断写锁释放被持有了,如果被持有了,且是当前线程,使用锁降级,如果没有,读锁正常获取。

获取过程中,会使用 firstReader 和 cachedHoldCounter 提高性能。

总结

  • 读写锁适用于读多写少的应用场景,相比于其它排他锁可以提升吞吐量
  • 读写锁利用同步状态 state 位切割方式分为读锁和写锁
  • 读锁是共享锁,写锁是排他锁
  • 锁降级有一定争议,但个人认为其意义是提升获取锁的效率

参考文章

  1. 方腾飞 等著 《Java 并发编程的艺术》
  2. Read/Write Locks in Java (ReentrantReadWriteLock)
  3. JUC 可重入 读写锁 ReentrantReadWriteLock
  4. 并发编程之——读锁源码分析(解释关于锁降级的争议)
  5. Java多线程——ReentrantReadWriteLock源码阅读

本文由 Sanarous 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可,转载前请务必署名
本文链接:https://bestzuo.cn/posts/1665595927.html
最后更新于:2019-07-17 16:35:18

切换主题 | SCHEME TOOL