写点什么

并发编程 -ReentrantLook 底层设计

作者:Java你猿哥
  • 2023-05-11
    湖南
  • 本文字数:6057 字

    阅读完需:约 20 分钟

重入锁

顾名思义:就是可以重入的互斥锁,但是这个重入是有条件的,允许同一个线程多次获得同一个锁,避免了死锁的发生。

重入锁在实现上比 synchronized 关键字更加灵活,提供了一些额外的特性,比如可定时的锁等待(tryLock)、可中断的锁等待(lockInterruptibly)、公平性等。另外,使用重入锁需要手动加锁和解锁,使得线程控制更加精细。

使用重入锁可以避免死锁的发生,提高程序的可靠性。但是,过多地使用重入锁可能会影响程序的性能,因为重入锁需要进行额外的状态管理和线程切换操作。因此,在使用重入锁时需要谨慎考虑锁的范围和粒度,以及锁的公平性和性能等方面的问题。

这里可能有人对 tryLock 和 lockInterruptibly 有疑问 具体的使用方式如下:

  • tryLock

 ReentrantLock lock = new ReentrantLock(); if (lock.tryLock()) {     try {         // 获得锁后的操作     } finally {         lock.unlock();     } } else {     // 获取锁失败的操作 }
复制代码

它的作用可以让线程在获取锁的时候最多等待一定的时间,如果在等待时间内没有获取到锁,则返回 false,表示获取锁失败

  • lockInterruptibly

  Thread t = new Thread(() -> {      ReentrantLock lock = new ReentrantLock();      try {          lock.lockInterruptibly();          try {              // 获得锁后的操作              while (!Thread.currentThread().isInterrupted()) {                  System.out.println("1");              }          } finally {              lock.unlock();          }      } catch (InterruptedException e) {          // 当前线程被中断的操作          Thread.currentThread().interrupt();      }  });  t.start();  Thread.sleep(1000);  t.interrupt();
复制代码

作用就是可以抛出 InterruptedException 异常,所以线程可以使用 interrupt 的方式被中断

Lock

Lock(锁)是一种线程同步机制,用于实现互斥和协作访问共享资源。与 synchronized 关键字相比,Lock 接口提供了更加灵活的锁实现,可以实现更加精细的线程控制和高级功能。

其实 Lock 就是一个接口,定义了锁的一些方法:


  1. lock():获取锁,如果锁被其他线程持有,则线程会阻塞等待锁的释放。

  2. unlock():释放锁,如果当前线程持有锁,则释放锁,否则会抛出 IllegalMonitorStateException 异常。

  3. tryLock():尝试获取锁,如果锁当前没有被其他线程持有,则获取锁并立即返回 true,否则立即返回 false,不会阻塞线程。

  4. tryLock(long time, TimeUnit unit):在一定时间内尝试获取锁,如果锁在给定的时间内未被其他线程持有,则获取锁并立即返回 true,否则等待指定时间,如果在指定时间内还未获取到锁,则返回 false。

  5. newCondition():获取与该锁关联的 Condition 对象,用于实现等待/通知模式。

ReentrantLock

ReentrantLock 就是 Lock 的一种实现:


而 ReentrantLock 是基于 AbstractQueuedSynchronizer 实现的可重入锁

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 基于 FIFO 等待队列的同步器框架,它为自定义同步器的实现提供了一种底层框架和算法,可以方便地实现各种类型的同步器,用于存储等待获取锁的线程 我们先看一下 AQS 的代码结构:


lock 源码实现


发现是调用了 sync.lock();而 sync 就是一个内部类并且集成 AQS:


并且我们可以发现 sync 是个抽象类一共有两个实现:


也就是公平锁和非公平锁(下文会说到详细的实现)至于用哪个是在实例化的时候赋值的如下:


接着看 sync.lock()源码实现如下:


这里我们就拿非公平锁举例:源码实现是这样的

initialTryLock

final boolean initialTryLock() {    Thread current = Thread.currentThread();    if (compareAndSetState(0, 1)) { // first attempt is unguarded        setExclusiveOwnerThread(current);        return true;    } else if (getExclusiveOwnerThread() == current) {        int c = getState() + 1;        if (c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    } else        return false;}
复制代码

compareAndSetState 方法使用了 unsafe 类的一个方法具体实现在 jvm 里面



所以这个方法呢就是对比 state 状态如果是 0 则说明当前没有线程占用则会赋值 1 然后调用 setExclusiveOwnerThread,而 setExclusiveOwnerThread 方法就是给 exclusiveOwnerThread 赋值记录一下当前这个锁是被这个线程占用的:

protected final void setExclusiveOwnerThread(Thread thread) {    exclusiveOwnerThread = thread;}
复制代码

如果不是 state 不是 0 则说明当前的锁被占用了,所以此时需要判断被占用的线程是否是自己。


这里也比较简单就是 state 加一加好了,然后返回 true,当前线程获取了所以可以执行对应的代码块否则就会返回 false

acquire

上文我们知道如果锁被占用了 initialTryLock 会返回 false 则此时会执行 acquire 方法

public final void acquire(int arg) {    if (!tryAcquire(arg))        acquire(null, arg, false, false, false, 0L);}
复制代码

首先是 tryAcquire

protected final boolean tryAcquire(int acquires) {    if (getState() == 0 && compareAndSetState(0, acquires)) {        setExclusiveOwnerThread(Thread.currentThread());        return true;    }    return false;}
复制代码

这里又尝试着获取锁,因为可能到这一步刚好这个锁就释放掉了所以就可以直接获取到锁如果还是没有获取到锁才会走到 acquire 方法

final int acquire(Node node, int arg, boolean shared,                  boolean interruptible, boolean timed, long time) {    Thread current = Thread.currentThread();    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread    boolean interrupted = false, first = false;    Node pred = null;                // predecessor of node when enqueued
for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible);}
复制代码

梳理一下逻辑:

初始变量:

spins = 0,

postSpins = 0;

interrupted = false,

first = false;

pred = null;

node = null

share = false

第一次循环:

第一个 if 中 !first -> true (pred = null) != null -> false 所以第一个 if 不执行

第二个 if (first || pred == null) pred == null -> true

所以此时执行了,执行的结果其实就是再一次尝试获取锁,获取成功就返回了否自

继续往下执行

第三个 if(node == null)第一个条件命中 node = new ExclusiveNode();

第一次循环结束

第二次循环

此时条件有一些变化

spins = 0,

postSpins = 0;

interrupted = false,

first = false;

pred = null;

node = new ExclusiveNode()

share = false

第一个 if:(pred = (node == null) ? null : node.prev) != null -> false 所以没有执行

第二个 if: pred == null -> true 一样的继续尝试获取锁

第三个 if:走到了 else if (pred == null) 这里

这里将 waiter 赋值给当前线程

然后将 node->prev 指向 tail,但是此时 tail = null

继续走到 tryInitializeHead 方法

private void tryInitializeHead() {

Node h = new ExclusiveNode();

if (U.compareAndSetReference(this, HEAD, null, h))

tail = h;

}

就是新创建一个 node,然后 head 和 tail 都指向新节点 h

第三次循环

第一个 if 和第二个 if 不说了还是一样

到第三个 else if (pred == null) {

此时就会将 node 结点的 prev 指针指向 tail,然后 tail 的 next 指针指向 node

至此一个链表的结构也就行程了

第四次循环

直接看最后一个 if 直接走到了最后一个 else

LockSupport.park(this);

开始阻塞线程,等待被唤醒了

unlock 源码实现

看完上文的逻辑这里就很简单了

public void unlock() {    sync.release(1);}
复制代码


public final boolean release(int arg) {    if (tryRelease(arg)) {        signalNext(head);        return true;    }    return false;}
复制代码


@ReservedStackAccessprotected final boolean tryRelease(int releases) {    //这里其实就是将state-1    int c = getState() - releases;    if (getExclusiveOwnerThread() != Thread.currentThread())       //如果当前线程和记录的所占用的线程不符合则抛出异常        throw new IllegalMonitorStateException();        //判断是否释放成功了    //释放成功的条件是state == 0    boolean free = (c == 0);    if (free)        //如果释放成功则修改当前占用线程为null        setExclusiveOwnerThread(null);    setState(c);    return free;}
复制代码

如果没有释放成功则继续不修改锁的状态,释放成功了则会走到 signalNext 方法

private static void signalNext(Node h) {    Node s;    if (h != null && (s = h.next) != null && s.status != 0) {        s.getAndUnsetStatus(WAITING);        LockSupport.unpark(s.waiter);    }}
复制代码

可以发现是将当前等待的线程唤醒并且设置 status 为 0(这里也可以看出是 FIFO 结构,去的是头结点的 next 结点)这又回到了上文中的循环会走到第二个 if

if (first || pred == null) {    boolean acquired;    try {        if (shared)            acquired = (tryAcquireShared(arg) >= 0);        else            acquired = tryAcquire(arg);    } catch (Throwable ex) {        cancelAcquire(node, interrupted, false);        throw ex;    }    if (acquired) {        if (first) {            node.prev = null;            head = node;            pred.next = null;            node.waiter = null;            if (shared)                signalNextIfShared(node);            if (interrupted)                current.interrupt();        }        return 1;    }}
复制代码

尝试获取锁,如果获取不到

 else if (node.status == 0) {    node.status = WAITING;          // enable signal and recheck}
复制代码

又继续将 status 置为 1

公平性体现

我们可以看一下 initialTryLock 代码的实现 FairSync:

final boolean initialTryLock() {    Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {            setExclusiveOwnerThread(current);            return true;        }    } else if (getExclusiveOwnerThread() == current) {        if (++c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    }    return false;}
复制代码

在这里会先去判断一下当前的锁是不是被占用了,被占用的是谁,而且这里还多了一个判断是线程队列中是否有等待的线程,如果有直接返回 false 了 而后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,所以这个是公平的 NonfairSync:

final boolean initialTryLock() {    Thread current = Thread.currentThread();    if (compareAndSetState(0, 1)) { // first attempt is unguarded        setExclusiveOwnerThread(current);        return true;    } else if (getExclusiveOwnerThread() == current) {        int c = getState() + 1;        if (c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    } else        return false;}
复制代码

当然后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,这个是公平的,但是区别于上文中的公平锁的实现是他没有判断当前是否有等待的队列,如果此时一个新的线程进来需要获取锁,恰巧一个老的线程正在释放锁已经将 state 置为 0 了所以此时新线程不就插队成功了,所以非公平就体现在这里

底层实现逻辑流程图


用户头像

Java你猿哥

关注

一只在编程路上渐行渐远的程序猿 2023-03-09 加入

关注我,了解更多Java、架构、Spring等知识

评论

发布
暂无评论
并发编程-ReentrantLook底层设计_Java_Java你猿哥_InfoQ写作社区