我看 JAVA 之 并发编程【二】java.util.concurrent.locks
概述
说到 JAVA 的并发编程,就不得不说一说 java.util.concurrent.locks 包,下面分为如下几部分介绍下这个包的内容。
LockSupport
Lock 接口
AQS & Condition
LockSupport
基本概念
LockSupport 是用来创建 Lock 和 其他线程同步类的基础工具类。其所有的方法都是静态的,可以让线程在任意位置阻塞和唤醒。
通过方法 park 和 unpark 方法来阻塞和唤醒线程。
底层使用 Unsafe 对象实现
public class LockSupport {
private LockSupport() {}
// 通过 Unsafe 设置线程的 parkBlocker 属性值
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
U.putObject(t, PARKBLOCKER, arg);
}
// 唤醒指定的线程
// 注意:如果指定线程处于阻塞状态可以被唤醒,否则当其下次调用 park 时不会被阻塞,指定线程必须是 start状态。
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
// 暂停当前线程,指定blocker
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
// 暂停当前线程,指定blocker,设置超时时间
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, nanos);
setBlocker(t, null);
}
}
// 暂停当前线程,指定blocker,指定 deadline 的时间戳
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(true, deadline);
setBlocker(t, null);
}
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return U.getObjectVolatile(t, PARKBLOCKER);
}
// 暂停当前线程
public static void park() {
U.park(false, 0L);
}
// 暂停当前线程,设置超时时间
public static void parkNanos(long nanos) {
if (nanos > 0)
U.park(false, nanos);
}
// 暂停当前线程,指定 deadline 的时间戳
public static void parkUntil(long deadline) {
U.park(true, deadline);
}
static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = U.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
U.putInt(t, SECONDARY, r);
return r;
}
// 获取线程ID
static final long getThreadId(Thread thread) {
return U.getLong(thread, TID);
}
// Hotspot implementation via intrinsics API
private static final Unsafe U = Unsafe.getUnsafe();
private static final long PARKBLOCKER = U.objectFieldOffset
(Thread.class, "parkBlocker");
private static final long SECONDARY = U.objectFieldOffset
(Thread.class, "threadLocalRandomSecondarySeed");
private static final long TID = U.objectFieldOffset
(Thread.class, "tid");
}
复制代码
与 wait 和 notify 的区别
使用 wait 和 notify 的前提是,当前线程必须获取对象锁。
noftify 只能随机唤醒一个或多个线程,unpark 可以唤醒指定线程
底层原理
Unsafe 的 native 方法,有兴趣可以仔细看一下 native 的 C++源码:
Unsafe.park(isAbsolute, nanos);
Unsafe.unpark(thread);
Lock 接口
Lock 提供了相比 Synchronized 关键字更具扩展性的线程同步操作。
public interface Lock {
// 获取锁
void lock();
// 如果当前线程未被中断,则获取锁,可以响应中断
void lockInterruptibly() throws InterruptedException;
/**
* 仅在调用时锁为空闲状态才获取该锁,可以响应中断
*
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // perform alternative actions
* }}</pre>
*/
boolean tryLock();
// 仅在调用时间范围内锁为空闲状态才获取该锁,可以响应中断
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
/**
* 返回一个绑定在此 Lock 对象上的 Condition 条件对象。
*/
Condition newCondition();
}
复制代码
ReentrantLock
ReentrantLock 顾名思义为可重入锁,它实现了 Lock 接口。下面通过一个 uml 图形表示出它的继承及依赖关系。
通过以上图可以发现如下:
ReentrantLock 实现了 Lock 接口
ReentrantLock 内部聚合了 名为 Sync 的 AQS 同步器,并且包含 Fair 和 NonFair 两种实现。
如何实现 Fair 和 NonFair 呢?
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平方式,只要cas修改state成功即可获得锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平方式,需要检查是否有阻塞的predecessors
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
通过如下例子可以快速实现一个线程安全的方法
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
复制代码
ReadWriteLock
ReadWriteLock 顾名思义为读写锁, 它没有实现 Lock 接口,而是提供了如下两个方法,通过内部依赖 Lock 的方式达到了锁的目的。
Lock readLock();
Lock writeLock();
AQS
在前面讲解 Lock 的时候,提到了 AQS(一般指 AbstractQueuedSynchronizer) 这个词,从字面上解释是抽象队列同步器。而在 ReentrantLock 类中也发现了 Sync 这个 AbstractQueuedSynchronizer 接口的实现类,其对 AQS 的部分方法进行了具体实现。
什么是 AQS
AQS 提供了一个实现阻塞锁和基于 FIFO 的相关同步器(信号量、事件等),它具有如下几个特征来实现同步器。
使用 volatile int 类型的 state 来表示目前的锁的状态,0 表示未锁定,大于 0 表示已锁定。如果 state=2,表示这个锁被统一线程获取了 2 次,实现了线程可重入。另外,state 只有三种 final 的访问方式
getState()
setState()
compareAndSetState()
AQS 通过引入 Node 构成双向链表来维护等待获取锁的 FIFO 线程等待队列,入队和出队逻辑已经实现,具体自定义同步器不需要考虑实现。
AQS 在 Node 上加入 nextWaiter,构成的单向链表类维护等待 condition 的条件队列,入队(ConditionObject.await()\ConditionObject.addConditionWaiter())和出队(ConditionObject.signal()\ConditionObject.signalAll()\ConditionObject.doSignal(first)\ConditionObject.doSignalAll(first)\ConditionObject.transferForSignal(first))逻辑已经实现,具体自定义同步器不需要考虑实现。
AQS 定义两种资源共享方式:Exclusive 和 Share 。
Exclusive
ReentrantLock
Share
CountdownLatch
Semaphore
ReentrantReadWriteLock(读共享,写独占)
各个自定义同步器抢占共享资源的方式不同,在具体实现上只需要实现 state 的获取与释放方式即可,主要实现以下几种方法:
tryAcquire(int arg):独占方式尝试获取资源,成功为 true,失败为 false。
tryRelease(int arg):独占方式尝试释放资源,成功为 true,失败为 false。
tryAcquireShared(int arg):共享方式尝试获取资源。负数表示失败;0 表示成功,但没有剩余资源;正数表示成功,且有剩余资源。
tryReleaseShared(int arg):共享方式尝试释放资源,成功为 true,失败为 false。
isHeldExclusively():当前节点(线程)是否正在独占资源。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 代表 等待获取 CLH 锁 的 FIFO 队首,
private transient volatile Node head;
// 代表 等待获取 CLH 锁 的 FIFO 队尾
private transient volatile Node tail;
// volatile 修饰的共享变量 state,代表共享资源的状态,0表示未锁定,1及大于1代表锁定的次数(可以被同一线程重复获取锁)
private volatile int state;
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;
/**
* 节点状态
* SIGNAL: 唤醒当前节点的后继节点,unpark
* CANCELLED: 由于超时或者终端导致当前节点取消了
* CONDITION: 当前节点在 condition 队列中,在被transferred前不会进入 sync 队列
* PROPAGATE: 共享模式下,head节点会唤醒后继节点并一直传递下去
*
*/
volatile int waitStatus;
// 指向 SyncQueue 中的前继节点,当前节点会一直check前继节点的 waitStatus
volatile Node prev;
// 指向 SyncQueue 中等待获取锁的后继节点
volatile Node next;
// 当前节点关联的线程
volatile Thread thread;
// 指向 condition队列中 等待的下一个节点,构成为单向链表
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
// 前继节点
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** Establishes initial head or SHARED marker. */
Node() {}
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}
/** CASes waitStatus field. */
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
/** CASes next field. */
final boolean compareAndSetNext(Node expect, Node update) {
return NEXT.compareAndSet(this, expect, update);
}
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
// VarHandle mechanics
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
复制代码
Condition
Condition 是一个接口,主要包含 await()和 signal()方法,可以像 Object 的 wait()、notify() 一样实现线程间的协作。
ConditionObject 是 AbstractQueuedSynchronizer 的一个内部类。AQS 管理了一个 sync 队列和多个 condition 队列。sync 队列维护了等待获取锁的线程节点,头结点记录了当前正在运行的线程。condition 队列维护了由 Condition.await() 阻塞的线程,一个 Lock 可以 new 多个 Condition,每个 Condition 是一个队列。ConditionObject 对象中是可以访问 AQS 的内部属性。下面以一张图来展示 Condition,以及 Condition 与 AQS 的关系。
await()
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 1.如果当前线程被中断, 那么 throw InterruptedException.
throw new InterruptedException();
Node node = addConditionWaiter(); // 加入条件队列
int savedState = fullyRelease(node);// 获取state,检查是否持有锁,有则释放,无则 throw IllegalMonitorStateException
int interruptMode = 0;
//自旋,阻塞
while (!isOnSyncQueue(node)) {// isOnSyncQueue 返回当前节点是否已经进入 Sync 队列,及是否符合 node.waitStatus == Node.CONDITION || node.prev == null 要求
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // checkInterruptWhileWaiting会检查当前线程是否被中断或signal,否则一直自旋阻塞
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
signal()
// 唤醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//唤醒具体实现
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将从 condition 队列唤醒的第一个线程节点,加入到 sync 等待队列
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
复制代码
评论