写点什么

JUC 并发—AQS 源码分析一

  • 2025-02-19
    福建
  • 本文字数:29094 字

    阅读完需:约 95 分钟

1.JUC 中的 Lock 接口


(1)Lock 接口定义了抢占锁和释放锁的方法


一.lock()方法

抢占锁,如果没抢到锁则阻塞。


二.tryLock()方法

尝试抢占锁,成功返回 true,失败返回 false。


三.unlock()方法

释放锁。

 

(2)Lock 接口的实现类 ReentrantLock


ReentractLock 是重入锁,属于排他锁,功能和 synchronized 类似。但是在实际中,其实比较少会使用 ReentrantLock。因为 ReentrantLock 的实现及性能和 syncrhonized 差不多,所以一般推荐使用 synchronized 而不是 ReentrantLock。


public class ReentractLockDemo {    static int data = 0;    static ReentrantLock lock = new ReentrantLock();    public static void main(String[] args) {        new Thread() {            public void run() {                for (int i = 0; i < 10; i++) {                    lock.lock();//获取锁                    try {                        ReentractLockDemo.data++;                        System.out.println(ReentractLockDemo.data);                    } finally {                        lock.unlock();//释放锁                    }                }            }        }.start();        new Thread() {            public void run() {                for (int i = 0; i < 10; i++) {                    lock.lock();//获取锁                    try {                        ReentractLockDemo.data++;                        System.out.println(ReentractLockDemo.data);                    } finally {                        lock.unlock();                    }                }            }        }.start();    }}
复制代码


(3)Lock 接口的实现类 ReentrantReadWriteLock


ReentrantReadWriteLock 是可重入的读写锁,ReentrantReadWriteLock 维护了两个锁:一是 ReadLock,二是 WriteLock。ReadLock 和 WriteLock 都分别实现了 Lock 接口。

 

ReadLock 和 WriteLock 适用于读多写少场景,具有特性:读读不互斥、读写互斥、写写互斥。


public class ReadWriteLockExample {    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();    private final Lock readLock = readWriteLock.readLock();    private final Lock writeLock = readWriteLock.writeLock();    private List<String> dataList = new ArrayList<>();        public void add(String data) {        writeLock.lock();        try {            dataList.add(data);        } finally {            writeLock.unlock();        }    }        public String get(int idx) {        readLock.lock();        try {            return dataList.get(idx);        } finally {            readLock.unlock();        }    }}
复制代码


(4)Lock 接口的实现类 StampedLock


ReentrantReadWriteLock 锁有一个问题,如果当前有线程在调用 get()方法,那么所有调用 add()方法的线程必须要等调用 get()方法的线程释放锁才能写,所以如果调用 get()方法的线程非常多,那么就会导致写线程一直被阻塞。

 

StampedLock 优化了 ReentrantReadWriteLock 的乐观锁,当有线程调用 get()方法读取数据时,不会阻塞准备执行写操作的线程。

 

StampedLock 提供了三种锁;


一.writeLock

功能与 ReentrantReadWriteLock 的写锁一样。


二.readLock

功能与 ReentrantReadWriteLock 的读锁一样。


三.tryOptimisticRead

当有线程获得该读锁时不会阻塞其他线程的写操作。

 

StampedLock 的 tryOptimisticRead()方法会返回一个 stamp 版本号,用来表示当前线程在读操作期间数据是否被修改过。

 

StampedLock 提供了一个 validate()方法来验证 stamp 版本号,如果线程在读取过程中没有其他线程对数据做修改,那么 stamp 的值不会变。

 

StampedLock 使用了乐观锁的思想,避免了在读多写少场景中,大量线程占用读锁造成的写阻塞,在一定程度上提升了读写锁的并发性能。


public class Point {    private double x, y;    private final StampedLock stampedLock = new StampedLock();
public void move(double deltaX, double deltaY) { //获得一个写锁,和ReentrantReadWriteLock相同 long stamp = stampedLock.writeLock(); try { x += deltaX; y += deltaY; } finally { stampedLock.unlock(stamp); } }
public double distanceFromOrigin() { //获得一个乐观锁,不阻塞写操作 long stamp = stampedLock.tryOptimisticRead(); double currentX = x, currentY = y; if (stampedLock.validate(stamp)) { stamp = stampedLock.readLock(); try { currentX = x; currentY = y; } finally { stampedLock.unlock(stamp); } } else { //可以使用readLock()方法来获取带阻塞机制的读锁,类似于synchronized中的锁升级 } return Math.sqrt(currentX * currentX + currentY * currentY); }}
复制代码


(5)三种锁的并发度


一.ReentrantLock

读读互斥、读写互斥、写写互斥。


二.ReentrantReadWriteLock

读读不互斥、读写互斥、写写互斥。


三.StampedLock

读读不互斥、读写不互斥、写写互斥。

 

ReentrantReadWriteLock 采用的是悲观读策略。当第一个读线程获取锁后,第二个、第三个读线程还可以获取锁。这样可能会使得写线程一直拿不到锁,从而导致写线程饿死。所以其公平和非公平实现中,都会尽量避免这种情形。比如非公平锁的实现中,如果读线程在尝试获取锁时发现,AQS 的等待队列中的头结点的后继结点是独占锁结点,那么读线程会阻塞。

 

StampedLock 采用的是乐观读策略,类似于 MVCC。读的时候不加锁,读出来发现数据被修改了,再升级为悲观锁。

 

2.如何实现具有阻塞或唤醒功能的锁


(1)需要一个共享变量标记锁的状态


AQS 有一个 int 变量 state 用来记录锁的状态,通过 CAS 操作确保对 state 变量操作的线程安全。

 

(2)需要记录当前是哪个线程持有锁


AQS 有一个 Thread 变量 exclusiveOwnerThread 用来记录持有锁的线程。

 

当 state = 0 时,没有线程持有锁,此时 exclusiveOwnerThread = null。

当 state = 1 时,有一线程持有锁,此时 exclusiveOwnerThread = 该线程。

当 state > 1,说明有一线程重入了锁。

 

(3)需要支持对线程进行阻塞和唤醒


AQS 使用 LockSupport 工具类的 park()方法和 unpark()方法,通过 Unsafe 类提供的 native 方法实现阻塞和唤醒线程。

 

(4)需要一个无锁队列来维护阻塞线程


AQS 通过一个双向链表和 CAS 实现了一个无锁的阻塞队列来维护阻塞的线程。

 

3.AQS 抽象队列同步器的理解


(1)什么是 AQS


AQS 就是 AbstractQueuedSynchronizer 的缩写,中文意思就是抽象队列同步器。

 

其中 ReentractLock、ReadWriteReentractLock,都是基于 AQS 来实现的。AQS 是专门用来支撑各种 Java 并发类底层实现的抽象类。

 

AQS 中最关键的两部分是:Node 等待队列和 state 变量。其中 Node 等待队列是一个双向链表,用来存放阻塞等待的线程,而 state 变量则用来在加锁和释放锁时标记锁的状态。



(2)如何理解 AQS


AQS 是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态,通过一个无锁队列实现线程排队。

 

AQS 的主要使用方式是继承,子类通过继承 AQS 并实现 AQS 的抽象方法来管理同步状态,子类被推荐定义为自定义同步组件的静态内部类。

 

AQS 自身没有实现任何接口,AQS 仅仅定义若干同步状态获取和释放的方法来供同步组件使用。AQS 既可以支持独占式获取同步状态,也可以支持共享式获取同步状态。

 

AQS 面向的是锁或者同步组件的实现者,基于模版方法设计模式,简化了锁的实现,屏蔽了同步状态管理、线程排队、线程等待与唤醒等操作。

 

4.基于 AQS 实现的 ReentractLock


(1)ReentrantLock 的构造函数


在 ReentrantLock 的构造函数中,初始化 sync 变量为一个 NonfairSync 对象。NonfairSync 是非公平锁,所以 ReentrantLock 默认使用非公平锁。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Synchronizer providing all implementation mechanics    private final Sync sync;        //Creates an instance of ReentrantLock.    //This is equivalent to using ReentrantLock(false).    public ReentrantLock() {        sync = new NonfairSync();    }        //Creates an instance of ReentrantLock with the given fairness policy.    public ReentrantLock(boolean fair) {        sync = fair ? new FairSync() : new NonfairSync();    }    ...}
复制代码


(2)ReentrantLock 的加锁方法


ReentrantLock 获取锁是通过 Sync 的 lock()方法来实现的,也就是 ReentrantLock 的 lock()方法默认会执行继承自 ReentrantLock 内部类 Sync 的 NonfairSync 类的 lock()方法。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Acquires the lock.    //Acquires the lock if it is not held by another thread and returns immediately,     //setting the lock hold count to one.        //If the current thread already holds the lock,    //then the hold count is incremented by one and the method returns immediately.       //If the lock is held by another thread,    //then the current thread becomes disabled for thread scheduling purposes     //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.    public void lock() {        //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法        sync.lock();    }
//Acquires the lock unless the current thread is Thread#interrupt interrupted. public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
//Acquires the lock only if it is not held by another thread at the time of invocation. public boolean tryLock() { return sync.nonfairTryAcquire(1); }
//Acquires the lock if it is not held by another thread within the given waiting time //and the current thread has not been Thread#interrupt interrupted. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
//Attempts to release this lock. public void unlock() { sync.release(1); } ...}
复制代码


(3)ReentrantLock 的 sync 变量


ReentrantLock 的 sync 变量就是一个 Sync 对象,Sync 类则是 ReentrantLock 的抽象静态内部类。Sync 类继承了 AQS 类(抽象队列同步器),所以可以认为 Sync 就是 AQS。此外,NonfairSync 是 Sync 的一个子类,FairSync 也是 Sync 的一个子类。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Synchronizer providing all implementation mechanics    private final Sync sync;        //Base of synchronization control for this lock.     //Subclassed into fair and nonfair versions below.     //Uses AQS state to represent the number of holds on the lock.    abstract static class Sync extends AbstractQueuedSynchronizer {        ...    }        //Sync object for fair locks    static final class FairSync extends Sync {        ...    }        //Sync object for non-fair locks    static final class NonfairSync extends Sync {        ...    }    ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Head of the wait queue, lazily initialized. //Except for initialization, it is modified only via method setHead. //Note: If head exists, its waitStatus is guaranteed not to be CANCELLED. private transient volatile Node head;
//Tail of the wait queue, lazily initialized. //Modified only via method enq to add new wait node. private transient volatile Node tail;
//The synchronization state. private volatile int state; ...}
复制代码


5.ReentractLock 如何获取锁


(1)compareAndSetState()方法尝试加锁


ReentractLock 是基于 NonfairSync 的 lock()方法来实现加锁的。AQS 里有一个核心的变量 state,代表了锁的状态。在 NonfairSync.lock()方法中,会通过 CAS 操作来设置 state 从 0 变为 1。如果 state 原来是 0,那么就代表此时还没有线程获取锁,当前线程执行 AQS 的 compareAndSetState()方法便能成功将 state 设置为 1。

 

所以 AQS 的 compareAndSetState()方法相当于在尝试加锁。AQS 的 compareAndSetState()方法是基于 Unsafe 类来实现 CAS 操作的,Atomic 原子类也是基于 Unsafe 来实现 CAS 操作的。

 

(2)setExclusiveOwnerThread()方法设置加锁线程为当前线程


如果执行 compareAndSetState(0, 1)返回的是 true,那么就说明加锁成功,于是执行 AQS 的 setExclusiveOwnerThread()方法设置加锁线程为当前线程。


public class ReentrantLock implements Lock, java.io.Serializable {    //Synchronizer providing all implementation mechanics    private final Sync sync;    ...        //If the lock is held by another thread,    //then the current thread becomes disabled for thread scheduling purposes     //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.    public void lock() {        //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法        sync.lock();    }
//Sync object for non-fair locks static final class NonfairSync extends Sync { //Performs lock. Try immediate barge, backing up to normal acquire on failure. final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //执行AQS的acquire()方法 acquire(1); } } ... } ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private volatile int state;
static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); ... } catch (Exception ex) { throw new Error(ex); } }
//Atomically sets synchronization state to the given updated value if the current state value equals the expected value. //This operation has memory semantics of a volatile read and write. protected final boolean compareAndSetState(int expect, int update) { //See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ...}
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { ... //The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; //Sets the thread that currently owns exclusive access. //A null argument indicates that no thread owns access. //This method does not otherwise impose any synchronization or volatile field accesses. protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } ...}
复制代码


6.AQS 如何基于 state 变量实现可重入锁


(1)线程重入锁时 CAS 操作失败


假如线程 1 加锁后又调用 ReentrantLock.lock()方法,应如何实现可重入锁?此时 state = 1,故执行 AQS 的 compareAndSetState(0, 1)方法会返回 false。所以首先通过 CAS 操作,尝试获取锁会失败,然后返回 false,于是便会执行 AQS 的 acquire(1)方法。

 

(2)Sync 的 nonfairTryAcquire()实现可重入锁


在 AQS 的 acquire()方法中,首先会执行 AQS 的 tryAcquire()方法尝试获取锁。但 AQS 的 tryAcquire()方法是个保护方法,需要由子类重写。所以其实会执行继承自 AQS 子类 Sync 的 NonfairSync 的 tryAcquire()方法,而该方法最终又执行回 AQS 子类 Sync 的 nonfairTryAcquire()方法。

 

在 AQS 子类 Sync 的 nonfairTryAcquire()方法中:首先判断 state 是否为 0,如果是则表明此时已释放锁,可通过 CAS 来获取锁。否则判断持有锁的线程是否为当前线程,如果是则对 state 进行累加。也就是通过对 state 进行累加,实现持有锁的线程可以重入锁。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Synchronizer providing all implementation mechanics    private final Sync sync;
public void lock() { //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法 sync.lock(); }
//NonfairSync是ReentractLock的内部类,继承自ReentractLock的另一内部类Sync static final class NonfairSync extends Sync { final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //执行AQS的acquire()方法 acquire(1); } } //判断是否是重入锁 + 是否已释放锁 protected final boolean tryAcquire(int acquires) { //执行继承自AQS的Sync的nonfairTryAcquire()方法 return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer { ... //判断是否是重入锁 + 是否已释放锁,尝试获取锁 final boolean nonfairTryAcquire(int acquires) { //先获取当前的线程 final Thread current = Thread.currentThread(); //获取volatile修饰的state变量值 int c = getState(); //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁 //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁 if (c == 0) {//表示无锁状态 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁 if (compareAndSetState(0, acquires)) { //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } //代码执行到这里还是没有线程释放锁,state还是 != 0 //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程 //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数 else if (current == getExclusiveOwnerThread()) { //此时,c = 1,而nextc = c(1) + acquires(1) = 2 //这代表的是当前线程重入锁2次,nextc代表重入次数 int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } //执行AQS的setState()方法,修改volatile修饰的state setState(nextc); return true; } return false; } ... } ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //The synchronization state. private volatile int state;
//Returns the current value of synchronization state. //This operation has memory semantics of a volatile read. //@return current state value protected final int getState() { return state; } //Sets the value of synchronization state. //This operation has memory semantics of a volatile write. //@param newState the new state value protected final void setState(int newState) { state = newState; }
//Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } //Attempts to acquire in exclusive mode. //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it. //This method is always invoked by the thread performing acquire. //If this method reports failure, the acquire method may queue the thread, if it is not already queued, //until it is signalled by a release from some other thread. //This can be used to implement method Lock#tryLock(). //The default implementation throws UnsupportedOperationException. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ...}
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { ... //The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; //Sets the thread that currently owns exclusive access. //A null argument indicates that no thread owns access. //This method does not otherwise impose any synchronization or volatile field accesses. protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
//Returns the thread last set by setExclusiveOwnerThread, or null if never set. //This method does not otherwise impose any synchronization or volatile field accesses. protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } ...}
复制代码


7.AQS 如何处理 CAS 加锁失败的线程


(1)加锁失败的线程的核心处理逻辑


加锁失败时会将获取锁失败的线程维护成一个双向链表,也就是队列。同时会将线程挂起进行阻塞等待,等待被持有锁的线程释放锁时唤醒。



(2)加锁失败的线程的具体处理流程


首先在 ReentrantLock 内部类 NonfairSync 的 lock()方法中,执行 AQS 的 compareAndSetState()方法尝试获取锁是失败的。

 

于是执行 AQS 的 acquire()方法 -> 执行 NonfairSync 的 tryAcquire()方法。也就是执行继承自 AQS 的 Sync 的 nonfairTryAcquire()方法,进行判断是否是重入锁 + 是否已释放锁。发现也是失败的,所以继承自 Sync 的 NonfairSync 的 tryAcquire()方法返回 false。

 

然后在 AQS 的 acquire()方法中,if 判断的第一个条件 tryAcquire()便是 false,所以接着会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。也就是先执行 AQS 的 addWaiter()方法将当前线程加入等待队列,然后再去执行 AQS 的 acquireQueued()方法将当前线程挂起阻塞等待。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Synchronizer providing all implementation mechanics    private final Sync sync;
public void lock() { //执行继承ReentrantLock内部类Sync的NonfairSync类的lock()方法 sync.lock(); } ...}
public class ReentrantLock implements Lock, java.io.Serializable { ... //这是ReentractLock里的内部类 static final class NonfairSync extends Sync { final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //通过CAS获取锁失败时,执行AQS的acquire()方法 acquire(1); } } //判断是否是重入锁 + 是否已释放锁 protected final boolean tryAcquire(int acquires) { //执行继承自AQS的Sync的nonfairTryAcquire()方法 return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer { ... //判断是否是重入锁 + 是否已释放锁,尝试获取锁 final boolean nonfairTryAcquire(int acquires) { //先获取当前的线程 final Thread current = Thread.currentThread(); //获取volatile修饰的state变量值 int c = getState(); //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁 //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁 if (c == 0) {//表示无锁状态 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁 if (compareAndSetState(0, acquires)) { //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } //代码执行到这里还是没有线程释放锁,state还是 != 0 //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程 //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数 else if (current == getExclusiveOwnerThread()) { //此时,c = 1,而nextc = c(1) + acquires(1) = 2 //这代表的是当前线程重入锁2次,nextc代表重入次数 int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } //修改这个volatile修饰的state,volatile保证了可见性 setState(nextc); return true; } return false; } ... } ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } //Attempts to acquire in exclusive mode. //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it. //This method is always invoked by the thread performing acquire. //If this method reports failure, the acquire method may queue the thread, if it is not already queued, //until it is signalled by a release from some other thread. //This can be used to implement method Lock#tryLock(). //The default implementation throws UnsupportedOperationException. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ...}
复制代码


ReentrantLock 内部类 NonfairSync 的 lock()方法总结:如果 CAS 操作失败,则说明有线程正在持有锁,此时会继续调用 acquire(1)。然后通过 NonfairSync 的 tryAcquire()方法尝试获取独占锁,也就是通过 Sync 的 nonfairTryAcquire()方法尝试获取独占锁。如果 NonfairSync 的 tryAcquire()方法返回 false,说明锁已被占用。于是执行 AQS 的 addWaiter()方法将当前线程封装成 Node 并添加到等待队列,接着执行 AQS 的 acquireQueued()方法通过自旋尝试获取锁以及挂起线程。

 

(3)执行 AQS 的 addWaiter()方法维护等待队列


在 AQS 的 addWaiter()方法中:首先会将当前获取锁失败的线程封装为一个 Node 对象,然后判断等待队列(双向链表)的尾结点是否为空。如果尾结点不为空,则使用 CAS 操作 + 尾插法将 Node 对象插入等待队列中。如果尾结点为空或者尾结点不为空时 CAS 操作失败,则调用 enq()方法通过自旋 + CAS 构建等待队列或把 Node 对象插入等待队列。

 

注意:等待队列的队头是个空的 Node 结点,新增一个结点时会从尾部插入。


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    private transient volatile Node head;    private transient volatile Node tail;    private static final long headOffset;    private static final long tailOffset;        static {        try {            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));            ...        } catch (Exception ex) {            throw new Error(ex);        }    }        ...    //Acquires in exclusive mode, ignoring interrupts.     //Implemented by invoking at least once tryAcquire, returning on success.    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.      //This method can be used to implement method Lock#lock.    public final void acquire(int arg) {        //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁        //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false        //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {            selfInterrupt();        }    }        //Attempts to acquire in exclusive mode.     //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.    //This method is always invoked by the thread performing acquire.    //If this method reports failure, the acquire method may queue the thread, if it is not already queued,     //until it is signalled by a release from some other thread.     //This can be used to implement method Lock#tryLock().    //The default implementation throws UnsupportedOperationException.    protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();    }        //Creates and enqueues node for current thread and given mode.    //@param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared    //@return the new node    private Node addWaiter(Node mode) {        //首先将获取锁失败的线程封装为一个Node对象;        //然后使用尾插法将这个Node对象插入双向链表中,形成一个等待队列;        Node node = new Node(Thread.currentThread(), mode);        Node pred = tail;//tail是AQS等待队列的尾结点        if (pred != null) {//当tail不为空时            node.prev = pred;//把根据当前线程封装的node结点的prev指向tail            if (compareAndSetTail(pred, node)) {//通过CAS把node结点插入等待队列尾部                pred.next = node;//把原tail的next指向node结点                return node;            }        }        //当tail为空时,把node结点添加到等待队列        enq(node);        return node;    }        private Node enq(final Node node) {        for (;;) {            //首先获取等待队列的队尾结点            Node t = tail;            //队列的头是个空Node,新增一个结点会从尾部插入            if (t == null) {                //初始化一个空的Node结点并赋值给head和tail                if (compareAndSetHead(new Node())) {                    tail = head;                }            } else {                node.prev = t;                //尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }        private final boolean compareAndSetHead(Node update) {        //headOffset是head变量在AQS类的位置,判断head变量是否为null        //如果是null就将head设置为空的Node结点        //所以队列的头是个空Node,新增一个结点会从尾部插入        return unsafe.compareAndSwapObject(this, headOffset, null, update);    }        private final boolean compareAndSetTail(Node expect, Node update) {        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);    }
static final class Node { //如果当前线程获取锁失败,则会进入阻塞等待的状态,当前线程会被挂起; //阻塞状态可以分为很多种不同的阻塞状态: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile int waitStatus; //一个结点的上一个结点,prev指针,指向Node结点的上一个Node volatile Node prev; //一个结点的下一个结点,next指针,指向Node结点的下一个Node volatile Node next; //Node结点里封装的一个线程 volatile Thread thread; //可以认为是下一个等待线程 Node nextWaiter; } ...}
复制代码


(4)执行 AQS 的 acquireQueued()方法挂起线程


执行完 AQS 的 addWaiter()方法后便执行 AQS 的 acquireQueued()方法。

 

在 AQS 的 acquireQueued()方法中:首先会判断传入结点的上一个结点是否为等待队列的头结点。如果是,则再次调用 NonfairSync 的 tryAcquire()方法尝试获取锁。如果获取锁成功,则将传入的 Node 结点从等待队列中移除。同时设置传入的 Node 结点为头结点,然后将该结点设置为空。从而确保等待队列的头结点是一个空的 Node 结点。

 

注意:NonfairSync 的 tryAcquire()方法会判断是否重入锁 + 是否已释放锁。

 

在 AQS 的 acquireQueued()方法中:如果首先进行的尝试获取锁失败了,那么就执行 shouldParkAfterFailedAcquire()方法判断是否要将当前线程挂起。如果需要将当前线程挂起,则会调用 parkAndCheckInterrupt()方法进行挂起,也就是通过调用 LockSupport 的 park()方法挂起当前线程。

 

需要注意的是:如果线程被中断,只会暂时设置 interrupted 为 true。然后还要继续等待被唤醒获取锁,才能调用 selfInterrupt()方法对线程中断。


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    ...    //Acquires in exclusive mode, ignoring interrupts.     //Implemented by invoking at least once tryAcquire, returning on success.    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.      //This method can be used to implement method Lock#lock.    public final void acquire(int arg) {        //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁        //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false        //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {            //对当前线程进行中断            selfInterrupt();        }    }        //Convenience method to interrupt current thread.    static void selfInterrupt() {        Thread.currentThread().interrupt();    }        //Acquires in exclusive uninterruptible mode for thread already in queue.     //Used by condition wait methods as well as acquire.    //@param node the node    //@param arg the acquire argument    //@return true if interrupted while waiting    final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                //获取node结点的上一个结点,也就是prev指针指向的结点                final Node p = node.predecessor();                //这里会先判断传入结点的上一个结点是否为等待队列的头结点                //如果是,则再次调用继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,尝试获取锁                //如果获取锁成功,那么就将当前线程对应的Node结点从等待队列中移除                if (p == head && tryAcquire(arg)) {                    //重新设置传入的Node结点为头结点,同时将该结点设置为空                    setHead(node);//线程唤醒后,头结点会后移                    p.next = null;//help GC                    failed = false;                    return interrupted;                }                //如果再次尝试获取锁失败,则执行shouldParkAfterFailedAcquire()方法                //判断是否需要将当前线程挂起,然后进行阻塞等待                //如果需要挂起,那么就会使用park操作挂起当前线程                //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL                //执行parkAndCheckInterrupt()方法挂起当前线程                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {                    //如果线程被中断,只会暂时设置interrupted为true,还需要继续等待被唤醒来获取锁才能真正中断                    interrupted = true;                }            }        } finally {            if (failed) {                cancelAcquire(node);            }        }    }        //Sets head of queue to be node, thus dequeuing.     //Called only by acquire methods.      //Also nulls out unused fields for sake of GC and to suppress unnecessary signals and traversals.    private void setHead(Node node) {        //重新设置传入的Node结点为头结点,同时将该结点设置为空        head = node;        node.thread = null;        node.prev = null;    }        //Checks and updates status for a node that failed to acquire.    //Returns true if thread should block.     //This is the main signal control in all acquire loops.    //Requires that pred == node.prev.    //@param pred node's predecessor holding status    //@param node the node    //@return true if thread should block    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //pred -> 空Node        //Node结点的状态watiStatus可以分为如下几种:        //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)        //默认情况下,watiStatus应该是0,或者是空        int ws = pred.waitStatus;        //如果前驱结点的状态为SIGNAL,那么当前线程就需要被挂起进行阻塞        if (ws == Node.SIGNAL) {            return true;        }        //如果前驱结点的状态为CANCELED,那么需要移除前驱结点        if (ws > 0) {            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            //修改前驱结点的状态为SIGNAL            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }
private final boolean parkAndCheckInterrupt() { //LockSupport的park操作,就是将一个线程进行挂起 //必须得有另外一个线程来对当前线程执行unpark操作,才能唤醒挂起的线程 LockSupport.park(this); return Thread.interrupted(); } ...}
复制代码


AQS 的 acquireQueued()方法总结:


如果当前结点的前驱结点不是队头结点或者当前线程尝试抢占锁失败,那么都会调用 shouldParkAfterFailedAcquire()方法,修改当前线程结点的前驱结点的状态为 SIGNAL + 决定是否应挂起当前线程。shouldParkAfterFailedAcquire()方法作用是检查当前结点的前驱结点状态。如果状态是 SIGNAL,则可以挂起线程。如果状态是 CANCELED,则要移除该前驱结点。如果状态是其他,则通过 CAS 操作修改该前驱结点的状态为 SIGNAL。

 

(5)如何处理正常唤醒和中断唤醒


LockSupport 的 park 操作,会挂起一个线程。LockSupport 的 unpark 操作,会唤醒被挂起的线程。下面是挂起一个线程和唤醒一个线程的 demo:


public static void main(String[] args) throws Exception {    final Thread thread1 = new Thread() {        public void run() {            System.out.println("挂起之前执行的动作");            LockSupport.park();            System.out.println("唤醒之前挂起的线程继续执行");        }    };    thread1.start();
Thread thread2 = new Thread() { public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("等待"+i+"秒"); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } System.out.println("尝试唤醒第一个线程"); LockSupport.unpark(thread1); } }; thread2.start();}
复制代码


被 LockSupport.park()方法阻塞的线程被其他线程唤醒有两种情况:

情况一:其他线程调用了 LockSupport.unpark()方法,正常唤醒。

情况二:其他线程调用了阻塞线程 Thread 的 interrupt()方法,中断唤醒。

 

正是因为被 LockSupport.park()方法阻塞的线程可能会被中断唤醒,所以 AQS 的 acquireQueued()方法才写了一个 for 自旋。当阻塞的线程被唤醒后,如果发现自己的前驱结点是头结点,那么就去获取锁。如果获取不到锁,那么就再次阻塞自己,不断重复直到获取到锁为止。

 

被 LockSupport.park()方法阻塞的线程不管是正常唤醒还是被中断唤醒,唤醒后都会通过 Thread.interruptrd()方法来判断是否是中断唤醒。如果是中断唤醒,唤醒后不会立刻响应中断,而是再次获取锁,获取到锁后才能响应中断。

 

8.AQS 的 acquire()方法获取锁的流程总结


AQS 的 acquire()方法代码如下:


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    ...    public final void acquire(int arg) {        //首先调用AQS子类重写的tryAcquire()方法,尝试加锁        //如果加锁失败,则调用AQS的addWaiter()方法将当前线程封装成Node结点,插入到等待队列尾部        //接着调用AQS的acquireQueued()方法,通过LockSupport的park操作挂起当前线程,让当前线程阻塞等待        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {            selfInterrupt();        }    }    ...}
复制代码


public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    ...    public final void acquire(int arg) {        //首先调用AQS子类重写的tryAcquire()方法,尝试加锁        //如果加锁失败,则调用AQS的addWaiter()方法将当前线程封装成Node结点,插入到等待队列尾部        //接着调用AQS的acquireQueued()方法,通过LockSupport的park操作挂起当前线程,让当前线程阻塞等待        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {            selfInterrupt();        }    }    ...}
复制代码


首先调用 AQS 子类的 tryAcquire()方法尝试获取锁(是否重入 + 是否释放锁)。如果获取成功,则说明是重入锁或 CAS 抢占释放的锁成功,于是退出返回。如果获取失败,则调用 AQS 的 addWaiter()方法将当前线程封装成 Node 结点,并通过 AQS 的 compareAndSetTail()方法将该 Node 结点添加到等待队列尾部。

 

然后将该 Node 结点传入 AQS 的 acquireQueued()方法,通过自旋尝试获取锁。在 AQS 的 acquireQueued()方法中,会判断该 Node 结点的前驱是否为头结点。如果不是,则挂起当前线程进行阻塞。如果是,则尝试获取锁。如果获取成功,则设置当前结点为头结点,然后退出返回。如果获取失败,则继续挂起当前线程进行阻塞。

 

当被阻塞线程,被其他线程中断唤醒或其对应结点的前驱结点释放了锁,那么就继续判断该线程对应结点的前驱结点是否成为头结点。


 

9.ReentractLock 如何释放锁


(1)ReentrantLock 释放锁的流程


ReentrantLock 释放锁是通过 AQS 的 release()方法来实现的。在 AQS 的 release()方法中,首先会执行 Sync 的 tryRelease()方法,而 Sync 的 tryRelease()方法会通过递减 state 变量的值来释放锁资源。如果 Sync 的 tryRelease()方法返回 true,也就是成功释放了锁资源,那么接下来就会调用 AQS 的 unparkSuccessor()方法唤醒头结点的后继结点所对应的线程。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Synchronizer providing all implementation mechanics    private final Sync sync;
//Attempts to release this lock. //If the current thread is the holder of this lock then the hold count is decremented. //If the hold count is now zero then the lock is released. //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown. public void unlock() { //执行AQS的release()方法 sync.release(1); } ... abstract static class Sync extends AbstractQueuedSynchronizer { ... protected final boolean tryRelease(int releases) { //重入锁的情况:getState() == 2,releases == 1,getState() - releases = 1,c = 1 int c = getState() - releases; //如果当前线程不等于加锁的线程,说明不是当前线程加的锁,结果当前线程来释放锁,会抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ... }}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Releases in exclusive mode. //Implemented by unblocking one or more threads if tryRelease returns true. //This method can be used to implement method Lock#unlock. //@param arg the release argument. This value is conveyed to #ryRelease but is otherwise uninterpreted and can represent anything you like. public final boolean release(int arg) { //执行AQS的子类Sync的tryRelease()方法 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { //传入头结点 unparkSuccessor(h); } return true; } return false; }
//Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { //If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. //It is OK if this fails or if status is changed by waiting thread. //Node结点的状态watiStatus可以分为如下几种: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) //默认情况下,watiStatus应该是0,或者是空 //获得头结点的状态 int ws = node.waitStatus; //需要设置头结点的状态为0 if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); }
//Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor. //获取头结点的后继结点 Node s = node.next; //如果头结点的后继结点为null或其状态为CANCELED if (s == null || s.waitStatus > 0) { s = null; //那么就从尾结点开始扫描,找到距离头结点最近的 + 状态不是取消的结点 for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } if (s != null) { //唤醒传入的头结点的后继结点对应的线程 LockSupport.unpark(s.thread); } } ...}
复制代码


(2)AQS 的 unparkSuccessor()方法


该方法的主要工作是找出传入结点的下一个结点(状态不是取消),然后通过 LockSupport.unpark()方法唤醒该结点。如果发现传入结点的下一个结点无效,则从尾结点开始扫描,找到离头结点最近的 + 状态不是取消的结点。

 

(3)AQS 的 release()方法总结


AQS 的 release()方法主要做了两件事情:

一.通过 tryRelease()方法释放锁(递减 state 变量)

二.通过 unparkSuccessor()方法唤醒等待队列中的下一个线程

 

由于是独占锁,只有持有锁的线程才有资格释放锁,所以 tryRelease()方法修改 state 变量值时不需要使用 CAS 操作。

 

10.ReentractLock 的响应中断和超时获取


(1)ReentractLock 的响应中断


ReentractLock 的 lock()方法不能响应中断,但是 ReentractLock 的 lockInterruptibly()方法可以响应中断。

 

AQS 的 doAcquireInterruptibly()方法和 AQS 的 acquireQueued()方法不同的是,前者收到线程中断信号后,不再去重新竞争锁,而是直接抛出异常。


public class ReentrantLock implements Lock, java.io.Serializable {    private final Sync sync;      public ReentrantLock() {        sync = new NonfairSync();    }    ...        //Acquires the lock unless the current thread is Thread#interrupt interrupted.    public void lockInterruptibly() throws InterruptedException {        //执行AQS的acquireInterruptibly()方法        sync.acquireInterruptibly(1);    }    ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, aborting if interrupted. //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success or the thread is interrupted. //This method can be used to implement method Lock#lockInterruptibly. public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //首先还是执行AQS子类重写的tryAcquire()方法 if (!tryAcquire(arg)) { //执行AQS的doAcquireInterruptibly()方法 doAcquireInterruptibly(arg); } }
//Acquires in exclusive interruptible mode. private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { //和acquireQueued()方法不同的是,这里收到线程中断信号后,不再去重新竞争锁,而是直接抛异常返回 throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } }
//Acquires in exclusive uninterruptible mode for thread already in queue. //Used by condition wait methods as well as acquire. final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ...}
复制代码


(2)ReentractLock 的超时获取


在超时时间非常短的情况下,AQS 不会挂起线程,而是让线程自旋去获取锁。


public class ReentrantLock implements Lock, java.io.Serializable {    private final Sync sync;      public ReentrantLock() {        sync = new NonfairSync();    }    ...        //Acquires the lock if it is not held by another thread within the given waiting time     //and the current thread has not been Thread#interrupt interrupted.    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {        //执行AQS的tryAcquireNanos()方法        return sync.tryAcquireNanos(1, unit.toNanos(timeout));    }    ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Attempts to acquire in exclusive mode, aborting if interrupted, and failing if the given timeout elapses. //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success. //Otherwise, the thread is queued, possibly repeatedly blocking and unblocking, //invoking #tryAcquire until success or the thread is interrupted or the timeout elapses. //This method can be used to implement method Lock#tryLock(long, TimeUnit). public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //如果执行AQS子类重写的tryAcquire()方法失败,才执行AQS的doAcquireNanos()方法 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
//Acquires in exclusive timed mode. private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {//已超时 return false; } //在超时时间非常短的情况下,AQS不会挂起线程,而是让线程自旋去获取锁; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) { //最多挂起线程nanosTimeout LockSupport.parkNanos(this, nanosTimeout); } if (Thread.interrupted()) { throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } } ...}
复制代码


11.ReentractLock 的公平锁 FairSync


(1)ReentrantLock 的非公平加锁策略


ReentrantLock 默认使用的是非公平加锁的策略。即新来抢占锁的线程,不管有没有其他线程在排队,都先通过 CAS 抢占锁,也就是让等待队列的队头的后继结点线程和新来抢占锁的线程进行竞争。非公平加锁策略的效率会高些,因为可以让新来的线程也有机会抢占到锁。

 

(2)ReentrantLock 的公平加锁策略


如果希望每个线程过来都按照顺序去进行排队来获取锁,那么就是公平锁。ReentrantLock 如果要使用公平加锁的策略,只需要在构造函数传入 true。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Creates an instance of ReentrantLock.    //This is equivalent to using ReentrantLock(false).    public ReentrantLock() {        sync = new NonfairSync();    }
//Creates an instance of ReentrantLock with the given fairness policy. //@param fair true if this lock should use a fair ordering policy public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ...}
复制代码


(3)ReentrantLock 的公平锁实现


公平锁 FairSync 的 tryAcquire()方法,会在使用 CAS 操作获取锁之前,增加一个判断条件,先判断等待队列中是否有线程在排队获取锁。如果没有其他线程排队获取锁(头结点==尾结点),则当前线程可以获取锁。如果有其他线程排队获取锁,则当前线程不能获取锁,需要阻塞等待。如果等待队列的头结点的后继结点的线程是当前线程,则当前线程重入锁。

 

所以公平锁的核心实现其实就是"!hasQueuedPredecessors()"这个判断条件,每次加锁时都会首先判断:等待队列中是否有线程在排队获取锁。


public class ReentrantLock implements Lock, java.io.Serializable {    ...    //Sync object for fair locks,公平锁    static final class FairSync extends Sync {        final void lock() {            acquire(1);        }        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            } else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) {                    throw new Error("Maximum lock count exceeded");                }                setState(nextc);                return true;            }            return false;        }    }
//Sync object for non-fair locks,非公平锁 static final class NonfairSync extends Sync { //Performs lock. Try immediate barge, backing up to normal acquire on failure. final void lock() { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); } else { acquire(1); } } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer { //Performs Lock#lock. The main reason for subclassing is to allow fast path for nonfair version. abstract void lock();
//Performs non-fair tryLock. //tryAcquire is implemented in subclasses, but both need nonfair try for trylock method. final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; } ... } ...}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Queries whether any threads have been waiting to acquire longer than the current thread. //判断当前队列中是否有线程排队 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //所以!hasQueuedPredecessors() 等价于: //h == t || (h.next != null && h.next.thread == Thread.currentThread()) return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ...}
复制代码


12.AQS 的方法和使用总结

 

(1)访问或修改 state 的方法


getState():获取同步状态setState(int newState):设置当前同步状态compareAndSetState(int expect, int update):使用CAS设置同步状态setExclusiveOwnerThread(Thread thread):标识加锁的线程
复制代码


(2)需要子类重写的方法


tryAcquire():独占式获取同步状态,实现该方法需要查询当前同步状态并判断同步状态是否符合预期,然后再CAS设置同步状态tryRelease():独占式释放同步状态tryAcquireShared():共享式获取同步状态tryReleaseShared():共享式释放同步状态isHeldExclusively():在独占模式下是否被在线线程占有
复制代码


(3)AQS 提供的模版方法


一.独占式获取与释放同步状态acquire():独占式获取同步状态,该方法会调用子类重写的tryAcquire()方法;获取成功则返回,获取失败则当前线程会进入等待队列等待;acquireInterruptibly():与acquire()相同,会响应中断tryAcquireNanos():在acquireInterruptibly()基础上增加超时限制release():独占式释放同步状态

二.共享式获取与释放同步状态acquireShared():共享式获取同步状态,与独占式区别是同一时刻可以有多个线程获取同步状态acquireSharedInterruptibly():与acquireShared()相同,会响应中断tryAcquireSharedNanos():在acquireSharedInterruptibly()基础上增加了超时限制releaseShared():共享式释放同步状态

三.查询等待队列中等待的线程getQueuedThreads()
复制代码


(4)如何使用 AQS 自定义独占锁


独占锁就是同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于等待队列中等待。只有获取锁的线程释放了锁,后继的线程才能尝试获取锁。

 

步骤一:定义一个静态内部类 Sync

在实现 Lock 接口的自定义 Mutex 类中定义一个静态内部类 Sync,并且初始化这个静态内部类(子类)的一个实例。

 

步骤二:实现 tryAcquire()和 tryRelease()方法

该内部类 Sync 继承 AQS,需要实现独占式获取锁和释放锁的方法。在获取锁的 tryAcquire()方法中,如果成功设置 state 为 1,则代表获取锁。在释放锁的 tryRelease()方法中,需要重置 state 为 0,持有锁线程为 null。

 

步骤三:实现自定义 Mutex 类的 lock()方法

在自定义 Mutex 类获取锁的 lock()方法中,只需要通过 Sync 实例,调用 AQS 的 acquire()方法即可。

 

(5)独占锁的获取和释放总结


一.获取锁时 AQS 会维护一个等待队列

获取锁失败的线程会被加入到 AQS 的等待队列中,然后进行自旋 + 阻塞等待。

 

二.线程移出队列 + 停止自旋的条件

这个条件就是线程对应的前驱结点为头结点并且该线程成功获取了锁。

 

三.释放锁时的处理

AQS 的 tryRelease()方法会释放锁,然后唤醒头结点的后继结点对应的线程。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18720681

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
JUC并发—AQS源码分析一_JavaScript_不在线第一只蜗牛_InfoQ写作社区