一. 概述
在 SMP 体系结构下,往往出现执行并发执行等情况的出现;在编程时,考虑并发性问题;基于并发性问题,java 提供了锁机制来限制对竞争资源的操作。当抢到锁时,才能访问该资源。那么在 java 体系中,都有哪些锁呢,且是如何体现的呢?
二. java 锁以及原理
在 java 开发中,对某项资源或者某一项代码块进行加锁的形式有两种。一种是通过 synchronize 关键字进行加锁;另外一种是声明 Lock 对象进行加锁。
2.1 synchronize
2.1.1 使用场景
该关键字可以修饰类方法,锁的对象就是对应的类。如下代码:该锁的对象就是 Test 类
public class Test{
public static synchronized void compute(int i){
//.....
}
}
复制代码
也可以修饰类实例方法,锁的对象就是对应的类实例。如下代码:该锁的对象就是 Test 对象
public class Test{
public synchronized void compute(int i){
//.....
}
}
复制代码
修饰类实例方法,粒度有点大,还可以缩小范围,修饰代码块。该锁的对象可以指定。
public class Test{
public void compute(int i){
synchronized (this){
//.....
}
}
}
复制代码
2.1.2 原理
通过 synchronized底层实现原理及锁优化 可以得知:修饰方法时,是在方法修饰符上打上 ACC_SYNCHRONIZED 标志,表明是原子操作;而修饰代码块,在对应代码开始前加上 monitorenter 和代码执行完后加上 monitorexit 指令。
指令的介绍如下:
为了理解虚拟机规范说讲述的,我将以例子的形式进行说明:
public class LockTest {
public void com6() {
int i = 9;
synchronized (LockTest.class) {
i = 12;
System.out.println("hello word");
}
}
}
复制代码
查看该字节码:
Constant pool:
......
#7 = Class #33 // LockTest
......
#33 = Utf8 LockTest
public void com6();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=4, args_size=1
0: bipush 9
2: istore_1
3: ldc #7 // class LockTest 从常量池中推送至栈顶,注意这个是指针
5: dup //复制栈顶的数值并压入栈顶,之所以复制,是为将该指针存放到本地变量内,给monitorexit使用
6: astore_2
7: monitorenter
8: bipush 12
10: istore_1
11: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
14: ldc #5 // String hello word
16: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
19: aload_2 //将第6步存放的值,压入栈顶,供给下一个指令使用
20: monitorexit
21: goto 29
24: astore_3
25: aload_2
26: monitorexit
27: aload_3
28: athrow
29: return
复制代码
结合字节码以及指令的介绍,monitor 对象就是 LockTest 对象。
自从 JDK1.6 后,JDK 团队对 synchronized 关键字锁对应的锁进行了优化;具体的细节,可以参考 synchronized底层实现原理及锁优化
PS. 一个概念 Mark Word 结构,说明一下;该结构存在对象头部。32 位架构的结构如下:
其中无锁的“对象的 HashCode”以及偏向锁的“线程 ID”都可以理解;而轻量级锁中的“指向栈中锁记录的指针”是什么意思呢?通过《深入理解 Java 虚拟机》第 2 版中第 13 章 线程安全与锁优化介绍,在当前线程的栈帧中建立一个名为锁记录的空间,用于存储对象目前的 Mark Work 的拷贝,然后通过 CAS 操作将对象的 Mark Word 更新为指向 Lock Record 的指针。
2.2 Lock
Lock 是 JDK 提供出来的一个接口,具体的锁的特性交给子类来实现。其暴露的接口如下:
lock 阻塞型的获取锁;获取该锁时,如果该锁已经被使用,那么就等待其他线程释放该锁。
tryLock 非阻塞型获取锁;获取该锁时,如果该锁已经被使用,那么即时的返回 false.
tryLock(time) 有一定的非阻塞型获取锁;获取该锁时,如果该锁已经被使用,那么就指定的时间内尝试去获取该锁。时间一过,依然没有获取锁则直接返回 false;
unlock 释放锁
newCondition 基于当前 Lock 对象创建 Condition 对象。负责释放当前线程 CPU 资源等待其他线程激活,当前线程激活 Condition 里面的等待线程;
基于 Condition 的使用,有如下例子。详细的原理会在 2.3.3 ConditionObject 里有讲解
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionTest {
private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
private Lock lock = new ReentrantLock();
private int maxSize = 5;
//生产者
private Condition providerCondition = lock.newCondition();
// 消费者
private Condition consumerCondition = lock.newCondition();
public void provide(String value) {
try {
lock.lock();
while (queue.size() == maxSize) {
//当队列满了以后,停止当前线程执行,并将该线程保存到providerCondition对象里面
System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+"-provide queue = " + queue);
providerCondition.await();
}
System.out.println("provide - value = " + value);
queue.add(value);
//激活消费者线程继续消费队列里面的消息,如果consumerCondition里没有阻塞线程,不用做任何处理
consumerCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String consume() {
String result = null;
try {
lock.lock();
while (queue.size() == 0) {
//当队列空了,停止当前线程,并将该线程保存到consumerCondition对象里面
System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+" -consume queue = " + queue);
consumerCondition.await();
}
result = queue.poll();
System.out.println("consume - result = " + result);
//激活生产者线程继续生产消息
providerCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return result;
}
public static void main(String[] args) {
LockConditionTest t = new LockConditionTest();
new Thread(() -> {
int i = 0;
while (true) {
t.provide(i++ + "");
}
}).start();
new Thread(() -> {
while (true) {
t.consume();
}
}).start();
}
}
复制代码
2.2.1 ReentrantLock
这个是可重入锁。具体的使用如下:
Lock lock = new ReentrantLock();
复制代码
里面的方法可以参考上一章节介绍。
需要说明的是 ReentrantLock 内部调用了 AbstractQueuedSynchronizer 类,也就是我们常说的 AQS,后面将重点解读该类。
2.2.2 **Lock 与**LockView
WriteLock 与 ReadLock 是 ReentrantReadWriteLock 类的内部类;
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
复制代码
具体的介绍,有 ReentrantReadWriteLock 详细解读
WriteLockView 与 ReadLockView 是 StampedLock 内部类:
StampedLock stampedLock = new StampedLock();
Lock readLock = stampedLock.asReadLock();
Lock writeLock = stampedLock.asWriteLock();
复制代码
具体的介绍,有 StampedLock 详细解读
2.3 AbstractQueuedSynchronizer
从类名可以看出来,它是个抽象类,同时也包含队列特性,还有更加关键的是提供原子操作功能;
2.3.1 属性
A. 队列 从下面的代码,可以看出它是链表结构;当线程去获取锁时,发现锁已经被使用,那么就会自动将当前线程设置休眠状态,同时封装成 Node 对象,插入到 AQS 队列中去。等待锁的释放再去竞争。
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
volatile int waitStatus; //thread的状态
volatile Node prev; // 上一个节点指针
volatile Node next; //下一个节点指针
volatile Thread thread; //线程
Node nextWaiter; //当使用lock.newCondition时将记录下一个阻塞线程。而没有使用的情况下,将记录是排他模式还是共享模式
//.......
}
复制代码
B. 原子操作 JDK 封装 Unsafe 类提供原子操作,其原理是通过 CAS 机制进行原子操作。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
//.....
static {
try {
stateOffset = unsafe.objectFieldOffsetAbstractQueuedSynchronizer.class.getDeclaredField("state"));
//.....
} catch (Exception ex) { throw new Error(ex); }
}
复制代码
C state 是共享变量。当并发情况下,通过 CAS 操作达到线程间”通信“。这里的“通信”就是加锁释放锁等信息。
2.3.2 方法
下面将对主要的方法进行解读
1. acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
tryAcquire 是没有实现的,具体交给各个子类去实现。其大体的逻辑是,如果能获取该“资源”,则返回 true, 否则返回 false。未能获取“资源”。则将当前阻塞,并插入到队尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
当当前队列没有阻塞线程节点时,将创建一个空节点保存到 head, 将入参的节点保存到尾部 tail.
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
插入到队尾后,代码如下。该逻辑是:判断刚插入的节点的前驱节点(上一个节点)是否是头部节点。如果是,则尝试再次获取资源。意味者:当当前队列没有阻塞线程时,当前线程会有“两次”尝试获取资源的动作。当尝试获取资源成功后,则将当前线程节点设置为头部节点。如果没有获取资源,则设置前驱节点的 waitStatus 为 Node.SIGNAL 状态。然而头部节点的状态是 0,则会再一次循环,再次尝试获取资源。所以,当当前队列没有阻塞线程时,当前线程会有三次尝试获取资源的动作。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取上一个节点的对象
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
为什么会将该节点设置为头部节点,而不是将该节点删除掉呢,毕竟该节点已经获取资源了?因为 Head 节点永远都是无用节点,也就是一个空节点;这里稍微提前说一下,释放资源 release 方法,该方法将 head 的后驱节点,也就是下一个节点的线程激活。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
复制代码
依然获取不到资源,则将当前节点的当前线程失效,放弃 CPU 资源。
private final boolean parkAndCheckInterrupt() {
//释放CPU资源,等待其他线程来激活
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
当被其他线程激活时,则会重复之前的步骤,来进行尝试获取资源。
2. acquireInterruptibly
该方法支持中断,当尝试获取资源时,直接中断,会丢出一个中断异常。如果不是,则尝试获取资源。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
复制代码
当获取不到资源,该逻辑跟 acquire 的逻辑差不多。只是稍微不同的是,当该线程被中断,则直接丢出异常,而不是再次尝试获取资源。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
3. tryAcquireNanos
这里跟 acquireInterruptibly 方法逻辑差不多,只是多出两个逻辑:
在指定的时间内,尝试获取资源,并且是无阻塞的;当超过该时间,则直接返回 false.
只有当指定的时间大于自旋时间,则先释放 CPU 资源固定时间,然后抢占 CPU 资源再次尝试获取资源。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//截至时间戳
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//当过期,则说明已经超时,直接返回
if (nanosTimeout <= 0L)
return false;
//spinForTimeoutThreshold = 1000纳秒
//当间隔时间大于自旋时间,则直接休眠该间隔时间
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
4. acquireShared
tryAcquireShared 方法交给子类来实现。只有当返回大于等于 0 时,则说明获取该资源成功
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码
将当前线程封装成一个节点,该节点的 nextWaiter 是共享模式,插入队尾里面。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//再次尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//说明获取到该资源,则会头节点以及设置“传递”机制
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//请参考acquire章节的介绍
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
里面跟 acquire 章节的介绍逻辑差不多。我们重点看一下 setHeadAndPropagate 方法。只有当有资源且资源是大于 0,或者是头节点的当前线程的状态是小于 0 时,才会获取 node 的下一个节点,去判断是否是共享模式的。如果是,则激活正在阻塞的共享模式下的线程。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//请参考acquire章节的介绍
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
当头部节点的状态为 SIGNAL 时,说明有阻塞线程存在,则可以直接激活该线程;该线程没有抢占到 CPU 资源时,说明 16 行的代码判断条件为真,则直接跳出去。如果该线程抢占到 CPU 资源了,但是依然获取不到资源,也不会更改 head 节点,条件也是为真,也会直接跳出去;如果获取到资源了,则更改 head 节点指针,条件为假,则再一次循环做判断;所以,该方法会激活所有正在阻塞的所有线程。
当状态为 0 时,则说明无阻塞线程存在,说明当前线程是的上一个前驱节点(上一个节点)就是头结点。当执行到 16 行时,条件为真,则直接跳出循环;
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//当头结点没有发生变化,说明这个时间段没有线程释放
if (h == head) // loop if head changed
break;
}
}
复制代码
5. release
public final boolean release(int arg) {
//释放该资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//表明有阻塞线程在队列中,则去激活
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
之所以要将该节点的 waitStatus 置为 0,是为了刚进来的线程有多一次的尝试获取资源机会;而不是直接让该线程直接释放其 CPU 资源;有利于提供性能;
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//将该节点的等待状态置为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//当下一个节点为空,说明没有阻塞线程;大于0说明,说该阻塞线程已经cancel了
//会从尾部往前遍历,最后一个阻塞线程的。
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
6. releaseShared
doReleaseShared 可以参考“acquireShared”里面介绍。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
2.3.3 ConditionObject
现在我们着重解读一下 condition,其实现类为 ConditionObject 的原理;
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;//第一个节点
private transient Node lastWaiter;//第二个节点
//...
}
复制代码
按照 2.2.1 ReentrantLock 的介绍,要先获取锁;根据上文的对该方法的介绍,会获取该资源;
final void lock() {
acquire(1);
}
复制代码
获取成功后,条件满足会调用 condition.await()方法,进行阻塞。
2.3.3.1 condition.await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装成Node,保存到队列里面
Node node = addConditionWaiter();
//释放所有资源
int savedState = fullyRelease(node);
int interruptMode = 0;
//是否是同步队列,这个地方很关键
while (!isOnSyncQueue(node)) {
//释放当前线程CPU资源,阻塞状态
LockSupport.park(this);
//检测是不是被中断了
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued在上面有讲解过,这里就不用在一一说明了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//清理掉不是Condition的所有节点。
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final boolean isOnSyncQueue(Node node) {
//当condition.signal()时,会将该状态置为0;详细的讲解稍微给出
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
复制代码
2.3.3.2 condition.signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//激活first的线程
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//这里将node的状态置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node插入到队尾里
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
复制代码
2.3.4 汇总
通过以上的几个关键方法的介绍,我们得知:
2.3.4.1 Node. waitStatus
static final int CANCELLED = 1; //表示当前的线程被取消
static final int SIGNAL = -1;//表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
static final int PROPAGATE = -3;//表示当前场景下后续的acquireShared能够得以执行。
//值为0,表示当前节点在sync queue中,等待着获取锁
复制代码
2.3.4.2 Node.nextWaiter
//当没有使用lock.newCondition情况下,该属性的值只能为如下两中状态
static final Node SHARED = new Node();//共享模式
static final Node EXCLUSIVE = null;//排他模式
//当在使用lock.newCondition的情况,阻塞的节点都会存放该属性上,以此为链路
复制代码
2.4 AbstractQueuedSynchronizer 实现类
上文有说过,其有几个方法没有实现,交给子类来实现;
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
复制代码
为什么没有实现该方法呢,由于 AbstractQueuedSynchronizer 里有一个 state 属性,我们一直没有注重解释过。上文我们一直说获取该“资源”,那“资源”代表是什么,就是对 state 进行处理校验,判断符合指定条件下,才能获取资源成功。
private volatile int state;
复制代码
2.4.1 Semaphore
这是是信号锁,其内部有一个属性 sync,该对象实现了 AbstractQueuedSynchronizer 类。代码有点多,索性就不粘贴了,直接阐述就差不多了。
初始化,会将 state 设置指定的值。例如,state = 3,当第一次获取锁时,发现 state >= 0。 将 state 修改为 2. 并直接返回 2. 上游方法会判断是否大于等于 0.如果条件为真,则说明获取锁;否则获取锁失败。
所以,每次获取锁,state 都会-1.每次释放锁,state 都会+1。换句话说,state 在代表是同时允许 state 个线程去执行;使用这个类,可以有效的控制线程数量同时执行;
2.4.2 ReentrantReadWriteLock
读写锁,意味者:允许可以多个线程并发读操作,当有一个线程在写操作时,其他线程都会被阻塞。
这里的 state 分为两部分,高 16 位代表多少个线程在做读操作,而低 16 位代表当前写操作的线程做了多少 lock 操作。
例如,初始化读写锁后.
当一个读操作的线程 A,获取锁成功,state=65536,16 进制为 1,0000。
接着再有一个读操作的线程 B,获取锁成功,state=131072,其 16 进制为 2,0000。
接着有一个写操作的线程 C,state=131072,其 16 进制为 2,0000,并将 C 线程进行阻塞,插入到队列中去。
接着再有一个读操作线程 D,state=131072,阻塞 D 线程,也插入到队列中去。
A 线程释放锁,state=65536。
B 线程释放锁,state=0,激活 C 线程。
C 线程获取锁成功,state=1,
C 线程释放锁,state=0,激活 D 线程
C 线程获取锁成功,state=65536
2.4.3 CountDownLatch
这是一个或多个线程等待其他线程完成后,才会执行它自身的操作。举个例子:E、F 线程等待 A、B、C、D 线程完成后,E、F 才会被激活去完成自己的操作;如下例子:
CountDownLatch countDownLatch = new CountDownLatch(4);
Thread A = new Thread(()->{
countDownLatch.countDown();
System.out.println("A thread run.... ");
});
Thread B = new Thread(()->{
countDownLatch.countDown();
System.out.println("B thread run.... ");
});
Thread C = new Thread(()->{
countDownLatch.countDown();
System.out.println("C thread run.... ");
});
Thread D = new Thread(()->{
countDownLatch.countDown();
System.out.println("D thread run.... ");
});
Thread E = new Thread(()->{
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("E thread run.... ");
});
Thread F = new Thread(()->{
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("F thread run.... ");
});
A.start();
B.start();
C.start();
D.start();
E.start();
F.start();
复制代码
创建 CountDownLatch 对象,会初始化 state 的值,该值有构成函数传入;当调用 await 方法时,会判断 state 是否等于 0,如果条件为真,则说明无需等待其他线程完成,因为 state=0 代表其他线程已经完成了。
如果条件为假,则将当前线程插入到队列中去;等待其他线程来激活;调用 countDown 方法,会将 state-1,当 state=0 时,会去激活其他阻塞线程。
注意,CountDownLatch 只能用一次,不能复用;
2.4.4 ThreadPoolExecutor
在线程池中有一个内部类 Worker 实现了。这里逻辑比较简单,就不多阐述了。里面有一行代码,需要重新留意的,TheadPoolExecutor 中的Worker为什么要加锁?
其实代码也说明了,获取锁是为了避免线程中暴露的方法去暴力中断该线程。
2.4.5 公平锁与非公平锁
在 AbstractQueuedSynchronized 的实现字类中,都会有公平锁和非公平锁,之所以单独抽离来讲解,而不是在各个子类进行阐述,是因为他们的业务逻辑都是一样的。
公平锁,当队列有阻塞的线程时,先有新的线程 A 尝试获取锁,则就会将线程 A 直接插入队尾中去。
非公平锁,当队列有阻塞的线程时,先有新的线程 A 尝试获取锁,则会尝试获取锁,如果获取锁,则直接执行线程 A 后续的操作,否则将线程 A 插入到队尾中,等待其他线程被释放。
2.5 CyclicBarrier
这个类跟 CountDownLatch 类有点类似;允许多个线程之间相互等待,当大家都达到那个“点"后,多个线程才被激活,继续执行后续的操作。
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
Thread A = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("A thread run.... ");
});
Thread B = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("B thread run.... ");
});
Thread C = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("C thread run.... ");
});
Thread D = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("D thread run.... ");
});
Thread E = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("E thread run.... ");
});
Thread F = new Thread(()->{
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("F thread run.... ");
});
A.start();
B.start();
C.start();
D.start();
E.start();
F.start();
复制代码
创建 CyclicBarrier 对象,会初始化 state 的值,该值有构成函数传入。当调用 await 方法时,会将 state-1,接着判断 state 是否等于 0,如果条件为真,则说明其他线程都已经达到了这个”点“,激活其他线程线程,并且重置 state 为原始值(可以复用)。如果条件为假,则阻塞当前线程,将当前线程插入到队列中去。
2.5.1 CyclicBarrier 与 CountDownLatch 的区别
2.6 StampedLock
这个里面的逻辑稍微复杂,稍后补上......
三. 并发设计模式
我们在平常写代码,或多或少会涉及设计模式,装饰模式,组合模式,工厂模式,策略模式,观察者模式等;那么在编写并发代码,也会有对应的设计模式可以借鉴,从而是我们的代码更加美观,可拓展。
具体的内容,可以仔细阅读《Java 多线程编程》设计模式篇
详细的就不多说了
四. 锁的底层原理
在操作层面,锁是如何实现的;其主要是依赖于硬件层面上的逻辑处理。在其他文章也有涉及到。
《 Intel® 64 and IA-32 Architectures Software Developer’s Manual, Volume 3A》第 8 章,可以详细的去阅读该章节的知识点。然而我没花太多时间,只看到关键的介绍以及参考其他文章进行总结。
The Intel 64 and IA-32 architectures provide mechanisms for managing and improving the performance of multiple processors connected to the same system bus. These include:
• Bus locking(总线锁) and/or cache coherency(缓存一致性) management for performing atomic operations on system memory.
• Serializing instructions.
.......
4.1 总线锁
重量级锁。当某个指令加上了 # lock,意味该指令访问的地址被加上一个锁,阻塞其他 CPU 访问该地址。
4.2 缓存锁
轻量级锁。采用缓存一致性(MES)协议,保证缓存一致性问题。及保证了各个 CPU 访问的缓存都是一致性的。
在第 2 章节,我们所介绍的各种锁基本都是采用该机制。synchronized 关键比较特殊,刚开始是采用缓存锁来处理,当一直获取不到锁时,会升级到重量级锁,也就是总线索。
有关总线索以及缓存锁更详细的介绍,可以参考 聊聊并发(五)——原子操作的实现原理 前半部分。
五. 参考资源
synchronized底层实现原理及锁优化
Java 并发编程的艺术. 方腾飞 魏鹏 程晓明 著
深入理解 Java 虚拟机
Java 虚拟机规范
聊聊并发(五)——原子操作的实现原理
评论