写点什么

Nacos 源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里

作者:刘祥
  • 2022 年 3 月 16 日
  • 本文字数:6514 字

    阅读完需:约 21 分钟

大家好,给大家先做个自我介绍我是码上代码,大家可以叫我码哥我也是一个普通本科毕业的最普通学生,我相信大部分程序员或者想从事程序员行业的都是普通家庭的孩子,所以我也是靠自己的努力,从毕业入职到一家传统企业,到跳槽未尝败绩,现在在一家某互联网行业巨头公司工作,希望可以通过自己的分享对大家有一些帮助跟随我的专栏学习,可以省去你很多去培训的费用或者网上找资料的时间,节省你的大部分时间成本,让你更加快速成为面试收割机,年度最佳员工

我为大家准备了 16 个技术专栏带领大家一起学习

《亿级流量分布式系统实战》

《BAT大厂面试必问系列》

《技术杂谈》

《零基础带你学java教程专栏》

《带你学springCloud专栏》

《带你学SpringCloud源码专栏》

《带你学分布式系统专栏》

《带你学云原生专栏》

《带你学springboot源码》

《带你学netty原理与实战专栏》

《带你学Elasticsearch专栏》

《带你学mysql专栏》

《带你学JVM原理专栏》

《带你学Redis原理专栏》

《带你学java进阶专栏》

《带你学大数据专栏》

Nacos 简介

Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十⼀的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。 随着云计算兴起,2018 年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型!


nacos 实践

五分钟学会 Spring Cloud Alibaba:Nacos 作为注册中心和配置中心使用(小白必看,一看就会教程)

架构图

整体架构分为用户层、业务层、内核层和插件,用户层主要解决用户使用的易用性问题,业务层主要解决服务发现和配置管理的功能问题,内核层解决分布式系统⼀致性、存储、高可用等核心问题,插件解决扩展性问题。


本篇源码分析

nacos 中线程池的使用

nacos 中大量使用了线程池,比如协议管理,服务订阅等,nacos 的代码为了实现高可用和可扩展性,对于一些公共的功能做了统一处理,代码更加简洁


首先定义了一个线程池工厂,里面定义一些公有的线程方法


public final class ExecutorFactory {        public static ExecutorService newSingleExecutorService() {        return Executors.newFixedThreadPool(1);    }        public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) {        return Executors.newFixedThreadPool(1, threadFactory);    }        public static ExecutorService newFixedExecutorService(final int nThreads) {        return Executors.newFixedThreadPool(nThreads);    }        public static ExecutorService newFixedExecutorService(final int nThreads, final ThreadFactory threadFactory) {        return Executors.newFixedThreadPool(nThreads, threadFactory);    }        public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) {        return Executors.newScheduledThreadPool(1, threadFactory);    }        public static ScheduledExecutorService newScheduledExecutorService(final int nThreads,            final ThreadFactory threadFactory) {        return Executors.newScheduledThreadPool(nThreads, threadFactory);    }        public static ThreadPoolExecutor newCustomerThreadExecutor(final int coreThreads, final int maxThreads,            final long keepAliveTimeMs, final ThreadFactory threadFactory) {        return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>(), threadFactory);    }
复制代码


其次比较重要的一个类,ThreadPoolManager,这个类主要实现了线程池资源注册,销毁的方法供其他业务使用


public final class ThreadPoolManager {        private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolManager.class);        private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;        private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);        private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();        private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/** * 静态代码块初始化 */ static { //初始化存放线程池资源的Map,这里使用懒加载的思想 INSTANCE.init(); //这个是线程池工具类提供的一个钩子函数,用户结束线程池,addShutdownHook方法的作用是 //当JVM关闭时,需要完成线程池中的任务,再关闭线程池,防止任务丢失 ThreadUtils.addShutdownHook(new Thread(new Runnable() { @Override public void run() { LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool"); shutdown(); LOGGER.warn("[ThreadPoolManager] Destruction of the end"); } })); } //获取当前类 public static ThreadPoolManager getInstance() { return INSTANCE; } //实现私有构造器,不能继承 private ThreadPoolManager() { } private void init() { //用的时候在进行初始化 resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8); } /** * Register the thread pool resources with the resource manager. * * @param namespace namespace name * @param group group name * @param executor {@link ExecutorService} */ public void register(String namespace, String group, ExecutorService executor) { if (!resourcesManager.containsKey(namespace)) { //这里使用了synchronized,防止出现线程安全问题 synchronized (this) { lockers.put(namespace, new Object()); } } final Object monitor = lockers.get(namespace); synchronized (monitor) { Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace); if (map == null) { map = new HashMap<String, Set<ExecutorService>>(8); map.put(group, new HashSet<ExecutorService>()); map.get(group).add(executor); resourcesManager.put(namespace, map); return; } if (!map.containsKey(group)) { map.put(group, new HashSet<ExecutorService>()); } map.get(group).add(executor); } } /** * Cancel the uniform lifecycle management for all threads under this resource. * * @param namespace namespace name * @param group group name */ public void deregister(String namespace, String group) { if (resourcesManager.containsKey(namespace)) { final Object monitor = lockers.get(namespace); synchronized (monitor) { resourcesManager.get(namespace).remove(group); } } } /** * Undoing the uniform lifecycle management of {@link ExecutorService} under this resource. * * @param namespace namespace name * @param group group name * @param executor {@link ExecutorService} */ public void deregister(String namespace, String group, ExecutorService executor) { if (resourcesManager.containsKey(namespace)) { final Object monitor = lockers.get(namespace); synchronized (monitor) { final Map<String, Set<ExecutorService>> subResourceMap = resourcesManager.get(namespace); if (subResourceMap.containsKey(group)) { subResourceMap.get(group).remove(executor); } } } } /** * Destroys all thread pool resources under this namespace. * * @param namespace namespace */ public void destroy(final String namespace) { final Object monitor = lockers.get(namespace); if (monitor == null) { return; } synchronized (monitor) { Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace); if (subResource == null) { return; } for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) { for (ExecutorService executor : entry.getValue()) { ThreadUtils.shutdownThreadPool(executor); } } resourcesManager.get(namespace).clear(); resourcesManager.remove(namespace); } } /** * This namespace destroys all thread pool resources under the grouping. * * @param namespace namespace * @param group group */ public void destroy(final String namespace, final String group) { final Object monitor = lockers.get(namespace); if (monitor == null) { return; } synchronized (monitor) { Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace); if (subResource == null) { return; } Set<ExecutorService> waitDestroy = subResource.get(group); for (ExecutorService executor : waitDestroy) { ThreadUtils.shutdownThreadPool(executor); } resourcesManager.get(namespace).remove(group); } } /** * Shutdown thread pool manager. */ public static void shutdown() { //使用乐观锁判断是否可以终止 if (!CLOSED.compareAndSet(false, true)) { return; } Set<String> namespaces = INSTANCE.resourcesManager.keySet(); for (String namespace : namespaces) { INSTANCE.destroy(namespace); } } }
复制代码


下面是我们在源码里可以拿来即用的工具类 ThreadUtils


package com.alibaba.nacos.common.utils;
import org.slf4j.Logger;
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.TimeUnit;
/** * Thread utils. * * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> */public final class ThreadUtils { /** * Wait. * * @param object load object */ public static void objectWait(Object object) { try { object.wait(); } catch (InterruptedException ignore) { Thread.interrupted(); } } /** * Sleep. * * @param millis sleep millisecond */ public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public static void countDown(CountDownLatch latch) { Objects.requireNonNull(latch, "latch"); latch.countDown(); } /** * Await count down latch. * * @param latch count down latch */ public static void latchAwait(CountDownLatch latch) { try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * Await count down latch with timeout. * * @param latch count down latch * @param time timeout time * @param unit time unit */ public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) { try { latch.await(time, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * Through the number of cores, calculate the appropriate number of threads; 1.5-2 times the number of CPU cores. * * @return thread count */ public static int getSuitableThreadCount() { return getSuitableThreadCount(THREAD_MULTIPLER); } /** * Through the number of cores, calculate the appropriate number of threads. * * @param threadMultiple multiple time of cores * @return thread count */ public static int getSuitableThreadCount(int threadMultiple) { final int coreCount = Runtime.getRuntime().availableProcessors(); int workerCount = 1; while (workerCount < coreCount * threadMultiple) { workerCount <<= 1; } return workerCount; } public static void shutdownThreadPool(ExecutorService executor) { shutdownThreadPool(executor, null); } /** * Shutdown thread pool. * * @param executor thread pool * @param logger logger */ public static void shutdownThreadPool(ExecutorService executor, Logger logger) { executor.shutdown(); //重试策略 int retry = 3; while (retry > 0) { retry--; try { if (executor.awaitTermination(1, TimeUnit.SECONDS)) { return; } } catch (InterruptedException e) { executor.shutdownNow(); Thread.interrupted(); } catch (Throwable ex) { if (logger != null) { logger.error("ThreadPoolManager shutdown executor has error : {}", ex); } } } executor.shutdownNow(); } //钩子函数 public static void addShutdownHook(Runnable runnable) { Runtime.getRuntime().addShutdownHook(new Thread(runnable)); } private static final int THREAD_MULTIPLER = 2; }
复制代码


大家一定记得点赞,收藏,关注


防止下次找不到了


你们的支持是我持续创作的动力!!!

用户头像

刘祥

关注

个人公众号|码上代码 2020.03.06 加入

码上代码 |CSDNjava领域优质创作者分享

评论

发布
暂无评论
Nacos源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里_SpringCloud Alibaba_刘祥_InfoQ写作平台