1.wait()与 notify()实现一个简易的内存队列
在多线程开发中,wait()和 notify()/notifyAll()还是挺常见的。在分布式系统里经常会使用 wait()和 notifyAll()来进行线程通信,当某个线程处于阻塞等待状态时,其他线程可以进行通知并唤醒它。
如下代码向内存队列添加元素和获取元素时,都使用了 MyQueue 对象锁。当内存队列满或者空时,需要释放锁,才能让添加或者获取继续下去。其中 wait()方法会释放锁,并让当前线程进入等待状态,而 notify()方法和 notifyAll()方法会唤醒等待获取锁的线程。所以 wait()和 notify()主要是用来控制线程的,当然也可认为用于线程通信。
public class MyQueue {
private final static int MAX_SIZE = 100;
private LinkedList<String> queue = new LinkedList<String>();
public synchronized void offer(String element) {
try {
if (queue.size() == MAX_SIZE) {
} catch (Exception e) {
public synchronized String take() {
String element = null;
try {
if (queue.size() == 0) {
element = queue.removeFirst();
} catch (Exception e) {
return element;
2.wait()与 notify()的底层原理
如果线程在运行 synchronized 修饰的同步块代码时,发现锁状态是轻量级锁并且有其他线程抢占了锁资源,那么该线程就会触发锁膨胀升级到重量级锁。
在获取重量级锁之前会先实现锁膨胀,锁膨胀时首先会创建一个 ObjectMonitor 对象,然后把 ObjectMonitor 对象的指针保存到锁对象的 Mark Word 中。
重量级锁的实现是在 ObjectMonitor 中完成的,所以锁膨胀的意义就是构建一个 ObjectMonitor 对象。
(2)ObjectMonitor 对象的重要字段
_waitset:被 wait()方法阻塞的线程队列
重量级锁的竞争都是在 ObjectMonitor 对象中完成的。首先判断当前线程是否是重入,如果是则重入次数+1。然后通过 CAS 自旋来判断 ObjectMonitor 中的_owner 字段是否为空。如果为空,则表示重量级锁已经被释放,当前线程可以获得锁。如果不为空,就继续进行自适应自旋重试。最后如果通过自旋竞争锁失败,则把当前线程构建成一个 ObjectWaiter 结点,插入到 ObjectMonitor 的_cxq 队列的队头,再调用 park()方法阻塞当前线程。
首先把 ObjectMonitor 的_owner 字段设置为 null,然后从 ObjectMonitor 的_cxq 队列中调用 unpark()方法唤醒一个阻塞的线程。被唤醒的线程会重新竞争重量级锁,如果没抢到,则继续阻塞等待。因为 synchronized 是非公平锁,被唤醒的线程不一定能重新抢占到锁。
(5)wait()与 notify()的底层原理
这与 synchronized 的原理(ObjectMonitor 对象)相关,ObjectMonitor 对象有一个_waitset 队列和重入计数器。使用 wait()和 notify()时必须对同一个对象实例进行加 synchronized 锁。如果对象实例加锁,那么重入计数器 + 1。如果对象实例释放锁,那么重入计数器 - 1。
执行 wait()方法时会释放锁 + 阻塞当前线程 + 把当前线程放入_waitset 队列,执行 notify()方法时会唤醒_waitset 队列里的被阻塞的线程。
(6)wait()与 notify()在使用时的注意事项
wait()与 sleep()的区别:两者都会等待,前者释放锁,后者不释放锁。wait()必须要有其他线程调用 notify()来唤醒它。wait(timeout)会阻塞一段时间,然后自己唤醒自己,继续争抢锁。wait()与 notify()必须与 synchornized 一起,对同一个对象进行使用。notify()会唤醒阻塞状态的一个线程,notifyall()会唤醒阻塞状态的所有线程。
3.分布式存储系统 NameNode 机制介绍
(1)HDFS 的 DataNode 和 NameNode
HDFS 是 Hadoop 的分布式文件系统,它由很多机器组成。每台机器上运行一个 DataNode 进程,存储一部分数据。然后会有一台机器上运行一个 NameNode 进程,NameNode 可以认为是负责管理整个 HDFS 集群的进程,NameNode 里存储了 HDFS 集群的所有元数据。
(2)HDFS 的 NameNode 架构简介
一.每次修改元数据都顺序追加 edits log
NameNode 的核心功能是管理整个 HDFS 集群的元数据,比如文件目录树、权限设置、副本数设置等。
HDFS 客户端每次上传文件时,都要维护 NameNode 的文件目录树。但是 NameNode 的文件目录树是在内存里的,万一 NameNode 宕机,内存里的文件目录树可能就会丢失。
所以每次修改内存,就顺序追加一条 edits log(元数据操作日志)到磁盘文件。每次 NameNode 重启,就把 edits log(元数据操作日志)读到内存恢复数据。
二.如何避免 edits log 过大导致恢复过慢
为了避免 edits log(元数据操作日志)越来越大每次重启恢复过慢,于是引入了一个新的磁盘文件 fsimage、一个 JournalNodes 集群、一个 Active NameNode(主节点)、一个 Standby NameNode(备节点)。
主节点每修改一条元数据都会生成一条 edits log。每条 edits log 除了写到主节点外,还会写到 JournalNodes 集群。然后备节点会从 JournalNodes 集群拉取 edits log 到自己内存的文件目录树里,这样备节点的数据就可以跟主节点的数据保持一致了。
每隔一段时间备节点会把自己内存的文件目录树写一份到 fsimage 磁盘文件,这个也就是所谓的 checkpoint 检查点操作。然后备节点再把这个 fsimage 磁盘文件上传到到主节点,接着清空掉主节点上的旧的 edits log 文件(可能几十万行)。之后主节点继续处理修改元数据请求,那么可能只有几十行 edits log 日志了。
如果此时主节点重启,首先把备节点传过来的 fsimage 文件读到内存里,然后把新的 edits log 里少量的几十行操作日志重新恢复到内存中即可。
三.NameNode 主备高可用故障转移机制
整个过程有两个 NameNode:一是对外提供服务接收请求的主节点 NameNode,二是同步主节点 edits log + 定期执行 checkpoint 的备节点 NameNode。
这两个 NameNode 内存里的元数据几乎一模一样。所以如果主节点挂了,可以马上切换到备节点对外提供服务,而这就是所谓的 NameNode 主备高可用故障转移机制了。
4.分布式存储系统的 edits log 机制介绍
(1)高并发请求下 NameNode 会遇到的问题
NameNode 每修改一条元数据都要写一条 edits log,这包括两个步骤:写入本地磁盘和通过网络传输给 JournalNodes 集群。
NameNode 必须保证写入的每条 edits log 都有一个全局顺序递增的 txid,这样才可以标识出一条 edits log 的先后顺序。
如果要保证每条 edits log 的 txid 都是递增的,那么就必须要加锁。每个线程修改完元数据准备写一条 edits log 时,按顺序排队获取锁,获取到锁之后才能生成一个递增的 txid 给要准备写的 edits log。
但是如果每次在加锁的代码块里生成 txid,然后写磁盘文件 edits log,接着通过网络传输写入 JournalNodes,那么性能就一定很低。所以每个线程写 edits log 时最好不要串行化排队来执行这 3 个操作:生成 txid + 写磁盘 + 写 JournalNode。
(2)通过双缓冲机制来提升写 edits log 的性能
为了避免线程写 edits log 时串行化排队去生成 txid + 写磁盘 + 写 JournalNode,可以考虑增加内存缓冲。首先将 edits log 写入到内存缓冲里,然后通过后台线程将内存中的 edits log 刷入磁盘 + 写入 JournalNode。而且将 edits log 刷盘的过程中,其他线程依然可以将 edits log 写入内存缓冲。
所以 HDFS 采取了双缓冲机制来处理,也就是将一块内存缓冲分成两部分。其中一部分只用来写入,另一部分只用来读取进行刷盘。
5.分布式存储系统的 NameNode 实现
(1)NameNode 的基本功能
如果 NameNode 执行命令创建一个目录,那么会做两件事情:一是在内存里的文件目录树中加入目录节点,二是在磁盘里写入一条 edits log 日志来记录本次元数据修改。
所以接下来要实现两个功能:一是在内存文件目录树中加入目录节点,二是写 edits log 到磁盘文件。
如下是 NameNode 的核心组件说明:
FSNamesystem 类:作为 NameNode 里元数据操作的核心入口,负责管理所有的元数据的操作,会调用其他组件完成相关事情。
FSDirectory 类:管理内存中的文件目录树。
FSEditLog 类:写入 edits log 到磁盘文件里。
(2)NameNode 的核心启动类
public class NameNode {
private volatile Boolean shouldRun;
private FSNamesystem namesystem;
private NameNodeRpcServer rpcServer;
public NameNode() {
this.shouldRun = true;
private void initialize() {
this.namesystem = new FSNamesystem();
this.rpcServer = new NameNodeRpcServer(this.namesystem);
private void run() {
try {
while(shouldRun) {
} catch (Exception e) {
public static void main(String[] args) throws Exception {
NameNode namenode = new NameNode();
(3)NameNode 的 RPC 服务接口
public class NameNodeRpcServer {
private FSNamesystem namesystem;
public NameNodeRpcServer(FSNamesystem namesystem) {
this.namesystem = namesystem;
public Boolean mkdir(String path) throws Exception {
return this.namesystem.mkdir(path);
//启动这个rpc server
public void start() {
System.out.println("开始监听指定的rpc server的端口号,来接收请求");
(4)负责管理元数据的 FSNamesystem
public class FSNamesystem {
private FSDirectory directory;
//负责管理edits log写入磁盘的组件
private FSEditlog editlog;
public FSNamesystem() {
this.directory = new FSDirectory();
this.editlog = new FSEditlog();
public Boolean mkdir(String path) throws Exception {
this.editlog.logEdit("创建了一个目录:" + path);
return true;
(5)负责管理文件目录树的 FSDirectory
public class FSDirectory {
public void mkdir(String path) {
(6)负责管理 edits log 日志的 FSEditlog
//负责管理edits log日志的核心组件
public class FSEditlog {
//记录edits log日志
public void logEdit(String log) {
在内存的文件目录树中创建一个目录节点的代码如下。内存里的文件目录树是会被多线程并发写的资源,所以创建目录的代码块必须要用 synchronized 保护起来。
public class FSDirectory {
private INodeDirectory dirTree;
public FSDirectory() {
this.dirTree = new INodeDirectory("/");
public void mkdir(String path) {
//path = /usr/warehouse/hive
synchronized(dirTree) {
String[] pathes = path.split("/");
INodeDirectory parent = dirTree;
for (String splitedPath : pathes) {
if (splitedPath.trim().equals("")) {
INodeDirectory dir = findDirectory(parent, splitedPath);
if (dir != null) {
parent = dir;
INodeDirectory child = new INodeDirectory(splitedPath);
private INodeDirectory findDirectory(INodeDirectory dir, String path) {
if (dir.getChildren().size() == 0) {
return null;
INodeDirectory resultDir = null;
for (INode child : dir.getChildren()) {
if (child instanceof INodeDirectory) {
INodeDirectory childDir = (INodeDirectory) child;
if ((childDir.getPath().equals(path))) {
return childDir;
resultDir = findDirectory(childDir, path);
if (resultDir != null) {
return resultDir;
return null;
private interface INode {
public static class INodeDirectory implements INode {
private String path;
private List<INode> children;
public INodeDirectory(String path) {
this.path = path;
this.children = new LinkedList<INode>();
public void addChild(INode inode) {
public String getPath() {
return path;
public void setPath(String path) {
this.path = path;
public List<INode> getChildren() {
return children;
public void setChildren(List<INode> children) {
this.children = children;
public static class INodeFile implements INode {
private String name;
public String getName() {
return name;
public void setName(String name) {
this.name = name;
7.edits log 的全局 txid 机制和双缓冲机制实现
全局 txid 机制 + 双缓冲机制的代码如下:
//负责管理edits log日志的核心组件
public class FSEditlog {
private long txidSeq = 0L;
private DoubleBuffer editLogBuffer = new DoubleBuffer();
//记录edits log日志
public void logEdit(String content) {
synchronized(this) {
//获取全局唯一递增的txid,代表了edits log的序号
long txid = txidSeq;
//构造一条edits log对象
EditLog log = new EditLog(txid, content);
//将edits log写入内存缓冲中,不是直接刷入磁盘文件
//代表了一条edits log,内部类
private class EditLog {
long txid;
String content;
public EditLog(long txid, String content) {
this.txid = txid;
this.content = content;
private class DoubleBuffer {
//专门用来承载线程写入edits log
LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
//将edits log写到内存缓冲里去
public void write(EditLog log) {
public void setReadyToSync() {
LinkedList<EditLog> tmp = currentBuffer;
currentBuffer = syncBuffer;
syncBuffer = tmp;
//获取sync buffer缓冲区里的最大的一个txid
public Long getSyncMaxTxid() {
return syncBuffer.getLast().txid;
public void flush() {
for (EditLog log : syncBuffer) {
System.out.println("将edit log写入磁盘文件中:" + log);
8.synchronized 实现 edits log 分段加锁机制
logSync()方法通过分两段加 synchronized 锁,将耗时的刷盘操作放在锁外,然后通过更改标志位以及使用 wait()和 notify()来控制线程的等待和锁的释放,从而保证高并发下的刷盘性能。
//负责管理edits log日志的核心组件
public class FSEditlog {
private long txidSeq = 0L;
private DoubleBuffer editLogBuffer = new DoubleBuffer();
private volatile Boolean isSyncRunning = false;
//当前是否有线程在等待刷新下一批edits log到磁盘里去
private volatile Boolean isWaitSync = false;
private volatile Long syncMaxTxid = 0L;
private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
//记录edits log日志
public void logEdit(String content) {
synchronized(this) {
//获取全局唯一递增的txid,代表了edits log的序号
long txid = txidSeq;
//构造一条edits log对象
EditLog log = new EditLog(txid, content);
//将edits log写入内存缓冲中,不是直接刷入磁盘文件
private void logSync() {
synchronized(this) {
if (isSyncRunning) {
//而且里面的edits log的txid一定是从小到大的,此时要同步的txid = 6,7,8,9,10,11,12,所以syncMaxTxid = 12
syncMaxTxid = editLogBuffer.getSyncMaxTxid();
isSyncRunning = true;
synchronized(this) {
isSyncRunning = false;
9.wait()与 notify()实现 edits log 批量刷磁盘
//负责管理edits log日志的核心组件
public class FSEditlog {
private long txidSeq = 0L;
private DoubleBuffer editLogBuffer = new DoubleBuffer();
private volatile Boolean isSyncRunning = false;
//当前是否有线程在等待刷新下一批edits log到磁盘里去
private volatile Boolean isWaitSync = false;
private volatile Long syncMaxTxid = 0L;
private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
//记录edits log日志
public void logEdit(String content) {
synchronized(this) {
//获取全局唯一递增的txid,代表了edits log的序号
long txid = txidSeq;
//构造一条edits log对象
EditLog log = new EditLog(txid, content);
//将edits log写入内存缓冲中,不是直接刷入磁盘文件
private void logSync() {
synchronized(this) {
if (isSyncRunning) {
//假如某个线程正在把txid = 6,7,8,9,10,11,12的edits log从syncBuffer刷入磁盘
//此时syncMaxTxid = 12,代表的是正在刷入磁盘的最大txid
//那么刷盘的线程释放锁进行刷盘后,这时来一个线程对应的txid = 10,此时它可以直接返回
//因为它对应的edits log被刷盘的线程正在刷入或者已经刷入磁盘了,这时txid = 12的线程就不需要等待
long txid = localTxid.get();
if (txid <= syncMaxTxid) {
//此时如果来的是一个txid = 13的线程,那么就会发现已经有线程在等待刷下一批数据到磁盘,此时会直接返回
if (isWaitSync) {
//此时如果来的是一个txid = 14的线程,并且刷盘还没刷完,
isWaitSync = true;
while (isSyncRunning) {
try {
} catch (Exception e) {
isWaitSync = false;
//而且里面的edits log的txid一定是从小到大的,此时要同步的txid = 6,7,8,9,10,11,12,所以syncMaxTxid = 12
syncMaxTxid = editLogBuffer.getSyncMaxTxid();
isSyncRunning = true;
synchronized(this) {
isSyncRunning = false;
10.i++和 AtomicInteger 分别实现并发安全
public class AtomicIntegerDemo {
static Integer i = 0;
static AtomicInteger j = new AtomicInteger(0);
public static void main(String[] args) {
private static void synchronizedAdd() {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
synchronized(AtomicIntegerDemo.class) {
private static void atomicAdd() {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
11.AtomicInteger 中的 CAS 无锁化原理
(1)CAS 简介
CAS 就是 Compare and Set,首先判断此时内存中是否是某个值。如果是则修改,如果不是则重新查询最新的值,再执行判断。
(2)Atomic 原子类简介
Atomic 的原子类分别有:
AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference、LongAdder 等。
Atomic 原子类底层的核心原理就是 CAS,属于一种乐观锁。每次修改时就先对比原值,看看有没有其他线程修改过原值。如果没有修改过就可以修改,如果有修改就重新查出最新值来重复这个过程。
12.Atomic 源码之仅限 JDK 使用的 Unsafe 类
(1)Atomic 原子类通过 Unsafe 执行 CAS 操作
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;
//Creates a new AtomicInteger with the given initial value.
public AtomicInteger(int initialValue) {
value = initialValue;
//Creates a new AtomicInteger with initial value {@code 0}.
public AtomicInteger() {
//Gets the current value.
public final int get() {
return value;
//Sets to the given value.
public final void set(int newValue) {
value = newValue;
//Eventually sets to the given value.
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
//Atomically sets to the given value and returns the old value.
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
//Atomically sets the value to the given updated value if the current value == the expected value.
//@param expect the expected value
//@param update the new value
//@return true if successful. False return indicates that the actual value was not equal to the expected value.
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
//Atomically increments by one the current value.
//@return the previous value
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
//Atomically decrements by one the current value.
//@return the previous value
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
//Atomically adds the given value to the current value.
//@param delta the value to add
//@return the previous value
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
//Atomically increments by one the current value.
//@return the updated value
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
//Atomically decrements by one the current value.
//@return the updated value
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
//Atomically adds the given value to the current value.
//@param delta the value to add
//@return the updated value
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
//Atomically updates the current value with the results of applying the given function, returning the previous value.
//The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
//@param updateFunction a side-effect-free function
//@return the previous value
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
//Atomically updates the current value with the results of applying the given function, returning the updated value.
//The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
//@param updateFunction a side-effect-free function
//@return the updated value
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
//Atomically updates the current value with the results of applying the given function to the current and given values, returning the previous value.
//The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
//The function is applied with the current value as its first argument, and the given update as the second argument.
//@param x the update value
//@param accumulatorFunction a side-effect-free function of two arguments
//@return the previous value
public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
//Atomically updates the current value with the results of applying the given function to the current and given values, returning the updated value.
//The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads.
//The function is applied with the current value as its first argument, and the given update as the second argument.
//@param x the update value
//@param accumulatorFunction a side-effect-free function of two arguments
//@return the updated value
public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
(2)Unsafe 类仅限 JDK 内部使用
Unsafe 类是 JDK 底层的一个类,不允许被实例化和使用它里面的方法。因为 Unsafe 类的构造函数是私有的,所以不能手动进行实例化。其次调用 Unsafe.getUnsafe()方法来获取一个 UnSafe 实例也不被允许。
public final class Unsafe {
private Unsafe() {
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
13.Atomic 源码之无限重复循环以及 CAS 操作
(1)AtomicInteger 类的初始化
AtomicInteger 类初始化时,会执行静态代码块,即初始化 valueOffset 变量为 value 变量在 AtomicInteger 类中的偏移量。
这个 valueOffset 偏移量可以理解为:value 变量在 AtomicInteger 类的位置。由于 valueOffset 偏移量是 final 的,所以一旦初始化完毕就不会再改变。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
private volatile int value;
static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
//Atomically increments by one the current value.
//@return the updated value
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
//Atomically sets the value to the given updated value if the current value == the expected value.
//@param expect the expected value
//@param update the new value
//@return true if successful. False return indicates that the actual value was not equal to the expected value.
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
(2)Unsafe 类的 getAndAddInt()方法
public final class Unsafe {
//如果一样,就将value的值给设置为:l(之前拿到的值) + delta(递增的值)
public final int getAndAddInt(Object paramObject, long valueOffset, int delta) {
int l;
do {
l = this.getIntVolatile(paramObject, valueOffset);
} while(!this.compareAndSwapInt(paramObject, valueOffset, l, l + delta));
return l;
public native int getIntVolatile(Object paramObject, long valueOffset);
public final native boolean compareAndSwapInt(Object paramObject, long valueOffset, int expect, int update);
(3)CAS 的底层工作原理
下图表示通过 CAS 对变量 V 进行原子更新操作,底层的 CAS 方法中会传递三个参数。第一个参数 V 表示要更新的变量,第二个参数 E 表示期望值,第三个参数 U 表示更新后的值。
更新的方式是:如果 V == E,表示预期值和实际值相等,则将 V 修改成 U 并返回 true;否则修改失败,然后返回 false。
策略一:放弃 CPU 进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
策略二:不放弃 CPU,而是进入空转进行不断重试,也就是自旋。
单核 CPU 只能使用策略一,AtomicInteger 就是使用了自旋策略。synchronized 则是先自旋几圈,自旋后还获取不到锁再阻塞。
14.Atomic 原子类基于 CAS 操作的三大问题
(1)ABA 问题
ABA 问题就是:如果某个值一开始是 A,后来变成了 B,然后又变成了 A。AtomicStampedReference 能原子更新带有版本号的引用类型,解决 ABA 问题。此外一般用 AtomicInteger 进行的是不断累加计数,所以 ABA 问题比较少。
Atomic 原子类设置值的时候会进入一个无限循环,只要不成功就不停循环再次尝试,在高并发修改值时是挺常见的。
比如用 AtomicInteger 定义一个原子变量,高并发下修改时,可能会导致 compareAndSet()要循环很多次才设置成功。所以引入了 LongAdder 来解决,通过分段 CAS 的思路来解决无限循环问题。
一般的 AtomicInteger,只能保证一个变量的原子性,但是如果多个变量呢?
要保证多个变量的原子性,可以使用 AtomicReference 来封装自定义对象。将多个变量放在一个对象里,通过对象的引用来实现多个变量的原子性。
15.AtomicLong 优化服务注册中心心跳计数器
可以使用 AtomicLong 来优化服务注册中心内部的心跳计数器。
public class HeartbeatCounter {
private static HeartbeatCounter instance = new HeartbeatCounter();
private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L);
private long latestMinuteTimestamp = System.currentTimeMillis();
private HeartbeatCounter() {
Daemon daemon = new Daemon();
public static HeartbeatCounter getInstance() {
return instance;
public /**synchronized*/ void increment() {
public /**synchronized*/ long get() {
return latestMinuteHeartbeatRate.get();
private class Daemon extends Thread {
public void run() {
while(true) {
try {
long currentTime = System.currentTimeMillis();
if (currentTime - latestMinuteTimestamp > 60 * 1000) {
latestMinuteHeartbeatRate = new AtomicLong(0L);
latestMinuteTimestamp = System.currentTimeMillis();
} catch (Exception e) {
16.LongAdder 的分段 CAS 优化多线程自旋
(1)采用分段 CAS 降低重试频率
这种分段的做法类似于 JDK7 中的 ConcurrentHashMap 的分段锁。
高并发场景下,value 变量其实就是一个热点数据,大量线程竞争一个热点。LongAdder 基本思路就是分散热点,将 value 分散到一个 Cell 数组中。不同线程会命中数组的不同槽位,各线程只对自己槽位的 value 进行 CAS 操作。这样热点就被分散了,冲突概率就变小了。
LongAdder 内部有一个 base 变量和一个 Cell[ ]数组。当并发不高的时候都是通过 CAS 来直接操作 base 变量的值。如果对 base 变量的 CAS 失败,则再针对 Cell[ ]数组中的 Cell 进行 CAS 操作。如果对 Cell[ ]数组中的 Cell 进行 CAS 失败,则换一个 Cell 进行 CAS 操作。
LongAdder 在无竞争情况下,跟 AtomicLong 是一样的, 对同一个 base 进行操作。当出现竞争的时候,则采用化整为零分散热点的做法,用空间换时间。通过使用一个 Cell[ ]数组,将一个 value 拆分进这个 Cell[ ]数组中。
只有在使用 longValue()方法获取当前累加值时才会真正去结算计数的数据。LongAdder.longValue()方法其实就是调用 LongAdder.sum()方法,LongAdder.sum()方法会将 Cell 数组中的各元素 value 和 base 累加作为返回值。
AtomicLong.incrementAndGet()方法每次都会返回 long 类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。
(3)LongAdder 和 AtomicLong 对比
一.AtomicLong 总结
AtomicLong 的实现原理是:
基于 CAS + 自旋操作,CAS 是基于硬件来实现原子性的,可以保障线程安全。
AtomicLong 的使用场景:
AtomicLong 的优势是:
AtomicLong 的缺点是:
高并发下性能急剧下降,N 个线程同时进行自旋,N-1 个线程会自旋失败、不断重试。
二.LongAdder 总结
LongAdder 设计思想是:
空间换时间,分散热点数据 value 的值。
LongAdder 的实现原理是:
高并发时通过 Cell[ ]数组进行分段 CAS。
LongAdder 的使用场景是:
LongAdder 的优势是:
减少 CAS 重试次数、防止伪共享、惰性求值。
LongAdder 的缺点是:
如果使用它的 sum()方法时有并发更新,可能数据结果存在误差。
(4)LongAdder 的源码分析
public class LongAdder extends Striped64 implements Serializable {
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//as数组为空(null或者size为0) 或者 当前线程取模as数组大小为空 或者 cas更新Cell失败
if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) {
longAccumulate(x, null, uncontended);
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
return sum;
abstract class Striped64 extends Number {
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) {
value = x;
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
//若as数组已经初始化,(n-1) & h 即为取模操作,相对 % 效率要更高
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
//可能会有多个线程执行了"new Cell(x)",因此需要进行CAS操作,避免线程安全的问题
//双重检查cellsBusy == 0,避免并发场景下重复赋值
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;//赋值到空槽位
created = true;
} finally {
cellsBusy = 0;//解锁
if (created) {
collide = false;
} else if (!wasUncontended) {
wasUncontended = true;
} else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
} else if (n >= NCPU || cells != as) {
collide = false;
} else if (!collide) {
collide = true;
} else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i) {
rs[i] = as[i];
cells = rs;
} finally {
cellsBusy = 0;//解锁
collide = false;
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];//创建长度为2的cells数组
rs[h & 1] = new Cell(x);//将累加值x随机存放到cell数组对应的索引下标位置
cells = rs;//再将创建的cell数组引用赋值到cells
init = true;
} finally {
cellsBusy = 0;//创建完cells数组后,解锁
if (init) {
} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {//尝试CAS累加
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
(5)LongAdder 的设计总结
一.分段 CAS 机制
把一个变量拆成多份变成多个变量,类似 JDK 1.7 的 ConcurrentHashMap 的分段锁。具体来说就是把一个 Long 型变量拆成一个 base 变量外加多个 Cell 变量,每个 Cell 变量包装了一个 Long 型变量。当多个线程并发累加时,如果并发度低就直接加到 base 变量上,如果并发度高就分散到 Cell 变量上。在最后取值时,再把 base 变量和这些 Cell 变量进行累加求和运算。
LongAddr 只能进行累加操作,并且初始值默认为 0。LongAccumulator 可以自定义一个二元操作符,而且可以传入一个初始值。
LongAddr 的 sum()方法并没有对 cell[ ]数组加锁,所以存在一边有线程对 cell[ ]数组求和、一边有线程修改数组的情况。类似于 ConcurrentHashMap 的 clear()方法,一边清空数据一边放入数据。
LongAddr 在定义 Cell 时,使用了注解 @Contended。这个注解可以用来进行缓存行填充,从而解决伪共享问题。
Cell[ ]数组的大小始终是 2 的整数次方,每次扩容都变为原来的 2 倍。
每个 CPU 都有自己的缓存,也就是高速缓存。CPU 缓存与主内存进行数据交换的基本单位叫缓存行。CPU 缓存是由若干个缓存行组成的,缓存行是 CPU 缓存的最小存储单位。在 64 位的 x86 架构中,每个缓存行是 64 字节,也就是 8 个 Long 型的大小。当 CPU 的缓存失效了需要从主内存刷新数据时,至少需要刷新 64 字节。
假设主内存的 Long 型变量 X、Y 已被 CPU1 和 CPU2 分别读入自己的缓存,且 Long 型变量 X、Y 在 CPU 缓存和主内存中都是放在同一行缓存行中的。这样当 CPU1 修改了变量 X,需要失效整个缓存行时,就会往总线发送消息,通知 CPU2 对应的缓存行失效。所以虽然 CPU2 并没有修改变量 Y,但也需要刷新变量 Y 所在的缓存行。这就是伪共享问题,缓存行上的不同变量,读 CPU 受到写 CPU 的影响。
17.LongAdder 的分段 CAS 优化心跳计数器
使用 LongAdder 替代 AtomicLong:
//private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L);
private LongAdder latestMinuteHeartbeatRate = new LongAdder();
//return latestMinuteHeartbeatRate.get();
return latestMinuteHeartbeatRate.longValue();
//latestMinuteHeartbeatRate = new AtomicLong(0L);
latestMinuteHeartbeatRate = new LongAdder();
由于服务注册表的每一条数据并不是都会变化的,每隔 30 秒可能只有少数几个服务实例的数据会出现变化,所以并不需要每隔 30 秒就全量拉取服务注册表的所有数据。
否则,如果服务实例有几万个,那么服务注册表里对应有几万条数据。每 30 秒拉取几万条数据,将对网络开销、注册中心的性能产生巨大压力。
注册中心启动时,会先全量拉取一次服务注册表,然后每隔 30 秒增量拉取一次服务注册表。所以每隔 30 秒,拉取最近 30 秒变化的少量服务实例信息即可。
可以使用一个队列,队列里存放的就是最近 3 分钟有变化的服务实例。
public class ServiceRegistry {
public static final Long RECENTLY_CHANGED_ITEM_CHECK_INTERVAL = 3000L;
public static final Long RECENTLY_CHANGED_ITEM_EXPIRED = 3 * 60 * 1000L;
private static ServiceRegistry instance = new ServiceRegistry();
private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
private LinkedList<RecentlyChangedServiceInstance> recentlyChangedQueue = new LinkedList<RecentlyChangedServiceInstance>();
public static ServiceRegistry getInstance() {
return instance;
private ServiceRegistry() {
RecentlyChangedQueueMonitor recentlyChangedQueueMonitor = new RecentlyChangedQueueMonitor();
public synchronized void register(ServiceInstance serviceInstance) {
RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REGISTER);
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceInstance.getServiceName());
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
registry.put(serviceInstance.getServiceName(), serviceInstanceMap);
serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance);
System.out.println("服务实例,完成注册......【" + serviceInstance + "】");
System.out.println("注册表:" + registry);
public synchronized ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
return serviceInstanceMap.get(serviceInstanceId);
public synchronized Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
public synchronized void remove(String serviceName, String serviceInstanceId) {
System.out.println("服务实例从注册表中摘除[" + serviceName + ", " + serviceInstanceId + "]");
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
ServiceInstance serviceInstance = serviceInstanceMap.get(serviceInstanceId);
RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REMOVE);
class RecentlyChangedServiceInstance {
ServiceInstance serviceInstance;
Long changedTimestamp;
String serviceInstanceOperation;
public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {
this.serviceInstance = serviceInstance;
this.changedTimestamp = changedTimestamp;
this.serviceInstanceOperation = serviceInstanceOperation;
class ServiceInstanceOperation {
public static final String REGISTER = "register";//注册
public static final String REMOVE = "REMOVE";//删除
class RecentlyChangedQueueMonitor extends Thread {
public void run() {
while(true) {
try {
synchronized(instance) {
RecentlyChangedServiceInstance recentlyChangedItem = null;
Long currentTimestamp = System.currentTimeMillis();
while ((recentlyChangedItem = recentlyChangedQueue.peek()) != null) {
if (currentTimestamp - recentlyChangedItem.changedTimestamp > RECENTLY_CHANGED_ITEM_EXPIRED) {
} catch (Exception e) {
public class RegisterServerController {
private ServiceRegistry registry = ServiceRegistry.getInstance();
public RegisterResponse register(RegisterRequest registerRequest) {
public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) {
public Map<String, Map<String, ServiceInstance>> fetchFullServiceRegistry() {
return registry.getRegistry();
public LinkedList<RecentlyChangedServiceInstance> fetchDeltaServiceRegistry() {
return registry.getRecentlyChangedQueue();
public void cancel(String serviceName, String serviceInstanceId) {
public class ServiceRegistry {
public synchronized Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
public synchronized LinkedList<RecentlyChangedServiceInstance> getRecentlyChangedQueue() {
return recentlyChangedQueue;
public class CachedServiceRegistry {
private FetchDeltaRegistryWorker fetchDeltaRegistryWorker;
public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
this.registerClient = registerClient;
this.httpSender = httpSender;
public void initialize() {
FetchFullRegistryWorker fetchFullRegistryWorker = new FetchFullRegistryWorker();
public void destroy() {
private class FetchFullRegistryWorker extends Thread {
public void run() {
registry = httpSender.fetchServiceRegistry();
private class FetchDeltaRegistryWorker extends Thread {
public void run() {
while(registerClient.isRunning()) {
try {
} catch (Exception e) {
public class HttpSender {
public Map<String, Map<String, ServiceInstance>> fetchServiceRegistry() {
Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
ServiceInstance serviceInstance = new ServiceInstance();
Map<String, ServiceInstance> serviceInstances = new HashMap<String, ServiceInstance>();
serviceInstances.put("FINANCE-SERVICE-", serviceInstance);
registry.put("FINANCE-SERVICE", serviceInstances);
System.out.println("拉取注册表:" + registry);
return registry;
public LinkedList<RecentlyChangedServiceInstance> fetchDeltaServiceRegistry() {
LinkedList<RecentlyChangedServiceInstance> recentlyChangedQueue = new LinkedList<RecentlyChangedServiceInstance>();
ServiceInstance serviceInstance = new ServiceInstance();
RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), "register");
System.out.println("拉取增量注册表:" + recentlyChangedQueue);
return recentlyChangedQueue;
public class CachedServiceRegistry {
private class FetchDeltaRegistryWorker extends Thread {
public void run() {
while(registerClient.isRunning()) {
try {
LinkedList<RecentlyChangedServiceInstance> deltaRegistry = httpSender.fetchDeltaServiceRegistry();
synchronized(registry) {
} catch (Exception e) {
private void mergeDeltaRegistry(LinkedList<RecentlyChangedServiceInstance> deltaRegistry) {
for (RecentlyChangedServiceInstance recentlyChangedItem : deltaRegistry) {
if (ServiceInstanceOperation.REGISTER.equals(recentlyChangedItem.serviceInstanceOperation)) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(recentlyChangedItem.serviceInstance.getServiceName());
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
registry.put(recentlyChangedItem.serviceInstance.getServiceName(), serviceInstanceMap);
ServiceInstance serviceInstance = serviceInstanceMap.get(recentlyChangedItem.serviceInstance.getServiceInstanceId());
if (serviceInstance == null) {
serviceInstanceMap.put(recentlyChangedItem.serviceInstance.getServiceInstanceId(), recentlyChangedItem.serviceInstance);
else if (ServiceInstanceOperation.REMOVE.equals(recentlyChangedItem.serviceInstanceOperation)) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(recentlyChangedItem.serviceInstance.getServiceName());
if (serviceInstanceMap != null) {
private class FetchDeltaRegistryWorker extends Thread {
public void run() {
while(registerClient.isRunning()) {
try {
DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();
synchronized(registry) {
Long serverSideTotalCount = deltaRegistry.getServiceInstanceTotalCount();
Long clientSideTotalCount = 0L;
for (Map<String, ServiceInstance> serviceInstanceMap : registry.values()) {
clientSideTotalCount += serviceInstanceMap.size();
if (serverSideTotalCount != clientSideTotalCount) {
registry = httpSender.fetchFullRegistry();
} catch (Exception e) {
19.AtomicReference 优化客户端缓存注册表
多个线程同时对缓存的注册表信息进行修改时,必然存在并发冲突问题,此时可用 AtomicReference 的 CAS 操作来替代使用加重量级锁。
public class Applications {
private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
public Applications() {
public Applications(Map<String, Map<String, ServiceInstance>> registry) {
this.registry = registry;
public Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
public void setRegistry(Map<String, Map<String, ServiceInstance>> registry) {
this.registry = registry;
public class CachedServiceRegistry {
private static final Long SERVICE_REGISTRY_FETCH_INTERVAL = 30 * 1000L;
private AtomicReference<Applications> applications = new AtomicReference<Applications>(new Applications());
private FetchDeltaRegistryWorker fetchDeltaRegistryWorker;
private RegisterClient registerClient;
private HttpSender httpSender;
public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
this.registerClient = registerClient;
this.httpSender = httpSender;
public void initialize() {
FetchFullRegistryWorker fetchFullRegistryWorker = new FetchFullRegistryWorker();
public void destroy() {
private class FetchFullRegistryWorker extends Thread {
public void run() {
Applications fetchedApplications = httpSender.fetchFullRegistry();
while (true) {
Applications expectedApplications = applications.get();
if (applications.compareAndSet(expectedApplications, fetchedApplications)) {
private class FetchDeltaRegistryWorker extends Thread {
public void run() {
while (registerClient.isRunning()) {
try {
DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();
} catch (Exception e) {
private void mergeDeltaRegistry(DeltaRegistry deltaRegistry) {
synchronized(applications) {
Map<String, Map<String, ServiceInstance>> registry = applications.get().getRegistry();
LinkedList<RecentlyChangedServiceInstance> recentlyChangedQueue = deltaRegistry.getRecentlyChangedQueue();
for (RecentlyChangedServiceInstance recentlyChangedItem : recentlyChangedQueue) {
String serviceName = recentlyChangedItem.serviceInstance.getServiceName();
String serviceInstanceId = recentlyChangedItem.serviceInstance.getServiceInstanceId();
if (ServiceInstanceOperation.REGISTER.equals(recentlyChangedItem.serviceInstanceOperation)) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
registry.put(serviceName, serviceInstanceMap);
ServiceInstance serviceInstance = serviceInstanceMap.get(serviceInstanceId);
if (serviceInstance == null) {
serviceInstanceMap.put(serviceInstanceId, recentlyChangedItem.serviceInstance);
else if (ServiceInstanceOperation.REMOVE.equals(recentlyChangedItem.serviceInstanceOperation)) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
if (serviceInstanceMap != null) {
private void reconcileRegistry(DeltaRegistry deltaRegistry) {
Map<String, Map<String, ServiceInstance>> registry = applications.get().getRegistry();
Long serverSideTotalCount = deltaRegistry.getServiceInstanceTotalCount();
Long clientSideTotalCount = 0L;
for (Map<String, ServiceInstance> serviceInstanceMap : registry.values()) {
clientSideTotalCount += serviceInstanceMap.size();
if (serverSideTotalCount != clientSideTotalCount) {
Applications fetchedApplications = httpSender.fetchFullRegistry();
while(true) {
Applications expectedApplications = applications.get();
if (applications.compareAndSet(expectedApplications, fetchedApplications)) {
class ServiceInstanceOperation {
public static final String REGISTER = "register";//注册
public static final String REMOVE = "REMOVE";//删除
public Map<String, Map<String, ServiceInstance>> getRegistry() {
return applications.get().getRegistry();
static class RecentlyChangedServiceInstance {
ServiceInstance serviceInstance;
Long changedTimestamp;
String serviceInstanceOperation;
public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {
this.serviceInstance = serviceInstance;
this.changedTimestamp = changedTimestamp;
this.serviceInstanceOperation = serviceInstanceOperation;
public String toString() {
return"RecentlyChangedServiceInstance [serviceInstance=" + serviceInstance + ", changedTimestamp=" + changedTimestamp + ", serviceInstanceOperation=" + serviceInstanceOperation + "]";
20.AtomicStampedReference 解决 ABA 问题
public class CachedServiceRegistry {
private AtomicStampedReference<Applications> applications;
public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
this.registerClient = registerClient;
this.httpSender = httpSender;
this.applications = new AtomicStampedReference<Applications>(new Applications(), 0);
private class FetchFullRegistryWorker extends Thread {
public void run() {
Applications fetchedApplications = httpSender.fetchFullRegistry();
while(true) {
Applications expectedApplications = applications.getReference();
int expectedStamp = applications.getStamp();
if (applications.compareAndSet(expectedApplications, fetchedApplications, expectedStamp, expectedStamp + 1)) {
21.AtomicLong 多线程拉取注册表版本不错乱
前面使用 AtomicReference 来解决多线程并发赋值时的原子性问题,下面使用 AtomicLong 来解决多线程并发拉注册表时可能的版本混乱问题。
public class CachedServiceRegistry {
private class FetchFullRegistryWorker extends Thread {
public void run() {
(2)AtomicLong 解决注册表版本错乱问题
public class CachedServiceRegistry {
private AtomicLong applicationsVersion = new AtomicLong(0L);
private void fetchFullRegistry() {
Long expectedVersion = applicationsVersion.get(); // version = 0
Applications fetchedApplications = httpSender.fetchFullRegistry();
if (applicationsVersion.compareAndSet(expectedVersion, expectedVersion + 1)) {
while (true) {
Applications expectedApplications = applications.getReference();
int expectedStamp = applications.getStamp();
if (applications.compareAndSet(expectedApplications, fetchedApplications, expectedStamp, expectedStamp + 1)) {
private class FetchDeltaRegistryWorker extends Thread {
public void run() {
while(registerClient.isRunning()) {
try {
Long expectedVersion = applicationsVersion.get();
if (applicationsVersion.compareAndSet(expectedVersion, expectedVersion + 1)) {
DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();
} catch (Exception e) {