写点什么

限流系列文章——漏斗限流

作者:李子捌
  • 2021 年 11 月 27 日
  • 本文字数:3978 字

    阅读完需:约 13 分钟

限流系列文章——漏斗限流

1、需求

限定用户的某个行为在指定时间 T 内,只允许发生 N 次。假设 T 为 1 秒钟,N 为 1000 次。

2、常见的错误设计

程序员设计了一个在每分钟内只允许访问 1000 次的限流方案,如下图 01:00s-02:00s 之间只允许访问 1000 次,这种设计最大的问题在于,请求可能在 01:59s-02:00s 之间被请求 1000 次,02:00s-02:01s 之间被请求了 1000 次,这种情况下 01:59s-02:01s 间隔 0.02s 之间被请求 2000 次,很显然这种设计是错误的。



3、漏斗限流

3.1 解决方案

漏斗容量有限,当流水的的速度小于灌水的速度,漏斗就会水满溢出,利用这个原理我们可以设计限流代码!漏斗的剩余的空间就代表着当前行为(请求)可以持续进行的数量,漏斗的流水速率代表系统允许行为(请求)发生的最大频率,通常安装系统的处理能力权衡后进行设值。



3.2 Java 代码实现

package com.lizba.redis.limit;
import java.util.Map;import java.util.concurrent.ConcurrentHashMap;
/** * <p> * 漏斗限流 * </p> * * @Author: Liziba */public class FunnelRateLimiter {
/** map用于存储多个漏斗 */ private Map<String, Funnel> funnels = new ConcurrentHashMap<>();
/** * 请求(行为)是否被允许 * * @param userId 用户id * @param actionKey 行为key * @param capacity 漏斗容量 * @param leakingRate 剩余容量 * @param quota 请求次数 * @return */ public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate, int quota) {
String key = String.format("%s:%s", userId, actionKey); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return funnel.waterLeaking(quota); }
/** * 漏斗类 */ class Funnel { /** 漏斗容量 */ int capacity; /** 漏斗流速,每毫秒允许的流速(请求) */ float leakingRate; /** 漏斗剩余空间 */ int leftCapacity; /** 上次漏水时间 */ long leakingTs;
public Funnel(int capacity, float leakingRate) { this.capacity = this.leftCapacity = capacity; this.leakingRate = leakingRate; leakingTs = System.currentTimeMillis(); }
/** * 计算剩余空间 */ void makeSpace() { long nowTs = System.currentTimeMillis(); long intervalTs = nowTs - leakingTs; int intervalCapacity = (int) (intervalTs * leakingRate); // int 溢出 if (intervalCapacity < 0) { this.leftCapacity = this.capacity; this.leakingTs = nowTs; return; } // 腾出空间 >= 1 if (intervalCapacity < 1) { return; } // 增加漏斗剩余容量 this.leftCapacity += intervalCapacity; this.leakingTs = nowTs; // 容量不允许超出漏斗容量 if (this.leftCapacity > this.capacity) { this.leftCapacity = this.capacity; } }
/** * 漏斗流水 * * @param quota 流水量 * @return */ boolean waterLeaking(int quota) { // 触发漏斗流水 this.makeSpace(); if (this.leftCapacity >= quota) { leftCapacity -= quota; return true; } return false; } }
}
复制代码

测试代码:

计算机运行如下的代码速度会非常的块,我通过 TimeUnit.SECONDS.sleep(2);模拟客户端过一段时间后再请求。

设置漏斗容量为 10,每毫秒允许 0.002 次请求(2 次/秒),每次请求数量为 1;

package com.lizba.redis.limit;
import java.util.concurrent.TimeUnit;
/** * @Author: Liziba */public class TestFunnelRateLimit {
public static void main(String[] args) throws InterruptedException {
FunnelRateLimiter limiter = new FunnelRateLimiter(); for (int i = 1; i <= 20; i++) { if (i == 15) { TimeUnit.SECONDS.sleep(2); } // 设置漏斗容量为10,每毫秒允许0.002次请求(2 次/秒),每次请求数量为1 boolean success = limiter.isActionAllowed("Liziba", "commit", 10, 0.002f, 1); System.out.println("第" + i + "请求" + (success ? "成功" : "失败")); }
}
}
复制代码

测试结果:

  1. 01-10 次请求成功,初始漏斗大小为 10,因此前 10 次请求成功

  2. 11-14 次请求失败,由于漏斗已满,并且漏斗的流速在这四次请求之间未能释放 1

  3. 15-18 次请求成功,因为 i == 15 时主线程睡眠 2 秒,2 秒时间漏斗流出 0.002*1000*2 = 4,因此这四次请求成功

  4. 19-20 次请求失败,与 11-14 次请求失败的原因一致



3.3 结合 Redis 实现

我们采用 hash 结构,将 Funnel 的属性字段,放入 hash 中,并且在代码中进行运算即可

package com.lizba.redis.limit;
import redis.clients.jedis.Jedis;
import java.util.HashMap;import java.util.Map;
/** * <p> * redis hash 漏斗限流 * </p> * * @Author: Liziba * @Date: 2021/9/7 23:46 */public class FunnelRateLimiterByHash {
private Jedis client;
public FunnelRateLimiterByHash(Jedis client) { this.client = client; }
/** * 请求是否成功 * * @param userId * @param actionKey * @param capacity * @param leakingRate * @param quota * @return */ public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate, int quota) { String key = this.key(userId, actionKey); long nowTs = System.currentTimeMillis(); Map<String, String> funnelMap = client.hgetAll(key); if (funnelMap == null || funnelMap.isEmpty()) { return initFunnel(key, nowTs, capacity, quota); } long intervalTs = nowTs - Long.parseLong(funnelMap.get("leakingTs")); int intervalCapacity = (int) (intervalTs * leakingRate); // 时间过长, int可能溢出 if (intervalCapacity < 0) { intervalCapacity = 0; initFunnel(key, nowTs, capacity, quota); } // 腾出空间必须 >= 1 if (intervalCapacity < 1) { intervalCapacity = 0; } int leftCapacity = Integer.parseInt(funnelMap.get("leftCapacity")) + intervalCapacity; if (leftCapacity > capacity) { leftCapacity = capacity; } return initFunnel(key, nowTs, leftCapacity, quota); }


/** * 存入redis,初始funnel * * @param key * @param nowTs * @param capacity * @param quota * @return */ private boolean initFunnel(String key,long nowTs, int capacity, int quota) { Map<String, String> funnelMap = new HashMap<>(); funnelMap.put("leftCapacity", String.valueOf((capacity > quota) ? (capacity - quota) : 0)); funnelMap.put("leakingTs", String.valueOf(nowTs)); client.hset(key, funnelMap); return capacity >= quota; }
/** * 限流key * * @param userId * @param actionKey * @return */ private String key(String userId, String actionKey) { return String.format("limit:%s:%s", userId, actionKey); } }
复制代码

测试代码:

package com.lizba.redis.limit;
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;
/** * @Author: Liziba */public class TestFunnelRateLimiterByHash {
public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis("192.168.211.108", 6379);
FunnelRateLimiterByHash limiter = new FunnelRateLimiterByHash(jedis); for (int i = 1; i <= 20; i++) { if (i == 15) { TimeUnit.SECONDS.sleep(2); } boolean success = limiter.isActionAllowed("liziba", "view", 10, 0.002f, 1); System.out.println("第" + i + "请求" + (success ? "成功" : "失败")); }
jedis.close(); }
}
复制代码

测试结果:

与上面的 java 代码结构一致



4、总结

上述说了两种实现漏斗限流的方式,其实思想都是一样的,但是这两者都无法在分布式环境中使用,即便是在单机环境中也是不准确的,存在线程安全问题/原子性问题,因此我们一般使用 Redis 提供的限流模块 Redis-Cell 来限流,Redis-Cell 提供了原子的限流指令 cl.throttle,这个留到后续在详细说吧,我要睡觉去了!

发布于: 2021 年 11 月 27 日阅读数: 10
用户头像

李子捌

关注

华为云享专家 2020.07.20 加入

公众号【李子捌】

评论

发布
暂无评论
限流系列文章——漏斗限流