写点什么

巧用 GenericObjectPool 创建自定义对象池

  • 2023-03-13
    北京
  • 本文字数:7773 字

    阅读完需:约 26 分钟

巧用GenericObjectPool创建自定义对象池

作者:京东物流 高圆庆

1 前言

通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在 java 中,有很多池管理的概念,典型的如线程池,数据库连接池,socket 连接池。本文章讲介绍 apache 提供的通用对象池框架 GenericObjectPool,以及基于 GenericObjectPool 实现的 sftp 连接池在国际物流调度履约系统中的应用。

2 GenericObjectPool 剖析

Apache Commons Pool 是一个对象池的框架,他提供了一整套用于实现对象池化的 API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool 和 GenericObjectPool,其中 GenericObjectPool 是我们最常用的对象池,内部实现也最复杂。GenericObjectPool 的 UML 图如下所示:


2.1 核心接口 ObjectPool

从图中可以看出,GenericObjectPool 实现了 ObjectPool 接口,而 ObjectPool 就是对象池的核心接口,它定义了一个对象池应该实现的行为。


  • addObject 方法:往池中添加一个对象

  • borrowObject 方法:从池中借走到一个对象

  • returnObject 方法:把对象归还给对象池

  • invalidateObject:验证对象的有效性

  • getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。

  • getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。

  • clear:清理对象池。注意是清理不是清空,该方法要求的是,清理所有空闲对象,释放相关资源。

  • close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。

2.2 对象工厂 BasePooledObjectFactory

对象的创建需要通过对象工厂来创建,对象工厂需要实现 BasePooledObjectFactory 接口。ObjectPool 接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:


  1. public interface PooledObjectFactory<T> {

  2. /**

  3. * 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。

  4. */

  5. PooledObject<T> makeObject() throws Exception;

  6. /**

  7. * 销毁池不再需要的实例

  8. */

  9. void destroyObject(PooledObject<T> p) throws Exception;

  10. /**

  11. * 确保实例可以安全地由池返回

  12. */

  13. boolean validateObject(PooledObject<T> p);

  14. /**

  15. * 重新初始化池返回的实例

  16. */

  17. void activateObject(PooledObject<T> p) throws Exception;

  18. /**

  19. * 取消初始化要返回到空闲对象池的实例

  20. */

  21. void passivateObject(PooledObject<T> p) throws Exception;

  22. }

2.3 配置类 GenericObjectPoolConfig

GenericObjectPoolConfig 是封装 GenericObject 池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用 GenericObjectPoolConfig 提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig 是一个抽象类,实际应用中需要新建配置类,然后继承它。

2.4 工作原理流程

  1. 构造方法

  2. 当我们执行构造方法时,主要工作就是创建了一个存储对象的 LinkedList 类型容器,也就是概念意义上的“池”

  3. 从对象池中获取对象

  4. 获取池中的对象是通过 borrowObject()命令,源码比较复杂,简单而言就是去 LinkedList 中获取一个对象,如果不存在的话,要调用构造方法中第一个参数 Factory 工厂类的 makeObject()方法去创建一个对象再获取,获取到对象后要调用 validateObject 方法判断该对象是否是可用的,如果是可用的才拿去使用。LinkedList 容器减一

  5. 归还对象到线程池

  6. 简单而言就是先调用 validateObject 方法判断该对象是否是可用的,如果可用则归还到池中,LinkedList 容器加一,如果是不可以的则则调用 destroyObject 方法进行销毁


上面三步就是最简单的流程,由于取和还的流程步骤都在 borrowObject 和 returnObject 方法中固定的,所以我们只要重写 Factory 工厂类的 makeObject()和 validateObject 以及 destroyObject 方法即可实现最简单的池的管理控制,通过构造方法传入该 Factory 工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:


3 开源框架如何使用 GenericObjectPool

redis 的 java 客户端 jedis 就是基于 Apache Commons Pool 对象池的框架来实现的。

3.1 对象工厂类 JedisFactory

对象工厂类只需实现 activateObject、destroyObject、makeObject、validateObject 方法即可,源码如下:


  1. class JedisFactory implements PooledObjectFactory<Jedis> {

  2. private final String host;

  3. private final int port;

  4. private final int timeout;

  5. private final int newTimeout;

  6. private final String password;

  7. private final int database;

  8. private final String clientName;

  9. public JedisFactory(String host, int port, int timeout, String password, int database) {

  10. this(host, port, timeout, password, database, (String)null);

  11. }

  12. public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {

  13. this(host, port, timeout, timeout, password, database, clientName);

  14. }

  15. public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {

  16. this.host = host;

  17. this.port = port;

  18. this.timeout = timeout;

  19. this.newTimeout = newTimeout;

  20. this.password = password;

  21. this.database = database;

  22. this.clientName = clientName;

  23. }

  24. public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {

  25. BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();

  26. if (jedis.getDB() != (long)this.database) {

  27. jedis.select(this.database);

  28. }

  29. }

  30. public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {

  31. BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();

  32. if (jedis.isConnected()) {

  33. try {

  34. try {

  35. jedis.quit();

  36. } catch (Exception var4) {

  37. }

  38. jedis.disconnect();

  39. } catch (Exception var5) {

  40. }

  41. }

  42. }

  43. public PooledObject<Jedis> makeObject() throws Exception {

  44. Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);

  45. jedis.connect();

  46. if (null != this.password) {

  47. jedis.auth(this.password);

  48. }

  49. if (this.database != 0) {

  50. jedis.select(this.database);

  51. }

  52. if (this.clientName != null) {

  53. jedis.clientSetname(this.clientName);

  54. }

  55. return new DefaultPooledObject(jedis);

  56. }

  57. public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {

  58. }

  59. public boolean validateObject(PooledObject<Jedis> pooledJedis) {

  60. BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();

  61. try {

  62. return jedis.isConnected() && jedis.ping().equals("PONG");

  63. } catch (Exception var4) {

  64. return false;

  65. }

  66. }

  67. }

3.2 配置类 JedisPoolConfig

  1. public class JedisPoolConfig extends GenericObjectPoolConfig {

  2. public JedisPoolConfig() {

  3. this.setTestWhileIdle(true);

  4. this.setMinEvictableIdleTimeMillis(60000L);

  5. this.setTimeBetweenEvictionRunsMillis(30000L);

  6. this.setNumTestsPerEvictionRun(-1);

  7. }

  8. }

4 国际物流履约系统中的应用

在国际物流履约系统中,我们和客户交互文件经常使用 sftp 服务器,因为创建 sftp 服务器的连接比较耗时,所以基于 Apache Commons Pool 对象池的框架来实现的我们自己的 sftp 链接池。

4.1 sftp 对象池

SftpPool 比较简单,直接继承 GenericObjectPool。


  1. public class SftpPool extends GenericObjectPool<Sftp> {

  2. public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {

  3. super(factory, config, abandonedConfig);

  4. }

  5. }

4.2 对象工厂 SftpFactory

这是基于 Apache Commons Pool 框架实现自定义对象池的核心类,代码如下:


  1. public class SftpFactory extends BasePooledObjectFactory<Sftp> {

  2. private static final String CHANNEL_TYPE = "sftp";

  3. private static Properties sshConfig = new Properties();

  4. private String host;

  5. private int port;

  6. private String username;

  7. private String password;

  8. static {

  9. sshConfig.put("StrictHostKeyChecking", "no");

  10. }

  11. @Override

  12. public Sftp create() {

  13. try {

  14. JSch jsch = new JSch();

  15. Session sshSession = jsch.getSession(username, host, port);

  16. sshSession.setPassword(password);

  17. sshSession.setConfig(sshConfig);

  18. sshSession.connect();

  19. ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);

  20. channel.connect();

  21. log.info("sftpFactory创建sftp");

  22. return new Sftp(channel);

  23. } catch (JSchException e) {

  24. log.error("连接sftp失败:", e);

  25. throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);

  26. }

  27. }

  28. /**

  29. * @param sftp 被包装的对象

  30. * @return 对象包装器

  31. */

  32. @Override

  33. public PooledObject<Sftp> wrap(Sftp sftp) {

  34. return new DefaultPooledObject<>(sftp);

  35. }

  36. /**

  37. * 销毁对象

  38. * @param p 对象包装器

  39. */

  40. @Override

  41. public void destroyObject(PooledObject<Sftp> p) {

  42. log.info("开始销毁channelSftp");

  43. if (p!=null) {

  44. Sftp sftp = p.getObject();

  45. if (sftp!=null) {

  46. ChannelSftp channelSftp = sftp.getChannelSftp();

  47. if (channelSftp!=null) {

  48. channelSftp.disconnect();

  49. log.info("销毁channelSftp成功");

  50. }

  51. }

  52. }

  53. }

  54. /**

  55. * 检查连接是否可用

  56. *

  57. * @param p 对象包装器

  58. * @return {@code true} 可用,{@code false} 不可用

  59. */

  60. @Override

  61. public boolean validateObject(PooledObject<Sftp> p) {

  62. if (p!=null) {

  63. Sftp sftp = p.getObject();

  64. if (sftp!=null) {

  65. try {

  66. sftp.getChannelSftp().cd("./");

  67. log.info("验证连接是否可用,结果为true");

  68. return true;

  69. } catch (SftpException e) {

  70. log.info("验证连接是否可用,结果为false",e);

  71. return false;

  72. }

  73. }

  74. }

  75. log.info("验证连接是否可用,结果为false");

  76. return false;

  77. }

  78. public static class Builder {

  79. private String host;

  80. private int port;

  81. private String username;

  82. private String password;

  83. public SftpFactory build() {

  84. return new SftpFactory(host, port, username, password);

  85. }

  86. public Builder host(String host) {

  87. this.host = host;

  88. return this;

  89. }

  90. public Builder port(int port) {

  91. this.port = port;

  92. return this;

  93. }

  94. public Builder username(String username) {

  95. this.username = username;

  96. return this;

  97. }

  98. public Builder password(String password) {

  99. this.password = password;

  100. return this;

  101. }

  102. }

  103. }

4.3 配置类 SftpPoolConfig

配置类继承了 GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。


  1. public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {

  2. public static class Builder {

  3. private int maxTotal;

  4. private int maxIdle;

  5. private int minIdle;

  6. private boolean lifo;

  7. private boolean fairness;

  8. private long maxWaitMillis;

  9. private long minEvictableIdleTimeMillis;

  10. private long evictorShutdownTimeoutMillis;

  11. private long softMinEvictableIdleTimeMillis;

  12. private int numTestsPerEvictionRun;

  13. private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置

  14. private String evictionPolicyClassName;

  15. private boolean testOnCreate;

  16. private boolean testOnBorrow;

  17. private boolean testOnReturn;

  18. private boolean testWhileIdle;

  19. private long timeBetweenEvictionRunsMillis;

  20. private boolean blockWhenExhausted;

  21. private boolean jmxEnabled;

  22. private String jmxNamePrefix;

  23. private String jmxNameBase;

  24. public SftpPoolConfig build() {

  25. SftpPoolConfig config = new SftpPoolConfig();

  26. config.setMaxTotal(maxTotal);

  27. config.setMaxIdle(maxIdle);

  28. config.setMinIdle(minIdle);

  29. config.setLifo(lifo);

  30. config.setFairness(fairness);

  31. config.setMaxWaitMillis(maxWaitMillis);

  32. config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

  33. config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);

  34. config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);

  35. config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);

  36. config.setEvictionPolicy(evictionPolicy);

  37. config.setEvictionPolicyClassName(evictionPolicyClassName);

  38. config.setTestOnCreate(testOnCreate);

  39. config.setTestOnBorrow(testOnBorrow);

  40. config.setTestOnReturn(testOnReturn);

  41. config.setTestWhileIdle(testWhileIdle);

  42. config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

  43. config.setBlockWhenExhausted(blockWhenExhausted);

  44. config.setJmxEnabled(jmxEnabled);

  45. config.setJmxNamePrefix(jmxNamePrefix);

  46. config.setJmxNameBase(jmxNameBase);

  47. return config;

  48. }

  49. }

4.4 SftpClient 配置类

读取配置文件,创建 SftpFactory、SftpPoolConfig、SftpPool,代码如下:


  1. @Configuration

  2. @ConditionalOnClass(SftpPool.class)

  3. @EnableConfigurationProperties(SftpClientProperties.class)

  4. public class SftpClientAutoConfiguration {

  5. @Bean

  6. @ConditionalOnMissingBean

  7. public ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {

  8. if (sftpClientProperties.isMultiple()) {

  9. MultipleSftpClient multipleSftpClient = new MultipleSftpClient();

  10. sftpClientProperties.getClients().forEach((name, properties) -> {

  11. SftpFactory sftpFactory = createSftpFactory(properties);

  12. SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);

  13. SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);

  14. SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);

  15. ISftpClient sftpClient = new SftpClient(sftpPool);

  16. multipleSftpClient.put(name, sftpClient);

  17. });

  18. return multipleSftpClient;

  19. }

  20. SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);

  21. SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);

  22. SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);

  23. SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);

  24. return new SftpClient(sftpPool);

  25. }

  26. public SftpFactory createSftpFactory(SftpClientProperties properties) {

  27. return new SftpFactory.Builder()

  28. .host(properties.getHost())

  29. .port(properties.getPort())

  30. .username(properties.getUsername())

  31. .password(properties.getPassword())

  32. .build();

  33. }

  34. public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {

  35. SftpClientProperties.Pool pool = properties.getPool();

  36. return new SftpPoolConfig.Builder()

  37. .maxTotal(pool.getMaxTotal())

  38. .maxIdle(pool.getMaxIdle())

  39. .minIdle(pool.getMinIdle())

  40. .lifo(pool.isLifo())

  41. .fairness(pool.isFairness())

  42. .maxWaitMillis(pool.getMaxWaitMillis())

  43. .minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis())

  44. .evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis())

  45. .softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis())

  46. .numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun())

  47. .evictionPolicy(null)

  48. .evictionPolicyClassName(DefaultEvictionPolicy.class.getName())

  49. .testOnCreate(pool.isTestOnCreate())

  50. .testOnBorrow(pool.isTestOnBorrow())

  51. .testOnReturn(pool.isTestOnReturn())

  52. .testWhileIdle(pool.isTestWhileIdle())

  53. .timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis())

  54. .blockWhenExhausted(pool.isBlockWhenExhausted())

  55. .jmxEnabled(pool.isJmxEnabled())

  56. .jmxNamePrefix(pool.getJmxNamePrefix())

  57. .jmxNameBase(pool.getJmxNameBase())

  58. .build();

  59. }

  60. public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {

  61. SftpClientProperties.Abandoned abandoned = properties.getAbandoned();

  62. return new SftpAbandonedConfig.Builder()

  63. .removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow())

  64. .removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance())

  65. .removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout())

  66. .logAbandoned(abandoned.isLogAbandoned())

  67. .requireFullStackTrace(abandoned.isRequireFullStackTrace())

  68. .logWriter(new PrintWriter(System.out))

  69. .useUsageTracking(abandoned.isUseUsageTracking())

  70. .build();

  71. }

  72. }

4.5 对象 SftpClient

SftpClient 是实际工作的类,从 SftpClient 中可获取到一个 sftp 链接,使用完成后,归还给 sftpPool。SftpClient 代码如下:


  1. public class SftpClient implements ISftpClient {

  2. private SftpPool sftpPool;

  3. /**

  4. * 从sftp连接池获取连接并执行操作

  5. *

  6. * @param handler sftp操作

  7. */

  8. @Override

  9. public void open(ISftpClient.Handler handler) {

  10. Sftp sftp = null;

  11. try {

  12. sftp = sftpPool.borrowObject();

  13. ISftpClient.Handler policyHandler = new DelegateHandler(handler);

  14. policyHandler.doHandle(sftp);

  15. } catch (Exception e) {

  16. log.error("sftp异常:", e);

  17. throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);

  18. } finally {

  19. if (sftp != null) {

  20. sftpPool.returnObject(sftp);

  21. }

  22. }

  23. }

  24. @AllArgsConstructor

  25. static class DelegateHandler implements ISftpClient.Handler {

  26. private ISftpClient.Handler target;

  27. @Override

  28. public void doHandle(Sftp sftp) {

  29. try {

  30. target.doHandle(sftp);

  31. } catch (Exception e) {

  32. log.error("sftp异常:", e);

  33. throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);

  34. }

  35. }

  36. }

  37. }

4.6 实战代码示例

通过 sftp 上传文件到 XX 服务器


  1. //通过SFTP上传到XX

  2. ((MultipleSftpClient) sftpClient).choose("XX");

  3. sftpClient.open(sftp -> {

  4. boolean exist = sftp.isExist(inventoryPath);

  5. if(!exist){

  6. sftp.mkdirs(inventoryPath);

  7. }

  8. // 执行sftp操作

  9. InputStream is = new FileInputStream(oneColumnCSVFile);

  10. sftp.upload(inventoryPath, titleName, is);

  11. log.info("inventory upload over");

  12. });

5 总结

通过本文的介绍可以知道,Apache Commons Pool 定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架 Jedis 是如何基于 GenericObjectPool 来实现的连接池。最后介绍了国际物流履约系统中是如何基于 GenericObjectPool 来管理 Sftp 连接的。


掌握了 GenericObjectPool 的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。

发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
巧用GenericObjectPool创建自定义对象池_京东云_京东科技开发者_InfoQ写作社区