写点什么

commons-pool2 池化技术探究

发布于: 2021 年 04 月 27 日

一、前言


我们经常会接触各种池化的技术或者概念,包括对象池、连接池、线程池等,池化技术最大的好处就是实现对象的重复利用,尤其是创建和使用大对象或者宝贵资源(HTTP 连接对象,MySQL 连接对象)等方面的时候能够大大节省系统开销,对提升系统整体性能也至关重要。


在并发请求下,如果需要同时为几百个 query 操作创建/关闭 MySQL 的连接或者是为每一个 HTTP 请求创建一个处理线程或者是为每一个图片或者 XML 解析创建一个解析对象而不使用池化技术,将会给系统带来极大的负载挑战。


本文主要是分析 commons-pool2 池化技术的实现方案,希望通过本文能让读者对 commons-pool2 的实现原理一个更全面的了解。

二、commons-pool2 池化技术剖析


越来越多的框架在选择使用 apache commons-pool2 进行池化的管理,如 jedis-cluster,commons-pool2 工作的逻辑如下图所示:

2.1 核心三元素

2.1.1 ObjectPool


对象池,负责对对象进行生命周期的管理,并提供了对对象池中活跃对象和空闲对象统计的功能。

2.1.2 PooledObjectFactory


对象工厂类,负责具体对象的创建、初始化,对象状态的销毁和验证。commons-pool2 框架本身提供了默认的抽象实现 BasePooledObjectFactory ,业务方在使用的时候只需要继承该类,然后实现 warp 和 create 方法即可。

2.1.3 PooledObject


池化对象,是需要放到 ObjectPool 对象的一个包装类。添加了一些附加的信息,比如说状态信息,创建时间,激活时间等。commons-pool2 提供了 DefaultPooledObject 和 PoolSoftedObject 2 种实现。其中 PoolSoftedObject 继承自 DefaultPooledObject,不同点是使用 SoftReference 实现了对象的软引用。获取对象的时候使用也是通过 SoftReference 进行获取。

2.2 对象池逻辑分析

2.2.1 对象池接口说明


1)我们在使用 commons-pool2 的时候,应用程序获取或释放对象的操作都是基于对象池进行的,对象池核心接口主要包括如下:

/***向对象池中增加对象实例*/void addObject() throws Exception, IllegalStateException,      UnsupportedOperationException;/*** 从对象池中获取对象*/T borrowObject() throws Exception, NoSuchElementException,      IllegalStateException;/*** 失效非法的对象*/void invalidateObject(T obj) throws Exception;/*** 释放对象至对象池*/void returnObject(T obj) throws Exception;
复制代码

除了接口本身之外,对象池还支持对对象的最大数量,保留时间等等进行设置。对象池的核心参数项包括 maxTotal,maxIdle,minIdle,maxWaitMillis,testOnBorrow 等。

2.2.2 对象创建解耦


对象工厂是 commons-pool2 框架中用于生成对象的核心环节,业务方在使用过程中需要自己去实现对应的对象工厂实现类,通过工厂模式,实现了对象池与对象的生成与实现过程细节的解耦,每一个对象池应该都有对象工厂的成员变量,如此实现对象池本身和对象的生成逻辑解耦。


可以通过代码进一步验证我们的思路:

public GenericObjectPool(final PooledObjectFactory<T> factory) {      this(factory, new GenericObjectPoolConfig<T>());  }    public GenericObjectPool(final PooledObjectFactory<T> factory,                            final GenericObjectPoolConfig<T> config) {      super(config, ONAME_BASE, config.getJmxNamePrefix());      if (factory == null) {          jmxUnregister(); // tidy up          throw new IllegalArgumentException("factory may not be null");      }      this.factory = factory;      idleObjects = new LinkedBlockingDeque<>(config.getFairness());      setConfig(config);  }  public GenericObjectPool(final PooledObjectFactory<T> factory,                            final GenericObjectPoolConfig<T> config, final AbandonedConfig abandonedConfig) {      this(factory, config);      setAbandonedConfig(abandonedConfig);  }
复制代码

可以看到对象池的构造方法,都依赖于对象构造工厂 PooledObjectFactory,在生成对象的时候,基于对象池中定义的参数和对象构造工厂来生成。

/*** 向对象池中增加对象,一般在预加载的时候会使用该功能*/@Overridepublic void addObject() throws Exception {  assertOpen();  if (factory == null) {      throw new IllegalStateException(              "Cannot add objects without a factory.");  }  final PooledObject<T> p = create();  addIdleObject(p);}
复制代码

create() 方法基于对象工厂来生成的对象,继续往下跟进代码来确认逻辑;

final PooledObject<T> p;try {  p = factory.makeObject();  if (getTestOnCreate() && !factory.validateObject(p)) {      createCount.decrementAndGet();      return null;  }} catch (final Throwable e) {  createCount.decrementAndGet();  throw e;} finally {  synchronized (makeObjectCountLock) {      makeObjectCount--;      makeObjectCountLock.notifyAll();  }}
复制代码

此处确认了 factory.makeObject()的操作,也印证了上述的推测,基于对象工厂来生成对应的对象。


为了更好的能够实现对象池中对象的使用以及跟踪对象的状态,commons-pool2 框架中使用了池化对象 PooledObject 的概念,PooledObject 本身是泛型类,并提供了 getObject()获取实际对象的方法。

2.2.3 对象池源码分析


经过上述分析我们知道了对象池承载了对象的生命周期的管理,包括整个对象池中对象数量的控制等逻辑,接下来我们通过 GenericObjectPool 的源码来分析究竟是如何实现的。

对象池中使用了双端队列 LinkedBlockingDeque 来存储对象,LinkedBlockingDeque 对列支持 FIFO 和 FILO 两种策略,基于 AQS 来实现队列的操作的协同。


LinkedBlockingDeque 提供了队尾和队头的插入和移除元素的操作,相关操作都进行了加入重入锁的加锁操作队列中设置 notFull 和 notEmpty 两个状态变量,当对队列进行元素的操作的时候会触发对应的执行 await 和 notify 等操作。

/*** 第一个节点* Invariant: (first == null && last == null) ||*           (first.prev == null && first.item != null)*/private transient Node<E> first; // @GuardedBy("lock")/*** 最后一个节点* Invariant: (first == null && last == null) ||*           (last.next == null && last.item != null)*/private transient Node<E> last; // @GuardedBy("lock")/** 当前队列长度 */private transient int count; // @GuardedBy("lock")/** 队列最大容量 */private final int capacity;/** 主锁 */private final InterruptibleReentrantLock lock;/** 队列是否为空状态锁 */private final Condition notEmpty;/** 队列是否满状态锁 */private final Condition notFull;
复制代码

队列核心点为:

1.队列中所有的移入元素、移出、初始化构造元素都是基于主锁进行加锁操作。


2.队列的 offer 和 pull 支持设置超时时间参数,主要是通过两个状态 Condition 来进行协调操作。如在进行 offer 操作的时候,如果操作不成功,则基于 notFull 状态对象进行等待。

public boolean offerFirst(final E e, final long timeout, final TimeUnit unit)  throws InterruptedException {  Objects.requireNonNull(e, "e");  long nanos = unit.toNanos(timeout);  lock.lockInterruptibly();  try {      while (!linkFirst(e)) {          if (nanos <= 0) {              return false;          }          nanos = notFull.awaitNanos(nanos);      }      return true;  } finally {      lock.unlock();  }}
复制代码

如进行 pull 操作的时候,如果操作不成功,则对 notEmpty 进行等待操作。

public E takeFirst() throws InterruptedException {  lock.lock();  try {      E x;      while ( (x = unlinkFirst()) == null) {          notEmpty.await();      }      return x;  } finally {      lock.unlock();  }}
复制代码

反之当操作成功的时候,则进行唤醒操作,如下所示:

private boolean linkLast(final E e) {  // assert lock.isHeldByCurrentThread();  if (count >= capacity) {      return false;  }  final Node<E> l = last;  final Node<E> x = new Node<>(e, l, null);  last = x;  if (first == null) {      first = x;  } else {      l.next = x;  }  ++count;  notEmpty.signal();  return true;}
复制代码

2.3 核心业务流程

2.3.1 池化对象状态变更

上图是 PooledObject 的状态机图,蓝色表示状态,红色表示与 ObjectPool 相关的方法.PooledObject 的状态为:IDLE、ALLOCATED、RETURNING、ABANDONED、INVALID、EVICTION、EVICTION_RETURN_TO_HEAD


所有状态是在 PooledObjectState 类中定义的,其中一些是暂时未使用的,此处不再赘述。

2.3.2 对象池 browObject 过程


第一步、根据配置确定是否要为标签删除调用 removeAbandoned 方法。


第二步、尝试获取或创建一个对象,源码过程如下:

//1、尝试从双端队列中获取对象,pollFirst方法是非阻塞方法p = idleObjects.pollFirst();if (p == null) {    p = create();    if (p != null) {        create = true;    }}if (blockWhenExhausted) {    if (p == null) {        if (borrowMaxWaitMillis < 0) {            //2、没有设置最大阻塞等待时间,则无限等待            p = idleObjects.takeFirst();        } else {            //3、设置最大等待时间了,则阻塞等待指定的时间            p = idleObjects.pollFirst(borrowMaxWaitMillis,                    TimeUnit.MILLISECONDS);        }    }}
复制代码

示意图如下所示:

第三步、调用 allocate 使状态更改为 ALLOCATED 状态。


第四步、调用工厂的 activateObject 来初始化对象,如果发生错误,请调用 destroy 方法来销毁对象,例如源代码中的六个步骤。


第五步、调用 TestFactory 的 validateObject 进行基于 TestOnBorrow 配置的对象可用性分析,如果不可用,则调用 destroy 方法销毁对象。3-7 步骤的源码过程如下所示:

//修改对象状态if (!p.allocate()) {    p = null;}if (p != null) {    try {        //初始化对象        factory.activateObject(p);    } catch (final Exception e) {        try {            destroy(p, DestroyMode.NORMAL);        } catch (final Exception e1) {        } }    if (p != null && getTestOnBorrow()) {        boolean validate = false;        Throwable validationThrowable = null;        try {            //验证对象的可用性状态            validate = factory.validateObject(p);        } catch (final Throwable t) {            PoolUtils.checkRethrow(t);            validationThrowable = t;        }        //对象不可用,验证失败,则进行destroy        if (!validate) {            try {                destroy(p, DestroyMode.NORMAL);               destroyedByBorrowValidationCount.incrementAndGet();            } catch (final Exception e) {                // Ignore - validation failure is more important            }         }    }}
复制代码

2.3.3 对象池 returnObject 的过程执行逻辑


第一步、调用 markReturningState 方法将状态更改为 RETURNING。


第二步、基于 testOnReturn 配置调用 PooledObjectFactory 的 validateObject 方法以进行可用性检查。如果检查失败,则调用 destroy 消耗该对象,然后确保调用 idle 以确保池中有 IDLE 状态对象可用,如果没有,则调用 create 方法创建一个新对象。


第三步、调用 PooledObjectFactory 的 passivateObject 方法进行反初始化操作。


第四步、调用 deallocate 将状态更改为 IDLE。


第五步、检测是否已超过最大空闲对象数,如果超过,则销毁当前对象。


第六步、根据 LIFO(后进先出)配置将对象放置在队列的开头或结尾。


(还原操作队列示意图)

2.4 拓展和思考

2.4.1 关于 LinkedBlockingDeque 的另种实现


上文中分析到 commons-pool2 中使用了双端队列以及 java 中的 condition 来实现队列中对象的管理和不同线程对对象获取和释放对象操作之间的协调,那是否有其他方案可以实现类似效果呢?答案是肯定的。


使用双端队列进行操作,其实是想将空闲对象和活跃对象进行隔离,本质上将我们用两个队列来分别存储空闲队列和当前活跃对象,然后再统一使用一个对象锁,也是可以达成相同的目标的,大概的思路如下:


1.双端队列改为两个单向队列分别用于存储空闲的和活跃的对象,队列之间的同步和协调可以通过对象锁的 wait 和 notify 完成。

public  class PoolState { protected final List<PooledObject> idleObjects = new ArrayList<>();protected final List<PooledObject> activeObjects = new ArrayList<>();  //... }
复制代码
  1. 在获取对象时候,原本对双端队列的 LIFO 或者 FIFO 变成了从空闲队列 idleObjects 中获取对象,然后在获取成功并对象状态合法后,将对象添加到活跃对象集合 activeObjects 中,如果获取对象需要等待,则 PoolState 对象锁应该通过 wait 操作,进入等待状态。

  2. 在释放对象的时候,则首先从活跃对象集合 activeObjects 删除元素,删除完成后,将对象增加到空闲对象集合 idleObjects 中,需要注意的是,在释放对象过程中也需要去校验对象的状态。当对象状态不合法的时候,对象应该进行销毁,不应该添加到 idleObjects 中。释放成功后则 PoolState 通过 notify 或者 notifyAll 唤醒等待中的获取操作。

  3. 为保障对活跃队列和空闲队列的操作线程安全性,获取对象和释放对象需要进行加锁操作,和 commons2-pool 中的一致。

2.4.2 对象池的自我保护机制


我们在使用 commons-pool2 中获取对象的时候,会从双端队列中阻塞等待获取元素(或者是创建新对象),但是如果是应用程序的异常,一直未调用 returnObject 或者 invalidObject 的时候,那可能就会出现对象池中的对象一直上升,到达设置的上线之后再去调用 borrowObject 的时候就会出现一直等待或者是等待超时而无法获取对象的情况。


commons-pool2 为了避免上述分析的问题的出现,提供了两种自我保护机制:

2.4.2.1 基于阈值的检测


从对象池中获取对象的时候会校验当前对象池的活跃对象和空闲对象的数量占比,当空闲独享非常少,活跃对象非常多的时候,会触发空闲对象的回收,具体校验规则为:如果当前对象池中少于 2 个 idle 状态的对象或者 active 数量>最大对象数-3 的时候,在 borrow 对象的时候启动泄漏清理。通过 AbandonedConfig.setRemoveAbandonedOnBorrow 为 true 进行开启。

//根据配置确定是否要为标签删除调用removeAbandoned方法final AbandonedConfig ac = this.abandonedConfig;if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) {    removeAbandoned(ac);}
复制代码
2.4.2.2 异步调度线程检测


AbandonedConfig.setRemoveAbandonedOnMaintenance 设置为 true 以后,在维护任务运行的时候会进行泄漏对象的清理,通过设置 setTimeBetweenEvictionRunsMillis 来设置维护任务执行的时间间隔。

(异步检测线程 Evictor 时序图)


检测和回收实现逻辑分析:

在构造方法内部逻辑的最后调用了 startEvictor 方法。这个方法的作用是在构造完对象池后,启动回收器来监控回收空闲对象。startEvictor 定义在 GenericObjectPool 的父类 BaseGenericObjectPool(抽象)类中,我们先看一下这个方法的源码。


在构造器中会执行如下的设置参数;

public final void setTimeBetweenEvictionRunsMillis(      final long timeBetweenEvictionRunsMillis) {  this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;  startEvictor(timeBetweenEvictionRunsMillis);}
复制代码

当且仅当设置了 timeBetweenEvictionRunsMillis 参数后才会开启定时清理任务。

final void startEvictor(final long delay) {  synchronized (evictionLock) {      EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);      evictor = null;      evictionIterator = null;      //如果delay<=0则不会开启定时清理任务      if (delay > 0) {          evictor = new Evictor();          EvictionTimer.schedule(evictor, delay, delay);      }  }}
复制代码

继续跟进代码可以发现,调度器中设置的清理方法的实现逻辑实际在对象池中定义的,也就是由 GenericObjectPool 或者 GenericKeyedObjectPool 来实现,接下来我们继续探究对象池是如何进行对象回收的。


a)、核心参数:


minEvictableIdleTimeMillis:指定空闲对象最大保留时间,超过此时间的会被回收。不配置则不过期回收。


softMinEvictableIdleTimeMillis:一个毫秒数值,用来指定在空闲对象数量超过 minIdle 设置,且某个空闲对象超过这个空闲时间的才可以会被回收。


minIdle:对象池里要保留的最小空间对象数量。


b)、回收逻辑


以及一个对象回收策略接口 EvictionPolicy,可以预料到对象池的回收会和上述的参数项及接口 EvictionPolicy 发生关联,继续跟进代码会发现如下的内容,可以看到在判断对象池可以进行回收的时候,直接调用了 destroy 进行回收。

boolean evict;try {  evict = evictionPolicy.evict(evictionConfig, underTest,  idleObjects.size());} catch (final Throwable t) {  // Slightly convoluted as SwallowedExceptionListener  // uses Exception rather than Throwable    PoolUtils.checkRethrow(t);    swallowException(new Exception(t));    // Don't evict on error conditions    evict = false;}if (evict) {    // 如果可以被回收则直接调用destroy进行回收    destroy(underTest);    destroyedByEvictorCount.incrementAndGet();}
复制代码

为提升回收的效率,在回收策略判断对象的状态不是 evict 的时候,也会进行进一步的状态判断和处理,具体逻辑如下:


1.尝试激活对象,如果激活失败则认为对象已经不再存活,直接调用 destroy 进行销毁。


2.在激活对象成功的情况下,会通过 validateObject 方法取校验对象状态,如果校验失败,则说明对象不可用,需要进行销毁。

boolean active = false;try {  // 调用activateObject激活该空闲对象,本质上不是为了激活,  // 而是通过这个方法可以判定是否还存活,这一步里面可能会有一些资源的开辟行为。  factory.activateObject(underTest);  active = true;} catch (final Exception e) {  // 如果激活的时候,发生了异常,就说明该空闲对象已经失联了。  // 调用destroy方法销毁underTest  destroy(underTest);  destroyedByEvictorCount.incrementAndGet();}if (active) {  // 再通过进行validateObject校验有效性  if (!factory.validateObject(underTest)) {      // 如果校验失败,说明对象已经不可用了      destroy(underTest);      destroyedByEvictorCount.incrementAndGet();  } else {      try {          /*            *因为校验还激活了空闲对象,分配了额外的资源,那么就通过passivateObject把在activateObject中开辟的资源释放掉。          */          factory.passivateObject(underTest);      } catch (final Exception e) {          // 如果passivateObject失败,也可以说明underTest这个空闲对象不可用了          destroy(underTest);          destroyedByEvictorCount.incrementAndGet();      }  }}
复制代码

三、写在最后


连接池能够给程序开发者带来一些便利性,前言中我们分析了使用池化技术的好处和必要性,但是我们也可以看到 commons-pool2 框架在对象的创建和获取上都进行了加锁的操作,这会在并发场景下一定程度的影响应用程序的性能,其次池化对象的对象池中对象的数量也是需要进行合理的设置,否则也很难起到真正的使用对象池的目的,这给我们也带来了一定的挑战。


作者:vivo 互联网服务器团队-Huang Xiaoqun

发布于: 2021 年 04 月 27 日阅读数: 37
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
commons-pool2 池化技术探究