写点什么

我看 JAVA 之 并发编程【二】java.util.concurrent.locks

用户头像
awen
关注
发布于: 1 小时前
我看 JAVA 之 并发编程【二】java.util.concurrent.locks

我看 JAVA 之 并发编程【二】java.util.concurrent.locks

概述

说到 JAVA 的并发编程,就不得不说一说 java.util.concurrent.locks 包,下面分为如下几部分介绍下这个包的内容。

  1. ‍LockSupport

  2. Lock 接口

  3. AQS & Condition

LockSupport

基本概念

LockSupport 是用来创建 Lock 和 其他线程同步类的基础工具类。其所有的方法都是静态的,可以让线程在任意位置阻塞和唤醒。

通过方法 park 和 unpark 方法来阻塞和唤醒线程。

底层使用 Unsafe 对象实现

public class LockSupport {    private LockSupport() {} 
// 通过 Unsafe 设置线程的 parkBlocker 属性值 private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. U.putObject(t, PARKBLOCKER, arg); }
// 唤醒指定的线程 // 注意:如果指定线程处于阻塞状态可以被唤醒,否则当其下次调用 park 时不会被阻塞,指定线程必须是 start状态。 public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); }
// 暂停当前线程,指定blocker public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); U.park(false, 0L); setBlocker(t, null); }
// 暂停当前线程,指定blocker,设置超时时间 public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { Thread t = Thread.currentThread(); setBlocker(t, blocker); U.park(false, nanos); setBlocker(t, null); } }
// 暂停当前线程,指定blocker,指定 deadline 的时间戳 public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); setBlocker(t, blocker); U.park(true, deadline); setBlocker(t, null); }
public static Object getBlocker(Thread t) { if (t == null) throw new NullPointerException(); return U.getObjectVolatile(t, PARKBLOCKER); }
// 暂停当前线程 public static void park() { U.park(false, 0L); }

// 暂停当前线程,设置超时时间 public static void parkNanos(long nanos) { if (nanos > 0) U.park(false, nanos); }
// 暂停当前线程,指定 deadline 的时间戳 public static void parkUntil(long deadline) { U.park(true, deadline); }
static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = U.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero U.putInt(t, SECONDARY, r); return r; }
// 获取线程ID static final long getThreadId(Thread thread) { return U.getLong(thread, TID); }
// Hotspot implementation via intrinsics API private static final Unsafe U = Unsafe.getUnsafe(); private static final long PARKBLOCKER = U.objectFieldOffset (Thread.class, "parkBlocker"); private static final long SECONDARY = U.objectFieldOffset (Thread.class, "threadLocalRandomSecondarySeed"); private static final long TID = U.objectFieldOffset (Thread.class, "tid");
}
复制代码

与 wait 和 notify 的区别

  1. 使用 wait 和 notify 的前提是,当前线程必须获取对象锁。

  2. noftify 只能随机唤醒一个或多个线程,unpark 可以唤醒指定线程

底层原理

Unsafe 的 native 方法,有兴趣可以仔细看一下 native 的 C++源码:

  1. Unsafe.park(isAbsolute, nanos);

  2. Unsafe.unpark(thread);

Lock 接口

Lock 提供了相比 Synchronized 关键字更具扩展性的线程同步操作。

  • ReentrantLock

public interface Lock {  // 获取锁    void lock();
// 如果当前线程未被中断,则获取锁,可以响应中断 void lockInterruptibly() throws InterruptedException;
/** * 仅在调用时锁为空闲状态才获取该锁,可以响应中断 * * Lock lock = ...; * if (lock.tryLock()) { * try { * // manipulate protected state * } finally { * lock.unlock(); * } * } else { * // perform alternative actions * }}</pre>
*/ boolean tryLock();
// 仅在调用时间范围内锁为空闲状态才获取该锁,可以响应中断 boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁 void unlock();
/** * 返回一个绑定在此 Lock 对象上的 Condition 条件对象。 */ Condition newCondition();}
复制代码


ReentrantLock

ReentrantLock 顾名思义为可重入锁,它实现了 Lock 接口。下面通过一个 uml 图形表示出它的继承及依赖关系。

通过以上图可以发现如下:

  1. ReentrantLock 实现了 Lock 接口

  2. ReentrantLock 内部聚合了 名为 Sync 的 AQS 同步器,并且包含 Fair 和 NonFair 两种实现。


如何实现 Fair 和 NonFair 呢?

abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        @ReservedStackAccess        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                // 非公平方式,只要cas修改state成功即可获得锁                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
@ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

}

static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ @ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平方式,需要检查是否有阻塞的predecessors if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
复制代码


通过如下例子可以快速实现一个线程安全的方法

class X {   private final ReentrantLock lock = new ReentrantLock();   // ...
public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
复制代码

ReadWriteLock

ReadWriteLock 顾名思义为读写锁, 它没有实现 Lock 接口,而是提供了如下两个方法,通过内部依赖 Lock 的方式达到了锁的目的。

  1. Lock readLock();

  2. Lock writeLock();


AQS

在前面讲解 Lock 的时候,提到了 AQS(一般指 AbstractQueuedSynchronizer) 这个词,从字面上解释是抽象队列同步器。而在 ReentrantLock 类中也发现了 Sync 这个 AbstractQueuedSynchronizer 接口的实现类,其对 AQS 的部分方法进行了具体实现。


什么是 AQS

AQS 提供了一个实现阻塞锁和基于 FIFO 的相关同步器(信号量、事件等),它具有如下几个特征来实现同步器。

  1. 使用 volatile int 类型的 state 来表示目前的锁的状态,0 表示未锁定,大于 0 表示已锁定。如果 state=2,表示这个锁被统一线程获取了 2 次,实现了线程可重入。另外,state 只有三种 final 的访问方式

  2. getState()

  3. setState()

  4. compareAndSetState()

  5. AQS 通过引入 Node 构成双向链表来维护等待获取锁的 FIFO 线程等待队列,入队和出队逻辑已经实现,具体自定义同步器不需要考虑实现。

  6. AQS 在 Node 上加入 nextWaiter,构成的单向链表类维护等待 condition 的条件队列,入队(ConditionObject.await()\ConditionObject.addConditionWaiter())和出队(ConditionObject.signal()\ConditionObject.signalAll()\ConditionObject.doSignal(first)\ConditionObject.doSignalAll(first)\ConditionObject.transferForSignal(first))逻辑已经实现,具体自定义同步器不需要考虑实现。

  7. AQS 定义两种资源共享方式:Exclusive 和 Share 。

  8. Exclusive

  9. ReentrantLock

  10. Share

  11. CountdownLatch

  12. Semaphore

  13. ReentrantReadWriteLock(读共享,写独占)

  14. 各个自定义同步器抢占共享资源的方式不同,在具体实现上只需要实现 state 的获取与释放方式即可,主要实现以下几种方法:

  15. tryAcquire(int arg):独占方式尝试获取资源,成功为 true,失败为 false。

  16. tryRelease(int arg):独占方式尝试释放资源,成功为 true,失败为 false。

  17. tryAcquireShared(int arg):共享方式尝试获取资源。负数表示失败;0 表示成功,但没有剩余资源;正数表示成功,且有剩余资源。

  18. tryReleaseShared(int arg):共享方式尝试释放资源,成功为 true,失败为 false。

  19. isHeldExclusively():当前节点(线程)是否正在独占资源。

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer    implements java.io.Serializable {        // 代表 等待获取 CLH 锁 的 FIFO 队首,    private transient volatile Node head;
// 代表 等待获取 CLH 锁 的 FIFO 队尾 private transient volatile Node tail;
// volatile 修饰的共享变量 state,代表共享资源的状态,0表示未锁定,1及大于1代表锁定的次数(可以被同一线程重复获取锁) private volatile int state; static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled. */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking. */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition. */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate. */ static final int PROPAGATE = -3;
/** * 节点状态 * SIGNAL: 唤醒当前节点的后继节点,unpark * CANCELLED: 由于超时或者终端导致当前节点取消了 * CONDITION: 当前节点在 condition 队列中,在被transferred前不会进入 sync 队列 * PROPAGATE: 共享模式下,head节点会唤醒后继节点并一直传递下去 * */ volatile int waitStatus;
// 指向 SyncQueue 中的前继节点,当前节点会一直check前继节点的 waitStatus volatile Node prev;
// 指向 SyncQueue 中等待获取锁的后继节点 volatile Node next;
// 当前节点关联的线程 volatile Thread thread;
// 指向 condition队列中 等待的下一个节点,构成为单向链表 Node nextWaiter;
/** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; }
// 前继节点 final Node predecessor() { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
/** Establishes initial head or SHARED marker. */ Node() {}
/** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); }
/** Constructor used by addConditionWaiter. */ Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); THREAD.set(this, Thread.currentThread()); }
/** CASes waitStatus field. */ final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update); }
/** CASes next field. */ final boolean compareAndSetNext(Node expect, Node update) { return NEXT.compareAndSet(this, expect, update); }
final void setPrevRelaxed(Node p) { PREV.set(this, p); }
// VarHandle mechanics private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next", Node.class); PREV = l.findVarHandle(Node.class, "prev", Node.class); THREAD = l.findVarHandle(Node.class, "thread", Thread.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
复制代码


Condition


Condition 是一个接口,主要包含 await()和 signal()方法,可以像 Object 的 wait()、notify() 一样实现线程间的协作。

ConditionObject 是 AbstractQueuedSynchronizer 的一个内部类。AQS 管理了一个 sync 队列和多个 condition 队列。sync 队列维护了等待获取锁的线程节点,头结点记录了当前正在运行的线程。condition 队列维护了由 Condition.await() 阻塞的线程,一个 Lock 可以 new 多个 Condition,每个 Condition 是一个队列。ConditionObject 对象中是可以访问 AQS 的内部属性。下面以一张图来展示 Condition,以及 Condition 与 AQS 的关系。


await()


public final void await() throws InterruptedException {
if (Thread.interrupted()) // 1.如果当前线程被中断, 那么 throw InterruptedException.
throw new InterruptedException();
Node node = addConditionWaiter(); // 加入条件队列
int savedState = fullyRelease(node);// 获取state,检查是否持有锁,有则释放,无则 throw IllegalMonitorStateException
int interruptMode = 0;
//自旋,阻塞 while (!isOnSyncQueue(node)) {// isOnSyncQueue 返回当前节点是否已经进入 Sync 队列,及是否符合 node.waitStatus == Node.CONDITION || node.prev == null 要求
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // checkInterruptWhileWaiting会检查当前线程是否被中断或signal,否则一直自旋阻塞
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码


signal()


// 唤醒public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //唤醒具体实现 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 将从 condition 队列唤醒的第一个线程节点,加入到 sync 等待队列final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
复制代码


用户头像

awen

关注

Things happen for a reason. 2019.11.15 加入

还未添加个人简介

评论

发布
暂无评论
我看 JAVA 之 并发编程【二】java.util.concurrent.locks