写点什么

读写锁 ReentranReadWriteLock 源码分析

作者:Ayue、
  • 2021 年 12 月 27 日
  • 本文字数:8740 字

    阅读完需:约 29 分钟

什么是读写锁

在上篇我们聊到了可重入锁ReentrantLcok ,但它也是一把独占锁(也叫排他锁),也就是说在同一时刻只能允许一个线程持有,但在大多数场景下,都是读多写少,并且读并不存在数据竞争的问题,因此也不存在线程安全问题,因此,如果这个时候去使用ReentrantLcok,效率必然低下,所以就有了读写锁ReentrantReadWriteLock

内部结构

读写锁也是基于 AQS 来实现的,其内部维护着一对锁,一个读锁(ReadLock)和一个写锁(WriteLock)。通过分离读锁和写锁,使得并发性比一般独占锁有着显著的提升。同时也具备公平锁和非公平锁的性质,因此在读写锁中主要基于非公平锁来分析。


/** Inner class providing readlock */private final ReentrantReadWriteLock.ReadLock readerLock;/** Inner class providing writelock */private final ReentrantReadWriteLock.WriteLock writerLock;
复制代码


类图如下:


简单使用

读写锁一般分为以下三种情形:


  • 读读并发,即同一时刻可以允许多个读线程访问,共享。

  • 读写互斥,即在写线程访问时,所有的读线程和其他写线程均被阻塞。

  • 写写互斥,在写线程访问时,其他写线程均被阻塞。


用下面的代码来演示这 3 种情形:


1、线程 1 拿到写锁,因为写锁是独占锁,所以其他线程拿不到锁。


2、线程 2 去拿读锁。


3、线程 3 去拿读锁。


4、线程 4 再去拿写锁。


5、线程 5 去拿读锁。


问:t5 是读锁 会不会和t2、t3 一起执行?


@Slf4j(topic = "ayue")public class RWTest {
public static void main(String[] args) throws InterruptedException { ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //读锁 Lock rl = rwl.readLock(); //写锁 Lock wl = rwl.writeLock();
/** * t1是写锁,让它休眠 5s 此时,其他线程拿不到锁 */ new Thread(() -> { wl.lock(); try { log.info("t1获取写锁"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { wl.unlock(); log.info("t1释放写锁"); } }, "t1").start();
//休眠1s让其按 t1 先执行 TimeUnit.SECONDS.sleep(1);
/** * 此时 t2 在 5s 内拿不到锁阻塞 */ new Thread(() -> { rl.lock(); try { log.info("t2获取读锁"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); log.info("t2释放读锁"); } }, "t2").start();
/** * 此时 t3 在 5s 内拿不到锁阻塞 * * 但在 t1释放之后 t2 和 t3 能同时拿到锁 */ new Thread(() -> { rl.lock(); try { log.info("t3获取读锁"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); log.info("t3释放读锁"); } }, "t3").start();
/** * t4 获取读锁,并睡秒 10s 问释放后 t5能不能获取读锁 */ new Thread(() -> { wl.lock(); try { log.info("t4获取写锁"); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { wl.unlock(); log.info("t4释放写锁"); } }, "t4").start();
/** * t5 是读锁 会不会和t2 t3 一起执行? * */ new Thread(() -> { rl.lock(); try { log.info("t5获取读锁"); } finally { rl.unlock(); log.info("t5释放读锁"); } }, "t5").start(); }}
复制代码


输出:


11:43:36.390 [t1] INFO  ayue - t1获取写锁11:43:41.398 [t1] INFO  ayue - t1释放写锁11:43:41.398 [t2] INFO  ayue - t2获取读锁11:43:41.398 [t3] INFO  ayue - t3获取读锁11:43:42.411 [t3] INFO  ayue - t3释放读锁11:43:42.411 [t2] INFO  ayue - t2释放读锁11:43:42.411 [t4] INFO  ayue - t4获取写锁11:43:52.423 [t4] INFO  ayue - t4释放写锁11:43:52.423 [t5] INFO  ayue - t5获取读锁11:43:52.423 [t5] INFO  ayue - t5释放读锁
复制代码


根据结果我们可以知道:


假如先有一个t1写锁拿到锁,后面有一些其他锁(可能是读锁或者写锁),当t1释放锁之后按照 FIFO 的原则唤醒等待的线程如果第一个被唤醒线程是写锁(假如t2是写锁)则不会再跟着唤醒t3及后续的锁,只有等t2执行完成之后才会去唤醒t3。假设被唤醒的t2是读锁,那么t2会去判断他的下一个t3是不是读锁,如果是则把t3唤醒,t3唤醒之后会判断t4是不是读锁,如果t4也是则唤醒t5,依次类推。但是如果t4被唤醒是写锁则不会唤醒t5了,即使后面的t5是读锁也不会唤醒。


因此就会出现上面的情况t2、t3 一起执行,t5不会一起执行。

锁降级

锁降级是指先获取写锁,再获取读锁,然后再释放写锁的过程 。锁降级是为了保证数据的可见性。锁降级是 ReentrantReadWriteLock 重要特性之一。


public class RWDowngrade {
public static void main(String[] args) { ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //读锁 Lock rl = rwl.readLock(); //写锁 Lock wl = rwl.writeLock();
/** * 锁降级 */ new Thread(() -> { wl.lock(); try { System.out.println("先获取写锁"); rl.lock(); System.out.println("后获取读锁"); } finally { rl.unlock(); wl.unlock(); } }, "t").start(); }}
复制代码


输出:


先获取写锁后获取读锁
复制代码


注意:ReentrantReadWriteLock 并不能实现锁升级。(在源码中将会知道原因)


如下,如果先获取读锁在获取写锁,则会死锁:


new Thread(() -> {    rl.lock();    try {        System.out.println("先获取读锁");        wl.lock();        System.out.println("后获取写锁");    } finally {        wl.unlock();        rl.unlock();    }}, "t").start();
复制代码


输出,直接卡住:


源码分析

ReentrantReadWriteLock 和 ReentrantLock 其实都一样,锁核心都是 Sync, 读锁和写锁都是基于 Sync 来实现的。从这分析其实 ReentrantReadWriteLock 就是一个锁,只不过内部根据不同的场景设计了两个不同的实现方式。其读写锁为两个内部类: ReadLock、WriteLock 都实现了 Lock 接口。


读写锁也是基于 AQS 实现的,但我们知道在 AQS 中是根据其中的state而且是int 整型变量来判断同步状态的,而读写锁是两把锁,怎么通过一个属性来表示两把锁呢?


如何在一个整型上维护多种状态,我们知道int类型是 32 位的,因此我们需要按位切割这个变量,读写锁将变量切割成两部分,高 16 位表示读,低 16 位表示写。



分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算(忘记位运算的可以看看我之前的文章:Java中的位运算),假如当前同步状态为 c,那么写状态等于 c & 0x0000FFFF(将高 16 位全部抹去),读状态等于c >>> 16(无符号补 0 右移 16 位)。


代码如下:


static final int SHARED_SHIFT   = 16;static final int SHARED_UNIT    = (1 << SHARED_SHIFT);static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回读状态 */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写状态 */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码


根据状态的划分能得出一个推论:c 不等于 0 时,当写状态等于 0 时,则读状态大于 0,即读锁已被获取。 在源码中也能看到这样的注释。


(Note: if c != 0 and w == 0 then shared count != 0)
复制代码

写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为 0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。


写锁的获取


写锁的获取主要在于tryAcquire()方法,源码如下:


protected final boolean tryAcquire(int acquires) {    //当前线程    Thread current = Thread.currentThread();  //获取锁的状态    int c = getState();    //写锁状态    int w = exclusiveCount(c);    if (c != 0) {//如果已经上锁        /**         * 1、w == 0,用来判断当前锁是什么锁,w为0则表示这把锁只上过读锁,没上过写锁         * 也就是说当前线程是来获取读锁的,但本来该方法是用来获取写锁,因此当前线程         * 相当于锁升级,故而获取锁失败。         * 2、如果进入 ||,则 w != 0,说明这已经上了写锁,判断当前线程是不是重入,如果不是失败。         */        if (w == 0 || current != getExclusiveOwnerThread())            return false;        /**         * 如果获取写锁的数量超过最大值65535 ,直接异常,一般都不会超过。         */        if (w + exclusiveCount(acquires) > MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // Reentrant acquire        setState(c + acquires);        return true;    }    /**     * writerShouldBlock(),简单来说就是判断是否需要排队。     * 如果是非公平锁,直接返回false,即不管有没有线程排队直接抢锁。     * 如果是公平锁,其底层调用了hasQueuedPredecessors()方法,即会判断队列中是否还存在排队的线程,如果有就   * 排队,没有则尝试加锁。     */    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))        return false;    setExclusiveOwnerThread(current);    return true;}
复制代码


通过分析源码可知:


如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。


写锁的释放


写锁的释放与ReentrantLock的释放过程基本类似,上篇文章也有讲解,可自行查看,这里不在重复。

读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为 0)时,读锁总会被成功地获取。需要注意的是,如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。


读锁的获取


写锁的获取主要在于tryAcquireShared()方法,源码如下:


//尝试获取共享状态,如果共享状态大于等于0则说明获取锁成功,否则加入同步队列。protected final int tryAcquireShared(int unused) {    Thread current = Thread.currentThread();    int c = getState();  /*     * 1、exclusiveCount(c) != 0,判断是否被上了写锁。     * 2、如果上了写锁,进入&&,为什么上了写锁还要继续判断?这就是锁的降级。     * 如果重入则一定是降级,如果不是重入则失败,因为读写需要互斥。     */    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)        return -1;    /**     * 如果上面代码没有返回执行到这里有两种情况标识     * 1、没有人上写锁      * 2、重入降级     */    int r = sharedCount(c);    /**     * readerShouldBlock():判断锁是否需要等待     * r < MAX_COUNT:判断锁的数量是否超过最大值65535     * compareAndSetState(c, c + SHARED_UNIT): 设置共享状态(读锁状态)     */    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {        // r==0,表示当前没有任何线程获取读锁        if (r == 0) {            // 设置当前线程为第一个获取读锁的线程            firstReader = current;            // 计数设置为1            firstReaderHoldCount = 1;        } else if (firstReader == current) {            // 表示重入锁,在计数其上+1            firstReaderHoldCount++;        } else {            /**             * HoldCounter 主要是一个类来记录线程获取锁的数量             * cachedHoldCounter 缓存的是最后一个获取锁线程的HoldCounter对象             */            HoldCounter rh = cachedHoldCounter;            if (rh == null || rh.tid != getThreadId(current))                cachedHoldCounter = rh = readHolds.get();            else if (rh.count == 0)                readHolds.set(rh);            rh.count++;        }        return 1;    }    //自旋方式再次尝试获取    return fullTryAcquireShared(current);}
复制代码


tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠 CAS 保证)增加读状态,成功获取读锁。


读锁的释放


protected final boolean tryReleaseShared(int unused) {    Thread current = Thread.currentThread();    //判断当前线程释放是第一个获取读锁的线程    if (firstReader == current) {        // assert firstReaderHoldCount > 0;        // 判断获取锁的次数释放为1,如果为1说明没有重入情况,直接释放firstReader = null;否则将该线程持有锁的数量 -1        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;    } else {        // 如果当前线程不是第一个获取读锁的线程。        // 获取缓存中的HoldCounter        HoldCounter rh = cachedHoldCounter;        // 如果缓存中的HoldCounter 不属于当前线程则获取当前线程的HoldCounter。        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count <= 1) {            // 如果线程持有锁的数量小于等1 直接删除HoldCounter            readHolds.remove();            if (count <= 0)                throw unmatchedUnlockException();        }        // 持有锁数量大于1 则执行 - 1操作        --rh.count;    }    // 自旋释放同步状态    for (; ; ) {        int c = getState();        int nextc = c - SHARED_UNIT;        if (compareAndSetState(c, nextc))            // Releasing the read lock has no effect on readers,            // but it may allow waiting writers to proceed if            // both read and write locks are now free.            return nextc == 0;    }}
复制代码


锁的释放比较简单,首先看当前线程是否是第一个获取读锁的线程,如果是并且没有发生重入,则将首次获取读锁变量设为 null, 如果发生重入,则将首次获取读锁计数器 -1 其次 查看缓存中计数器是否为空或者是否是当前线程,如果为空或者不是则获取当前线程的计数器,如果计数器个数小于等 1, 从 ThreadLocl 中删除计数器,并计数器值-1,如果小于等于 0 异常 。最后自旋修改同步状态。

高性能读写锁 StampedLock

ReentrantReadWriteLock 的性能已经很好了,但是底层还是需要进行一系列的 CAS 操作去加锁,因此 JUC 还提供了另外一个读写锁 StampedLock。


StampedLock 如果是读锁上锁是没有这种 CAS 操作的,因此性能比 ReentrantReadWriteLock 更好,也称为乐观读锁,即读获取锁的时候不加锁,直接返回一个值,然后执行临界区的时候去验证这个值是否有被人修改(写操作加锁)如果没有被人修改则直接执行临界区的代码,如果被人修改了则需要升级为读写锁 ReadLock 。

基本语法

//获取戳 不存在锁 long stamp = lock.tryOptimisticRead(); //验证戳 if(lock.validate(stamp)){     //成立则执行临界区的代码     //返回 }//如果没有返回则表示被人修改了 需要升级成为readLock lock.readLock();
复制代码

代码示例

import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.StampedLock;
@Slf4j(topic = "ayue")public class DataContainer { int i; long stampw = 0l;
public void setI(int i) { this.i = i; }
private final StampedLock lock = new StampedLock(); //首先 加 StampedLock
@SneakyThrows public int read() { //尝试一次乐观读 long stamp = lock.tryOptimisticRead(); log.debug("StampedLock 读锁拿到的戳{}", stamp); //1s之后验戳 TimeUnit.SECONDS.sleep(1); //验戳 if (lock.validate(stamp)) { log.debug("StampedLock 验证完毕stamp{}, data.i:{}", stamp, i); return i; } //一定验证失败 log.debug("验证失败 被写线程给改变了{}", stampw); try { //锁的升级 也会改戳 stamp = lock.readLock(); log.debug("升级之后的加锁成功 {}", stamp);
TimeUnit.SECONDS.sleep(1); log.debug("升级读锁完毕{}, data.i:{}", stamp, i); return i; } finally { log.debug("升级锁解锁 {}", stamp); lock.unlockRead(stamp); } }
@SneakyThrows public void write(int i) { //cas 加锁 stampw = lock.writeLock(); log.debug("写锁加锁成功 {}", stampw); try { TimeUnit.SECONDS.sleep(5); this.i = i; } finally { log.debug("写锁解锁 {},data.i:{}", stampw, i); lock.unlockWrite(stampw); } }
public static void main(String[] args) throws InterruptedException { //实例化数据容器 DataContainer dataContainer = new DataContainer(); //给了一个初始值 不算写 构造方法赋值 dataContainer.setI(1); //读取 new Thread(() -> { dataContainer.read(); }, "t1").start();
new Thread(() -> { dataContainer.read(); }, "t2").start();
// new Thread(() -> {// dataContainer.write(9);// }, "t3").start(); }}
复制代码


都是读锁输出(t1,t2):


11:58:48.839 [t1] DEBUG ayue - StampedLock 读锁拿到的戳25611:58:48.839 [t2] DEBUG ayue - StampedLock 读锁拿到的戳25611:58:49.854 [t2] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:111:58:49.854 [t1] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:1
复制代码


如果有修改(t1,t3):


12:07:36.234 [t1] DEBUG ayue - StampedLock 读锁拿到的戳25612:07:37.238 [t3] DEBUG ayue - 写锁加锁成功 38412:07:41.213 [t1] DEBUG ayue - 验证失败 被写线程给改变了38412:07:42.240 [t3] DEBUG ayue - 写锁解锁 384,data.i:912:07:42.240 [t1] DEBUG ayue - 升级之后的加锁成功 51312:07:43.252 [t1] DEBUG ayue - 升级读锁完毕513, data.i:912:07:43.252 [t1] DEBUG ayue - 升级锁解锁 513
复制代码

StampedLock 能否替代 ReentrantReadWriteLock

StampedLock 性能虽然高于 ReentrantReadWriteLock,但在使用过程中也需要按场景,StampedLock 具有一下特点:


  1. 不支持重入

  2. 不支持条件队列

  3. 存在一定的并发问题

总结

在线程持有读锁的情况下,该线程不能取得写锁(为了保证写操作对后续所有的读操作保持可见性)。


在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。


仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁。


写锁可以“降级”为读锁,读锁不能“升级”为写锁。

发布于: 2021 年 12 月 27 日
用户头像

Ayue、

关注

个人站点:javatv.net 2019.10.16 加入

学习知识,目光坚毅

评论

发布
暂无评论
读写锁ReentranReadWriteLock源码分析