巧用 GenericObjectPool 创建自定义对象池

作者:京东物流 高圆庆
1 前言
通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在 java 中,有很多池管理的概念,典型的如线程池,数据库连接池,socket 连接池。本文章讲介绍 apache 提供的通用对象池框架 GenericObjectPool,以及基于 GenericObjectPool 实现的 sftp 连接池在国际物流调度履约系统中的应用。
2 GenericObjectPool 剖析
Apache Commons Pool 是一个对象池的框架,他提供了一整套用于实现对象池化的 API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool 和 GenericObjectPool,其中 GenericObjectPool 是我们最常用的对象池,内部实现也最复杂。GenericObjectPool 的 UML 图如下所示:
2.1 核心接口 ObjectPool
从图中可以看出,GenericObjectPool 实现了 ObjectPool 接口,而 ObjectPool 就是对象池的核心接口,它定义了一个对象池应该实现的行为。
addObject 方法:往池中添加一个对象
borrowObject 方法:从池中借走到一个对象
returnObject 方法:把对象归还给对象池
invalidateObject:验证对象的有效性
getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
clear:清理对象池。注意是清理不是清空,该方法要求的是,清理所有空闲对象,释放相关资源。
close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。
2.2 对象工厂 BasePooledObjectFactory
对象的创建需要通过对象工厂来创建,对象工厂需要实现 BasePooledObjectFactory 接口。ObjectPool 接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:
public interface PooledObjectFactory<T> {/*** 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。*/PooledObject<T> makeObject() throws Exception;/*** 销毁池不再需要的实例*/void destroyObject(PooledObject<T> p) throws Exception;/*** 确保实例可以安全地由池返回*/boolean validateObject(PooledObject<T> p);/*** 重新初始化池返回的实例*/void activateObject(PooledObject<T> p) throws Exception;/*** 取消初始化要返回到空闲对象池的实例*/void passivateObject(PooledObject<T> p) throws Exception;}
2.3 配置类 GenericObjectPoolConfig
GenericObjectPoolConfig 是封装 GenericObject 池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用 GenericObjectPoolConfig 提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig 是一个抽象类,实际应用中需要新建配置类,然后继承它。
2.4 工作原理流程
构造方法
当我们执行构造方法时,主要工作就是创建了一个存储对象的 LinkedList 类型容器,也就是概念意义上的“池”
从对象池中获取对象
获取池中的对象是通过 borrowObject()命令,源码比较复杂,简单而言就是去 LinkedList 中获取一个对象,如果不存在的话,要调用构造方法中第一个参数 Factory 工厂类的 makeObject()方法去创建一个对象再获取,获取到对象后要调用 validateObject 方法判断该对象是否是可用的,如果是可用的才拿去使用。LinkedList 容器减一
归还对象到线程池
简单而言就是先调用 validateObject 方法判断该对象是否是可用的,如果可用则归还到池中,LinkedList 容器加一,如果是不可以的则则调用 destroyObject 方法进行销毁
上面三步就是最简单的流程,由于取和还的流程步骤都在 borrowObject 和 returnObject 方法中固定的,所以我们只要重写 Factory 工厂类的 makeObject()和 validateObject 以及 destroyObject 方法即可实现最简单的池的管理控制,通过构造方法传入该 Factory 工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:
3 开源框架如何使用 GenericObjectPool
redis 的 java 客户端 jedis 就是基于 Apache Commons Pool 对象池的框架来实现的。
3.1 对象工厂类 JedisFactory
对象工厂类只需实现 activateObject、destroyObject、makeObject、validateObject 方法即可,源码如下:
class JedisFactory implements PooledObjectFactory<Jedis> {private final String host;private final int port;private final int timeout;private final int newTimeout;private final String password;private final int database;private final String clientName;public JedisFactory(String host, int port, int timeout, String password, int database) {this(host, port, timeout, password, database, (String)null);}public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {this(host, port, timeout, timeout, password, database, clientName);}public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {this.host = host;this.port = port;this.timeout = timeout;this.newTimeout = newTimeout;this.password = password;this.database = database;this.clientName = clientName;}public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();if (jedis.getDB() != (long)this.database) {jedis.select(this.database);}}public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();if (jedis.isConnected()) {try {try {jedis.quit();} catch (Exception var4) {}jedis.disconnect();} catch (Exception var5) {}}}public PooledObject<Jedis> makeObject() throws Exception {Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);jedis.connect();if (null != this.password) {jedis.auth(this.password);}if (this.database != 0) {jedis.select(this.database);}if (this.clientName != null) {jedis.clientSetname(this.clientName);}return new DefaultPooledObject(jedis);}public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {}public boolean validateObject(PooledObject<Jedis> pooledJedis) {BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();try {return jedis.isConnected() && jedis.ping().equals("PONG");} catch (Exception var4) {return false;}}}
3.2 配置类 JedisPoolConfig
public class JedisPoolConfig extends GenericObjectPoolConfig {public JedisPoolConfig() {this.setTestWhileIdle(true);this.setMinEvictableIdleTimeMillis(60000L);this.setTimeBetweenEvictionRunsMillis(30000L);this.setNumTestsPerEvictionRun(-1);}}
4 国际物流履约系统中的应用
在国际物流履约系统中,我们和客户交互文件经常使用 sftp 服务器,因为创建 sftp 服务器的连接比较耗时,所以基于 Apache Commons Pool 对象池的框架来实现的我们自己的 sftp 链接池。
4.1 sftp 对象池
SftpPool 比较简单,直接继承 GenericObjectPool。
public class SftpPool extends GenericObjectPool<Sftp> {public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {super(factory, config, abandonedConfig);}}
4.2 对象工厂 SftpFactory
这是基于 Apache Commons Pool 框架实现自定义对象池的核心类,代码如下:
public class SftpFactory extends BasePooledObjectFactory<Sftp> {private static final String CHANNEL_TYPE = "sftp";private static Properties sshConfig = new Properties();private String host;private int port;private String username;private String password;static {sshConfig.put("StrictHostKeyChecking", "no");}@Overridepublic Sftp create() {try {JSch jsch = new JSch();Session sshSession = jsch.getSession(username, host, port);sshSession.setPassword(password);sshSession.setConfig(sshConfig);sshSession.connect();ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);channel.connect();log.info("sftpFactory创建sftp");return new Sftp(channel);} catch (JSchException e) {log.error("连接sftp失败:", e);throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);}}/*** @param sftp 被包装的对象* @return 对象包装器*/@Overridepublic PooledObject<Sftp> wrap(Sftp sftp) {return new DefaultPooledObject<>(sftp);}/*** 销毁对象* @param p 对象包装器*/@Overridepublic void destroyObject(PooledObject<Sftp> p) {log.info("开始销毁channelSftp");if (p!=null) {Sftp sftp = p.getObject();if (sftp!=null) {ChannelSftp channelSftp = sftp.getChannelSftp();if (channelSftp!=null) {channelSftp.disconnect();log.info("销毁channelSftp成功");}}}}/*** 检查连接是否可用** @param p 对象包装器* @return {@code true} 可用,{@code false} 不可用*/@Overridepublic boolean validateObject(PooledObject<Sftp> p) {if (p!=null) {Sftp sftp = p.getObject();if (sftp!=null) {try {sftp.getChannelSftp().cd("./");log.info("验证连接是否可用,结果为true");return true;} catch (SftpException e) {log.info("验证连接是否可用,结果为false",e);return false;}}}log.info("验证连接是否可用,结果为false");return false;}public static class Builder {private String host;private int port;private String username;private String password;public SftpFactory build() {return new SftpFactory(host, port, username, password);}public Builder host(String host) {this.host = host;return this;}public Builder port(int port) {this.port = port;return this;}public Builder username(String username) {this.username = username;return this;}public Builder password(String password) {this.password = password;return this;}}}
4.3 配置类 SftpPoolConfig
配置类继承了 GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。
public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {public static class Builder {private int maxTotal;private int maxIdle;private int minIdle;private boolean lifo;private boolean fairness;private long maxWaitMillis;private long minEvictableIdleTimeMillis;private long evictorShutdownTimeoutMillis;private long softMinEvictableIdleTimeMillis;private int numTestsPerEvictionRun;private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置private String evictionPolicyClassName;private boolean testOnCreate;private boolean testOnBorrow;private boolean testOnReturn;private boolean testWhileIdle;private long timeBetweenEvictionRunsMillis;private boolean blockWhenExhausted;private boolean jmxEnabled;private String jmxNamePrefix;private String jmxNameBase;public SftpPoolConfig build() {SftpPoolConfig config = new SftpPoolConfig();config.setMaxTotal(maxTotal);config.setMaxIdle(maxIdle);config.setMinIdle(minIdle);config.setLifo(lifo);config.setFairness(fairness);config.setMaxWaitMillis(maxWaitMillis);config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);config.setEvictionPolicy(evictionPolicy);config.setEvictionPolicyClassName(evictionPolicyClassName);config.setTestOnCreate(testOnCreate);config.setTestOnBorrow(testOnBorrow);config.setTestOnReturn(testOnReturn);config.setTestWhileIdle(testWhileIdle);config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);config.setBlockWhenExhausted(blockWhenExhausted);config.setJmxEnabled(jmxEnabled);config.setJmxNamePrefix(jmxNamePrefix);config.setJmxNameBase(jmxNameBase);return config;}}
4.4 SftpClient 配置类
读取配置文件,创建 SftpFactory、SftpPoolConfig、SftpPool,代码如下:
@Configuration@ConditionalOnClass(SftpPool.class)@EnableConfigurationProperties(SftpClientProperties.class)public class SftpClientAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {if (sftpClientProperties.isMultiple()) {MultipleSftpClient multipleSftpClient = new MultipleSftpClient();sftpClientProperties.getClients().forEach((name, properties) -> {SftpFactory sftpFactory = createSftpFactory(properties);SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);ISftpClient sftpClient = new SftpClient(sftpPool);multipleSftpClient.put(name, sftpClient);});return multipleSftpClient;}SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);return new SftpClient(sftpPool);}public SftpFactory createSftpFactory(SftpClientProperties properties) {return new SftpFactory.Builder().host(properties.getHost()).port(properties.getPort()).username(properties.getUsername()).password(properties.getPassword()).build();}public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {SftpClientProperties.Pool pool = properties.getPool();return new SftpPoolConfig.Builder().maxTotal(pool.getMaxTotal()).maxIdle(pool.getMaxIdle()).minIdle(pool.getMinIdle()).lifo(pool.isLifo()).fairness(pool.isFairness()).maxWaitMillis(pool.getMaxWaitMillis()).minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis()).evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis()).softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis()).numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun()).evictionPolicy(null).evictionPolicyClassName(DefaultEvictionPolicy.class.getName()).testOnCreate(pool.isTestOnCreate()).testOnBorrow(pool.isTestOnBorrow()).testOnReturn(pool.isTestOnReturn()).testWhileIdle(pool.isTestWhileIdle()).timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis()).blockWhenExhausted(pool.isBlockWhenExhausted()).jmxEnabled(pool.isJmxEnabled()).jmxNamePrefix(pool.getJmxNamePrefix()).jmxNameBase(pool.getJmxNameBase()).build();}public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {SftpClientProperties.Abandoned abandoned = properties.getAbandoned();return new SftpAbandonedConfig.Builder().removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow()).removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance()).removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout()).logAbandoned(abandoned.isLogAbandoned()).requireFullStackTrace(abandoned.isRequireFullStackTrace()).logWriter(new PrintWriter(System.out)).useUsageTracking(abandoned.isUseUsageTracking()).build();}}
4.5 对象 SftpClient
SftpClient 是实际工作的类,从 SftpClient 中可获取到一个 sftp 链接,使用完成后,归还给 sftpPool。SftpClient 代码如下:
public class SftpClient implements ISftpClient {private SftpPool sftpPool;/*** 从sftp连接池获取连接并执行操作** @param handler sftp操作*/@Overridepublic void open(ISftpClient.Handler handler) {Sftp sftp = null;try {sftp = sftpPool.borrowObject();ISftpClient.Handler policyHandler = new DelegateHandler(handler);policyHandler.doHandle(sftp);} catch (Exception e) {log.error("sftp异常:", e);throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);} finally {if (sftp != null) {sftpPool.returnObject(sftp);}}}@AllArgsConstructorstatic class DelegateHandler implements ISftpClient.Handler {private ISftpClient.Handler target;@Overridepublic void doHandle(Sftp sftp) {try {target.doHandle(sftp);} catch (Exception e) {log.error("sftp异常:", e);throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);}}}}
4.6 实战代码示例
通过 sftp 上传文件到 XX 服务器
//通过SFTP上传到XX((MultipleSftpClient) sftpClient).choose("XX");sftpClient.open(sftp -> {boolean exist = sftp.isExist(inventoryPath);if(!exist){sftp.mkdirs(inventoryPath);}// 执行sftp操作InputStream is = new FileInputStream(oneColumnCSVFile);sftp.upload(inventoryPath, titleName, is);log.info("inventory upload over");});
5 总结
通过本文的介绍可以知道,Apache Commons Pool 定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架 Jedis 是如何基于 GenericObjectPool 来实现的连接池。最后介绍了国际物流履约系统中是如何基于 GenericObjectPool 来管理 Sftp 连接的。
掌握了 GenericObjectPool 的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。
版权声明: 本文为 InfoQ 作者【京东科技开发者】的原创文章。
原文链接:【http://xie.infoq.cn/article/2a0c57bdc404750d7d6118e74】。文章转载请联系作者。










评论