写点什么

多优先级线程池实践

作者:FunTester
  • 2024-05-06
    河北
  • 本文字数:1802 字

    阅读完需:约 6 分钟

在之前的 Java 线程池实践当中,我遇到了任务优先级的问题。最终采取的方案是新增一个线程池作为执行高优任务,然后将普通线程池的在执行任务执行,先去判断高优线程池是否有等待任务,如果有就先执行高优线程池等待队列中的任务。


虽然后期给普通异步线程池增加了双向链表,直接采取插队模式执行,但是也让异步任务更加复杂了。所以经过一些 java.util.concurrent 包的重新学习,得到了最优的答案,就是 java.util.concurrent.PriorityBlockingQueue

PriorityBlockingQueue 简介

java.util.concurrent.PriorityBlockingQueue 是 Java 并发包中的一个线程安全的优先级阻塞队列。它是基于优先级的元素顺序,具有以下特点:


  1. 线程安全: PriorityBlockingQueue 是线程安全的,可以在多线程环境下安全地进行操作,而不需要额外的同步手段。

  2. 无界队列: PriorityBlockingQueue 是一个无界队列,可以无限制地添加元素,因此不会因为队列满而阻塞生产者线程。

  3. 基于优先级的元素顺序: PriorityBlockingQueue 中的元素按照优先级顺序进行排序,具有较高优先级的元素会被优先出队。默认情况下,元素需要实现 Comparable 接口来定义它们的优先级,也可以在构造函数中提供一个 Comparator 来自定义优先级顺序。

  4. 阻塞操作: 当队列为空时,从 PriorityBlockingQueue 中获取元素的操作会阻塞线程,直到队列中有元素可用;当队列满时,向 PriorityBlockingQueue 中添加元素的操作也会阻塞线程,直到队列有足够的空间。

  5. 不支持空元素: PriorityBlockingQueue 不支持添加空元素,即元素不能为 null。


PriorityBlockingQueue 可以用于实现基于优先级的任务调度、事件处理等场景,其中优先级高的任务或事件会优先被处理。它提供了一种高效的方式来管理和处理具有不同优先级的元素。

多优先级线程池

下面是我自己的实现逻辑:


  1. 首先创建一个功能类,实现 java.lang.Comparablejava.lang.Runnable

  2. 制定优先级规则,通常定义一个属性 int 类型,代表优点等级。

  3. 封装 execute()方法,用来向线程池提交任务。


具体代码如下:


/**   * 多优先级线程池   */  static ThreadPoolExecutor levelPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new PriorityBlockingQueue<PriorityTask>(), getFactory("L"), new ThreadPoolExecutor.DiscardPolicy())      /**   * 执行优先级任务   * @param task   * @return   */  static def executeLevel(PriorityTask task) {      levelPool.execute(task)  }    /**   * 执行优先级任务,设定优先级   * @param priority   * @param closure   * @return   */  static def executeLevel(int priority, Closure closure) {      levelPool.execute(new PriorityTask(priority) {            @Override          void run() {              closure()          }      })  }    /**   * 执行优先级任务,使用默认优先级   * @param closure   * @return   */  static def executeLevel(Closure closure) {      levelPool.execute(new PriorityTask(PRIORITY_LEVEL_DEFAULT) {            @Override          void run() {              closure()          }      })  }    /**   * 优先级任务,用于优先级线程池   */  static abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {        int priority        PriorityTask(int priority) {          this.priority = priority      }          /**       * 比较方法,用于优先级队列,优先级越高,越先执行       * @param o       * @return       */      @Override      int compareTo(PriorityTask o) {          return this.priority - o.priority      }  }
复制代码

测试

我们来写一个测试用例,往线程池提交优先级不断提升的任务,打印任务执行时间。


ThreadPoolUtil.getLevelPool().setCorePoolSize(1);  ThreadPoolUtil.getLevelPool().setMaximumPoolSize(1);  for (int i = 0; i < 10; i++) {      ThreadPoolUtil.executeLevel(new ThreadPoolUtil.PriorityTask(10 - i) {          @Override          public void run() {              SourceCode.sleep(0.01);              System.out.println(this.getPriority());          }      });  }
复制代码


控制台打印:


10123456789
复制代码


看起来是比较符合预期的。

发布于: 刚刚阅读数: 3
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
多优先级线程池实践_FunTester_InfoQ写作社区