12.基于 volatile 优化微服务的优雅关闭机制
//在微服务上被创建和启动,负责和register-server进行通信
public class RegisterClient {
...
//服务实例是否在运行
private volatile Boolean isRunning;
public RegisterClient() {
this.serviceInstanceId = UUID.randomUUID().toString().replace("-", "");
this.httpSender = new HttpSender();
this.heartbeatWorker = new HeartbeatWorker();
this.isRunning = true;
}
//启动RegisterClient组件
public void start() {
try {
RegisterWorker registerWorker = new RegisterWorker();
registerWorker.start();
registerWorker.join();
heartbeatWorker.start();
} catch (Exception e) {
e.printStackTrace();
}
}
//停止RegisterClient组件
public void shutdown() {
this.isRunning = false;
this.heartbeatWorker.interrupt();
}
//服务注册线程
private class RegisterWorker extends Thread {
...
}
//心跳线程
private class HeartbeatWorker extends Thread {
@Override
public void run() {
//如果注册成功,就进入while true死循环
HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.setServiceName(SERVICE_NAME);
heartbeatRequest.setServiceInstanceId(serviceInstanceId);
HeartbeatResponse heartbeatResponse = null;
while(isRunning) {
try {
heartbeatResponse = httpSender.heartbeat(heartbeatRequest);
System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......");
Thread.sleep(HEARTBEAT_INTERVAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
13.基于 volatile 优化微服务存活状态检查机制
private class Lease {
//最近一次心跳的时间
private volatile Long latestHeartbeatTime = System.currentTimeMillis();
//续约,只要微服务发送一次心跳,就相当于维护了register-client和register-server之间的一个契约
//进行续约的意思就是表明还存活着,分布式系统中大都有契约机制
public void renew() {
this.latestHeartbeatTime = System.currentTimeMillis();
System.out.println("服务实例[" + serviceInstanceId + "],进行续约:" + latestHeartbeatTime);
}
//判断当前服务实例的契约是否还存活
public Boolean isAlive() {
Long currentTime = System.currentTimeMillis();
if (currentTime - latestHeartbeatTime > NOT_ALIVE_PERIOD) {
System.out.println("服务实例[" + serviceInstanceId + "],不再存活");
return false;
}
System.out.println("服务实例[" + serviceInstanceId + "],保持存活");
return true;
}
}
复制代码
14.i++的多线程安全问题演示
多个线程对一个共享数据并发写,可能会导致数据出错,这就是原子性问题。
public class ThreadUnsafeDemo {
private static int data = 0;
public static void main(String[] args) throws Exception {
Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
ThreadUnsafeDemo.data++;
System.out.println("thread2:" + data);
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
//最后的结果未必是20000
System.out.println(data);
}
}
复制代码
15.JMM 是多线程并发安全问题的根本原因
多线程并发写一个共享变量会出现问题的根本原因是 Java 内存模型 JMM。
在 Java 内存模型下,多个线程并发执行时,每个线程(一般对应一个 CPU)都会有自己的工作内存,每个线程读写数据时,线程对应的 CPU 会从主内存获取数据到本地进行缓存。
volatile 是无法保证原子性的,因为 volatile 的底层机制是:Lock 前缀指令 + MESI 缓存一致性协议。某线程修改变量时会刷主内存,并使其他线程工作内存的该变量缓存过期。
16.synchronized 可解决多线程并发安全问题
public class ThreadUnsafeDemo {
private static int data = 0;
public static void main(String[] args) throws Exception {
Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
increment();
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
increment();
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
//最后的结果未必是20000
System.out.println(data);
}
private synchronized static void increment() {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
复制代码
17.synchronized 的常见使用方法总结
(1)加类锁
如果 synchronized 一个静态方法,就是对这个类的 Class 对象加锁。如果 synchronized(类.class),也是对这个类的 Class 对象加锁。同一时间只有一个线程可以访问同一个类的 synchronized 方法。注意:每个类都会对应一个 Class 对象。
public class ThreadUnsafeDemo {
private static int data = 0;
public static void main(String[] args) throws Exception {
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
increment();
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
increment();
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
//最后的结果未必是20000
System.out.println(data);
}
private synchronized static void increment() {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
//synchronized一个类的静态方法,等价于synchronized(该类.class)
synchronized(MyObject.class) {
}
复制代码
(2)加对象锁
如果 synchronized 一个普通的方法,那么就是对当前的对象实例加锁。同一时间只有一个线程可以访问同一个对象实例的 synchronized 方法。注意:synchronized 一个代码片段的常见写法,就是 synchronized(this),意思就是基于当前这个对象实例来进行加锁。
public class ThreadUnsafeDemo {
private static int data = 0;
public static void main(String[] args) throws Exception {
final ThreadUnsafeDemo demo = new ThreadUnsafeDemo();
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
demo.increment();
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
demo.increment();
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
//最后的结果未必是20000
System.out.println(data);
}
private synchronized void increment() {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
或者
public class ThreadUnsafeDemo {
private static int data = 0;
public static void main(String[] args) throws Exception {
final ThreadUnsafeDemo demo = new ThreadUnsafeDemo();
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized(demo) {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized(demo) {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
}
};
thread1.start();
thread2.start();
thread1.join();
thread2.join();
//最后的结果未必是20000
System.out.println(data);
}
}
//对对象实例的方法加锁
private synchronized void increment() {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
//等价于synchronized(this)
private void increment() {
synchronized(this) {
ThreadUnsafeDemo.data++;
System.out.println("thread1:" + data);
}
}
//等价于synchronized(对象实例)
synchronized(myObject) {
}
复制代码
(3)类锁和对象锁的区别
synchronized 锁分两种:一是对某个对象加锁,二是对某个类加锁。对类加锁其实也是在对一个对象加锁,只不过是对类的 Class 对象加锁。
类是在 JVM 启动过程中加载的,每个.class 文件被装载后会产生一个 Class 对象,每个.class 文件产生的 Class 对象在 JVM 进程中是全局唯一的。static 修饰的成员变量和方法,它们的生命周期都是属于类级别的,它们随着类的定义被分配和装载到内存,随着类被卸载而回收。
实例对象的生命周期伴随着实例对象的创建而开始,同时伴随着实例对象的回收而结束。
因此,类锁和对象锁的最大区别就是:锁的生命周期不同。
18.synchronized 的底层原理
(1)锁信息的存储
一个 Java 对象的存储结构由三部分组成:对象头、实例数据、对齐填充。其中对象头也由三部分组成:Mark Word、Klass Pointer、Length,而 Mark Word 会记录该对象的 HashCode、分代年龄和锁标记位;
(2)锁的四种状态
为了减少获得锁和释放锁带来的性能损耗,JDK 1.6 引入了偏向锁和轻量级锁。锁一共有 4 种状态:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,这意味着偏向锁升级为轻量级锁后不能降级回偏向锁。
(3)什么是偏向锁
一.偏向锁的介绍
大多数情况下,锁不仅不存在多线程竞争,而且总会由同一线程多次获得。所以,为了让线程获得锁的代价更低,便引入了偏向锁。
偏向锁可以认为是在没有多线程竞争的情况下,访问同步块的加锁场景。也就是在单线程执行同步块的情况下,就没有必要使用重量级锁了。为了提升性能,没必要基于操作系统级别的 Mutex Lock 来实现锁的抢占。
偏向锁的作用是:线程在没有线程竞争的情况下去访问同步块代码时,会先尝试通过偏向锁来抢占访问资格,这个抢占过程是基于 CAS 来完成的。如果抢占锁成功,则直接修改对象头中的 Mark Word 信息。也就是修改偏向锁标记为 1、锁标记为 01,以及存储当前获得锁的线程 ID。
偏向的意思是:如果线程 X 获得了偏向锁,当线程 X 后续再访问这个同步块时,就会判断出对象头中的线程 ID 和线程 X 相等,于是就不需要再次抢占锁了。
二.偏向锁的实现流程
三.偏向锁的实现原理
偏向锁的实现原理就是使用 CAS 来设置对象头中的线程 ID。如果成功则获得偏向锁,如果失败则升级到轻量级锁。
(4)什么是轻量级锁
一.轻量级锁的介绍
如果没有线程竞争,使用偏向锁能够在不影响性能的前提下获得锁。如果有多个线程并发访问同步块,那么没抢占到锁的线程只能进行阻塞等待。但在使用重量级锁阻塞等待前,还有更好的平衡方案,也就是使用轻量级锁。
所谓的轻量级锁,就是没有抢占到锁的线程,进行一定次数的自旋重试。如果线程在重试过程中抢占到了锁,那么这个线程就不需要阻塞了。
如果持有锁的线程占用锁的时间比较短,则自旋带来的性能提高会比较明显。如果持有锁的线程占用锁的时间比较长,则自旋就会浪费 CPU 资源。所以线程通过自旋来重试抢占锁的次数必须要有一个限制。
为了优化自旋,JDK 还引入了自适应自旋锁。如果在一个锁对象中,通过自旋获得锁很少成功,则 JVM 会缩短自旋次数。否则,JVM 可能会增加自旋次数。
二.轻量级锁的实现流程
三.轻量级锁的实现原理
如果偏向锁存在竞争或者偏向锁未开启,那么当线程访问同步块代码时就会通过轻量级锁来抢占锁资源。轻量级锁的原理就是,通过 CAS 来修改锁对象中指向 Lock Record 的指针。
其中偏向锁存在竞争指的是,还没有线程通过 CAS 设置对象头的线程 ID。此时多个线程会同时执行 CAS 尝试获取偏向锁,失败的就升级轻量级锁。
而偏向锁未开启指的是,已有线程成功通过 CAS 设置了对象头的线程 ID。此时多个线程同时访问同步块代码,就会直接升级轻量级锁。
(5)什么是重量级锁
轻量级锁能够通过一定次数的重试,让每一个没获得锁的线程有可能抢占到锁。但轻量级锁只有在获得锁的线程持有锁的时间比较短的情况下才能提升性能。如果持有锁的线程占用锁的时间较长,那么不能让没抢到锁的线程一直自旋。
如果没抢到锁的线程通过一定次数的自旋后,发现仍然没有获得锁。那么就只能升级到重量级锁,来进行阻塞等待了。
重量级锁的本质是:没有获得锁的线程会通过 park()方法挂起,接着被获得锁的线程通过 unpark()方法唤醒后再次抢占锁,直到抢占成功。
重量级锁依赖于底层操作系统的 Mutex Lock 来实现。使用 Mutex Lock 时需要从用户态切换到内核态,才能将当前线程挂起,所以性能开销比较大。
从偏向锁到轻量级锁再到重量级锁,整个优化过程其实使用了乐观锁的思想。
(6)锁升级的流程
一.锁升级的流程
当一个线程访问使用了 synchronized 修饰的代码块时,如果当前还没有线程获得偏向锁,则先通过 CAS 尝试获得偏向锁。如果当前已有线程获得偏向锁,则尝试升级到轻量级锁去抢占锁。轻量级锁就是通过多次 CAS(也就是自旋)来完成的。如果线程通过多次自旋仍然无法获得锁,那么就只能升级到重量级锁进行阻塞等待。
二.偏向锁和轻量级锁的区别
偏向锁只能保证偏向同一个线程,只要有线程获得过偏向锁,那么当其他线程抢占锁时,只能通过轻量级锁来实现,除非触发了重新偏向。如果获得轻量级锁的线程在后续的 20 次访问中,发现每次访问锁的线程都是同一个,那么就会触发重新偏向。
轻量级锁可以灵活释放。如果线程 1 抢占了轻量级锁,那么在锁用完并释放后,线程 2 可以继续通过轻量级锁来抢占锁资源。
偏向锁,就是在一段时间内只会由同一个线程来获得和释放锁,加锁的方式是把线程 ID 保存到锁对象的 Mark Word 中。
轻量级锁,就是在一段时间内可能会由多个线程来获得和释放锁。存在锁交替竞争的场景,但在同一时刻不会有多个线程同时获得锁。加锁的方式是首先在每个线程的栈帧中分配一个 Lock Record,然后把锁对象中的 Mark Word 拷贝到 Lock Record 中,最后把锁对象的 Mark Word 的指针指向 Lock Record。
(7)锁膨胀的流程
一.获取重量级锁之前的锁膨胀
如果线程在运行 synchronized 修饰的同步块代码时,发现锁状态是轻量级锁并且有其他线程抢占了锁资源,那么该线程就会触发锁膨胀升级到重量级锁。
在获取重量级锁之前会先实现锁膨胀,锁膨胀时首先会创建一个 ObjectMonitor 对象,然后把 ObjectMonitor 对象的指针保存到锁对象的 Mark Word 中。
重量级锁的实现是在 ObjectMonitor 中完成的,所以锁膨胀的意义就是构建一个 ObjectMonitor 对象。
二.ObjectMonitor 对象的重要字段
_owner:保存当前持有锁的线程
_cxq:没有获得锁的线程队列
_waitset:被 wait()方法阻塞的线程队列
_recursions:锁被重入的次数
三.重量级锁的获取流程
重量级锁的竞争都是在 ObjectMonitor 对象中完成的。首先判断当前线程是否是重入,如果是则重入次数 + 1。
然后通过 CAS 自旋来判断 ObjectMonitor 中的_owner 字段是否为空。如果为空,则表示重量级锁已经被释放,当前线程可以获得锁。如果不为空,就继续进行自适应自旋重试。
最后如果通过自旋竞争锁失败,那么就把当前线程构建成一个 ObjectWaiter 结点,插入到 ObjectMonitor 的_cxq 队列的队头,然后再调用 park()方法阻塞当前线程。
四.重量级锁的释放流程
首先把 ObjectMonitor 的_owner 字段设置为 null,然后从 ObjectMonitor 的_cxq 队列中调用 unpark()方法唤醒一个阻塞的线程。被唤醒的线程会重新竞争重量级锁,如果没抢到,则继续阻塞等待。因为 synchronized 是非公平锁,被唤醒的线程不一定能重新抢占到锁。
(8)synchronized 的 lock 和 unlock 操作规定保证原子性 + 可见性 + 有序性
synchronized 是由 monitorenter 和 monitorexit 这两条字节码指令来实现的。可以理解为 monitorenter 指令对应了重量级锁的获取流程,monitorexit 指令对应了重量级锁的释放流程。
这两个字节码指令最终又会在内存间的交互时,使用 lock 操作和 unlock 操作。通过 lock 操作和 unlock 操作的语义来实现 synchronized 的原子性。
lcok 操作前需要从主内存同步最新值到工作内存,unlock 操作前会将工作内存上的值刷新回主内存,这样 lock 操作和 unlock 操作就实现了 synchronized 的可见性。
此外还规定,同一时刻只有一条线程可以进行 lock 操作,这样就实现了 synchronized 的有序性。
(9)内核态和用户态说明
在重量级锁中,线程的阻塞和唤醒是通过 park()方法和 unpark()方法来完成的,park()方法和 unpark()方法需要通过系统调用来完成。
由于系统调用是在内核态中运行的,所以进行系统调用时,系统需要从用户态切换到内核态。系统从用户态切换到内核态的这个过程会产生性能损耗。在切换之前需要保存用户态的状态,包括寄存器、程序指令等。然后才能执行内核态的系统调用指令,最后还要恢复用户态。
用户态和内核态表示的是操作系统中的不同执行权限。两者最大的区别在于:运行在用户空间中的进程不能直接访问操作系统内核的指令和程序,运行在内核空间中的进程可以直接访问操作系统内核的指令和程序。进行权限划分是为了避免用户在进程中直接操作危险的系统指令,从而影响进程和系统的稳定。
19.微服务定时拉取注册表信息
(1)HTTP 请求组件增加拉取服务注册表信息
//负责发送各种HTTP请求的组件
public class HttpSender {
//发送注册请求
public RegisterResponse register(RegisterRequest request) {
//实际可能会基于类似HttpClient这种开源的网络包来进行发送
System.out.println("服务实例[" + request + "],发送请求进行注册......");
//收到register-server响应后,封装一个Response对象
RegisterResponse response = new RegisterResponse();
response.setStatus(RegisterResponse.SUCCESS);
return response;
}
//发送心跳请求
public HeartbeatResponse heartbeat(HeartbeatRequest request) {
System.out.println("服务实例[" + request + "],发送请求进行心跳......");
HeartbeatResponse response = new HeartbeatResponse();
response.setStatus(RegisterResponse.SUCCESS);
return response;
}
//模拟拉取服务注册表信息
public Map<String, Map<String, ServiceInstance>> fetchServiceRegistry() {
Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setHostname("finance-service-01");
serviceInstance.setIp("192.168.31.1207");
serviceInstance.setPort(9000);
serviceInstance.setServiceInstanceId("FINANCE-SERVICE-192.168.31.207:9000");
serviceInstance.setServiceName("FINANCE-SERVICE");
Map<String, ServiceInstance> serviceInstances = new HashMap<String, ServiceInstance>();
serviceInstances.put("FINANCE-SERVICE-192.168.31.207:9000", serviceInstance);
registry.put("FINANCE-SERVICE", serviceInstances);
System.out.println("拉取注册表:" + registry);
return registry;
}
}
复制代码
(2)每隔 30 秒定时去服务端拉取注册表信息
//服务注册中心的客户端,缓存的一个服务注册表
public class ClientCachedServiceRegistry {
//拉取服务注册表的间隔时间:30s
private static final Long SERVICE_REGISTRY_FETCH_INTERVAL = 30 * 1000L;
//客户端缓存的服务注册表
private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
//负责定时拉取注册表到客户端进行缓存的后台线程
private Daemon daemon;
//RegisterClient
private RegisterClient registerClient;
//HTTP通信组件
private HttpSender httpSender;
//构造方法
public ClientCachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
this.daemon = new Daemon();
this.registerClient = registerClient;
this.httpSender = httpSender;
}
//初始化
public void initialize() {
this.daemon.start();
}
//销毁这个组件
public void destroy() {
this.daemon.interrupt();
}
//获取服务注册表
public Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
}
//负责定时拉取注册表信息到本地缓存
private class Daemon extends Thread {
@Override
public void run() {
while(registerClient.isRunning()) {
try {
registry = httpSender.fetchServiceRegistry();
Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
(3)添加客户端缓存的注册表到 register-client
//在服务上被创建和启动,负责跟register-server进行通信
public class RegisterClient {
public static final String SERVICE_NAME = "inventory-service";
public static final String IP = "192.168.31.207";
public static final String HOSTNAME = "inventory01";
public static final int PORT = 9000;
private static final Long HEARTBEAT_INTERVAL = 30 * 1000L;
//服务实例id
private String serviceInstanceId;
//HTTP通信组件
private HttpSender httpSender;
//心跳线程
private HeartbeatWorker heartbeatWorker;
//服务实例是否在运行
private volatile Boolean isRunning;
//客户端缓存的注册表
private ClientCachedServiceRegistry registry;
//构造方法
public RegisterClient() {
this.serviceInstanceId = UUID.randomUUID().toString().replace("-", "");
this.httpSender = new HttpSender();
this.heartbeatWorker = new HeartbeatWorker();
this.isRunning = true;
this.registry = new ClientCachedServiceRegistry(this, httpSender);
}
//启动RegisterClient组件
public void start() {
try {
//这个线程刚启动时,首先需要完成注册
//完成注册后,就会进入一个while true循环,每隔30秒发送一个心跳请求
RegisterWorker registerWorker = new RegisterWorker();
registerWorker.start();
registerWorker.join();
//启动心跳线程,定时发送心跳请求
heartbeatWorker.start();
//初始化客户端缓存的服务注册表组件
this.registry.initialize();
} catch (Exception e) {
e.printStackTrace();
}
}
//停止RegisterClient组件
public void shutdown() {
this.isRunning = false;
this.heartbeatWorker.interrupt();
this.registry.destroy();
}
//返回RegisterClient是否正在运行
public Boolean isRunning() {
return isRunning;
}
//服务注册线程
private class RegisterWorker extends Thread {
@Override
public void run() {
RegisterRequest registerRequest = new RegisterRequest();
registerRequest.setServiceName(SERVICE_NAME);
registerRequest.setIp(IP);
registerRequest.setHostname(HOSTNAME);
registerRequest.setPort(PORT);
registerRequest.setServiceInstanceId(serviceInstanceId);
RegisterResponse registerResponse = httpSender.register(registerRequest);
System.out.println("服务注册的结果是:" + registerResponse.getStatus() + "......");
}
}
//心跳线程
private class HeartbeatWorker extends Thread {
@Override
public void run() {
//如果注册成功,就进入while true循环
HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
heartbeatRequest.setServiceName(SERVICE_NAME);
heartbeatRequest.setServiceInstanceId(serviceInstanceId);
HeartbeatResponse heartbeatResponse = null;
while(isRunning) {
try {
heartbeatResponse = httpSender.heartbeat(heartbeatRequest);
System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......");
Thread.sleep(HEARTBEAT_INTERVAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
20.基于 synchronized 解决注册表并发问题
//服务注册表
public class ServiceRegistry {
//注册表是一个单例
private static ServiceRegistry instance = new ServiceRegistry();
private ServiceRegistry() {
}
//核心的内存数据结构:注册表
//Map<String, ServiceInstance>:key是服务名称,比如库存服务,value是这个服务的所有的服务实例,比如提供库存服务的2台机器
private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>();
//服务注册
public synchronized void register(ServiceInstance serviceInstance) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceInstance.getServiceName());
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
registry.put(serviceInstance.getServiceName(), serviceInstanceMap);
}
serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance);
System.out.println("服务实例[" + serviceInstance + "],完成注册......");
System.out.println("注册表:" + registry);
}
//获取服务实例
public synchronized ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
return serviceInstanceMap.get(serviceInstanceId);
}
//获取整个注册表
public synchronized Map<String, Map<String, ServiceInstance>> getRegistry() {
return registry;
}
//从注册表中删除一个服务实例
public synchronized void remove(String serviceName, String serviceInstanceId) {
System.out.println("服务实例[" + serviceInstanceId + "],从注册表中进行摘除");
Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName);
serviceInstanceMap.remove(serviceInstanceId);
}
//获取服务注册表实例
public static ServiceRegistry getInstance() {
return instance;
}
}
复制代码
由于 ConcurrentHashMap 是线程安全的数据结构,所以可以使用 ConcurrentHashMap 来代替 synchronized 关键字。通过 ConcurrentHashMap 的分段加锁机制,能更好地支持并发。
21.微服务注册中心的自我保护机制
(1)为什么要引入自我保护机制
对于单点的微服务注册中心,一般需要有一个自我保护的机制。如果 register-server 出现网络故障,导致大量服务实例没办法发送心跳,这时候就不能直接把所有的服务实例都给摘除掉。因为这时候是服务端自己出现问题,客户端服务是正常的,所以就需要引入自我保护的机制了。
(2)register-server 什么情况会判断是自己问题导致无法接收心跳请求
可以设定一个比例,比如 25%。如果 ServiceAliveMonitor 发现超过 25%的服务实例的心跳没及时更新,那么就可以认为是 register-server 网络故障而导致接收不到心跳请求。于是就开始自动进入自我保护机制,不再摘除任何的服务实例。从而避免 register-server 在自己网络故障的情况下,一下子就摘除大量服务实例,导致注册表的数据出现严重丢失。
如果后续发现已有超过 75%的服务实例已恢复发送心跳请求,此时 ServiceAliveMonitor 可以退出自我保护的状态,并继续检查某个服务实例的心跳是否超过 90 秒还没更新,如果是则认为这个服务实例已宕机,从注册表中进行摘除。
(3)为了实现自我保护机制需要收集心跳总次数
比如有 10 个服务实例,每分钟应该有 20 次心跳,但某一分钟只收到 8 次心跳。此时发现 8 < 20 * 0.75,于是有超过 25%的服务实例的心跳没有正常发送。此时就可以认为是 register-server 自己网络故障,从而触发自我保护机制。此时不再摘除任何服务实例,避免注册表的数据出现问题。
如果某一分钟收到的心跳次数达到了 18 次,18 > 20 * 0.85,此时就可以认为网络恢复了正常,于是就退出自我保护机制,继续检查服务实例的心跳是否在 90 秒内更新过。如果没更新过,就摘除这个故障的服务实例。
总结:为了实现自我保护机制,register-server 需要记录每分钟接收多少心跳请求。所以 ServiceAliveMonitor 线程每次尝试摘除服务实例时,都会检查上一分钟的心跳次数是否满足超 75%的服务实例都正常。如果不满足,就进入自我保护机制,避免摘除大量的服务实例。
22.基于 synchronized 实现服务心跳计数器
(1)服务端接收到心跳请求时增加记录心跳次数
public class RegisterServerController {
private ServiceRegistry registry = ServiceRegistry.getInstance();
//处理发送过来的服务注册请求
public RegisterResponse register(RegisterRequest registerRequest) {
RegisterResponse registerResponse = new RegisterResponse();
try {
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setHostname(registerRequest.getHostname());
serviceInstance.setIp(registerRequest.getIp());
serviceInstance.setPort(registerRequest.getPort());
serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId());
serviceInstance.setServiceName(registerRequest.getServiceName());
registry.register(serviceInstance);
registerResponse.setStatus(RegisterResponse.SUCCESS);
} catch (Exception e) {
e.printStackTrace();
registerResponse.setStatus(RegisterResponse.FAILURE);
}
return registerResponse;
}
//处理发送过来的心跳请求
public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) {
HeartbeatResponse heartbeatResponse = new HeartbeatResponse();
try {
//对服务实例进行续约
ServiceInstance serviceInstance = registry.getServiceInstance(
heartbeatRequest.getServiceName(),
heartbeatRequest.getServiceInstanceId());
serviceInstance.renew();
//记录每分钟的心跳次数
HeartbeatMessuredRate heartbeatMeasuredRate = new HeartbeatMeasuredRate();
heartbeatMeasuredRate.increment();
heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS);
} catch (Exception e) {
e.printStackTrace();
heartbeatResponse.setStatus(HeartbeatResponse.FAILURE);
}
return heartbeatResponse;
}
//拉取服务注册表信息
public Map<String, Map<String, ServiceInstance>> fetchServiceRegistry() {
return registry.getRegistry();
}
}
复制代码
(2)增加一个心跳请求计数器
//心跳请求计数器
public class HeartbeatMeasuredRate {
//单例实例
private static HeartbeatMeasuredRate instance = new HeartbeatMeasuredRate();
//最近一分钟的心跳次数
private long latestMinuteHeartbeatRate = 0L;
//最近一分钟的时间戳
private long latestMinuteTimestamp = System.currentTimeMillis();
//获取单例实例
public static HeartbeatMeasuredRate getInstance() {
return instance;
}
//增加最近一分钟的心跳次数
public synchronized void increment() {
long currentTime = System.currentTimeMillis();
if (currentTime - latestMinuteTimestamp > 60 * 1000) {
latestMinuteHeartbeatRate = 0L;
this.latestMinuteTimestamp = System.currentTimeMillis();
}
latestMinuteHeartbeatRate++;
}
//获取最近一分钟的心跳次数
public synchronized long get() {
return latestMinuteHeartbeatRate;
}
}
复制代码
可以使用 Atomic 来代替 synchronized 关键字来优化这个心跳计数器。
23.微服务关闭时的服务下线实现
(1)register-client 的 HttpSender 组件增加服务下线接口
服务关闭时,需要发送一个请求给 register-server,通知服务下线了。
//负责发送各种HTTP请求的组件
public class HttpSender {
//发送注册请求
public RegisterResponse register(RegisterRequest request) {
...
}
//发送心跳请求
public HeartbeatResponse heartbeat(HeartbeatRequest request) {
...
}
//拉取服务注册表
public Map<String, Map<String, ServiceInstance>> fetchServiceRegistry() {
...
}
//服务下线
public void cancel(String serviceName, String serviceInstanceId) {
System.out.println("服务实例下线[" + serviceName + ", " + serviceInstanceId + "]");
}
}
复制代码
(2)register-server 的 Controller 增加对服务下线请求的处理
public class RegisterServerController {
private ServiceRegistry registry = ServiceRegistry.getInstance();
//服务注册
public RegisterResponse register(RegisterRequest registerRequest) {
...
}
//发送心跳
public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) {
...
}
//拉取服务注册表信息
public Map<String, Map<String, ServiceInstance>> fetchServiceRegistry() {
return registry.getRegistry();
}
//服务下线
public void cancel(String serviceName, String serviceInstanceId) {
registry.remove(serviceName, serviceInstanceId);
}
}
复制代码
24.基于 synchronized 修改触发自我保护阈值
目前已经可以记录 register-server 每分钟收到的心跳请求次数,但还需要知道每分钟收到多少次心跳才不会触发自我保护机制。假设每个服务实例每分钟会发送 2 个心跳请求给 register-server。那么当注册一个服务实例时,要修改触发自我保护机制的阈值,比如加 2。当摘除一个服务实例或某个服务实例下线时,也要修改该阈值,比如减 2。
(1)新增一个自我保护机制的类
//自我保护机制的类
public class SelfProtectionPolicy {
private static SelfProtectionPolicy instance = new SelfProtectionPolicy();
//期望的心跳次数,如果有10个服务实例,这个数值就是10 * 2 = 20
private long expectedHeartbeatRate = 0L;
//期望的心跳次数的阈值,10 * 2 * 0.75 = 15,每分钟至少有15次心跳才不用开启自我保护机制
private long expectedHeartbeatThreshold = 0L;
//返回实例
public static SelfProtectionPolicy getInstance() {
return instance;
}
public long getExpectedHeartbeatRate() {
return expectedHeartbeatRate;
}
public void setExpectedHeartbeatRate(long expectedHeartbeatRate) {
this.expectedHeartbeatRate = expectedHeartbeatRate;
}
public long getExpectedHeartbeatThreshold() {
return expectedHeartbeatThreshold;
}
public void setExpectedHeartbeatThreshold(long expectedHeartbeatThreshold) {
this.expectedHeartbeatThreshold = expectedHeartbeatThreshold;
}
}
复制代码
(2)服务注册和下线时更新触发自我保护阈值
public class RegisterServerController {
private ServiceRegistry registry = ServiceRegistry.getInstance();
//服务注册
public RegisterResponse register(RegisterRequest registerRequest) {
RegisterResponse registerResponse = new RegisterResponse();
try {
//在注册表中加入这个服务实例
...
registry.register(serviceInstance);
//更新自我保护机制的阈值
synchronized(SelfProtectionPolicy.class) {
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
registerResponse.setStatus(RegisterResponse.SUCCESS);
} catch (Exception e) {
e.printStackTrace();
registerResponse.setStatus(RegisterResponse.FAILURE);
}
return registerResponse;
}
...
//服务下线
public void cancel(String serviceName, String serviceInstanceId) {
//从服务注册表中摘除服务实例
registry.remove(serviceName, serviceInstanceId);
//更新触发自我保护机制的阈值
synchronized(SelfProtectionPolicy.class) {
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
}
}
复制代码
(3)监控到服务不存活时更新触发自我保护阈值
//微服务存活状态监控组件
public class ServiceAliveMonitor {
//检查服务实例是否存活的间隔
private static final Long CHECK_ALIVE_INTERVAL = 60 * 1000L;
//负责监控微服务存活状态的后台线程
private Daemon daemon;
public ServiceAliveMonitor() {
this.daemon = new Daemon();
daemon.setDaemon(true);
daemon.setName("ServiceAliveMonitor");
}
//启动后台线程
public void start() {
daemon.start();
}
//负责监控微服务存活状态的后台线程
private class Daemon extends Thread {
private ServiceRegistry registry = ServiceRegistry.getInstance();
@Override
public void run() {
Map<String, Map<String, ServiceInstance>> registryMap = null;
while(true) {
try {
registryMap = registry.getRegistry();
for (String serviceName : registryMap.keySet()) {
Map<String, ServiceInstance> serviceInstanceMap = registryMap.get(serviceName);
for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {
//如果服务实例距离上一次发送心跳已经超过90秒,则认为这个服务不存活
//此时需要从注册表中摘除这个服务实例
if (!serviceInstance.isAlive()) {
registry.remove(serviceName, serviceInstance.getServiceInstanceId());
//更新自我保护机制的阈值
synchronized(SelfProtectionPolicy.class) {
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
}
}
}
Thread.sleep(CHECK_ALIVE_INTERVAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
25.基于 synchronized 开启自我保护机制
随着服务的注册、下线、故障,触发自我保护机制的阈值会不断变动。在 ServiceAliveMonitor 摘除故障服务前,可先判断是否要触发自我保护机制。也就是判断是否满足:上一分钟的心跳次数 < 期望的心跳次数 * 0.75。如果小于则认为 register-server 出现网络故障,无法接收客户端的心跳请求。此时 register-server 就启动自我保护机制,不再摘除任何服务实例。
(1)增加是否需要开启自我保护机制的方法
//自我保护机制
public class SelfProtectionPolicy {
private static SelfProtectionPolicy instance = new SelfProtectionPolicy();
//期望的心跳次数,如果有10个服务实例,这个数值就是10 * 2 = 20
private long expectedHeartbeatRate = 0L;
//期望的心跳次数的阈值,10 * 2 * 0.75 = 15,每分钟至少得有15次心跳才不用开启自我保护机制
private long expectedHeartbeatThreshold = 0L;
//返回实例
public static SelfProtectionPolicy getInstance() {
return instance;
}
//是否需要开启自我保护机制
public Boolean isEnable() {
HeartbeatMessuredRate heartbeatMessuredRate = HeartbeatMessuredRate.getInstance();
long latestMinuteHeartbeatRate = heartbeatMessuredRate.get();
if (latestMinuteHeartbeatRate < this.expectedHeartbeatThreshold) {
System.out.println("[开启自我保护机制]最近一分钟心跳次数=" + latestMinuteHeartbeatRate + ", 期望心跳次数=" + this.expectedHeartbeatThreshold);
return true;
}
System.out.println("[未开启自我保护机制]最近一分钟心跳次数=" + latestMinuteHeartbeatRate + ", 期望心跳次数=" + this.expectedHeartbeatThreshold);
return false;
}
...
}
复制代码
(2)心跳请求计数器定时刷新近一分钟心跳次数
//心跳请求计数器
public class HeartbeatMeasuredRate {
//单例实例
private static HeartbeatMeasuredRate instance = new HeartbeatMeasuredRate();
//最近一分钟的心跳次数
private long latestMinuteHeartbeatRate = 0L;
//最近一分钟的时间戳
private long latestMinuteTimestamp = System.currentTimeMillis();
private HeartbeatMeasuredRate() {
Daemon daemon = new Daemon();
daemon.setDaemon(true);
daemon.start();
}
//获取单例实例
public static HeartbeatMeasuredRate getInstance() {
return instance;
}
//增加一次最近一分钟的心跳次数
public void increment() {
synchronized(HeartbeatMeasuredRate.class) {
latestMinuteHeartbeatRate++;
}
}
//获取最近一分钟的心跳次数
public synchronized long get() {
return latestMinuteHeartbeatRate;
}
private class Daemon extends Thread {
@Override
public void run() {
while(true) {
try {
synchronized(HeartbeatMeasuredRate.class) {
long currentTime = System.currentTimeMillis();
if (currentTime - latestMinuteTimestamp > 60 * 1000) {
latestMinuteHeartbeatRate = 0L;
latestMinuteTimestamp = System.currentTimeMillis();
}
}
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
(3)监控服务是否存活的后台线程增加判断是否开启自我保护机制
//微服务存活状态监控组件
public class ServiceAliveMonitor {
...
//负责监控微服务存活状态的后台线程
private class Daemon extends Thread {
private ServiceRegistry registry = ServiceRegistry.getInstance();
@Override
public void run() {
Map<String, Map<String, ServiceInstance>> registryMap = null;
while(true) {
try {
//判断是否要开启自我保护机制
SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();
if (selfProtectionPolicy.isEnable()) {
Thread.sleep(CHECK_ALIVE_INTERVAL);
continue;
}
registryMap = registry.getRegistry();
for (String serviceName : registryMap.keySet()) {
Map<String, ServiceInstance> serviceInstanceMap = registryMap.get(serviceName);
for (ServiceInstance serviceInstance : serviceInstanceMap.values()) {
//说明服务实例距离上一次发送心跳已经超过90秒了
if (!serviceInstance.isAlive()) {
registry.remove(serviceName, serviceInstance.getServiceInstanceId());
//更新自我保护机制的阈值
synchronized(SelfProtectionPolicy.class) {
selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);
selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));
}
}
}
}
Thread.sleep(CHECK_ALIVE_INTERVAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复制代码
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18715765
体验地址:http://www.jnpfsoft.com/?from=001YH
评论