什么是读写锁
在上篇我们聊到了可重入锁ReentrantLcok ,但它也是一把独占锁(也叫排他锁),也就是说在同一时刻只能允许一个线程持有,但在大多数场景下,都是读多写少,并且读并不存在数据竞争的问题,因此也不存在线程安全问题,因此,如果这个时候去使用ReentrantLcok,效率必然低下,所以就有了读写锁ReentrantReadWriteLock
。
内部结构
读写锁也是基于 AQS 来实现的,其内部维护着一对锁,一个读锁(ReadLock
)和一个写锁(WriteLock
)。通过分离读锁和写锁,使得并发性比一般独占锁有着显著的提升。同时也具备公平锁和非公平锁的性质,因此在读写锁中主要基于非公平锁来分析。
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
复制代码
类图如下:
简单使用
读写锁一般分为以下三种情形:
用下面的代码来演示这 3 种情形:
1、线程 1 拿到写锁,因为写锁是独占锁,所以其他线程拿不到锁。
2、线程 2 去拿读锁。
3、线程 3 去拿读锁。
4、线程 4 再去拿写锁。
5、线程 5 去拿读锁。
问:t5
是读锁 会不会和t2、t3
一起执行?
@Slf4j(topic = "ayue")
public class RWTest {
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//读锁
Lock rl = rwl.readLock();
//写锁
Lock wl = rwl.writeLock();
/**
* t1是写锁,让它休眠 5s 此时,其他线程拿不到锁
*/
new Thread(() -> {
wl.lock();
try {
log.info("t1获取写锁");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
wl.unlock();
log.info("t1释放写锁");
}
}, "t1").start();
//休眠1s让其按 t1 先执行
TimeUnit.SECONDS.sleep(1);
/**
* 此时 t2 在 5s 内拿不到锁阻塞
*/
new Thread(() -> {
rl.lock();
try {
log.info("t2获取读锁");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rl.unlock();
log.info("t2释放读锁");
}
}, "t2").start();
/**
* 此时 t3 在 5s 内拿不到锁阻塞
*
* 但在 t1释放之后 t2 和 t3 能同时拿到锁
*/
new Thread(() -> {
rl.lock();
try {
log.info("t3获取读锁");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rl.unlock();
log.info("t3释放读锁");
}
}, "t3").start();
/**
* t4 获取读锁,并睡秒 10s 问释放后 t5能不能获取读锁
*/
new Thread(() -> {
wl.lock();
try {
log.info("t4获取写锁");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
wl.unlock();
log.info("t4释放写锁");
}
}, "t4").start();
/**
* t5 是读锁 会不会和t2 t3 一起执行?
* */
new Thread(() -> {
rl.lock();
try {
log.info("t5获取读锁");
} finally {
rl.unlock();
log.info("t5释放读锁");
}
}, "t5").start();
}
}
复制代码
输出:
11:43:36.390 [t1] INFO ayue - t1获取写锁
11:43:41.398 [t1] INFO ayue - t1释放写锁
11:43:41.398 [t2] INFO ayue - t2获取读锁
11:43:41.398 [t3] INFO ayue - t3获取读锁
11:43:42.411 [t3] INFO ayue - t3释放读锁
11:43:42.411 [t2] INFO ayue - t2释放读锁
11:43:42.411 [t4] INFO ayue - t4获取写锁
11:43:52.423 [t4] INFO ayue - t4释放写锁
11:43:52.423 [t5] INFO ayue - t5获取读锁
11:43:52.423 [t5] INFO ayue - t5释放读锁
复制代码
根据结果我们可以知道:
假如先有一个t1
写锁拿到锁,后面有一些其他锁(可能是读锁或者写锁),当t1
释放锁之后按照 FIFO 的原则唤醒等待的线程如果第一个被唤醒线程是写锁(假如t2
是写锁)则不会再跟着唤醒t3
及后续的锁,只有等t2
执行完成之后才会去唤醒t3
。假设被唤醒的t2
是读锁,那么t2
会去判断他的下一个t3
是不是读锁,如果是则把t3
唤醒,t3
唤醒之后会判断t4
是不是读锁,如果t4
也是则唤醒t5
,依次类推。但是如果t4
被唤醒是写锁则不会唤醒t5
了,即使后面的t5
是读锁也不会唤醒。
因此就会出现上面的情况t2、t3
一起执行,t5
不会一起执行。
锁降级
锁降级是指先获取写锁,再获取读锁,然后再释放写锁的过程 。锁降级是为了保证数据的可见性。锁降级是 ReentrantReadWriteLock 重要特性之一。
public class RWDowngrade {
public static void main(String[] args) {
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//读锁
Lock rl = rwl.readLock();
//写锁
Lock wl = rwl.writeLock();
/**
* 锁降级
*/
new Thread(() -> {
wl.lock();
try {
System.out.println("先获取写锁");
rl.lock();
System.out.println("后获取读锁");
} finally {
rl.unlock();
wl.unlock();
}
}, "t").start();
}
}
复制代码
输出:
注意:ReentrantReadWriteLock 并不能实现锁升级。(在源码中将会知道原因)
如下,如果先获取读锁在获取写锁,则会死锁:
new Thread(() -> {
rl.lock();
try {
System.out.println("先获取读锁");
wl.lock();
System.out.println("后获取写锁");
} finally {
wl.unlock();
rl.unlock();
}
}, "t").start();
复制代码
输出,直接卡住:
源码分析
ReentrantReadWriteLock 和 ReentrantLock 其实都一样,锁核心都是 Sync, 读锁和写锁都是基于 Sync 来实现的。从这分析其实 ReentrantReadWriteLock 就是一个锁,只不过内部根据不同的场景设计了两个不同的实现方式。其读写锁为两个内部类: ReadLock、WriteLock 都实现了 Lock 接口。
读写锁也是基于 AQS 实现的,但我们知道在 AQS 中是根据其中的state
而且是int
整型变量来判断同步状态的,而读写锁是两把锁,怎么通过一个属性来表示两把锁呢?
如何在一个整型上维护多种状态,我们知道int
类型是 32 位的,因此我们需要按位切割这个变量,读写锁将变量切割成两部分,高 16 位表示读,低 16 位表示写。
分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算(忘记位运算的可以看看我之前的文章:Java中的位运算),假如当前同步状态为 c,那么写状态等于 c & 0x0000FFFF
(将高 16 位全部抹去),读状态等于c >>> 16
(无符号补 0 右移 16 位)。
代码如下:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回读状态 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写状态 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
根据状态的划分能得出一个推论:c 不等于 0 时,当写状态等于 0 时,则读状态大于 0,即读锁已被获取。 在源码中也能看到这样的注释。
(Note: if c != 0 and w == 0 then shared count != 0)
复制代码
写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为 0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁的获取
写锁的获取主要在于tryAcquire()
方法,源码如下:
protected final boolean tryAcquire(int acquires) {
//当前线程
Thread current = Thread.currentThread();
//获取锁的状态
int c = getState();
//写锁状态
int w = exclusiveCount(c);
if (c != 0) {//如果已经上锁
/**
* 1、w == 0,用来判断当前锁是什么锁,w为0则表示这把锁只上过读锁,没上过写锁
* 也就是说当前线程是来获取读锁的,但本来该方法是用来获取写锁,因此当前线程
* 相当于锁升级,故而获取锁失败。
* 2、如果进入 ||,则 w != 0,说明这已经上了写锁,判断当前线程是不是重入,如果不是失败。
*/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
/**
* 如果获取写锁的数量超过最大值65535 ,直接异常,一般都不会超过。
*/
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
/**
* writerShouldBlock(),简单来说就是判断是否需要排队。
* 如果是非公平锁,直接返回false,即不管有没有线程排队直接抢锁。
* 如果是公平锁,其底层调用了hasQueuedPredecessors()方法,即会判断队列中是否还存在排队的线程,如果有就 * 排队,没有则尝试加锁。
*/
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
复制代码
通过分析源码可知:
如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的释放
写锁的释放与ReentrantLock的释放过程基本类似,上篇文章也有讲解,可自行查看,这里不在重复。
读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为 0)时,读锁总会被成功地获取。需要注意的是,如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
读锁的获取
写锁的获取主要在于tryAcquireShared()
方法,源码如下:
//尝试获取共享状态,如果共享状态大于等于0则说明获取锁成功,否则加入同步队列。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/*
* 1、exclusiveCount(c) != 0,判断是否被上了写锁。
* 2、如果上了写锁,进入&&,为什么上了写锁还要继续判断?这就是锁的降级。
* 如果重入则一定是降级,如果不是重入则失败,因为读写需要互斥。
*/
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
/**
* 如果上面代码没有返回执行到这里有两种情况标识
* 1、没有人上写锁
* 2、重入降级
*/
int r = sharedCount(c);
/**
* readerShouldBlock():判断锁是否需要等待
* r < MAX_COUNT:判断锁的数量是否超过最大值65535
* compareAndSetState(c, c + SHARED_UNIT): 设置共享状态(读锁状态)
*/
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// r==0,表示当前没有任何线程获取读锁
if (r == 0) {
// 设置当前线程为第一个获取读锁的线程
firstReader = current;
// 计数设置为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 表示重入锁,在计数其上+1
firstReaderHoldCount++;
} else {
/**
* HoldCounter 主要是一个类来记录线程获取锁的数量
* cachedHoldCounter 缓存的是最后一个获取锁线程的HoldCounter对象
*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//自旋方式再次尝试获取
return fullTryAcquireShared(current);
}
复制代码
在tryAcquireShared(int unused)
方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠 CAS 保证)增加读状态,成功获取读锁。
读锁的释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//判断当前线程释放是第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 判断获取锁的次数释放为1,如果为1说明没有重入情况,直接释放firstReader = null;否则将该线程持有锁的数量 -1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果当前线程不是第一个获取读锁的线程。
// 获取缓存中的HoldCounter
HoldCounter rh = cachedHoldCounter;
// 如果缓存中的HoldCounter 不属于当前线程则获取当前线程的HoldCounter。
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 如果线程持有锁的数量小于等1 直接删除HoldCounter
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 持有锁数量大于1 则执行 - 1操作
--rh.count;
}
// 自旋释放同步状态
for (; ; ) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
复制代码
锁的释放比较简单,首先看当前线程是否是第一个获取读锁的线程,如果是并且没有发生重入,则将首次获取读锁变量设为 null, 如果发生重入,则将首次获取读锁计数器 -1 其次 查看缓存中计数器是否为空或者是否是当前线程,如果为空或者不是则获取当前线程的计数器,如果计数器个数小于等 1, 从 ThreadLocl 中删除计数器,并计数器值-1,如果小于等于 0 异常 。最后自旋修改同步状态。
高性能读写锁 StampedLock
ReentrantReadWriteLock 的性能已经很好了,但是底层还是需要进行一系列的 CAS 操作去加锁,因此 JUC 还提供了另外一个读写锁 StampedLock。
StampedLock 如果是读锁上锁是没有这种 CAS 操作的,因此性能比 ReentrantReadWriteLock 更好,也称为乐观读锁,即读获取锁的时候不加锁,直接返回一个值,然后执行临界区的时候去验证这个值是否有被人修改(写操作加锁)如果没有被人修改则直接执行临界区的代码,如果被人修改了则需要升级为读写锁 ReadLock 。
基本语法
//获取戳 不存在锁
long stamp = lock.tryOptimisticRead();
//验证戳
if(lock.validate(stamp)){
//成立则执行临界区的代码
//返回
}
//如果没有返回则表示被人修改了 需要升级成为readLock
lock.readLock();
复制代码
代码示例
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
@Slf4j(topic = "ayue")
public class DataContainer {
int i;
long stampw = 0l;
public void setI(int i) {
this.i = i;
}
private final StampedLock lock = new StampedLock(); //首先 加 StampedLock
@SneakyThrows
public int read() {
//尝试一次乐观读
long stamp = lock.tryOptimisticRead();
log.debug("StampedLock 读锁拿到的戳{}", stamp);
//1s之后验戳
TimeUnit.SECONDS.sleep(1);
//验戳
if (lock.validate(stamp)) {
log.debug("StampedLock 验证完毕stamp{}, data.i:{}", stamp, i);
return i;
}
//一定验证失败
log.debug("验证失败 被写线程给改变了{}", stampw);
try {
//锁的升级 也会改戳
stamp = lock.readLock();
log.debug("升级之后的加锁成功 {}", stamp);
TimeUnit.SECONDS.sleep(1);
log.debug("升级读锁完毕{}, data.i:{}", stamp, i);
return i;
} finally {
log.debug("升级锁解锁 {}", stamp);
lock.unlockRead(stamp);
}
}
@SneakyThrows
public void write(int i) {
//cas 加锁
stampw = lock.writeLock();
log.debug("写锁加锁成功 {}", stampw);
try {
TimeUnit.SECONDS.sleep(5);
this.i = i;
} finally {
log.debug("写锁解锁 {},data.i:{}", stampw, i);
lock.unlockWrite(stampw);
}
}
public static void main(String[] args) throws InterruptedException {
//实例化数据容器
DataContainer dataContainer = new DataContainer();
//给了一个初始值 不算写 构造方法赋值
dataContainer.setI(1);
//读取
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
// new Thread(() -> {
// dataContainer.write(9);
// }, "t3").start();
}
}
复制代码
都是读锁输出(t1,t2):
11:58:48.839 [t1] DEBUG ayue - StampedLock 读锁拿到的戳256
11:58:48.839 [t2] DEBUG ayue - StampedLock 读锁拿到的戳256
11:58:49.854 [t2] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:1
11:58:49.854 [t1] DEBUG ayue - StampedLock 验证完毕stamp256, data.i:1
复制代码
如果有修改(t1,t3):
12:07:36.234 [t1] DEBUG ayue - StampedLock 读锁拿到的戳256
12:07:37.238 [t3] DEBUG ayue - 写锁加锁成功 384
12:07:41.213 [t1] DEBUG ayue - 验证失败 被写线程给改变了384
12:07:42.240 [t3] DEBUG ayue - 写锁解锁 384,data.i:9
12:07:42.240 [t1] DEBUG ayue - 升级之后的加锁成功 513
12:07:43.252 [t1] DEBUG ayue - 升级读锁完毕513, data.i:9
12:07:43.252 [t1] DEBUG ayue - 升级锁解锁 513
复制代码
StampedLock 能否替代 ReentrantReadWriteLock
StampedLock 性能虽然高于 ReentrantReadWriteLock,但在使用过程中也需要按场景,StampedLock 具有一下特点:
不支持重入
不支持条件队列
存在一定的并发问题
总结
在线程持有读锁的情况下,该线程不能取得写锁(为了保证写操作对后续所有的读操作保持可见性)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁。
写锁可以“降级”为读锁,读锁不能“升级”为写锁。
评论