写点什么

图解 ReentrantReadWriteLock 读写锁的实现原理

作者:JAVA旭阳
  • 2022-10-17
    浙江
  • 本文字数:8124 字

    阅读完需:约 1 分钟

概述

ReentrantReadWriteLock读写锁是使用 AQS 的集大成者,用了独占模式和共享模式。本文和大家一起理解下ReentrantReadWriteLock读写锁的实现原理。在这之前建议大家阅读下下面 3 篇关联文章:


深入浅出理解Java并发AQS的独占锁模式


深入浅出理解Java并发AQS的共享锁模式


通俗易懂读写锁ReentrantReadWriteLock的使用

原理概述


上图是ReentrantReadWriteLock读写锁的类结构图:


  • 实现了ReadWriteLock接口,该接口提供了获取读锁和写锁的 API。

  • ReentrantReadWriteLock读写锁内部的成员变量 readLock 是读锁,指向内部类 ReadLock。

  • ReentrantReadWriteLock读写锁内部的成员变量 writeLock 是写锁,指向内部类 WriteLock。

  • ReentrantReadWriteLock读写锁内部的成员变量 sync 是继承 AQS 的同步器,他有两个子类FairSync公平同步器和NoFairSync非公平同步器,读写锁内部也有一个 sync,他们使用的是同一个 sync。


读写锁用的同一个 sync 同步器,那么他们共享同一个 state, 这样不会混淆吗?


不会,ReentrantReadWriteLock读写锁使用了 AQS 中 state 值得低 16 位表示写锁得计数,用高 16 位表示读锁得计数,这样就可以使用同一个 AQS 同时管理读锁和写锁。


  1. ReentrantReadWriteLock 类重要成员变量


// 读锁private final ReentrantReadWriteLock.ReadLock readerLock;// 写锁private final ReentrantReadWriteLock.WriteLock writerLock;// 同步器final Sync sync;
复制代码


  1. ReentrantReadWriteLock 构造方法


//默认是非公平锁,可以指定参数创建公平锁public ReentrantReadWriteLock(boolean fair) {    // true 为公平锁    sync = fair ? new FairSync() : new NonfairSync();    // 这两个 lock 共享同一个 sync 实例,都是由 ReentrantReadWriteLock 的 sync 提供同步实现    readerLock = new ReadLock(this);    writerLock = new WriteLock(this);}
复制代码


  1. Sync 类重要成员变量


// 用来移位static final int SHARED_SHIFT   = 16;// 高16位的1static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 65535,16个1,代表写锁的最大重入次数static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 低16位掩码:0b 1111 1111 1111 1111,用来获取写锁重入的次数static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获取读写锁的读锁分配的总次数static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 写锁(独占)锁的重入次数static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码

加锁原理

图解过程

设计一个加锁场景,t1 线程加写锁,t2 线程加读锁,我们看下它们整个加锁得流程。


  1. t1 加写锁w.lock()成功,占了 state 的低 16 位。



  • 这里得 state 分为两部分0_1,0 表示高 16 位的值,1 表示低 16 位的值。

  • AQS 当前占用线程exclusiveOwnerThread属性指向 t1 线程。


  1. t2 线程执行加读锁 r.lock(),尝试获取锁,发现已经被写锁占据了,加锁失败。



  1. t2 线程被封装成一个共享模式 Node.SHARED 的节点,加入到 AQS 的队列中。



  1. 在阻塞前,t2 线程发现自己是队列中的老二,会尝试再次获取读锁,因为 t1 没有释放,它会失败,然后它会把队列的前驱节点的状态改为-1,然后阻塞自身,也就是 t2 线程。



  • 上面中黄色三角形就是等待状态的值,前驱节点变成-1

  • 上面中的灰色表示节点所在的线程阻塞了


  1. 后面如过有其他线程如 t3,t4 加读锁或者写锁,由于 t1 线程没有释放锁,会变成下面的状态。



上面是整个解锁的流程,下面深入源码验证这个流程。

源码解析

  1. 写锁加锁源码


WriteLock 类的 lock()方法是加写锁的入口方法。


static final class NonfairSync extends Sync {    // ... 省略无关代码     // 外部类 WriteLock 方法, 方便阅读, 放在此处    public void lock() {        sync.acquire(1);    }     // AQS 继承过来的方法, 方便阅读, 放在此处    public final void acquire(int arg) {        if (            // 尝试获得写锁失败                !tryAcquire(arg) &&                        // 将当前线程关联到一个 Node 对象上, 模式为独占模式                        // 进入 AQS 队列阻塞                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)        ) {            selfInterrupt();        }    }         protected final boolean tryAcquire(int acquires) {        // 获取当前线程        Thread current = Thread.currentThread();        //获得锁的状态        int c = getState();        // 获得低 16 位, 代表写锁的 state 计数        int w = exclusiveCount(c);         // c不等于0表示加了读锁或者写锁        if (c != 0) {            if (                // c != 0 and w == 0 表示有读锁返回错误,读锁不支持锁升级, 或者                    w == 0 ||                          // w != 0 说明有写锁,写锁的拥有者不是自己,获取失败                            current != getExclusiveOwnerThread()            ) {                // 获得锁失败                return false;            }            // 写锁计数超过低 16 位最大数量, 报异常            if (w + exclusiveCount(acquires) > MAX_COUNT)                throw new Error("Maximum lock count exceeded");            // 写锁重入, 获得锁成功,没有并发,所以不使用 CAS            setState(c + acquires);            return true;        }        if (             // c == 0,说明没有任何锁,判断写锁是否该阻塞,是 false 就尝试获取锁,失败返回 false                writerShouldBlock() ||                        // 尝试更改计数失败                        !compareAndSetState(c, c + acquires)        ) {            // 获得锁失败            return false;        }        // 获得锁成功,设置锁的持有线程为当前线程        setExclusiveOwnerThread(current);        return true;    }     // 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞    final boolean writerShouldBlock() {        return false;     }    // 公平锁会检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争    final boolean writerShouldBlock() {        return hasQueuedPredecessors();    }}
复制代码


  • tryAcquire()方法是模板方法,由子类自定义实现获取锁的逻辑。

  • 线程如果获取写锁失败的话,通过acquireQueued()方法封装成独占 Node 加入到 AQS 队列中。


2.读锁加锁源码


ReadLock类的lock()方法是加读锁的入口方法,调用tryAcquireShared()方法尝试获取读锁,返回负数,失败,加入到队列中。


// 加读锁的方法入口public void lock() {    sync.acquireShared(1);}public final void acquireShared(int arg) {    // tryAcquireShared 返回负数, 表示获取读锁失败,加入到队列中    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码


tryAcquireShared()方法是一个模板方法,AQS 类中定义语义,子类实现,如果返回 1,表示获取锁成功,还有剩余资源,返回 0 表示获取成功,没有剩余资源,返回-1 表示失败。


// 尝试以共享模式获取,返回1表示获取锁成功,还有剩余资源,返回0表示获取成功,没有剩余资源,返回-1,表示失败protected final int tryAcquireShared(int unused) {    Thread current = Thread.currentThread();    int c = getState();    // exclusiveCount(c) 代表低 16 位, 写锁的 state,成立说明有线程持有写锁    // 写锁的持有者不是当前线程,则获取读锁失败,【写锁允许降级】    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)        return -1;        // 高 16 位,代表读锁的 state,共享锁分配出去的总次数    int r = sharedCount(c);    // 读锁是否应该阻塞    if (!readerShouldBlock() &&  r < MAX_COUNT &&        compareAndSetState(c, c + SHARED_UNIT)) {    // 尝试增加读锁计数        // 加锁成功        // 加锁之前读锁为 0,说明当前线程是第一个读锁线程        if (r == 0) {            firstReader = current;            firstReaderHoldCount = 1;        // 第一个读锁线程是自己就发生了读锁重入        } else if (firstReader == current) {            firstReaderHoldCount++;        } else {            // cachedHoldCounter 设置为当前线程的 holdCounter 对象,即最后一个获取读锁的线程            HoldCounter rh = cachedHoldCounter;            // 说明还没设置 rh            if (rh == null || rh.tid != getThreadId(current))                // 获取当前线程的锁重入的对象,赋值给 cachedHoldCounter                cachedHoldCounter = rh = readHolds.get();            // 还没重入            else if (rh.count == 0)                readHolds.set(rh);            // 重入 + 1            rh.count++;        }        // 读锁加锁成功        return 1;    }    // 逻辑到这 应该阻塞,或者 cas 加锁失败    // 会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞    return fullTryAcquireShared(current);}
// 非公平锁 readerShouldBlock 偏向写锁一些,看 AQS 阻塞队列中第一个节点是否是写锁,是则阻塞,反之不阻塞// 防止一直有读锁线程,导致写锁线程饥饿// true 则该阻塞, false 则不阻塞final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive();}
// 下面是公平锁的readerShouldBlock// 公平锁会检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争final boolean readerShouldBlock() { return hasQueuedPredecessors();}
复制代码


  • fullTryAcquireShared()方法是通过自旋的方式不断获取读锁,因为由于前面的readerShouldBlock返回 false 或者 cas 失败,导致没有获取到锁,需要不断重试。


final int fullTryAcquireShared(Thread current) {    // 当前读锁线程持有的读锁次数对象    HoldCounter rh = null;    for (;;) {        int c = getState();        // 说明有线程持有写锁        if (exclusiveCount(c) != 0) {            // 写锁不是自己则获取锁失败            if (getExclusiveOwnerThread() != current)                return -1;        } else if (readerShouldBlock()) {            // 条件成立说明当前线程是 firstReader,当前锁是读忙碌状态,而且当前线程也是读锁重入            if (firstReader == current) {                // assert firstReaderHoldCount > 0;            } else {                if (rh == null) {                    // 最后一个读锁的 HoldCounter                    rh = cachedHoldCounter;                    // 说明当前线程也不是最后一个读锁                    if (rh == null || rh.tid != getThreadId(current)) {                        // 获取当前线程的 HoldCounter                        rh = readHolds.get();                        // 条件成立说明 HoldCounter 对象是上一步代码新建的                        // 当前线程不是锁重入,在 readerShouldBlock() 返回 true 时需要去排队                        if (rh.count == 0)                            // 防止内存泄漏                            readHolds.remove();                    }                }                if (rh.count == 0)                    return -1;            }        }        // 越界判断        if (sharedCount(c) == MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // 读锁加锁,条件内的逻辑与 tryAcquireShared 相同        if (compareAndSetState(c, c + SHARED_UNIT)) {            if (sharedCount(c) == 0) {                firstReader = current;                firstReaderHoldCount = 1;            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                if (rh == null)                    rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                rh.count++;                cachedHoldCounter = rh; // cache for release            }            return 1;        }    }}
复制代码


doAcquireShared()是在获取读锁失败的时候加入 AQS 队列的逻辑。


private void doAcquireShared(int arg) {    // 将当前线程关联到一个 Node 对象上, 模式为共享模式    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            // 获取前驱节点            final Node p = node.predecessor();            // 如果前驱节点就头节点就去尝试获取锁            if (p == head) {                // 再一次尝试获取读锁                int r = tryAcquireShared(arg);                // r >= 0 表示获取成功                if (r >= 0) {                    //【这里会设置自己为头节点,唤醒相连的后序的共享节点】                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            // 是否在获取读锁失败时阻塞                 park 当前线程            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码


  • setHeadAndPropagate()方法是在后续读锁被唤醒后,抢到锁要处理的逻辑,包括修改队列的头结点,以及唤醒队列中的下一个共享节点。


private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;     // 设置自己为 head 节点    setHead(node);    // propagate 表示有共享资源(例如共享读锁或信号量),为 0 就没有资源    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        // 获取下一个节点        Node s = node.next;        // 如果当前是最后一个节点,或者下一个节点是【等待共享读锁的节点】        if (s == null || s.isShared())            // 唤醒后继节点            doReleaseShared();    }}
复制代码

解锁原理

图解过程

由于上面 t1 线程加的写锁,所有其他的线程都被阻塞了,只有在 t1 线程解锁以后,其他线程才能被唤醒,我们现在看下 t1 线程被唤醒了,会发生什么?


  1. t1 线程执行解锁w.unlock()成功,修改 AQS 中的 state。



  • 这里的 state 变为了 0_0。

  • AQS 当前占用线程 exclusiveOwnerThread 属性变为 null。


  1. t1 线程唤醒队列中等待的老二, 为什么不是老大,因为老大是一个空节点,不会设置任何的线程。t2 线程被唤醒后,抢锁成功,修改 state 中高 16 位为 1。



  • 老二的线程节点变为蓝色节点

  • AQS 中的 state 变为 1_0。


  1. t2 线程恢复运行,设置原来的老二节点为头节点



  1. t2 线程要做的事情还没结束呢,因为是共享模式,它现在释放了,就此时也唤醒队列中的下一个共享节点。



  1. t3 线程恢复去竞争读锁成功,这时 state 的高位+1,变成 2。



  1. 这时候 t3 线程所在的 Node 设置为头节点,同时发现对列的下一个节点不是共享节点,而是独占节点,就不会唤醒后面的节点了。



  1. 之后 t2 线程和 t3 线程进入尾声,执行r.unlock操作,state 的计数减一,直到变为 0。



  1. 最后写锁线程 t4 被唤醒,去抢占锁成功,整个流程结束。



上面是整个解锁的流程,下面深入源码验证这个流程。

源码解析

  1. 写锁释放流程


WriteLock类的unlock()方法是入口方法,调用 tryRelease()方法释放锁,如果成功,调用unparkSuccessor()方法唤醒线程。


public void unlock() {    // 释放锁    sync.release(1);}public final boolean release(int arg) {    // 尝试释放锁    if (tryRelease(arg)) {        Node h = head;        // 头节点不为空并且不是等待状态不是 0,唤醒后继的非取消节点        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
复制代码


tryRelease()方法是 AQS 提供的模板方法,返回 true 表示成功,false 失败,由自定义同步器实现。


protected final boolean tryRelease(int releases) {    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    int nextc = getState() - releases;    // 因为可重入的原因, 写锁计数为 0, 才算释放成功    boolean free = exclusiveCount(nextc) == 0;    if (free)        // 设置占用线程为null        setExclusiveOwnerThread(null);    setState(nextc);    return free;}
复制代码


  1. 读锁释放流程


ReadLock类的unlock()方法是释放共享锁的入口方法。


public void unlock() {    sync.releaseShared(1);}public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}
复制代码


tryReleaseShared()方法是由 AQS 提供的模板方法,由自定义同步器实现。


protected final boolean tryReleaseShared(int unused) {    //自选    for (;;) {        int c = getState();        int nextc = c - SHARED_UNIT;        // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程,计数为 0 才是真正释放        if (compareAndSetState(c, nextc))            // 返回是否已经完全释放了             return nextc == 0;    }}
复制代码


调用doReleaseShared()方法唤醒等待的线程,这个方法调用的地方有两处,还记得吗,一个这是里的解锁,还有一个是前面加共享锁阻塞的地方,唤醒后获取锁成功,也会调用doReleaseShared()方法。


private void doReleaseShared() {    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark  // 如果 head.waitStatus == 0 ==> Node.PROPAGATE    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            // SIGNAL 唤醒后继            if (ws == Node.SIGNAL) {                // 因为读锁共享,如果其它线程也在释放读锁,那么需要将 waitStatus 先改为 0              // 防止 unparkSuccessor 被多次执行                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                  // 唤醒后继节点                unparkSuccessor(h);            }            // 如果已经是 0 了,改为 -3,用来解决传播性            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                        }        // 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的 head,        // 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点        if (h == head)                               break;    }}
复制代码

总结

本文讲解了读写锁ReentrantReadWriteLock的整个加锁、解锁的实现原理,并从源码的角度深入分析,希望对大家有帮助。

发布于: 刚刚阅读数: 4
用户头像

JAVA旭阳

关注

还未添加个人签名 2018-07-18 加入

还未添加个人简介

评论

发布
暂无评论
图解ReentrantReadWriteLock读写锁的实现原理_Java_JAVA旭阳_InfoQ写作社区