一. 概述
在 SMP 体系结构下,往往出现执行并发执行等情况的出现;在编程时,考虑并发性问题;基于并发性问题,java 提供了锁机制来限制对竞争资源的操作。当抢到锁时,才能访问该资源。那么在 java 体系中,都有哪些锁呢,且是如何体现的呢?
二. java 锁以及原理
在 java 开发中,对某项资源或者某一项代码块进行加锁的形式有两种。一种是通过 synchronize 关键字进行加锁;另外一种是声明 Lock 对象进行加锁。
2.1 synchronize
2.1.1 使用场景
该关键字可以修饰类方法,锁的对象就是对应的类。如下代码:该锁的对象就是 Test 类
public class Test{ public static synchronized void compute(int i){ //..... } }
复制代码
也可以修饰类实例方法,锁的对象就是对应的类实例。如下代码:该锁的对象就是 Test 对象
public class Test{ public synchronized void compute(int i){ //..... } }
复制代码
修饰类实例方法,粒度有点大,还可以缩小范围,修饰代码块。该锁的对象可以指定。
public class Test{ public void compute(int i){ synchronized (this){ //..... } } }
复制代码
2.1.2 原理
通过 synchronized底层实现原理及锁优化 可以得知:修饰方法时,是在方法修饰符上打上 ACC_SYNCHRONIZED 标志,表明是原子操作;而修饰代码块,在对应代码开始前加上 monitorenter 和代码执行完后加上 monitorexit 指令。
指令的介绍如下:
为了理解虚拟机规范说讲述的,我将以例子的形式进行说明:
public class LockTest { public void com6() { int i = 9; synchronized (LockTest.class) { i = 12; System.out.println("hello word"); } }}
复制代码
查看该字节码:
Constant pool: ...... #7 = Class #33 // LockTest ...... #33 = Utf8 LockTest public void com6(); descriptor: ()V flags: ACC_PUBLIC Code: stack=2, locals=4, args_size=1 0: bipush 9 2: istore_1 3: ldc #7 // class LockTest 从常量池中推送至栈顶,注意这个是指针 5: dup //复制栈顶的数值并压入栈顶,之所以复制,是为将该指针存放到本地变量内,给monitorexit使用 6: astore_2 7: monitorenter 8: bipush 12 10: istore_1 11: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 14: ldc #5 // String hello word 16: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 19: aload_2 //将第6步存放的值,压入栈顶,供给下一个指令使用 20: monitorexit 21: goto 29 24: astore_3 25: aload_2 26: monitorexit 27: aload_3 28: athrow 29: return
复制代码
结合字节码以及指令的介绍,monitor 对象就是 LockTest 对象。
自从 JDK1.6 后,JDK 团队对 synchronized 关键字锁对应的锁进行了优化;具体的细节,可以参考 synchronized底层实现原理及锁优化
PS. 一个概念 Mark Word 结构,说明一下;该结构存在对象头部。32 位架构的结构如下:
其中无锁的“对象的 HashCode”以及偏向锁的“线程 ID”都可以理解;而轻量级锁中的“指向栈中锁记录的指针”是什么意思呢?通过《深入理解 Java 虚拟机》第 2 版中第 13 章 线程安全与锁优化介绍,在当前线程的栈帧中建立一个名为锁记录的空间,用于存储对象目前的 Mark Work 的拷贝,然后通过 CAS 操作将对象的 Mark Word 更新为指向 Lock Record 的指针。
2.2 Lock
Lock 是 JDK 提供出来的一个接口,具体的锁的特性交给子类来实现。其暴露的接口如下:
lock 阻塞型的获取锁;获取该锁时,如果该锁已经被使用,那么就等待其他线程释放该锁。
tryLock 非阻塞型获取锁;获取该锁时,如果该锁已经被使用,那么即时的返回 false.
tryLock(time) 有一定的非阻塞型获取锁;获取该锁时,如果该锁已经被使用,那么就指定的时间内尝试去获取该锁。时间一过,依然没有获取锁则直接返回 false;
unlock 释放锁
newCondition 基于当前 Lock 对象创建 Condition 对象。负责释放当前线程 CPU 资源等待其他线程激活,当前线程激活 Condition 里面的等待线程;
基于 Condition 的使用,有如下例子。详细的原理会在 2.3.3 ConditionObject 里有讲解
import java.util.LinkedList;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;
public class LockConditionTest { private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
private Lock lock = new ReentrantLock();
private int maxSize = 5; //生产者 private Condition providerCondition = lock.newCondition(); // 消费者 private Condition consumerCondition = lock.newCondition();
public void provide(String value) { try { lock.lock(); while (queue.size() == maxSize) { //当队列满了以后,停止当前线程执行,并将该线程保存到providerCondition对象里面 System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+"-provide queue = " + queue); providerCondition.await(); } System.out.println("provide - value = " + value); queue.add(value); //激活消费者线程继续消费队列里面的消息,如果consumerCondition里没有阻塞线程,不用做任何处理 consumerCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public String consume() { String result = null; try { lock.lock(); while (queue.size() == 0) { //当队列空了,停止当前线程,并将该线程保存到consumerCondition对象里面 System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+" -consume queue = " + queue); consumerCondition.await(); } result = queue.poll(); System.out.println("consume - result = " + result); //激活生产者线程继续生产消息 providerCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return result; }
public static void main(String[] args) { LockConditionTest t = new LockConditionTest(); new Thread(() -> { int i = 0; while (true) { t.provide(i++ + ""); } }).start(); new Thread(() -> { while (true) { t.consume(); } }).start(); }}
复制代码
2.2.1 ReentrantLock
这个是可重入锁。具体的使用如下:
Lock lock = new ReentrantLock();
复制代码
里面的方法可以参考上一章节介绍。
需要说明的是 ReentrantLock 内部调用了 AbstractQueuedSynchronizer 类,也就是我们常说的 AQS,后面将重点解读该类。
2.2.2 **Lock 与**LockView
WriteLock 与 ReadLock 是 ReentrantReadWriteLock 类的内部类;
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
复制代码
具体的介绍,有 ReentrantReadWriteLock 详细解读
WriteLockView 与 ReadLockView 是 StampedLock 内部类:
StampedLock stampedLock = new StampedLock();Lock readLock = stampedLock.asReadLock();Lock writeLock = stampedLock.asWriteLock();
复制代码
具体的介绍,有 StampedLock 详细解读
2.3 AbstractQueuedSynchronizer
从类名可以看出来,它是个抽象类,同时也包含队列特性,还有更加关键的是提供原子操作功能;
2.3.1 属性
A. 队列 从下面的代码,可以看出它是链表结构;当线程去获取锁时,发现锁已经被使用,那么就会自动将当前线程设置休眠状态,同时封装成 Node 对象,插入到 AQS 队列中去。等待锁的释放再去竞争。
private transient volatile Node head;private transient volatile Node tail;static final class Node { volatile int waitStatus; //thread的状态 volatile Node prev; // 上一个节点指针 volatile Node next; //下一个节点指针 volatile Thread thread; //线程 Node nextWaiter; //当使用lock.newCondition时将记录下一个阻塞线程。而没有使用的情况下,将记录是排他模式还是共享模式 //.......}
复制代码
B. 原子操作 JDK 封装 Unsafe 类提供原子操作,其原理是通过 CAS 机制进行原子操作。
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;//.....static { try { stateOffset = unsafe.objectFieldOffsetAbstractQueuedSynchronizer.class.getDeclaredField("state")); //..... } catch (Exception ex) { throw new Error(ex); }}
复制代码
C state 是共享变量。当并发情况下,通过 CAS 操作达到线程间”通信“。这里的“通信”就是加锁释放锁等信息。
2.3.2 方法
下面将对主要的方法进行解读
1. acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
复制代码
tryAcquire 是没有实现的,具体交给各个子类去实现。其大体的逻辑是,如果能获取该“资源”,则返回 true, 否则返回 false。未能获取“资源”。则将当前阻塞,并插入到队尾。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node;}
复制代码
当当前队列没有阻塞线程节点时,将创建一个空节点保存到 head, 将入参的节点保存到尾部 tail.
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
复制代码
插入到队尾后,代码如下。该逻辑是:判断刚插入的节点的前驱节点(上一个节点)是否是头部节点。如果是,则尝试再次获取资源。意味者:当当前队列没有阻塞线程时,当前线程会有“两次”尝试获取资源的动作。当尝试获取资源成功后,则将当前线程节点设置为头部节点。如果没有获取资源,则设置前驱节点的 waitStatus 为 Node.SIGNAL 状态。然而头部节点的状态是 0,则会再一次循环,再次尝试获取资源。所以,当当前队列没有阻塞线程时,当前线程会有三次尝试获取资源的动作。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取上一个节点的对象 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
复制代码
为什么会将该节点设置为头部节点,而不是将该节点删除掉呢,毕竟该节点已经获取资源了?因为 Head 节点永远都是无用节点,也就是一个空节点;这里稍微提前说一下,释放资源 release 方法,该方法将 head 的后驱节点,也就是下一个节点的线程激活。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null;}
复制代码
依然获取不到资源,则将当前节点的当前线程失效,放弃 CPU 资源。
private final boolean parkAndCheckInterrupt() { //释放CPU资源,等待其他线程来激活 LockSupport.park(this); return Thread.interrupted();}
复制代码
当被其他线程激活时,则会重复之前的步骤,来进行尝试获取资源。
2. acquireInterruptibly
该方法支持中断,当尝试获取资源时,直接中断,会丢出一个中断异常。如果不是,则尝试获取资源。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg);}
复制代码
当获取不到资源,该逻辑跟 acquire 的逻辑差不多。只是稍微不同的是,当该线程被中断,则直接丢出异常,而不是再次尝试获取资源。
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
复制代码
3. tryAcquireNanos
这里跟 acquireInterruptibly 方法逻辑差不多,只是多出两个逻辑:
在指定的时间内,尝试获取资源,并且是无阻塞的;当超过该时间,则直接返回 false.
只有当指定的时间大于自旋时间,则先释放 CPU 资源固定时间,然后抢占 CPU 资源再次尝试获取资源。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; //截至时间戳 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); //当过期,则说明已经超时,直接返回 if (nanosTimeout <= 0L) return false; //spinForTimeoutThreshold = 1000纳秒 //当间隔时间大于自旋时间,则直接休眠该间隔时间 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
复制代码
4. acquireShared
tryAcquireShared 方法交给子类来实现。只有当返回大于等于 0 时,则说明获取该资源成功
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}
复制代码
将当前线程封装成一个节点,该节点的 nextWaiter 是共享模式,插入队尾里面。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //再次尝试获取资源 int r = tryAcquireShared(arg); if (r >= 0) { //说明获取到该资源,则会头节点以及设置“传递”机制 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //请参考acquire章节的介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
复制代码
里面跟 acquire 章节的介绍逻辑差不多。我们重点看一下 setHeadAndPropagate 方法。只有当有资源且资源是大于 0,或者是头节点的当前线程的状态是小于 0 时,才会获取 node 的下一个节点,去判断是否是共享模式的。如果是,则激活正在阻塞的共享模式下的线程。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //请参考acquire章节的介绍 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
复制代码
当头部节点的状态为 SIGNAL 时,说明有阻塞线程存在,则可以直接激活该线程;该线程没有抢占到 CPU 资源时,说明 16 行的代码判断条件为真,则直接跳出去。如果该线程抢占到 CPU 资源了,但是依然获取不到资源,也不会更改 head 节点,条件也是为真,也会直接跳出去;如果获取到资源了,则更改 head 节点指针,条件为假,则再一次循环做判断;所以,该方法会激活所有正在阻塞的所有线程。
当状态为 0 时,则说明无阻塞线程存在,说明当前线程是的上一个前驱节点(上一个节点)就是头结点。当执行到 16 行时,条件为真,则直接跳出循环;
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //当头结点没有发生变化,说明这个时间段没有线程释放 if (h == head) // loop if head changed break; }}
复制代码
5. release
public final boolean release(int arg) { //释放该资源 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //表明有阻塞线程在队列中,则去激活 unparkSuccessor(h); return true; } return false;}
复制代码
之所以要将该节点的 waitStatus 置为 0,是为了刚进来的线程有多一次的尝试获取资源机会;而不是直接让该线程直接释放其 CPU 资源;有利于提供性能;
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //将该节点的等待状态置为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { //当下一个节点为空,说明没有阻塞线程;大于0说明,说该阻塞线程已经cancel了 //会从尾部往前遍历,最后一个阻塞线程的。 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);}
复制代码
6. releaseShared
doReleaseShared 可以参考“acquireShared”里面介绍。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
复制代码
2.3.3 ConditionObject
现在我们着重解读一下 condition,其实现类为 ConditionObject 的原理;
public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter;//第一个节点 private transient Node lastWaiter;//第二个节点 //... }
复制代码
按照 2.2.1 ReentrantLock 的介绍,要先获取锁;根据上文的对该方法的介绍,会获取该资源;
final void lock() { acquire(1);}
复制代码
获取成功后,条件满足会调用 condition.await()方法,进行阻塞。
2.3.3.1 condition.await
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //将当前线程封装成Node,保存到队列里面 Node node = addConditionWaiter(); //释放所有资源 int savedState = fullyRelease(node); int interruptMode = 0; //是否是同步队列,这个地方很关键 while (!isOnSyncQueue(node)) { //释放当前线程CPU资源,阻塞状态 LockSupport.park(this); //检测是不是被中断了 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //acquireQueued在上面有讲解过,这里就不用在一一说明了 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
复制代码
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { //清理掉不是Condition的所有节点。 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node;}final boolean isOnSyncQueue(Node node) { //当condition.signal()时,会将该状态置为0;详细的讲解稍微给出 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node);}private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; }}
复制代码
2.3.3.2 condition.signal
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //激活first的线程 doSignal(first);}private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null);}final boolean transferForSignal(Node node) { //这里将node的状态置为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将node插入到队尾里 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}
复制代码
2.3.4 汇总
通过以上的几个关键方法的介绍,我们得知:
2.3.4.1 Node. waitStatus
static final int CANCELLED = 1; //表示当前的线程被取消static final int SIGNAL = -1;//表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作static final int PROPAGATE = -3;//表示当前场景下后续的acquireShared能够得以执行。//值为0,表示当前节点在sync queue中,等待着获取锁
复制代码
2.3.4.2 Node.nextWaiter
//当没有使用lock.newCondition情况下,该属性的值只能为如下两中状态static final Node SHARED = new Node();//共享模式static final Node EXCLUSIVE = null;//排他模式//当在使用lock.newCondition的情况,阻塞的节点都会存放该属性上,以此为链路
复制代码
2.4 AbstractQueuedSynchronizer 实现类
上文有说过,其有几个方法没有实现,交给子类来实现;
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}
复制代码
为什么没有实现该方法呢,由于 AbstractQueuedSynchronizer 里有一个 state 属性,我们一直没有注重解释过。上文我们一直说获取该“资源”,那“资源”代表是什么,就是对 state 进行处理校验,判断符合指定条件下,才能获取资源成功。
private volatile int state;
复制代码
2.4.1 Semaphore
这是是信号锁,其内部有一个属性 sync,该对象实现了 AbstractQueuedSynchronizer 类。代码有点多,索性就不粘贴了,直接阐述就差不多了。
初始化,会将 state 设置指定的值。例如,state = 3,当第一次获取锁时,发现 state >= 0。 将 state 修改为 2. 并直接返回 2. 上游方法会判断是否大于等于 0.如果条件为真,则说明获取锁;否则获取锁失败。
所以,每次获取锁,state 都会-1.每次释放锁,state 都会+1。换句话说,state 在代表是同时允许 state 个线程去执行;使用这个类,可以有效的控制线程数量同时执行;
2.4.2 ReentrantReadWriteLock
读写锁,意味者:允许可以多个线程并发读操作,当有一个线程在写操作时,其他线程都会被阻塞。
这里的 state 分为两部分,高 16 位代表多少个线程在做读操作,而低 16 位代表当前写操作的线程做了多少 lock 操作。
例如,初始化读写锁后.
当一个读操作的线程 A,获取锁成功,state=65536,16 进制为 1,0000。
接着再有一个读操作的线程 B,获取锁成功,state=131072,其 16 进制为 2,0000。
接着有一个写操作的线程 C,state=131072,其 16 进制为 2,0000,并将 C 线程进行阻塞,插入到队列中去。
接着再有一个读操作线程 D,state=131072,阻塞 D 线程,也插入到队列中去。
A 线程释放锁,state=65536。
B 线程释放锁,state=0,激活 C 线程。
C 线程获取锁成功,state=1,
C 线程释放锁,state=0,激活 D 线程
C 线程获取锁成功,state=65536
2.4.3 CountDownLatch
这是一个或多个线程等待其他线程完成后,才会执行它自身的操作。举个例子:E、F 线程等待 A、B、C、D 线程完成后,E、F 才会被激活去完成自己的操作;如下例子:
CountDownLatch countDownLatch = new CountDownLatch(4);Thread A = new Thread(()->{ countDownLatch.countDown(); System.out.println("A thread run.... ");});Thread B = new Thread(()->{ countDownLatch.countDown(); System.out.println("B thread run.... ");});Thread C = new Thread(()->{ countDownLatch.countDown(); System.out.println("C thread run.... ");});Thread D = new Thread(()->{ countDownLatch.countDown(); System.out.println("D thread run.... ");});Thread E = new Thread(()->{ try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("E thread run.... ");});Thread F = new Thread(()->{ try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("F thread run.... ");});A.start();B.start();C.start();D.start();E.start();F.start();
复制代码
创建 CountDownLatch 对象,会初始化 state 的值,该值有构成函数传入;当调用 await 方法时,会判断 state 是否等于 0,如果条件为真,则说明无需等待其他线程完成,因为 state=0 代表其他线程已经完成了。
如果条件为假,则将当前线程插入到队列中去;等待其他线程来激活;调用 countDown 方法,会将 state-1,当 state=0 时,会去激活其他阻塞线程。
注意,CountDownLatch 只能用一次,不能复用;
2.4.4 ThreadPoolExecutor
在线程池中有一个内部类 Worker 实现了。这里逻辑比较简单,就不多阐述了。里面有一行代码,需要重新留意的,TheadPoolExecutor 中的Worker为什么要加锁?
其实代码也说明了,获取锁是为了避免线程中暴露的方法去暴力中断该线程。
2.4.5 公平锁与非公平锁
在 AbstractQueuedSynchronized 的实现字类中,都会有公平锁和非公平锁,之所以单独抽离来讲解,而不是在各个子类进行阐述,是因为他们的业务逻辑都是一样的。
公平锁,当队列有阻塞的线程时,先有新的线程 A 尝试获取锁,则就会将线程 A 直接插入队尾中去。
非公平锁,当队列有阻塞的线程时,先有新的线程 A 尝试获取锁,则会尝试获取锁,如果获取锁,则直接执行线程 A 后续的操作,否则将线程 A 插入到队尾中,等待其他线程被释放。
2.5 CyclicBarrier
这个类跟 CountDownLatch 类有点类似;允许多个线程之间相互等待,当大家都达到那个“点"后,多个线程才被激活,继续执行后续的操作。
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);Thread A = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("A thread run.... ");});Thread B = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("B thread run.... ");});Thread C = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("C thread run.... ");});Thread D = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("D thread run.... ");});Thread E = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("E thread run.... ");});Thread F = new Thread(()->{ try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("F thread run.... "); });A.start();B.start();C.start();D.start();E.start();F.start();
复制代码
创建 CyclicBarrier 对象,会初始化 state 的值,该值有构成函数传入。当调用 await 方法时,会将 state-1,接着判断 state 是否等于 0,如果条件为真,则说明其他线程都已经达到了这个”点“,激活其他线程线程,并且重置 state 为原始值(可以复用)。如果条件为假,则阻塞当前线程,将当前线程插入到队列中去。
2.5.1 CyclicBarrier 与 CountDownLatch 的区别
2.6 StampedLock
这个里面的逻辑稍微复杂,稍后补上......
三. 并发设计模式
我们在平常写代码,或多或少会涉及设计模式,装饰模式,组合模式,工厂模式,策略模式,观察者模式等;那么在编写并发代码,也会有对应的设计模式可以借鉴,从而是我们的代码更加美观,可拓展。
具体的内容,可以仔细阅读《Java 多线程编程》设计模式篇
详细的就不多说了
四. 锁的底层原理
在操作层面,锁是如何实现的;其主要是依赖于硬件层面上的逻辑处理。在其他文章也有涉及到。
《 Intel® 64 and IA-32 Architectures Software Developer’s Manual, Volume 3A》第 8 章,可以详细的去阅读该章节的知识点。然而我没花太多时间,只看到关键的介绍以及参考其他文章进行总结。
The Intel 64 and IA-32 architectures provide mechanisms for managing and improving the performance of multiple processors connected to the same system bus. These include:
• Bus locking(总线锁) and/or cache coherency(缓存一致性) management for performing atomic operations on system memory.
• Serializing instructions.
.......
4.1 总线锁
重量级锁。当某个指令加上了 # lock,意味该指令访问的地址被加上一个锁,阻塞其他 CPU 访问该地址。
4.2 缓存锁
轻量级锁。采用缓存一致性(MES)协议,保证缓存一致性问题。及保证了各个 CPU 访问的缓存都是一致性的。
在第 2 章节,我们所介绍的各种锁基本都是采用该机制。synchronized 关键比较特殊,刚开始是采用缓存锁来处理,当一直获取不到锁时,会升级到重量级锁,也就是总线索。
有关总线索以及缓存锁更详细的介绍,可以参考 聊聊并发(五)——原子操作的实现原理 前半部分。
五. 参考资源
synchronized底层实现原理及锁优化
Java 并发编程的艺术. 方腾飞 魏鹏 程晓明 著
深入理解 Java 虚拟机
Java 虚拟机规范
聊聊并发(五)——原子操作的实现原理
评论