P8 大牛带你细谈架构中的限流与计数器的实现方式
}
http {
upstream enjoy{server 127.0.0.1:8080;
server 127.0.0.1:8081;
}server {listen 80;location / {proxy_pass http://enjoy;}}
}
在上面案例中我配置了个代理,当高并发的请求过来的时候,会把请求分发给 8080 与 8081 两个端口的服务器。
但现在我们假设一个极端的情况,这个时候由于业务有个秒杀要求,请求过大一下就让这两个服务器爆了,这个时候在我继续增加服务器之前这整个秒杀业务几乎都处在瘫痪状态,而这突然的访问量是你始料未及的,也就是说你根本就没法事先准备好足够的服务器来解决这种情况。
这个时候我相信你已经看出了如果仅仅做分流依然会出现很严重的问题,怎么办呢?这个时候你就需要限流了。
2. 再说下限流
限流依然是再高并发的前提下,如果某个服务器承受不了数目过多的请求量的一种限制机制,不同的是分流是让请求分发的其他服务器,而限流是达到某个阈值后直接不让你访问了
如果想完成限流的功能其实是有一些解决方案的(算法),比如来说,基于令牌桶的,程序计数器以及漏桶算法;
今天挑一个最简单的计数器方式来讲讲限流
3. 解决方案
计数器的解决方式是最简单最容易实现的一种解决方案,假设有一个接口,要求 1 分钟的访问量不能超过 10 次
这样当有任何请求过来,我可以让计数器+1;如果这个计数器的值大于 10,而且和第一次的请求相比,时间间隔在 1 分钟以内,那么久能说明该请求访问过多。
如果这个请求与第一次请求的访问时间之间的间隔超过了 1 分钟,那么该计数器的值久还是限流范围之内,接下来久只要重置计数器就好;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoyCountLimit {
private int limtCount = 60;// 限制最大访问的容量
AtomicInteger atomicInteger = new AtomicInteger(0); // 每秒钟 实际请求的数量
private long start = System.currentTimeMillis();// 获取当前系统时间
private int interval = 60*1000;// 间隔时间 60 秒
public boolean acquire() {
long newTime = System.currentTimeMillis();
if (newTime > (start + interval)) {
// 判断是否是一个周期
start = newTime;
atomicInteger.set(0); // 清理为 0
return true;
}
atomicInteger.incrementAndGet();// i++;
return atomicInteger.get() <= limtCount;
}
static EnjoyCountLimit limitService = new EnjoyCountLimit();
public static void main(String[] args) {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i < 100; i++) {
final int tempI = i;
newCachedThreadPool.execute(new Runnable() {
public void run() {
if (limitService.acquire()) {
System.out.println("你没有被限流,可以正常访问逻辑 i:" + tempI);
} else {
System.out.println("你已经被限流呢 i:" + tempI);
}
}
});
}
}
}
这个计数器的限流方式很简单吧,但这样问题吗?好好想想……
还是以 60 运行访问 10 次请求为例,在第一次 0-58 秒之内,没有访问请求,在 59 秒之内突然来了 10 次请求,这个时候会做什么,由于已经到了 1 分钟计数器会重置。
这个时候第二次的 1 秒内(1 分 0 秒)又了 10 请求,这个时候是不是就在 2 秒之内有 20 个请求被放行了呢?(59 秒,1 分 0 秒),如果某个服务器的访问量只能是 10 次请求,那这种限流方式已经导致服务器挂了;
4. 滑动窗口计数器
前面已经知道简单的计数器的实现方式,也知道他会出现的一些问题,虽然这些问题举得有些极端,但还是有更好得解决方案,这方案就是使用滑动窗口计数器
滑动窗口计数器得原理是在没错请求过来得时候,先判断前面 N 个单位内得总访问量是否操过得阈值,并且在当前得时间单位得请求数上+1
举例来说,要求 1 分钟的访问量不能超过 10 次
可以把 1 分钟看成是 6 个 10 秒钟的时间,0-9 秒的访问数记录到第一个格子,10-19 秒的访问数记录数记录到第二个格子以此内推,每次统计将 6 个格子里面的数据求和,如果超过了 10 次就不允许访问。
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoySlidingWindow {
private AtomicInteger[] timeSlices;
/* 队列的总长度 */
private final int timeSliceSize;
/* 每个时间片的时长 */
private final long timeMillisPerSlice;
/* 窗口长度 */
private final int windowSize;
/* 当前所使用的时间片位置 */
private AtomicInteger cursor = new AtomicIn
teger(0);
public static enum Time {
MILLISECONDS(1),
SECONDS(1000),
MINUTES(SECONDS.getMillis() * 60),
HOURS(MINUTES.getMillis() * 60),
DAYS(HOURS.getMillis() * 24),
WEEKS(DAYS.getMillis() * 7);
private long millis;
Time(long millis) {
this.millis = millis;
}
public long getMillis() {
return millis;
}
}
public EnjoySlidingWindow(int windowSize, Time timeSlice) {
this.timeMillisPerSlice = timeSlice.millis;
this.windowSize = windowSize;
// 保证存储在至少两个 window
this.timeSliceSize = windowSize * 2 + 1;
init();
}
/**
初始化
*/
private void init() {
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
评论