写点什么

JUC 浅析(一)

作者:andy
  • 2022-10-28
    北京
  • 本文字数:7753 字

    阅读完需:约 25 分钟

1-JUC 介绍


支持多线程操作的各种语言,尤其是 Java,一定会使用到多线程。在访问资源时,多线程非常重要,对资源的安全起到很好的作用。要想实现多线程处理操作,需使用 Object 类中的 wait()、notify()、notifyAll()方法以及关键字 synchronized,这在生产者与消费者设计模式中尤为常见。但是,使用以上方法,处理不好,很容易产生死锁。同时,多线程在操作各种数据类型时,为了保证数据的完整性,要大量使用关键字 volatile 进行原子性操作

多线程模型经历阶段:

  • 传统方式以 Thread、Runnable 为主,但是无法返回数据,并且操作复杂;

  • JDK1.5 之后提供的 Future 和 Callable 模型,可以实现数据异步处理的结果返回;

  • JDK1.5 之后提供 java.util.concurrent 开发框架。

传统线程编程模型之中为防止死锁等现象的出现(wait()、notify()、notifyAll()、synchronized)时,需要考虑性能(性能也就是处理速度)、公平性(锁资源后,其他线程无法访问)、资源管理(尤其是数据库连接池,进行连接的分配控制)等问题,这样加重了程序开发者的负担。

谈到并发访问处理的实际场景,如同玖富技术中心针对某一节日给予广大员工抽奖活动,但是人事没有处理好公平性、资源管理的问题,造成现场非常混乱。

JDK1.5 之后提供的 JUC 实现的多线程编程将有效的减少竞争条件(race conditions)和死锁线程。

JUC(java.util.concurrent)是 Java5.0 开始提供的一组专门实现多线程并发处理的开发架构,利用 JUC 开发架构可以有效的解决实际线程项目开发之中出现的死锁、阻塞、资源访问与公平机制。JUC 提供的锁的这些方法就是对于原始的线程模型的实现机制不断完善,这些类的好处就是可以轻松实现线程同步锁,避免死锁带来的问题。

当下时代强调的高并发机制讲的就是线程的处理和 JVM 合理调优


2-volatile 关键字


volatile 变量

volatile 直接进行数据写入具有可见性不负责数据同步不具有原子性,没有副本数据。

volatile 变量可以被看作是一种 “程度较轻的 synchronized”,具有 synchronized 的可见性特性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。

volatile 变量可用于提供线程安全,但是只能应用于非常有限的一组用例:多个变量之间或者某个变量的当前值与修改后值之间没有约束。因此,单独使用 volatile 还不足以实现计数器、互斥锁或任何具有与多个变量相关的不变式(Invariants)的类(例如 “start <=end”)。

出于简易性或可伸缩性的考虑,您可能倾向于使用 volatile 变量而不是锁。当使用 volatile 变量而非锁时,某些习惯用法(idiom)更加易于编码和阅读。此外,volatile 变量不会像锁那样造成线程阻塞,因此也很少造成可伸缩性问题。在某些情况下,如果读操作远远大于写操作,volatile 变量还可以提供优于锁的性能优势。

正确使用 volatile 变量的条件

您只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:

对变量的写操作不依赖于当前值;

该变量没有包含在具有其他变量的不变式中。

第一个条件的限制使 volatile 变量不能用作线程安全计数器。虽然增量操作(x++)看上去类似一个单独操作,实际上它是一个由读取-修改-写入操作序列组成的组合操作,必须以原子方式执行,而 volatile 不能提供必须的原子特性。实现正确的操作需要使 x 的值在操作期间保持不变,而 volatile 变量无法实现这点。(然而,如果将值调整为只从单个线程写入,那么可以忽略第一个条件。)

综上所述,被写入 volatile 变量的这些有效值独立于任何程序,包括变量的当前状态。

状态标志

volatile 变量的规范使用仅仅是使用一个布尔状态标志,用于指示发生了一个重要的一次性事件,例如完成初始化或请求停机。

很多应用程序包含了一种控制结构,形式为 “在还没有准备好停止程序时再执行一些工作”。


volatile boolean shutdownRequested; public void shutdown() { shutdownRequested = true; } public void doWork() {     while (!shutdownRequested) {         // do stuff    }}
复制代码


3-原子操作类


原子操作类

对于并发访问,需要考虑操作系统位数的问题。对于 long 和 double 数据类型而言,是 64 位存储,如果项目运行在 32 位系统之上,则 long 和 double 就会占据两个 32 位空间进行数据的保存。原子操作的关键在于 volatile。



volatile 直接进行数据写入具有可见性,没有副本数据。

volatile 不负责数据同步的问题,而是直接进行数据写入。正如 volatile 的特性,其具有可见性,但并不具备原子性。为了解决原子性问题。因此,在 JUC 中提供了一个 java.util.concurrent.atomic 子包,这个子包保存的都是原子性的操作数据,同时,这个包里面所包含的属性都是用 volatile 声明的。包中的原子性操作实质上通过 sun.misc.Unsafe 类实现。


原子操作类分类

基本类型

AtomicInteger、AtomicLong、AtomicBoolean

数组类型

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

引用类型

AtomicReference、AtomicStampedReference、AtomicMarkableReference

对象属性修改类型

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

对于原子操作类,需要时刻保持对数据在多线程访问下的并发访问安全性,故在运算功能上,并不是十分充足。


AtomicLong 部分结构如下


public class AtomicLong extends Number implements java.io.Serializable {    private volatile long value;    /**     * Creates a new AtomicLong with the given initial value     * @param initialValue the initial value     */    public AtomicLong(long initialValue) {        value = initialValue;    }}
复制代码


对于原子操作类,有一个非常重要的 CAS 方法。

CAS 方法(AtomicLong 为例):public final boolean compareAndSet(long expect,long update)

对于该方法,也就是进行比较,如果与原始值相同,则进行设置替换。故一定要设置有原始值,为了线程安全考虑。

除了对基本数据类型进行原子操作,也可以对数组和引用数据类型进行原子操作。

使用以下类进行引用数据类型的原子操作,但其中比较的方法只能是==,也即地址判断。这也是符合原子操作原理的。

java.util.concurrent.atomic.AtomicReference

部分代码示例:


// AtomicReference	AtomicModel am1 = new AtomicModel(100, "demo");	AtomicModel am2 = new AtomicModel(200, "change");	AtomicReference<AtomicModel> ar = new AtomicReference<AtomicModel>(am1);	System.out.println("init reference --> " + ar);	ar.compareAndSet(am2, am2);	System.out.println("first change reference --> " + ar.get());	ar.compareAndSet(am1, am2);	System.out.println("second change reference --> " + ar.get());
复制代码


说明:当进行原子性赋值时,在原子操作类的处理上,保证数据安全,然后,运行结果,再从原子操作类获取,从而整个过程都是原子性的,而不是从外部引用去获取。

在 Java 中,类中定义的数据类型不是 AtomicLong 原子操作类型。那么,可以利用原子成员更新器 AtomicLongFieldUpdater 实现处理,这里只是以 long 类型作为举例。

获得对象(由于 AtomicLongFieldUpdater 为抽象类,并没有提供子类,故通过静态方法获取对象):

public static AtomicLongFieldUpdater newUpdater(Class tclass,String fieldName)

注意:对于数据类型,考虑到原子性操作,故需要追加 volatile,考虑高并发访问,数据安全性问题,对于数据的控制需要使用 volatile 修饰。

代码部分示例如下:


class AtomicModel {	private volatile long id;	@SuppressWarnings({ "unchecked", "rawtypes" })	public void changeId(long newId){		// 使用AtomicLongFieldUpdater类		AtomicLongFieldUpdater alfu = AtomicLongFieldUpdater.newUpdater(this.getClass(), "id");		alfu.compareAndSet(this, this.id, newId);	}}
复制代码


并发访问中为了保证多位数据类型的完整性,一定要使用 volatile 关键字。同时,在整个 JUC 开发包中会大量使用原子操作类。

原子操作类特点:

1,保证存储的数据进行原子性操作,从而实现高并发的情况下线程安全;

2,为了增加浮点型数据的操作,提供了 DoubleAdder 类。


4-锁机制


锁机制

JUC 开发框架解决的核心问题是并发访问与数据安全操作问题。当进行并发访问,对于锁的控制不当,会造成死锁的阻塞问题。为了解决这一问题,JUC 重新对锁进行了设计。

锁在 JUC 中,是最重要的组成概念,必须加倍重视。对于资源的安全操作,可以使用以下两种方式:锁、原子操作类。





锁的原始控制,指的就是锁的暂停与恢复。Condition 表示锁的精准控制,是在外部锁的机制下,再进行细小划分。

多线程最重要的两个基础模型:数据共享卖票、生产者与消费者。ReadWriteLock 可以理解为生产者与消费者。

AQS 类是真正实现负责同步锁处理的。锁队列线程“有序”,分为强秩序和顺序,通俗理解就是加塞和不加塞。


5-同步锁


同步锁

通过 synchronized 关键字来进行同步,实现对竞争资源的互斥访问的锁。Java 1.0 版本中就已经支持同步锁了。

  同步锁的原理是,对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。但是,在同一个时间点,该同步锁能且只能被一个线程获取到。这样,获取到同步锁的线程就能进行 CPU 调度,从而在 CPU 上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行。这就是,多线程通过同步锁进行同步的原理!

JUC 提供了一系列的锁的处理工具类,其根本原因在于,Java 原始的锁机制(synchronized)虽然可以提供数据的安全访问,但是所有线程只能享受一把锁。


6-独占锁


Lock 接口

Lock 接口定义了语义不同的锁规则。所谓语义不同,是指锁有"公平机制的锁"、"非公平机制的锁"、"可重入的锁"等等。"公平机制"是指"不同线程获取锁的机制是公平的",而"非公平机制"则是指"不同线程获取锁的机制是非公平的","可重入的锁"是指同一个锁能够被一个线程多次获取。

核心方法:


public void lock​():获取锁

public boolean tryLock​(long time,TimeUnit unit)throws InterruptedException:

给定时间内获取空闲的锁

public Condition newCondition​():获取精确控制

public void unlock​():释放锁


ReentrantLock

ReentrantLock 是一个独占锁也叫排他锁,亦或者互斥锁,获取锁之后,所有操作线程独享,其他线程没有获取锁的时候就进入等待状态。

ReentrantLock 锁包括公平锁和非公平锁。"公平的 ReentrantLock"是指"不同线程获取锁的机制是公平的",而"非公平的 ReentrantLock"则是指"不同线程获取锁的机制是非公平的"。它们的区别体现在获取锁的机制上是否公平。

ReentrantLock 是可重入的锁,即锁可以被单个线程多次获取。

ReentrantLock 是通过一个 FIFO 的 CLH 等待锁队列来管理“获取该锁”的所有线程。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

在使用完锁之后,一定要释放锁,避免当前没有释放锁,而造成其他线程一直处于等待状态。方法定义格式如下:

lock.lock();

try {

// ... method body

}finally{

lock.unlock()

}



内部结构


public class ReentrantLockextends Objectimplements Lock, Serializable{
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer{
}
static final class NonfairSync extends Sync {
}
static final class FairSync extends Sync {
}}
复制代码


构造方法

ReentrantLock 分为公平锁和非公平锁,这两种锁的启用也是非常容易控制的,通过该类提供的构造方法实现。

public ReentrantLock​():无参构造(默认非公平锁,NonfairSync)

public ReentrantLock​(boolean fair):有参构造

fair=true,表示公平锁,FairSync

fair=false,表示非公平锁,NonfairSync

使用互斥锁比直接使用 synchronized 要方便很多。互斥锁使用 lock()进行锁定的时候会考虑两种情况:公平和非公平。

ReentrantLock 中有一个成员变量 sync,sync 是 Sync 类型;Sync 是一个抽象类,而且它继承于 AQS。

ReentrantLock 中有"公平锁类"FairSync 和"非公平锁类"NonfairSync,它们都是 Sync 的子类。ReentrantLock 中 sync 对象,是 FairSync 与 NonfairSync 中的一种,这也意味着 ReentrantLock 是"公平锁"或"非公平锁"中的一种,ReentrantLock 默认是非公平锁。


常用方法

public void lock​():获取锁

public final boolean isFair​():锁是否公平

public boolean tryLock​(long timeout,TimeUnit unit)throws InterruptedException:

在给定时间内获取空闲的锁

public void unlock​():解锁

部分代码示例:


class Ticket {	private int count;	private Lock lock = new ReentrantLock();	public void sale() {		lock.lock();		try {			if (this.count > 0) {				System.out.println(Thread.currentThread().getName() + " --> " + this.count--);			}		} finally {			lock.unlock();		}	}}
复制代码


7-公平锁


AQS

AQS,AbstractQueuedSynchronizer 类,是 java 中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。

它是一个非常有用的超类,锁以及依赖于 CLH 阻塞线程队列的其他同步器的实现。ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier 和 Semaphore 等这些类都是基于 AQS 类实现锁的功能的,实质上就是锁中的 Sync 类,该类继承 AQS 抽象类。

AQS 锁的类别

(01) 独占锁,锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过 CLH 等待线程按照 FIFO 先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视 CLH 等待队列而直接获取锁。独占锁的典型实例子是 ReentrantLock,此外,ReentrantReadWriteLock.WriteLock 也是独占锁。

(02) 共享锁,能被多个线程同时拥有,能被共享的锁。JUC 包中的 ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch 和 Semaphore 都是共享锁。

CLH 队列

CLH(Craig, Landin, and Hagersten lock queue)队列是 AQS 中处于“等待锁”状态下的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而造成错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH 就是管理这些“等待锁”的线程的队列。管理资源的是锁,管理“等待锁”的线程是 CLH

CLH 是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

CLH 锁,也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。原有的 Thread 中的 resume 方法只能判断一次,就无法再次判断,所以,容易死锁。



CAS 函数

CAS(Compare And Swap)函数,是比较并交换函数,它是原子操作函数;即,数据通过 CAS 方法实现原子方式操作。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。

AQS 内部结构


public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements Serializable{
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class Node{volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;
Node nextWaiter;
static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;}}
复制代码


获取公平锁

1. lock()


lock()在 ReentrantLock.java 的 FairSync 类中实现,它的源码如下:

final void lock() { acquire(1);}


说明:

“当前线程”实际上是通过 acquire(1)获取锁的。

这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是 0;锁被线程初次获取到了,它的状态值就变成了 1。

由于 ReentrantLock 是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取 1 次就将锁的状态+1。也就是说,初次获取锁时,通过 acquire(1)将锁的状态值设为 1;再次获取锁时,将锁的状态值设为 2;依次类推...。这就是为什么获取锁时,传入的参数是 1 的原因了。


2. acquire()

acquire()在 AQS 中实现的,它的源码如下:

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}


说明:

(01) “当前线程”首先通过 tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。

(02) “当前线程”尝试失败的情况下,先通过 addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH 队列"末尾。

(03) 再执行完 addWaiter(Node.EXCLUSIVE)之后,会调用 acquireQueued()来获取锁。由于此时 ReentrantLock 是公平锁,它会根据公平性原则来获取锁。

(04) “当前线程”在执行 acquireQueued()时,会进入到 CLH 队列中休眠等待,直到获取锁了才返回。如果“当前线程”在休眠等待过程中被中断过,acquireQueued 会返回 true,此时"当前线程"会调用 selfInterrupt()来自己给自己产生一个中断。

(05)我们已经将当前线程添加到 CLH 队列中了。而 acquireQueued()的作用就是逐步的去执行 CLH 队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。

(06)“当前线程”可以通过 Thread.currentThread()获得。


selfInterrupt()是 AQS 中实现,源码如下:

private static void selfInterrupt() {

Thread.currentThread().interrupt();

}


selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断,也就是中断此时的等待状态,然后抢占 CPU 资源。基于公平锁机制,故继续让其中断,处于等待状态。但是,为什么需要这么做呢?

这必须结合 acquireQueued()进行分析。如果在 acquireQueued()中,当前线程被中断过,则执行 selfInterrupt();否则不会执行。


释放公平锁

1. unlock()

unlock()在 ReentrantLock.java 中实现的,源码如下:

public void unlock() { sync.release(1);}


说明:

unlock()是解锁函数,它是通过 AQS 的 release()函数来实现的。

在这里,“1”的含义和“获取锁的函数 acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1

“释放锁”的过程相对“获取锁”的过程比较简单。释放锁时,主要进行的操作,是更新当前线程对应的锁的状态。如果当前线程对锁已经彻底释放,则设置“锁”的持有线程为 null,设置当前线程的状态为空,然后唤醒后继线程。


AbstractQueuedLongSynchronizer

AbstractQueuedLongSynchronizer 类提供了与 AQS 相同的功能,但扩展了对同步状态的 64 位的支持,与 AQS 都扩展了 AbstractOwnableSynchronizer 抽象类。

AbstractOwnableSynchronizer

AbstractOwnableSynchronizer,一个帮助记录当前保持独占同步的线程的简单抽象类。


用户头像

andy

关注

还未添加个人签名 2019-11-21 加入

还未添加个人简介

评论

发布
暂无评论
JUC浅析(一)_andy_InfoQ写作社区