写点什么

并发编程之深入理解 CAS

用户头像
Fox
关注
发布于: 2021 年 11 月 08 日

什么是 CAS

CAS(Compare And Swap,比较并交换),通常指的是这样一种原子操作:针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值。

CAS 的逻辑用伪代码描述如下:

if (value == expectedValue) {    value = newValue;}
复制代码

以上伪代码描述了一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。

CAS 可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java 原子类中的递增操作就通过 CAS 自旋实现的。

CAS 是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。


CAS 应用

在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操作,如图

它们都是 native 方法,由 Java 虚拟机提供具体实现,这意味着不同的 Java 虚拟机对它们的实现可能会略有不同。

以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。

public class CASTest {
public static void main(String[] args) { Entity entity = new Entity();
Unsafe unsafe = UnsafeFactory.getUnsafe();
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
boolean successful;
// 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段新值 successful = unsafe.compareAndSwapInt(entity, offset, 0, 3); System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 5); System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 8); System.out.println(successful + "\t" + entity.x); }}
public class UnsafeFactory {
/** * 获取 Unsafe 对象 * @return */ public static Unsafe getUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { e.printStackTrace(); } return null; }
/** * 获取字段的内存偏移量 * @param unsafe * @param clazz * @param fieldName * @return */ public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) { try { return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName)); } catch (NoSuchFieldException e) { throw new Error(e); } }}
复制代码

测试

针对 entity.x 的 3 次 CAS 操作,分别试图将它从 0 改成 3、从 3 改成 5、从 3 改成 8。执行结果如下:


CAS 源码分析

Hotspot 虚拟机对 compareAndSwapInt 方法的实现如下:

#unsafe.cppUNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))  UnsafeWrapper("Unsafe_CompareAndSwapInt");  oop p = JNIHandles::resolve(obj);  // 根据偏移量,计算value的地址  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);  // Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值   e:要比较的值  //cas成功,返回期望值e,等于e,此方法返回true   //cas失败,返回内存中的value值,不等于e,此方法返回false  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;UNSAFE_END
复制代码

核心逻辑在 Atomic::cmpxchg 方法中,这个根据不同操作系统和不同 CPU 会有不同的实现。这里我们以 linux_64x 的为例,查看 Atomic::cmpxchg 的实现

#atomic_linux_x86.inline.hppinline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {  //判断当前执行环境是否为多处理器环境  int mp = os::is_MP();  //LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果  //cmpxchgl 指令是包含在 x86 架构及 IA-64 架构中的一个原子条件指令,  //它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,  //如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值交给exchange_value。  //这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"                    : "=a" (exchange_value)                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)                    : "cc", "memory");  return exchange_value;}
复制代码

cmpxchgl 的详细执行过程:

首先,输入是"r" (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表示 compare_value 存入 eax 寄存器,而 exchange_value、dest、mp 的值存入任意的通用寄存器。嵌入式汇编规定把输出和输入寄存器按统一顺序编号,顺序是从输出寄存器序列从左到右从上到下以“%0”开始,分别记为 %0、%1···%9。也就是说,输出的 eax 是 %0,输入的 exchange_value、compare_value、dest、mp 分别是 %1、%2、%3、%4。

因此,cmpxchg %1,(%3)实际上表示 cmpxchg exchange_value,(dest)

需要注意的是 cmpxchg 有个隐含操作数 eax,其实际过程是先比较 eax 的值(也就是 compare_value)和 dest 地址所存的值是否相等,

输出是"=a" (exchange_value),表示把 eax 中存的值写入 exchange_value 变量中。

Atomic::cmpxchg 这个函数最终返回值是 exchange_value,也就是说,如果 cmpxchgl 执行时 compare_value 和 dest 指针指向内存值相等则会使得 dest 指针指向内存值变成 exchange_value,最终 eax 存的 compare_value 赋值给了 exchange_value 变量,即函数最终返回的值是原先的 compare_value。此时 Unsafe_CompareAndSwapInt 的返回值(jint)(Atomic::cmpxchg(x, addr, e)) == e 就是 true,表明 CAS 成功。如果 cmpxchgl 执行时 compare_value 和(dest)不等则会把当前 dest 指针指向内存的值写入 eax,最终输出时赋值给 exchange_value 变量作为返回值,导致(jint)(Atomic::cmpxchg(x, addr, e)) == e 得到 false,表明 CAS 失败。

现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。

不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。


CAS 缺陷

CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:

  • 自旋 CAS 长时间不成功,则会给 CPU 带来非常大的开销

  • 只能保证一个共享变量原子操作

  • ABA 问题


ABA 问题及其解决方案

CAS 算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。

什么是 ABA 问题

当有多个线程对一个原子类进行操作的时候,某个线程在短时间内将原子类的值 A 修改为 B,又马上将其修改为 A,此时其他线程不感知,还是会修改成功。

测试

@Slf4jpublic class ABATest {
public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(()->{ int value = atomicInteger.get(); log.debug("Thread1 read value: " + value);
// 阻塞1s LockSupport.parkNanos(1000000000L);
// Thread1通过CAS修改value值为3 if (atomicInteger.compareAndSet(value, 3)) { log.debug("Thread1 update from " + value + " to 3"); } else { log.debug("Thread1 update fail!"); } },"Thread1").start();
new Thread(()->{ int value = atomicInteger.get(); log.debug("Thread2 read value: " + value); // Thread2通过CAS修改value值为2 if (atomicInteger.compareAndSet(value, 2)) { log.debug("Thread2 update from " + value + " to 2");
// do something value = atomicInteger.get(); log.debug("Thread2 read value: " + value); // Thread2通过CAS修改value值为1 if (atomicInteger.compareAndSet(value, 1)) { log.debug("Thread2 update from " + value + " to 1"); } } },"Thread2").start(); }}
复制代码

Thread1 不清楚 Thread2 对 value 的操作,误以为 value=1 没有修改过

ABA 问题的解决方案

数据库有个锁称为乐观锁,是一种基于数据版本实现数据同步的机制,每次修改一次数据,版本就会进行累加。同样,Java 也提供了相应的原子引用类 AtomicStampedReference<V>

reference 即我们实际存储的变量,stamp 是版本,每次修改可以通过+1 保证版本唯一性。这样就可以保证每次修改后的版本也会往上递增。

@Slf4jpublic class AtomicStampedReferenceTest {
public static void main(String[] args) { // 定义AtomicStampedReference Pair.reference值为1, Pair.stamp为1 AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);
new Thread(()->{ int[] stampHolder = new int[1]; int value = (int) atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.debug("Thread1 read value: " + value + ", stamp: " + stamp);
// 阻塞1s LockSupport.parkNanos(1000000000L); // Thread1通过CAS修改value值为3 if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) { log.debug("Thread1 update from " + value + " to 3"); } else { log.debug("Thread1 update fail!"); } },"Thread1").start();
new Thread(()->{ int[] stampHolder = new int[1]; int value = (int)atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.debug("Thread2 read value: " + value+ ", stamp: " + stamp); // Thread2通过CAS修改value值为2 if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) { log.debug("Thread2 update from " + value + " to 2");
// do something
value = (int) atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; log.debug("Thread2 read value: " + value+ ", stamp: " + stamp); // Thread2通过CAS修改value值为1 if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) { log.debug("Thread2 update from " + value + " to 1"); } } },"Thread2").start(); }}
复制代码

Thread1 并没有成功修改 value

补充:AtomicMarkableReference 可以理解为上面 AtomicStampedReference 的简化版,就是不关心修改过几次,仅仅关心是否修改过。因此变量 mark 是 boolean 类型,仅记录值是否有过修改。


发布于: 2021 年 11 月 08 日阅读数: 7
用户头像

Fox

关注

有道无术,术尚可求也,有术无道,止于术。 2019.03.12 加入

多年中间件,高并发经验,擅长高并发,中间件,微服务架构,源码控,喜欢分享技术

评论

发布
暂无评论
并发编程之深入理解CAS