什么是读写锁
在上篇我们聊到了可重入锁ReentrantLcok ,但它也是一把独占锁(也叫排他锁),也就是说在同一时刻只能允许一个线程持有,但在大多数场景下,都是读多写少,并且读并不存在数据竞争的问题,因此也不存在线程安全问题,因此,如果这个时候去使用ReentrantLcok,效率必然低下,所以就有了读写锁ReentrantReadWriteLock。
内部结构
读写锁也是基于 AQS 来实现的,其内部维护着一对锁,一个读锁(ReadLock)和一个写锁(WriteLock)。通过分离读锁和写锁,使得并发性比一般独占锁有着显著的提升。同时也具备公平锁和非公平锁的性质,因此在读写锁中主要基于非公平锁来分析。
/** Inner class providing readlock */private final ReentrantReadWriteLock.ReadLock readerLock;/** Inner class providing writelock */private final ReentrantReadWriteLock.WriteLock writerLock;
复制代码
类图如下:
简单使用
读写锁一般分为以下三种情形:
用下面的代码来演示这 3 种情形:
1、线程 1 拿到写锁,因为写锁是独占锁,所以其他线程拿不到锁。
2、线程 2 去拿读锁。
3、线程 3 去拿读锁。
4、线程 4 再去拿写锁。
5、线程 5 去拿读锁。
问:t5 是读锁 会不会和t2、t3 一起执行?
@Slf4j(topic = "ayue")public class RWTest {
public static void main(String[] args) throws InterruptedException { ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //读锁 Lock rl = rwl.readLock(); //写锁 Lock wl = rwl.writeLock();
/** * t1是写锁,让它休眠 5s 此时,其他线程拿不到锁 */ new Thread(() -> { wl.lock(); try { log.info("t1获取写锁"); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { wl.unlock(); log.info("t1释放写锁"); } }, "t1").start();
//休眠1s让其按 t1 先执行 TimeUnit.SECONDS.sleep(1);
/** * 此时 t2 在 5s 内拿不到锁阻塞 */ new Thread(() -> { rl.lock(); try { log.info("t2获取读锁"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); log.info("t2释放读锁"); } }, "t2").start();
/** * 此时 t3 在 5s 内拿不到锁阻塞 * * 但在 t1释放之后 t2 和 t3 能同时拿到锁 */ new Thread(() -> { rl.lock(); try { log.info("t3获取读锁"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { rl.unlock(); log.info("t3释放读锁"); } }, "t3").start();
/** * t4 获取读锁,并睡秒 10s 问释放后 t5能不能获取读锁 */ new Thread(() -> { wl.lock(); try { log.info("t4获取写锁"); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { wl.unlock(); log.info("t4释放写锁"); } }, "t4").start();
/** * t5 是读锁 会不会和t2 t3 一起执行? * */ new Thread(() -> { rl.lock(); try { log.info("t5获取读锁"); } finally { rl.unlock(); log.info("t5释放读锁"); } }, "t5").start(); }}
复制代码
输出:
11:43:36.390 [t1] INFO ayue - t1获取写锁11:43:41.398 [t1] INFO ayue - t1释放写锁11:43:41.398 [t2] INFO ayue - t2获取读锁11:43:41.398 [t3] INFO ayue - t3获取读锁11:43:42.411 [t3] INFO ayue - t3释放读锁11:43:42.411 [t2] INFO ayue - t2释放读锁11:43:42.411 [t4] INFO ayue - t4获取写锁11:43:52.423 [t4] INFO ayue - t4释放写锁11:43:52.423 [t5] INFO ayue - t5获取读锁11:43:52.423 [t5] INFO ayue - t5释放读锁
复制代码
根据结果我们可以知道:
假如先有一个t1写锁拿到锁,后面有一些其他锁(可能是读锁或者写锁),当t1释放锁之后按照 FIFO 的原则唤醒等待的线程如果第一个被唤醒线程是写锁(假如t2是写锁)则不会再跟着唤醒t3及后续的锁,只有等t2执行完成之后才会去唤醒t3。假设被唤醒的t2是读锁,那么t2会去判断他的下一个t3是不是读锁,如果是则把t3唤醒,t3唤醒之后会判断t4是不是读锁,如果t4也是则唤醒t5,依次类推。但是如果t4被唤醒是写锁则不会唤醒t5了,即使后面的t5是读锁也不会唤醒。
因此就会出现上面的情况t2、t3 一起执行,t5不会一起执行。
锁降级
锁降级是指先获取写锁,再获取读锁,然后再释放写锁的过程 。锁降级是为了保证数据的可见性。锁降级是 ReentrantReadWriteLock 重要特性之一。
public class RWDowngrade {
public static void main(String[] args) { ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //读锁 Lock rl = rwl.readLock(); //写锁 Lock wl = rwl.writeLock();
/** * 锁降级 */ new Thread(() -> { wl.lock(); try { System.out.println("先获取写锁"); rl.lock(); System.out.println("后获取读锁"); } finally { rl.unlock(); wl.unlock(); } }, "t").start(); }}
复制代码
输出:
注意:ReentrantReadWriteLock 并不能实现锁升级。(在源码中将会知道原因)
如下,如果先获取读锁在获取写锁,则会死锁:
new Thread(() -> { rl.lock(); try { System.out.println("先获取读锁"); wl.lock(); System.out.println("后获取写锁"); } finally { wl.unlock(); rl.unlock(); }}, "t").start();
复制代码
输出,直接卡住:
源码分析
ReentrantReadWriteLock 和 ReentrantLock 其实都一样,锁核心都是 Sync, 读锁和写锁都是基于 Sync 来实现的。从这分析其实 ReentrantReadWriteLock 就是一个锁,只不过内部根据不同的场景设计了两个不同的实现方式。其读写锁为两个内部类: ReadLock、WriteLock 都实现了 Lock 接口。
读写锁也是基于 AQS 实现的,但我们知道在 AQS 中是根据其中的state而且是int 整型变量来判断同步状态的,而读写锁是两把锁,怎么通过一个属性来表示两把锁呢?
如何在一个整型上维护多种状态,我们知道int类型是 32 位的,因此我们需要按位切割这个变量,读写锁将变量切割成两部分,高 16 位表示读,低 16 位表示写。
分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算(忘记位运算的可以看看我之前的文章:Java中的位运算),假如当前同步状态为 c,那么写状态等于 c & 0x0000FFFF(将高 16 位全部抹去),读状态等于c >>> 16(无符号补 0 右移 16 位)。
代码如下:
static final int SHARED_SHIFT = 16;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回读状态 */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写状态 */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
根据状态的划分能得出一个推论:c 不等于 0 时,当写状态等于 0 时,则读状态大于 0,即读锁已被获取。 在源码中也能看到这样的注释。
(Note: if c != 0 and w == 0 then shared count != 0)
复制代码
写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为 0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁的获取
写锁的获取主要在于tryAcquire()方法,源码如下:
protected final boolean tryAcquire(int acquires) { //当前线程 Thread current = Thread.currentThread(); //获取锁的状态 int c = getState(); //写锁状态 int w = exclusiveCount(c); if (c != 0) {//如果已经上锁 /** * 1、w == 0,用来判断当前锁是什么锁,w为0则表示这把锁只上过读锁,没上过写锁 * 也就是说当前线程是来获取读锁的,但本来该方法是用来获取写锁,因此当前线程 * 相当于锁升级,故而获取锁失败。 * 2、如果进入 ||,则 w != 0,说明这已经上了写锁,判断当前线程是不是重入,如果不是失败。 */ if (w == 0 || current != getExclusiveOwnerThread()) return false; /** * 如果获取写锁的数量超过最大值65535 ,直接异常,一般都不会超过。 */ if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } /** * writerShouldBlock(),简单来说就是判断是否需要排队。 * 如果是非公平锁,直接返回false,即不管有没有线程排队直接抢锁。 * 如果是公平锁,其底层调用了hasQueuedPredecessors()方法,即会判断队列中是否还存在排队的线程,如果有就 * 排队,没有则尝试加锁。 */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true;}
复制代码
通过分析源码可知:
如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的释放
写锁的释放与ReentrantLock的释放过程基本类似,上篇文章也有讲解,可自行查看,这里不在重复。
读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为 0)时,读锁总会被成功地获取。需要注意的是,如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
读锁的获取
写锁的获取主要在于tryAcquireShared()方法,源码如下:
//尝试获取共享状态,如果共享状态大于等于0则说明获取锁成功,否则加入同步队列。protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); /* * 1、exclusiveCount(c) != 0,判断是否被上了写锁。 * 2、如果上了写锁,进入&&,为什么上了写锁还要继续判断?这就是锁的降级。 * 如果重入则一定是降级,如果不是重入则失败,因为读写需要互斥。 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; /** * 如果上面代码没有返回执行到这里有两种情况标识 * 1、没有人上写锁 * 2、重入降级 */ int r = sharedCount(c); /** * readerShouldBlock():判断锁是否需要等待 * r < MAX_COUNT:判断锁的数量是否超过最大值65535 * compareAndSetState(c, c + SHARED_UNIT): 设置共享状态(读锁状态) */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // r==0,表示当前没有任何线程获取读锁 if (r == 0) { // 设置当前线程为第一个获取读锁的线程 firstReader = current; // 计数设置为1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 表示重入锁,在计数其上+1 firstReaderHoldCount++; } else { /** * HoldCounter 主要是一个类来记录线程获取锁的数量 * cachedHoldCounter 缓存的是最后一个获取锁线程的HoldCounter对象 */ HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //自旋方式再次尝试获取 return fullTryAcquireShared(current);}
复制代码
在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠 CAS 保证)增加读状态,成功获取读锁。
读锁的释放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //判断当前线程释放是第一个获取读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; // 判断获取锁的次数释放为1,如果为1说明没有重入情况,直接释放firstReader = null;否则将该线程持有锁的数量 -1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果当前线程不是第一个获取读锁的线程。 // 获取缓存中的HoldCounter HoldCounter rh = cachedHoldCounter; // 如果缓存中的HoldCounter 不属于当前线程则获取当前线程的HoldCounter。 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 如果线程持有锁的数量小于等1 直接删除HoldCounter readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 持有锁数量大于1 则执行 - 1操作 --rh.count; } // 自旋释放同步状态 for (; ; ) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; }}
复制代码
锁的释放比较简单,首先看当前线程是否是第一个获取读锁的线程,如果是并且没有发生重入,则将首次获取读锁变量设为 null, 如果发生重入,则将首次获取读锁计数器 -1 其次 查看缓存中计数器是否为空或者是否是当前线程,如果为空或者不是则获取当前线程的计数器,如果计数器个数小于等 1, 从 ThreadLocl 中删除计数器,并计数器值-1,如果小于等于 0 异常 。最后自旋修改同步状态。
高性能读写锁 StampedLock
ReentrantReadWriteLock 的性能已经很好了,但是底层还是需要进行一系列的 CAS 操作去加锁,因此 JUC 还提供了另外一个读写锁 StampedLock。
StampedLock 如果是读锁上锁是没有这种 CAS 操作的,因此性能比 ReentrantReadWriteLock 更好,也称为乐观读锁,即读获取锁的时候不加锁,直接返回一个值,然后执行临界区的时候去验证这个值是否有被人修改(写操作加锁)如果没有被人修改则直接执行临界区的代码,如果被人修改了则需要升级为读写锁 ReadLock 。
基本语法
//获取戳 不存在锁 long stamp = lock.tryOptimisticRead(); //验证戳 if(lock.validate(stamp)){ //成立则执行临界区的代码 //返回 }//如果没有返回则表示被人修改了 需要升级成为readLock lock.readLock();
复制代码
代码示例
import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.StampedLock;
@Slf4j(topic = "ayue")public class DataContainer { int i; long stampw = 0l;
public void setI(int i) { this.i = i; }
private final StampedLock lock = new StampedLock(); //首先 加 StampedLock
@SneakyThrows public int read() { //尝试一次乐观读 long stamp = lock.tryOptimisticRead(); log.debug("StampedLock 读锁拿到的戳{}", stamp); //1s之后验戳 TimeUnit.SECONDS.sleep(1); //验戳 if (lock.validate(stamp)) { log.debug("StampedLock 验证完毕stamp{}, data.i:{}", stamp, i); return i; } //一定验证失败 log.debug("验证失败 被写线程给改变了{}", stampw); try { //锁的升级 也会改戳 stamp = lock.readLock(); log.debug("升级之后的加锁成功 {}", stamp);
TimeUnit.SECONDS.sleep(1); log.debug("升级读锁完毕{}, data.i:{}", stamp, i); return i; } finally { log.debug("升级锁解锁 {}", stamp); lock.unlockRead(stamp); } }
@SneakyThrows public void write(int i) { //cas 加锁 stampw = lock.writeLock(); log.debug("写锁加锁成功 {}", stampw); try { TimeUnit.SECONDS.sleep(5); this.i = i; } finally { log.debug("写锁解锁 {},data.i:{}", stampw, i); lock.unlockWrite(stampw); } }
public static void main(String[] args) throws InterruptedException { //实例化数据容器 DataContainer dataContainer = new DataContainer(); //给了一个初始值 不算写 构造方法赋值 dataContainer.setI(1); //读取 new Thread(() -> { dataContainer.read(); }, "t1").start();
new Thread(() -> { dataContainer.read(); }, "t2").start();
// new Thread(() -> {// dataContainer.write(9);// }, "t3").start(); }}
复制代码
都是读锁输出(t1,t2):
11:58:48.839 [t1] DEBUG ayue - StampedLock 读锁拿到的戳25611:58:48.839 [t2] DEBUG ayue - StampedLock 读锁拿到的戳25611:58:49.854 [t2] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:111:58:49.854 [t1] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:1
复制代码
如果有修改(t1,t3):
12:07:36.234 [t1] DEBUG ayue - StampedLock 读锁拿到的戳25612:07:37.238 [t3] DEBUG ayue - 写锁加锁成功 38412:07:41.213 [t1] DEBUG ayue - 验证失败 被写线程给改变了38412:07:42.240 [t3] DEBUG ayue - 写锁解锁 384,data.i:912:07:42.240 [t1] DEBUG ayue - 升级之后的加锁成功 51312:07:43.252 [t1] DEBUG ayue - 升级读锁完毕513, data.i:912:07:43.252 [t1] DEBUG ayue - 升级锁解锁 513
复制代码
StampedLock 能否替代 ReentrantReadWriteLock
StampedLock 性能虽然高于 ReentrantReadWriteLock,但在使用过程中也需要按场景,StampedLock 具有一下特点:
不支持重入
不支持条件队列
存在一定的并发问题
总结
在线程持有读锁的情况下,该线程不能取得写锁(为了保证写操作对后续所有的读操作保持可见性)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁。
写锁可以“降级”为读锁,读锁不能“升级”为写锁。
评论