写点什么

仿照 Hystrix,手写一个限流组件

用户头像
码农参上
关注
发布于: 1 小时前

这周工作的时候,碰见了这样一个问题,在我们的业务系统中,当用户访问自己的订单列表时,如果订单已经被添加了物流单号,但是后台还没有刷新到它的物流状态时,会去调用第三方物流的接口来刷新自己的物流状态。在这个过程中发现,一旦访问频率太过频繁的话就会被第三方限制,在一段时间内所有再发过去的请求都会被 ban 掉。


当一旦出现限流的情况,那么所有用户的物流状态都将无法被查询及刷新,将会给用户带来很不好的用户体验。所以我们的业务系统就需要实现这样的功能:


  • 用户第一次访问自己的订单列表时,直接调用第三方物流接口获取一次状态

  • 在接下来的一段时间,在访问订单列表时,不调用第三方接口刷新状态。做出此判断的依据是,对于用户来说,订单列表的访问功能是必须的,但是物流状态可能并非是刚需,因此此段时间绕过调用第三方接口

  • 当用户在一段时间内,访问频率到达一定量时,例如在 60 秒内访问了 5 次,那么判断用户获取物流状态的需求非常急迫,放行一次调用第三方接口,之后再次恢复之前的规则


对以上需求进行了一下评估后,发现无论是HystrixSentinel的限流规则,还是网关的漏桶和令牌桶,对我们来说都不是很适用,因此决定自己写一个组件,来实现这个限流规则。我们知道,Hystrix 是基于时间窗口内的失败统计,以及线程池或信号量隔离实现的快速失败机制,那么我们就仿照这个模式来实现自己的限流功能。


先从最基础的功能部分开始实现,实现一个滑动时间窗口,来统计一段时间内接口的调用次数:


@Slf4jpublic class MethodAccessWindow {    @AllArgsConstructor    @Data    class Node{        long time;    }
Queue<Node> queue; ScheduledExecutorService scheduledExecutorService;
private int windowTime; private int size;
public MethodAccessWindow(int windowTime, int size){ queue=new ArrayBlockingQueue<>(size); this.windowTime = windowTime; this.size=size; init(); }
private void init(){ System.out.println("初始化定时任务"); scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleWithFixedDelay(()->{ clean(); },windowTime*1000,1000, TimeUnit.MILLISECONDS); }
public boolean canReceive(){ if(queue.size()>=size){ return false; }else { queue.add(new Node(System.currentTimeMillis())); return true; } }
public void clean(){ for (Node node:queue){ if (System.currentTimeMillis()-node.getTime()> (windowTime *1000)){ queue.poll(); } } }
public void reset(){ queue.clear(); }
public void destroy(){ log.info("destroy"); try { scheduledExecutorService.shutdown(); if (!scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS)) { scheduledExecutorService.shutdownNow(); } } catch (Exception e) { e.printStackTrace(); System.out.println("awaitTermination interrupted: " + e); scheduledExecutorService.shutdownNow(); } log.info("end destroy"); }}
复制代码


在上面的类中:


  • windowTime表示滑动窗口的时间长度,size表示能够接受的任务数,使用队列来接收任务,队列长度为size

  • 在构造函数中启动线程池执行一个定时任务,会遍历队列中的节点,当节点的存活时间大于窗口时间时,删除过期节点。如果对实时要求比较高,可以修改定时任务的执行间隔

  • canReceive方法用于接收任务,当队列长度已满时,返回 false

  • reset方法用于清空队列,即重置窗口值


进行测试,设置为 20 秒的时间窗口可以接受 5 次请求:


public class Test {    public static void main(String[] args) throws InterruptedException {        MethodAccessWindow methodAccessWindow =new MethodAccessWindow(20,5);        for (int i = 0; i < 30; i++) {            System.out.println(i+"  "+ methodAccessWindow.canReceive());            TimeUnit.SECONDS.sleep(1);        }        methodAccessWindow.destroy();    }}
复制代码


在队列满后拒绝请求:



当到达第 20 秒时,前 19 个时间窗口内只有 4 个任务存在,因此可以接受任务,之后 4 秒同理:



完成了时间窗口,我们要把它应用在需要被限流的方法上,因此仿照Hystrix的格式,定义一个注解:


@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface NeedPassLimit {    int timeWindow() default 100;    int frequency() default 10;    String fallBackMethod() default "";}
复制代码


按照之前的规则,在timeWindow的时间窗口,当访问次数超过frequency时,进行一次放行,否则执行fallBackMethod指定的降级方法。将注解写在Service的方法上:


@Service@Slf4jpublic class MyService {
@NeedPassLimit(timeWindow = 20, frequency = 5, fallBackMethod = "fallback") public String getInfo(Long userId){ log.info("放行:"+userId.toString()); return "success"; }
public String fallback(Long userId){ log.info("拦截:"+userId.toString()); return "restriction"; }}
复制代码


这里在方法中,传入了一个userId字段,用以表明是哪个用户调用的方法。这是因为,按照仓壁模式,需要对用户以及访问的方法创建滑动窗口的隔离,这里简单使用类名加方法名加userId的方式,来区分不用的滑动窗口实例。给MethodAccessWindow添加一个字段windowKey,并修改构造方法:


private String windowKey;public MethodAccessWindow(int windowTime, int size, String windowKey){    queue=new ArrayBlockingQueue<>(size);    this.windowTime = windowTime;    this.size=size;    this.windowKey = windowKey;    init();}
复制代码


接下来,实现重要的切面方法:


@Component@Aspectpublic class MethodPassAspect {
//缓存各个用户的 Window @Getter private ConcurrentHashMap<String, MethodAccessWindow> passerMap =new ConcurrentHashMap<>();
@Pointcut("@annotation(com.cn.hydra.aspectdemo.rule.annotation.NeedPassLimit)") public void freshPointCut() { }
@Around("freshPointCut()") public Object doAround(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod();
if (method.isAnnotationPresent(NeedPassLimit.class)) { Object[] args = point.getArgs(); String[] parameterNames = signature.getParameterNames(); List<String> paramNameList = Arrays.asList(parameterNames);
String passerKey=null; if (paramNameList.contains("userId")) { passerKey=point.getTarget().getClass().getName() +"#"+method.getName()+"#"+args[paramNameList.indexOf("userId")]; }
NeedPassLimit annotation = method.getAnnotation(NeedPassLimit.class); int timeWindow = annotation.timeWindow(); int frequency = annotation.frequency(); String fallBackMethodName = annotation.fallBackMethod();
MethodAccessWindow methodAccessWindow; if (passerMap.keySet().contains(passerKey)) { methodAccessWindow = passerMap.get(passerKey); }else { //第一次,放过请求 methodAccessWindow= new MethodAccessWindow(timeWindow,frequency,passerKey); passerMap.put(passerKey,methodAccessWindow);
Object object = point.proceed(); return object; }
if (methodAccessWindow.canReceive()){ Object fallbackObject = invokeFallbackMethod(method, point.getTarget(), fallBackMethodName, args); return fallbackObject; }else{ Object object = point.proceed(); methodAccessWindow.reset(); return object; } } return null; }
private Object invokeFallbackMethod(Method method, Object bean, String fallbackMethodName, Object[] arguments) throws Exception { Class beanClass = bean.getClass(); Method fallbackMethod = beanClass.getMethod(fallbackMethodName, method.getParameterTypes()); Object fallbackObject = fallbackMethod.invoke(bean, arguments); return fallbackObject; }}
复制代码


在上面方法中:


  • passerMap 缓存了各个用户访问的接口的滑动窗口,用以实现仓壁模式

  • 当用户第一次访问时,执行原请求方法,执行后创建滑动窗口,放进passerMap 中缓存

  • 当用户之后访问时,调用canReceive方法,如果返回为 true,执行降级方法

  • canReceive返回为 false 时,执行原方法,并重置滑动窗口


在实现了上面的主要功能后,需要注意滑动窗口是一直存在的,为了保护系统资源,我们有必要销毁不需要的滑动窗口。主要需要实现将滑动窗口对象实例从切面的 passerMap 中移除,之后交给 jvm 垃圾回收器进行回收即可。


实现方式也很简单,当我们判断该滑动窗口已经很久没有使用时,发送一个自定义事件给我们自定义的 spring 事件监听器,由监听器负责移除该滑动窗口实例。先定义窗口关闭事件:


public class WindowCloseEvent extends ApplicationEvent {    @Getter    private String windowKey;
public WindowCloseEvent(Object source, String windowKey) { super(source); this.windowKey = windowKey; }}
复制代码


然后定义事件监听器,监听上面的WindowCloseEvent事件:


@Component@Slf4jpublic class WindowCloseEventListener implements ApplicationListener<WindowCloseEvent> {    @Autowired    MethodPassAspect methodPassAspect;
@Override public void onApplicationEvent(WindowCloseEvent windowCloseEvent) { log.info("close:"+windowCloseEvent.getWindowKey()); ConcurrentHashMap<String, MethodAccessWindow> passerMap = methodPassAspect.getPasserMap(); System.out.println(passerMap.toString()); passerMap.remove(windowCloseEvent.getWindowKey()); System.out.println(passerMap.toString()); }}
复制代码


再定义一个事件发布方法的EventPublisher,用来发送事件:


@Componentpublic class EventPublisher {    @Autowired    private ApplicationEventPublisher applicationEventPublisher;
public void publish(ApplicationEvent applicationEvent){ applicationEventPublisher.publishEvent(applicationEvent); }}
复制代码


那么,究竟该在什么时机去发送这个事件呢?我们可以在滑动窗口中记录一下最后使用时间,当超过约定的最大未使用时间时,将其从切面的passerMap中移除。在滑动窗口类中添加两个变量,修改构造方法,初始化这两个变量:


private long lastCallTime;//最后调用时间private long shutDownTime;//最长未调用时间
public MethodAccessWindow(int windowTime, int size, String windowKey){ queue=new ArrayBlockingQueue<>(size); this.windowTime = windowTime; this.size=size; this.windowKey = windowKey; lastCallTime=System.currentTimeMillis(); shutDownTime =windowTime*1000*3;//可自由进行长短额定义 init();}
复制代码


在每次调用方法时,先刷新lastCallTime


public boolean canReceive(){  this.lastCallTime=System.currentTimeMillis();  ......}
复制代码


回头看一下,滑动窗口实例对象在后台存在一个定时任务,用于清除超过时间窗口的任务,那么可以在这后面可以再添加一个任务,用于判断当前时间减去最后调用时间,是否超过定义个最长不使用时间。但是有一个问题,MethodAccessWindow并不是一个注册到 spring 环境的Bean,不能使用自动注入来注入EventPublisher对象,这里可以通过静态方法来获取 spring 容器,之后再使用容器的getBean方法拿到EventPublisher的对象。添加事件发布对象及其 set 方法:


private static EventPublisher eventPublisher;public static void setEventPubisher(ApplicationContext applicationContext ){    eventPublisher=applicationContext.getBean(EventPublisher.class);}
复制代码


spring容器完成初始化后,从启动类直接给滑动窗口注入:


@SpringBootApplicationpublic class AspectdemoApplication {    public static void main(String[] args) {        ApplicationContext applicationContext = SpringApplication.run(AspectdemoApplication.class, args);        MethodAccessWindow.setEventPubisher(applicationContext);    }}
复制代码


修改定时任务,在定时任务中发送关闭滑动窗口事件,并发送关闭线程池请求:


public void clean(){         for (Node node:queue){        if (System.currentTimeMillis()-node.getTime()> (windowTime *1000)){            queue.poll();        }    }
//超过时间不用则自动销毁 if (System.currentTimeMillis()-lastCallTime >= shutDownTime){ log.info("发送event"); WindowCloseEvent windowCloseEvent=new WindowCloseEvent(this, windowKey); eventPublisher.publish(windowCloseEvent); log.info("发送event end");
destroy(); }}
复制代码


调用Service接口进行测试:



在滑动窗口时间 20 秒,最大空闲时间设置为窗口事件 3 倍的情况下,在最后一次请求调用的后 1 分钟,发送了窗口关闭事件并被监听,从passerMap中移除,并且在之后销毁了线程池。这样,一个能够自定义规则的限流组件就完成了。


如果文章对您有所帮助,欢迎关注公众号 码农参上

加号主好友,来围观朋友圈啊~

发布于: 1 小时前阅读数: 2
用户头像

码农参上

关注

公众号:码农参上 2021.03.30 加入

还未添加个人简介

评论

发布
暂无评论
仿照Hystrix,手写一个限流组件