在《Java Core 「11」AQS-AbstractQueuedSynchronizer》中介绍过 AQS 之后,再回过头来看一下 ReentrantLock 中的实现细节。
ReentrantLock 中有三个内部类,Sync 继承自 AQS,FairSync 实现公平锁机制,NonfairSync 实现非公平锁机制。
01-公平锁 & 非公平锁中的 tryAcquire
非公平锁中获取锁的过程:
final Thread current = Thread.currentThread();int c = getState();if (c == 0) { /** 锁空闲,则当前线程获取成功 */ if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }}else if (current == getExclusiveOwnerThread()) { /** 锁被当前线程占用,重入 */ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true;}return false;j
复制代码
公平锁中获取锁的过程:
final Thread current = Thread.currentThread();int c = getState();if (c == 0) { /** 锁空闲时 */ if (!hasQueuedPredecessors() && /** 检查 sync 队列总是否有其他线程在等待 */ compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }}else if (current == getExclusiveOwnerThread()) { /** 锁被当前线程占用,重入 */ int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true;}return false;
复制代码
通过上面的代码可以发现,在锁空闲时,当前线程能否获得锁的条件是不一样的:
02-Sync 中的 tryRelease
公平锁 & 非公平锁释放锁时使用的 tryRelease 都是同一份,即 Sync 中的 tryRelease 方法:
int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();boolean free = false;/** 先判断释放后,锁状态是否归零,若归零说明锁空闲,允许其他线程获取锁 */if (c == 0) { free = true; setExclusiveOwnerThread(null);}setState(c);return free;
复制代码
03-公平锁示例分析
我们结合一个实例分析一下多个线程获得公平锁的执行情况,我们创建三个线程 t1/t2/t3,执行同一份代码runnable,使用同一公平锁lock。
ReentrantLock lock = new ReentrantLock(true);Runnable runnable = () -> { lock.lock(); try { System.out.println(Thread.currentThread() + " running"); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException ignored) {} } finally { lock.unlock(); }};
new Thread(runnable, "t1").start();new Thread(runnable, "t2").start();new Thread(runnable, "t3").start();
复制代码
不妨假设线程被调度的顺序是 t1 → t2 → t3,所以存在如下一种时序:t1 执行lock.lock();获得锁,开始执行并 sleep 500ms,此时 t2 执行lock.lock();被阻塞,t3 执行lock.lock();同样被阻塞。
注:以下的阅读顺序,可以按照先 Lock 流程,再 Unlock 流程。
03.1-t1 的执行流程
Lock 流程
ReentrantLock#lock → AbstractQueuedSynchronizer#acquire → ReentrantLock.FairSync#tryAcquire (true, state 是 0,sync 队列为空)
Unlock 流程
ReentrantLock#unlock → AbstractQueuedSynchronizer#release
public final boolean release(int arg) { /** state 由1变0,返回结果 true */ if (tryRelease(arg)) { Node h = head; /** h 不为 null,而且 waitStatus 为 SIGNAL(-1),由 t2 自旋时 park 之前设置,具体看下节 03.2 */ if (h != null && h.waitStatus != 0) /** 会将 h (head) 的 waitStatus 设置为 0, * h.next != null,会 LockSupport.unpark(h.next.thread) * t2 会在 LockSupport.park 处被唤醒 */ unparkSuccessor(h); return true; } return false;}
复制代码
03.2-t2 的执行流程
Lock 流程
ReentrantLock#lock → AbstractQueuedSynchronizer#acquire → ReentrantLock.FairSync#tryAcquire (false,state 是 1,为 t1 tryAcquire 时设置) → AbstractQueuedSynchronizer#addWaiter (会初始化 sync 队列,并将 t2 包装到 Node 中加入队列尾部)→ AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { /** t2 执行时,p 为 head */ final Node p = node.predecessor(); { /** ReentrantLock.FairSync#tryAcquire 仍会失败*/ if (p == head && tryAcquire(arg)) setHead(node); p.next = null; // help GC return interrupted; } /** * 这段代码在无限循环中(线程 t2 在自旋) * 第1次,p (head) 的 waitStatus 为0,然后被设置为 SIGNAL(-1) * 第2次,线程 t2 会在 parkAndCheckInterrupt 中 LockSupport.park(this); * 当 t1 在 unlock 中唤醒 t2 后, * 根据 t2.isInterrupted(true) 判断是否被中断过更新 interrupted 的值 * 无限循环继续,再次执行到 ttryAcquire 时会获得锁 * 此时,setHead(node) 会把 head 指向 t2 所在节点 */ if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; }}
复制代码
Unlock 流程
与 03.1 中 unlock 流程一致
03.3-t3 的执行流程
Lock 流程
ReentrantLock#lock → AbstractQueuedSynchronizer#acquire → ReentrantLock.FairSync#tryAcquire (false,state 是 1,为 t1 tryAcquire 时设置) → AbstractQueuedSynchronizer#addWaiter (将 t3 包装到 Node 中加入 sync 队列尾部) → AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { /** t3 执行时,p 为 t2 所在的节点 */ final Node p = node.predecessor(); { /** 这里不会执行 */ if (p == head && tryAcquire(arg)) setHead(node); p.next = null; // help GC return interrupted; } /** * 这段代码在无限循环中(线程 t3 在自旋) * 第1次,p (t2 节点) 的 waitStatus 为0,然后被设置为 SIGNAL(-1) * 第2次,线程 t3 会在 parkAndCheckInterrupt 中 LockSupport.park(this); */ if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; }}
复制代码
Unlock 流程
与 03.1 中 unlock 流程一致
历史文章推荐
Java Core 「11」AQS-AbstractQueuedSynchronizer
Java Core 「10」J.U.C 同步工具类 -2
Java Core 「9」J.U.C 同步工具类 -1
Java Core 「8」字节码增强技术
Java Core 「7」各种不同类型的锁
Java Core「6」反射与 SPI 机制
Java Core「5」自定义注解编程
Java Core「4」java.util.concurrent 包简介
Java Core「3」volatile 关键字
Java Core「2」synchronized 关键字
Java Core「1」JUC- 线程基础
评论