写点什么

并发编程 -ReentrantLook 底层设计

  • 2023-05-12
    湖南
  • 本文字数:6149 字

    阅读完需:约 20 分钟

重入锁

顾名思义:就是可以重入的互斥锁,但是这个重入是有条件的,允许同一个线程多次获得同一个锁,避免了死锁的发生。


重入锁在实现上比 synchronized 关键字更加灵活,提供了一些额外的特性,比如可定时的锁等待(tryLock)、可中断的锁等待(lockInterruptibly)、公平性等。另外,使用重入锁需要手动加锁和解锁,使得线程控制更加精细。


使用重入锁可以避免死锁的发生,提高程序的可靠性。但是,过多地使用重入锁可能会影响程序的性能,因为重入锁需要进行额外的状态管理和线程切换操作。因此,在使用重入锁时需要谨慎考虑锁的范围和粒度,以及锁的公平性和性能等方面的问题。


这里可能有人对tryLocklockInterruptibly有疑问具体的使用方式如下:

  • tryLock

 ReentrantLock lock = new ReentrantLock(); if (lock.tryLock()) {     try {         // 获得锁后的操作     } finally {         lock.unlock();     } } else {     // 获取锁失败的操作 }
复制代码

它的作用可以让线程在获取锁的时候最多等待一定的时间,如果在等待时间内没有获取到锁,则返回 false,表示获取锁失败

  • lockInterruptibly

  Thread t = new Thread(() -> {      ReentrantLock lock = new ReentrantLock();      try {          lock.lockInterruptibly();          try {              // 获得锁后的操作              while (!Thread.currentThread().isInterrupted()) {                  System.out.println("1");              }          } finally {              lock.unlock();          }      } catch (InterruptedException e) {          // 当前线程被中断的操作          Thread.currentThread().interrupt();      }  });  t.start();  Thread.sleep(1000);  t.interrupt();
复制代码

作用就是可以抛出InterruptedException异常,所以线程可以使用interrupt的方式被中断

Lock

Lock(锁)是一种线程同步机制,用于实现互斥和协作访问共享资源。与 synchronized 关键字相比,Lock 接口提供了更加灵活的锁实现,可以实现更加精细的线程控制和高级功能。


其实Lock就是一个接口,定义了锁的一些方法:

  1. lock(): 获取锁,如果锁被其他线程持有,则线程会阻塞等待锁的释放。

  2. unlock(): 释放锁,如果当前线程持有锁,则释放锁,否则会抛出 IllegalMonitorStateException 异常。

  3. tryLock(): 尝试获取锁,如果锁当前没有被其他线程持有,则获取锁并立即返回 true,否则立即返回 false,不会阻塞线程。

  4. tryLock(long time, TimeUnit unit): 在一定时间内尝试获取锁,如果锁在给定的时间内未被其他线程持有,则获取锁并立即返回 true,否则等待指定时间,如果在指定时间内还未获取到锁,则返回 false。

  5. newCondition(): 获取与该锁关联的 Condition 对象,用于实现等待/通知模式。

ReentrantLock

ReentrantLock就是Lock的一种实现:

ReentrantLock是基于 AbstractQueuedSynchronizer 实现的可重入锁

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 基于 FIFO 等待队列的同步器框架,它为自定义同步器的实现提供了一种底层框架和算法,可以方便地实现各种类型的同步器,用于存储等待获取锁的线程我们先看一下 AQS 的代码结构:

lock 源码实现

发现是调用了sync.lock();而 sync 就是一个内部类并且集成 AQS:

并且我们可以发现 sync 是个抽象类一共有两个实现:

也就是公平锁和非公平锁(下文会说到详细的实现)至于用哪个是在实例化的时候赋值的如下:

接着看sync.lock()源码实现如下:

这里我们就拿非公平锁举例:源码实现是这样的

initialTryLock
final boolean initialTryLock() {    Thread current = Thread.currentThread();    if (compareAndSetState(0, 1)) { // first attempt is unguarded        setExclusiveOwnerThread(current);        return true;    } else if (getExclusiveOwnerThread() == current) {        int c = getState() + 1;        if (c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    } else        return false;}
复制代码

compareAndSetState

方法使用了 unsafe 类的一个方法具体实现在 jvm 里面


所以这个方法呢就是对比 state 状态如果是 0 则说明当前没有线程占用则会赋值 1 然后调用setExclusiveOwnerThread,而setExclusiveOwnerThread方法就是给exclusiveOwnerThread赋值记录一下当前这个锁是被这个线程占用的:

protected final void setExclusiveOwnerThread(Thread thread) {    exclusiveOwnerThread = thread;}
复制代码

如果不是state不是 0 则说明当前的锁被占用了,所以此时需要判断被占用的线程是否是自己,

这里也比较简单就是state加一加好了,然后返回 true,当前线程获取了所以可以执行对应的代码块否则就会返回 false

acquire

上文我们知道如果锁被占用了initialTryLock会返回 false 则此时会执行acquire方法

public final void acquire(int arg) {    if (!tryAcquire(arg))        acquire(null, arg, false, false, false, 0L);}
复制代码

首先是tryAcquire

protected final boolean tryAcquire(int acquires) {    if (getState() == 0 && compareAndSetState(0, acquires)) {        setExclusiveOwnerThread(Thread.currentThread());        return true;    }    return false;}
复制代码

这里又尝试着获取锁,因为可能到这一步刚好这个锁就释放掉了所以就可以直接获取到锁如果还是没有获取到锁才会走到acquire方法

final int acquire(Node node, int arg, boolean shared,                  boolean interruptible, boolean timed, long time) {    Thread current = Thread.currentThread();    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread    boolean interrupted = false, first = false;    Node pred = null;                // predecessor of node when enqueued
for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible);}
复制代码

梳理一下逻辑:

初始变量:spins = 0,   postSpins = 0;   interrupted = false,   first = false;  pred = null;  node = null  share = false        
第一次循环:第一个if中 !first -> true (pred = null) != null -> false 所以第一个if不执行
第二个if (first || pred == null) pred == null -> true所以此时执行了,执行的结果其实就是再一次尝试获取锁,获取成功就返回了否自继续往下执行
第三个if(node == null)第一个条件命中 node = new ExclusiveNode();
第一次循环结束
第二次循环此时条件有一些变化spins = 0, postSpins = 0; interrupted = false, first = false;pred = null;node = new ExclusiveNode()share = false
第一个if:(pred = (node == null) ? null : node.prev) != null -> false 所以没有执行
第二个if: pred == null -> true 一样的继续尝试获取锁
第三个if:走到了else if (pred == null) 这里这里将waiter赋值给当前线程然后将node->prev指向tail,但是此时tail = null继续走到tryInitializeHead方法private void tryInitializeHead() { Node h = new ExclusiveNode(); if (U.compareAndSetReference(this, HEAD, null, h)) tail = h;}就是新创建一个node,然后head和tail都指向新节点h
第三次循环第一个if和第二个if不说了还是一样到第三个else if (pred == null) { 此时就会将node结点的prev指针指向tail,然后tail的next指针指向node至此一个链表的结构也就行程了

第四次循环直接看最后一个if直接走到了最后一个elseLockSupport.park(this);开始阻塞线程,等待被唤醒了
复制代码

unlock 源码实现

看完上文的逻辑这里就很简单了

public void unlock() {    sync.release(1);}
复制代码


public final boolean release(int arg) {    if (tryRelease(arg)) {        signalNext(head);        return true;    }    return false;}
复制代码


@ReservedStackAccessprotected final boolean tryRelease(int releases) {    //这里其实就是将state-1    int c = getState() - releases;    if (getExclusiveOwnerThread() != Thread.currentThread())       //如果当前线程和记录的所占用的线程不符合则抛出异常        throw new IllegalMonitorStateException();        //判断是否释放成功了    //释放成功的条件是state == 0    boolean free = (c == 0);    if (free)        //如果释放成功则修改当前占用线程为null        setExclusiveOwnerThread(null);    setState(c);    return free;}
复制代码

如果没有释放成功则继续不修改锁的状态,释放成功了则会走到signalNext方法

private static void signalNext(Node h) {    Node s;    if (h != null && (s = h.next) != null && s.status != 0) {        s.getAndUnsetStatus(WAITING);        LockSupport.unpark(s.waiter);    }}
复制代码

可以发现是将当前等待的线程唤醒并且设置 status 为 0(这里也可以看出是 FIFO 结构,去的是头结点的 next 结点)这又回到了上文中的循环会走到第二个 if

if (first || pred == null) {    boolean acquired;    try {        if (shared)            acquired = (tryAcquireShared(arg) >= 0);        else            acquired = tryAcquire(arg);    } catch (Throwable ex) {        cancelAcquire(node, interrupted, false);        throw ex;    }    if (acquired) {        if (first) {            node.prev = null;            head = node;            pred.next = null;            node.waiter = null;            if (shared)                signalNextIfShared(node);            if (interrupted)                current.interrupt();        }        return 1;    }}
复制代码

尝试获取锁,如果获取不到

 else if (node.status == 0) {    node.status = WAITING;          // enable signal and recheck}
复制代码

又继续将 status 置为 1

公平性体现

我们可以看一下initialTryLock代码的实现FairSync:

final boolean initialTryLock() {    Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {            setExclusiveOwnerThread(current);            return true;        }    } else if (getExclusiveOwnerThread() == current) {        if (++c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    }    return false;}
复制代码

在这里会先去判断一下当前的锁是不是被占用了,被占用的是谁,而且这里还多了一个判断是线程队列中是否有等待的线程,如果有直接返回 false 了 而后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,所以这个是公平的 NonfairSync

final boolean initialTryLock() {    Thread current = Thread.currentThread();    if (compareAndSetState(0, 1)) { // first attempt is unguarded        setExclusiveOwnerThread(current);        return true;    } else if (getExclusiveOwnerThread() == current) {        int c = getState() + 1;        if (c < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(c);        return true;    } else        return false;}
复制代码

当然后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,这个是公平的,但是区别于上文中的公平锁的实现是他没有判断当前是否有等待的队列,如果此时一个新的线程进来需要获取锁,恰巧一个老的线程正在释放锁已经将 state 置为 0 了所以此时新线程不就插队成功了,所以非公平就体现在这里

底层实现逻辑流程图


作者:Potato_土豆

链接:https://juejin.cn/post/7231447658565124155

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
并发编程-ReentrantLook底层设计_Java_做梦都在改BUG_InfoQ写作社区