写点什么

只用了半个 Redisson 的 Semaphore 实现并发控制

  • 2023-02-14
    湖南
  • 本文字数:2300 字

    阅读完需:约 8 分钟

做过企业微信开发的同学应该知道,企业微信有一个很讨厌的报错--接口并发超过限制(45033)。报错的原因呢就是因为有多个线程在同时调用企业微信的接口,为了不让接口调用一直报错,我的服务就也要有一个接口并发控制体系。


首先想到的就是用线程池来实现

刚开始看起来似乎没什么问题,但是企业微信的接口之间并发限制是互相隔离的,那就意味着,我需要创建几十个线程池。。。你以为这就完了?其实不同企业用户调用的时候又是互相隔离的,所以,我需要为每个企业。。。。

线程池肯定是用不了了,我猛然想起前天晚上做梦,大师似乎跟我讲了信号量可以用来控制并发,考虑到线上环境有多个节点,同时 Redisson 也提供了 Semaphore,打算用一下试试。想想距离上次使用信号量已经过去了∞天(毕竟我从来都没有用过),我直接打开 IDEA,先用 java 的试试水,public static void main 一气呵成

public static void main(String[] args) {    //创建一个信号量    Semaphore semaphore = new Semaphore(1);    //如果没获取到就抛出异常    if (!semaphore.tryAcquire()) {        throw new RuntimeException();    }    //try不能括到上面,获取不到的时候是不需要释放的    try{        //做一些业务        System.out.println("do something");    }finally {        //释放掉        semaphore.release();    }}
复制代码

又检查了一下逻辑,看起来似乎没什么问题,然后我突发奇想,如果我直接 release()会怎么样

Semaphore semaphore = new Semaphore(1);semaphore.release();System.out.println(semaphore.availablePermits());
复制代码


。。。0.0


此时我的心态出现了亿点变化,我意识到,其实初始化的 new Semaphore(1)只是设置了一个初始值,并不是一个最大值,直接调用 release(),可获取信号量的值会直接+1,这时我赶紧用 Redisson 的 api redissonClient.getSemaphore("test"); (其他代码和上面的一样)试了一下,万一呢


果然效果和 Java 的是一模一样。


不过,问题似乎也不是很大?如果我保证业务在拿到信号后一定会在 finally{..}中释放掉,并且 semaphore.trySetPermits(1);这个方法只初始化一次,并且一定是单线程,那其实就没有问题。

企业微信的这个并发限制,没有明文规定,那上线之后,大概是要在违法的边缘多试探试探的,所以就涉及到更新信号量的问题了

可以看到,如果在多线程的情况下,贸然更新信号量,由于模型没有最大值的概念,更新后的信号量总量比起期望值是只多不少的。


问题主要还是出在 release()这个方法没有上限概念上,那我来弄一个有上限的 release 方法就好了。此时第一个思路就是继承 RedissonSemaphore 这个类重写一下 release(),但是后来发现自己的子类无法用 redissonClient 初始化出来,方案就 pass 掉了。所谓工作时间长了,什么都敢干,我直接打开源码,我只要仿照着写个一样的是不就行了!


public RFuture<Void> releaseAsync(int permits) {    ...    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +                    "redis.call('publish', KEYS[2], value); ",            Arrays.asList(getRawName(), getChannelName()), permits);    ...}
复制代码


好家伙,核心逻辑其实就一行 lua 脚本 redis.call('incrby', KEYS[1], ARGV[1]);,我只要一个判断上限的就好了。虽然之前也没写过 lua 脚本,但是好在逻辑不复杂,只需要一个 if 判断就行了,redisson 源码中有很多这样的判断,现学现卖就可以了。


另外,鉴于面向对象的思想,如果我自己工具类写一个 release()方法,并让这个方法直接出现在业务中多少有一些不妥。所以新建了一个对象,封装一下获取和释放两个方法,之后项目中所有信号量的相关业务,都用这个对象,redisson 的信号量概念就不会再出现在具体的业务中了,可以提高业务代码的可读性,也会信号量相关操作的修改控制在这个类中。


经过了一番折腾,总算是把功能实现了。部分代码如下,tryAcquire()方法还是使用 redisson 提供的,但是 release()方法用了自己实现的~

public class SemaphoreModel {    /**     * 信号量本体     */    private RedissonSemaphore semaphore;
private RedissonClient redissonClient;
/** * 获取到信号的mark */ private boolean mark = false;
public void release() { //获取信号量的值 String script = "local value = redis.call('get', '" + semaphore.getRawName() + "'); " + //如果没有设置过(false意味着undefind)或者当前值小于5(我设置的最大值)就做release的逻辑 "if (value ~= false and tonumber(value) < 5) then " + //信号量加一 "local val = redis.call('incrby', '" + semaphore.getRawName() + "', 1); " + "redis.call('publish', '" + RedissonSemaphore.getChannelName(semaphore.getRawName()) + "', val); " + "return 1; " + "end; " + "return 0;"; redissonClient.getScript().eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.VALUE); }
/** * 获取一个信号量 */ public boolean tryAcquire() { return semaphore.tryAcquire(); }
}
复制代码

经过了这样的设计,就造出了一个有上限的信号量,问题也都解决了。


作者:笨天才

链接:https://juejin.cn/post/7197782794675585080

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
只用了半个Redisson的Semaphore实现并发控制_Java_做梦都在改BUG_InfoQ写作社区