Nacos 源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里
- 2022 年 3 月 16 日
本文字数:6514 字
阅读完需:约 21 分钟
大家好,给大家先做个自我介绍我是码上代码,大家可以叫我码哥我也是一个普通本科毕业的最普通学生,我相信大部分程序员或者想从事程序员行业的都是普通家庭的孩子,所以我也是靠自己的努力,从毕业入职到一家传统企业,到跳槽未尝败绩,现在在一家某互联网行业巨头公司工作,希望可以通过自己的分享对大家有一些帮助跟随我的专栏学习,可以省去你很多去培训的费用或者网上找资料的时间,节省你的大部分时间成本,让你更加快速成为面试收割机,年度最佳员工
我为大家准备了 16 个技术专栏带领大家一起学习
Nacos 简介
Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十⼀的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。 随着云计算兴起,2018 年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型!
nacos 实践
五分钟学会 Spring Cloud Alibaba:Nacos 作为注册中心和配置中心使用(小白必看,一看就会教程)
架构图
整体架构分为用户层、业务层、内核层和插件,用户层主要解决用户使用的易用性问题,业务层主要解决服务发现和配置管理的功能问题,内核层解决分布式系统⼀致性、存储、高可用等核心问题,插件解决扩展性问题。
本篇源码分析
nacos 中线程池的使用
nacos 中大量使用了线程池,比如协议管理,服务订阅等,nacos 的代码为了实现高可用和可扩展性,对于一些公共的功能做了统一处理,代码更加简洁
首先定义了一个线程池工厂,里面定义一些公有的线程方法
public final class ExecutorFactory {
public static ExecutorService newSingleExecutorService() {
return Executors.newFixedThreadPool(1);
}
public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) {
return Executors.newFixedThreadPool(1, threadFactory);
}
public static ExecutorService newFixedExecutorService(final int nThreads) {
return Executors.newFixedThreadPool(nThreads);
}
public static ExecutorService newFixedExecutorService(final int nThreads, final ThreadFactory threadFactory) {
return Executors.newFixedThreadPool(nThreads, threadFactory);
}
public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) {
return Executors.newScheduledThreadPool(1, threadFactory);
}
public static ScheduledExecutorService newScheduledExecutorService(final int nThreads,
final ThreadFactory threadFactory) {
return Executors.newScheduledThreadPool(nThreads, threadFactory);
}
public static ThreadPoolExecutor newCustomerThreadExecutor(final int coreThreads, final int maxThreads,
final long keepAliveTimeMs, final ThreadFactory threadFactory) {
return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
其次比较重要的一个类,ThreadPoolManager,这个类主要实现了线程池资源注册,销毁的方法供其他业务使用
public final class ThreadPoolManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolManager.class);
private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);
private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/**
* 静态代码块初始化
*/
static {
//初始化存放线程池资源的Map,这里使用懒加载的思想
INSTANCE.init();
//这个是线程池工具类提供的一个钩子函数,用户结束线程池,addShutdownHook方法的作用是
//当JVM关闭时,需要完成线程池中的任务,再关闭线程池,防止任务丢失
ThreadUtils.addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool");
shutdown();
LOGGER.warn("[ThreadPoolManager] Destruction of the end");
}
}));
}
//获取当前类
public static ThreadPoolManager getInstance() {
return INSTANCE;
}
//实现私有构造器,不能继承
private ThreadPoolManager() {
}
private void init() {
//用的时候在进行初始化
resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8);
}
/**
* Register the thread pool resources with the resource manager.
*
* @param namespace namespace name
* @param group group name
* @param executor {@link ExecutorService}
*/
public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) {
//这里使用了synchronized,防止出现线程安全问题
synchronized (this) {
lockers.put(namespace, new Object());
}
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
if (map == null) {
map = new HashMap<String, Set<ExecutorService>>(8);
map.put(group, new HashSet<ExecutorService>());
map.get(group).add(executor);
resourcesManager.put(namespace, map);
return;
}
if (!map.containsKey(group)) {
map.put(group, new HashSet<ExecutorService>());
}
map.get(group).add(executor);
}
}
/**
* Cancel the uniform lifecycle management for all threads under this resource.
*
* @param namespace namespace name
* @param group group name
*/
public void deregister(String namespace, String group) {
if (resourcesManager.containsKey(namespace)) {
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
resourcesManager.get(namespace).remove(group);
}
}
}
/**
* Undoing the uniform lifecycle management of {@link ExecutorService} under this resource.
*
* @param namespace namespace name
* @param group group name
* @param executor {@link ExecutorService}
*/
public void deregister(String namespace, String group, ExecutorService executor) {
if (resourcesManager.containsKey(namespace)) {
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
final Map<String, Set<ExecutorService>> subResourceMap = resourcesManager.get(namespace);
if (subResourceMap.containsKey(group)) {
subResourceMap.get(group).remove(executor);
}
}
}
}
/**
* Destroys all thread pool resources under this namespace.
*
* @param namespace namespace
*/
public void destroy(final String namespace) {
final Object monitor = lockers.get(namespace);
if (monitor == null) {
return;
}
synchronized (monitor) {
Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace);
if (subResource == null) {
return;
}
for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) {
for (ExecutorService executor : entry.getValue()) {
ThreadUtils.shutdownThreadPool(executor);
}
}
resourcesManager.get(namespace).clear();
resourcesManager.remove(namespace);
}
}
/**
* This namespace destroys all thread pool resources under the grouping.
*
* @param namespace namespace
* @param group group
*/
public void destroy(final String namespace, final String group) {
final Object monitor = lockers.get(namespace);
if (monitor == null) {
return;
}
synchronized (monitor) {
Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace);
if (subResource == null) {
return;
}
Set<ExecutorService> waitDestroy = subResource.get(group);
for (ExecutorService executor : waitDestroy) {
ThreadUtils.shutdownThreadPool(executor);
}
resourcesManager.get(namespace).remove(group);
}
}
/**
* Shutdown thread pool manager.
*/
public static void shutdown() {
//使用乐观锁判断是否可以终止
if (!CLOSED.compareAndSet(false, true)) {
return;
}
Set<String> namespaces = INSTANCE.resourcesManager.keySet();
for (String namespace : namespaces) {
INSTANCE.destroy(namespace);
}
}
}
下面是我们在源码里可以拿来即用的工具类 ThreadUtils
package com.alibaba.nacos.common.utils;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Thread utils.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class ThreadUtils {
/**
* Wait.
*
* @param object load object
*/
public static void objectWait(Object object) {
try {
object.wait();
} catch (InterruptedException ignore) {
Thread.interrupted();
}
}
/**
* Sleep.
*
* @param millis sleep millisecond
*/
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void countDown(CountDownLatch latch) {
Objects.requireNonNull(latch, "latch");
latch.countDown();
}
/**
* Await count down latch.
*
* @param latch count down latch
*/
public static void latchAwait(CountDownLatch latch) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Await count down latch with timeout.
*
* @param latch count down latch
* @param time timeout time
* @param unit time unit
*/
public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) {
try {
latch.await(time, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Through the number of cores, calculate the appropriate number of threads; 1.5-2 times the number of CPU cores.
*
* @return thread count
*/
public static int getSuitableThreadCount() {
return getSuitableThreadCount(THREAD_MULTIPLER);
}
/**
* Through the number of cores, calculate the appropriate number of threads.
*
* @param threadMultiple multiple time of cores
* @return thread count
*/
public static int getSuitableThreadCount(int threadMultiple) {
final int coreCount = Runtime.getRuntime().availableProcessors();
int workerCount = 1;
while (workerCount < coreCount * threadMultiple) {
workerCount <<= 1;
}
return workerCount;
}
public static void shutdownThreadPool(ExecutorService executor) {
shutdownThreadPool(executor, null);
}
/**
* Shutdown thread pool.
*
* @param executor thread pool
* @param logger logger
*/
public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
executor.shutdown();
//重试策略
int retry = 3;
while (retry > 0) {
retry--;
try {
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.interrupted();
} catch (Throwable ex) {
if (logger != null) {
logger.error("ThreadPoolManager shutdown executor has error : {}", ex);
}
}
}
executor.shutdownNow();
}
//钩子函数
public static void addShutdownHook(Runnable runnable) {
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
private static final int THREAD_MULTIPLER = 2;
}
大家一定记得点赞,收藏,关注
防止下次找不到了
你们的支持是我持续创作的动力!!!
刘祥
个人公众号|码上代码 2020.03.06 加入
码上代码 |CSDNjava领域优质创作者分享
评论