写点什么

固定 QPS 异步任务功能初探

作者:FunTester
  • 2022-10-19
    北京
  • 本文字数:2370 字

    阅读完需:约 1 分钟

在之前文章Java自定义异步功能实践中,我仿造 Go 语言中的go定义了fun作为 Groovy/Java 异步执行的关键字。通过一个定长的线程池执行异步任务。

经过一段时间的使用,效果非常好,既能满足当下的需求,在实现的过程中也锻炼了自己对线程池的理解。通常的使用场景分为:异步上报数据、大量任务需要多线程执行、做简单的并发测试。

但是来了一个新活儿,需要控制请求的 QPS,而非通过线程池的大小控制并发的压力。这样第一能够更加准确,第二能适配不同的耗时的任务。

说来就来,经过查询资料,很多限流框架或者组件都比较好地实现了这个功能。但是功能设计相对我的需求来说,太复杂,太强大了。有点大材小用的感觉。所以查阅了之前文章如何mock固定QPS的接口中用到的java.util.concurrent.Semaphore流量控制类。

看这个类的包路径就知道这是 Java 并发用到的,实际上在 JDK 自带并发相关功能类中,java.util.concurrent.Semaphore使用的范围还是挺广的。这里就不多说了。这个类就是在线程安全的场景中,实现流量控制的。主要的 API 也没几个。

API 简介

  • 首先是构造方法:java.util.concurrent.Semaphore#Semaphore(int)这里参数可以理解为许可的总量。

  • 获取许可:java.util.concurrent.Semaphore#acquire()由这个方法引申出来还有几个增加时间参数的 API,这里也不多说了。

  • 释放许可:java.util.concurrent.Semaphore#release()默认释放一个许可,最常用的也是这个。

  • 获取和释放都支持多个许可,使用场景不多,不过多介绍了。

实现

思路,我还是通过异步线程池来实现,然后每一次获取一个许可,我就把任务当做一个简单的异步任务去执行,然后休眠 1s 之后再释放许可。

由于工具类com.funtester.frame.SourceCode是个 Java 类,基本了不少工具函数,改成Groovy的话面临兼容性的风险。所以功能实现还是用的 Java 语言,这里借助了Groovy closure 与Java function转换提供的兼容性方案解决 Groovy 和 Java 闭包的不兼容问题,语法上看着有点乱。


    /**     * 已固定QPS,异步执行,默认16,调整方法{@link SourceCode#setMaxQps(int)}     *     * @param f     */    public static void funner(Closure f) {        if (!ThreadPoolUtil.acquire()) return;        fun(JToG.toClosure(() -> {            sleep(1.0);            fun(JToG.toClosure(() -> {                f.call();                ThreadPoolUtil.release();                return null;            }));            return null;        }));    }        /**     * 设置异步执行任务最大QPS     *     * @param i     */    public static void setMaxQps(int i) {        ThreadPoolUtil.setSemaphore(new Semaphore(i));    }
复制代码

流量设置:

    /**     * 全局流量控制     */    static Semaphore semaphore = new Semaphore(16)
    static long AcquireTimeout = 8888
    /**     * 获取许可     * @return     */    static boolean acquire() {        semaphore.tryAcquire(AcquireTimeout, TimeUnit.MILLISECONDS)    }
    /**     * 释放许可     * @return     */    static def release() {        semaphore.release()    }


复制代码

测试用例

思路就是设置一个小的 QPS,然后循环添加异步任务,异步任务完成打印时间。

    static void main(String[] args) {        setMaxQps(2)        10.times {            funner {                output(Time.getDate())            }        }
    }
复制代码

控制台输出:

  ###### #     #  #    # ####### ######  #####  ####### ######  #####  #      #     #  ##   #    #    #       #         #    #       #    #  ####   #     #  # #  #    #    ####    #####     #    ####    #####  #      #     #  #  # #    #    #            #    #    #       #   #  #       #####   #    #    #    ######  #####     #    ######  #    #
17:50:07.675 Deamon 守护线程开启!17:50:08.695 F-3  2022-10-08 17:50:0817:50:08.695 F-4  2022-10-08 17:50:0817:50:09.700 F-7  2022-10-08 17:50:0917:50:09.700 F-7  2022-10-08 17:50:0917:50:10.706 F-3  2022-10-08 17:50:1017:50:10.706 F-1  2022-10-08 17:50:1017:50:11.711 F-6  2022-10-08 17:50:1117:50:11.711 F-5  2022-10-08 17:50:1117:50:12.715 F-4  2022-10-08 17:50:1217:50:12.716 F-2  2022-10-08 17:50:1217:50:12.757 Deamon 异步线程池等待执行1次耗时:1,028 ms17:50:12.759 Deamon 异步线程池关闭!
进程已结束,退出代码0
复制代码

这里有个问题就是线程池满了之后会导致任务积压,这里我考虑了一下现实情况,暂时没有处理,后续遇到再说。

FunTester 原创专题推荐~

-- By FunTester

发布于: 刚刚阅读数: 5
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
固定QPS异步任务功能初探_FunTester_InfoQ写作社区