Nacos 源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里
- 2022 年 3 月 16 日
本文字数:6514 字
阅读完需:约 21 分钟
大家好,给大家先做个自我介绍我是码上代码,大家可以叫我码哥我也是一个普通本科毕业的最普通学生,我相信大部分程序员或者想从事程序员行业的都是普通家庭的孩子,所以我也是靠自己的努力,从毕业入职到一家传统企业,到跳槽未尝败绩,现在在一家某互联网行业巨头公司工作,希望可以通过自己的分享对大家有一些帮助跟随我的专栏学习,可以省去你很多去培训的费用或者网上找资料的时间,节省你的大部分时间成本,让你更加快速成为面试收割机,年度最佳员工
我为大家准备了 16 个技术专栏带领大家一起学习
Nacos 简介
Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十⼀的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。 随着云计算兴起,2018 年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型!
nacos 实践
五分钟学会 Spring Cloud Alibaba:Nacos 作为注册中心和配置中心使用(小白必看,一看就会教程)
架构图
整体架构分为用户层、业务层、内核层和插件,用户层主要解决用户使用的易用性问题,业务层主要解决服务发现和配置管理的功能问题,内核层解决分布式系统⼀致性、存储、高可用等核心问题,插件解决扩展性问题。
本篇源码分析
nacos 中线程池的使用
nacos 中大量使用了线程池,比如协议管理,服务订阅等,nacos 的代码为了实现高可用和可扩展性,对于一些公共的功能做了统一处理,代码更加简洁
首先定义了一个线程池工厂,里面定义一些公有的线程方法
public final class ExecutorFactory { public static ExecutorService newSingleExecutorService() { return Executors.newFixedThreadPool(1); } public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) { return Executors.newFixedThreadPool(1, threadFactory); } public static ExecutorService newFixedExecutorService(final int nThreads) { return Executors.newFixedThreadPool(nThreads); } public static ExecutorService newFixedExecutorService(final int nThreads, final ThreadFactory threadFactory) { return Executors.newFixedThreadPool(nThreads, threadFactory); } public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) { return Executors.newScheduledThreadPool(1, threadFactory); } public static ScheduledExecutorService newScheduledExecutorService(final int nThreads, final ThreadFactory threadFactory) { return Executors.newScheduledThreadPool(nThreads, threadFactory); } public static ThreadPoolExecutor newCustomerThreadExecutor(final int coreThreads, final int maxThreads, final long keepAliveTimeMs, final ThreadFactory threadFactory) { return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
其次比较重要的一个类,ThreadPoolManager,这个类主要实现了线程池资源注册,销毁的方法供其他业务使用
public final class ThreadPoolManager { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolManager.class); private Map<String, Map<String, Set<ExecutorService>>> resourcesManager; private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8); private static final ThreadPoolManager INSTANCE = new ThreadPoolManager(); private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/** * 静态代码块初始化 */ static { //初始化存放线程池资源的Map,这里使用懒加载的思想 INSTANCE.init(); //这个是线程池工具类提供的一个钩子函数,用户结束线程池,addShutdownHook方法的作用是 //当JVM关闭时,需要完成线程池中的任务,再关闭线程池,防止任务丢失 ThreadUtils.addShutdownHook(new Thread(new Runnable() { @Override public void run() { LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool"); shutdown(); LOGGER.warn("[ThreadPoolManager] Destruction of the end"); } })); } //获取当前类 public static ThreadPoolManager getInstance() { return INSTANCE; } //实现私有构造器,不能继承 private ThreadPoolManager() { } private void init() { //用的时候在进行初始化 resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8); } /** * Register the thread pool resources with the resource manager. * * @param namespace namespace name * @param group group name * @param executor {@link ExecutorService} */ public void register(String namespace, String group, ExecutorService executor) { if (!resourcesManager.containsKey(namespace)) { //这里使用了synchronized,防止出现线程安全问题 synchronized (this) { lockers.put(namespace, new Object()); } } final Object monitor = lockers.get(namespace); synchronized (monitor) { Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace); if (map == null) { map = new HashMap<String, Set<ExecutorService>>(8); map.put(group, new HashSet<ExecutorService>()); map.get(group).add(executor); resourcesManager.put(namespace, map); return; } if (!map.containsKey(group)) { map.put(group, new HashSet<ExecutorService>()); } map.get(group).add(executor); } } /** * Cancel the uniform lifecycle management for all threads under this resource. * * @param namespace namespace name * @param group group name */ public void deregister(String namespace, String group) { if (resourcesManager.containsKey(namespace)) { final Object monitor = lockers.get(namespace); synchronized (monitor) { resourcesManager.get(namespace).remove(group); } } } /** * Undoing the uniform lifecycle management of {@link ExecutorService} under this resource. * * @param namespace namespace name * @param group group name * @param executor {@link ExecutorService} */ public void deregister(String namespace, String group, ExecutorService executor) { if (resourcesManager.containsKey(namespace)) { final Object monitor = lockers.get(namespace); synchronized (monitor) { final Map<String, Set<ExecutorService>> subResourceMap = resourcesManager.get(namespace); if (subResourceMap.containsKey(group)) { subResourceMap.get(group).remove(executor); } } } } /** * Destroys all thread pool resources under this namespace. * * @param namespace namespace */ public void destroy(final String namespace) { final Object monitor = lockers.get(namespace); if (monitor == null) { return; } synchronized (monitor) { Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace); if (subResource == null) { return; } for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) { for (ExecutorService executor : entry.getValue()) { ThreadUtils.shutdownThreadPool(executor); } } resourcesManager.get(namespace).clear(); resourcesManager.remove(namespace); } } /** * This namespace destroys all thread pool resources under the grouping. * * @param namespace namespace * @param group group */ public void destroy(final String namespace, final String group) { final Object monitor = lockers.get(namespace); if (monitor == null) { return; } synchronized (monitor) { Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace); if (subResource == null) { return; } Set<ExecutorService> waitDestroy = subResource.get(group); for (ExecutorService executor : waitDestroy) { ThreadUtils.shutdownThreadPool(executor); } resourcesManager.get(namespace).remove(group); } } /** * Shutdown thread pool manager. */ public static void shutdown() { //使用乐观锁判断是否可以终止 if (!CLOSED.compareAndSet(false, true)) { return; } Set<String> namespaces = INSTANCE.resourcesManager.keySet(); for (String namespace : namespaces) { INSTANCE.destroy(namespace); } } }
下面是我们在源码里可以拿来即用的工具类 ThreadUtils
package com.alibaba.nacos.common.utils;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.TimeUnit;
/** * Thread utils. * * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> */public final class ThreadUtils { /** * Wait. * * @param object load object */ public static void objectWait(Object object) { try { object.wait(); } catch (InterruptedException ignore) { Thread.interrupted(); } } /** * Sleep. * * @param millis sleep millisecond */ public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public static void countDown(CountDownLatch latch) { Objects.requireNonNull(latch, "latch"); latch.countDown(); } /** * Await count down latch. * * @param latch count down latch */ public static void latchAwait(CountDownLatch latch) { try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * Await count down latch with timeout. * * @param latch count down latch * @param time timeout time * @param unit time unit */ public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) { try { latch.await(time, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * Through the number of cores, calculate the appropriate number of threads; 1.5-2 times the number of CPU cores. * * @return thread count */ public static int getSuitableThreadCount() { return getSuitableThreadCount(THREAD_MULTIPLER); } /** * Through the number of cores, calculate the appropriate number of threads. * * @param threadMultiple multiple time of cores * @return thread count */ public static int getSuitableThreadCount(int threadMultiple) { final int coreCount = Runtime.getRuntime().availableProcessors(); int workerCount = 1; while (workerCount < coreCount * threadMultiple) { workerCount <<= 1; } return workerCount; } public static void shutdownThreadPool(ExecutorService executor) { shutdownThreadPool(executor, null); } /** * Shutdown thread pool. * * @param executor thread pool * @param logger logger */ public static void shutdownThreadPool(ExecutorService executor, Logger logger) { executor.shutdown(); //重试策略 int retry = 3; while (retry > 0) { retry--; try { if (executor.awaitTermination(1, TimeUnit.SECONDS)) { return; } } catch (InterruptedException e) { executor.shutdownNow(); Thread.interrupted(); } catch (Throwable ex) { if (logger != null) { logger.error("ThreadPoolManager shutdown executor has error : {}", ex); } } } executor.shutdownNow(); } //钩子函数 public static void addShutdownHook(Runnable runnable) { Runtime.getRuntime().addShutdownHook(new Thread(runnable)); } private static final int THREAD_MULTIPLER = 2; }
大家一定记得点赞,收藏,关注
防止下次找不到了
你们的支持是我持续创作的动力!!!
刘祥
个人公众号|码上代码 2020.03.06 加入
码上代码 |CSDNjava领域优质创作者分享











评论