k8s 上运行我们的 springboot 服务之——热点数据

用户头像
柠檬
关注
发布于: 2020 年 05 月 21 日



综合描述

在我们微服务系统中总会用到redis,我们把某些在一个时间段内不断重复访问的数据叫着热点数据。例如:某个明星出轨的新闻,某个爆款的商品等。

这部分数据我个人觉得大概有以下几个特点:

1、在一段时间内不会变化

2、在某个时间段会有大量的用户去不断的查看

3、我们知道用户查看数据是我们系统的哪个服务接口,但是不能随时动态的确定是接口里面哪条数据



基于如上三个特点,我们会把这些接口的数据到放到redis中进行存储,用来降低mysql数据库的负载(在我看来系统的性能瓶颈总是在关系型数据库)。



问题

当我们redis达到了负载极限怎么办?也就是热点数据的访问量很大的时候,我们的单台redis扛不住了。

有的小伙伴会说redis可以上集群啊,不就解决了吗?

其实不然,我简单的说一下我的理解,我们redis做集群,master来负责数据的读写,由于redis hash槽的的设计,我们同一个key值的数据会放到一个固定的master节点。也就是在一台redis master节点上了,其他master节点没有存这个key的值。如果每个master做了主从,该master对应的从节点有这个key的数据,但是从节点做备份不做读写(遇到读写命中会转发给master)。如果我们做主从读写分离,它核心还是只有一台从节点在读还是存在单台扛不住的问题。



我们如何知道哪条数据是热点数据



网上的解决方案

读写分离

在操作读请求时,可以先加上readonly命令,这样从redis就可以提供读请求服务了,不需要转发到主redis

即达到了多台从redis去扛大量请求了,减少了主redis压力。这个方案需要对客户端进行改造,而且redis官方推荐没有必要使用读写分离



在系统和redis之间做一层代理

我的理解是:在存redis的时候我们改造key值(比如加入一些参数),使得数据在多个master都有备份。在读的时候根据我们在存的时候保存的规则,通过某种机制去不同的redis master获得数据



如何均匀的或者保证我们同一个key的数据更好的同步到不同master是一个问题点



加入一级缓存

当判断某个key是热点数据时,我们在每个微服务直接把数据存到缓存,不再到redis获得数据,由于我们服务本身就是集群,每台对外提供服务的节点都有一份热点数据,从而通过多台服务器去抗热点数据



什么时候把数据保存到缓存是一个问题点



通过redis4+版本获得热点key

在redis4+版本中,redis提供了–hotkeys命令获得热点数据



我们在业务系统(springboot架构)中如何使用–hotkeys获得热点数据,获得了热点数据业务系统该如何做是一个问题点



通过计数来确定热点key

在redis 读之前 做一个计数来找到热点key



计数(springboot架构),计数完了之后该如何做是一个问题点



等等等等。。。

很少有现成的方案或者落地代码供我们落地实现



我的实现

说明

1、在设计系统的时候我们大概能确定是哪个服务会导致热点数据,这个我们在开发的时候就要标记出来接口是哪个(用注解的方式),也可以动态加拦截器去统计,但是由于我们系统的接口太多,热点接口我觉得不会有太多,每个接口都去统计浪费资源



2、被标记的接口我不管你具体获得数据是通过mysql,redis,还是有混合都有。在没有达到我们预设的阀值我保存该接口的请求参数和返回值到redis并对该请求计数



3、当没有达到阀值时,我们的请求数据还是会走到我们的控制器并返回,同时会保存到redis



4、阀值判断,当我们的请求在一段时间内,请求总数没有达到计数里面的阀值,我们会继续发请求到控制器并返回数据。当达到阀值我们把数据保存的缓存,后面的请求来我们先判断缓存是否有数据,如果有直接返回,没有就把请求发到控制器并保存到缓存



5、当计数达到阀值后,由于我们的请求直接通过缓存获得数据,我们的阀值计数将不再进行。我们系统的这个计数redis和缓存的数据都有失效性,所以我们在配置的时候需要进行合理的配置,如果失效了,如果还是热点数据,无非就是把我们前面的事情再做了一遍(系统在短时间内可能有一个小的波动)



具体实现

技术罗列

拦截器 aop redisson 自定义注解等



代码片段

自定义注解



@Documented

@Retention(RetentionPolicy.RUNTIME)

@Target({ElementType.METHOD, ElementType.TYPE})

public @interface HotRequest {

boolean required() default true;



/**

* 默认频率10000次/s

*

* @throws

* @return: int

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/18 16:37

*/

long threshold() default LockConstant.HOTREQUESTTHRESHOLD;



/**

* 默认60秒

*

* @throws

* @return: int

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/18 21:04

*/

int section() default LockConstant.HOTREQUESTSECTION;



/**

* 默认 有效时间1800s 30分钟

*

* @throws

* @return: long

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/19 18:14

*/

long expired() default LockConstant.HOTREQUESTEXPIRED;

}



aop



@Aspect

@Component

public class HotRequestAspect {

private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestAspect.class);



@Autowired

HotRequestService hotRequestService;

@Value("${frame.redis.hotRequest.support:false}")

private boolean hotRequestSupport;

@Autowired

CaffeineService caffeineService;

@Value("${spring.application.name:lvmoney}")

private String serverName;

@Autowired

AbstractHandlerMethodMapping abstractHandlerMethodMapping;

@Autowired

DistributedLockerService distributedLockerService;



/***

* 定义controller切入点拦截规则,拦截SystemControllerLog注解的方法

*/

@Pointcut("@annotation(com.zhy.frame.cache.lock.annotion.HotRequest)")

private void controllerAspect() {

}



/**

* @describe: 获得请求的参数和返回结果

* 如果没有token,一般在登录的时候没有token的,那么需要记录参数中username的值

* @param: [joinPoint]

* @return: ResultData

@author: lvmoney /四川*****科技有限公司

* 2019/2/1

*/

@Around("controllerAspect()")

public Object recordHotRequest(ProceedingJoinPoint joinPoint) throws Throwable {

HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();

Object object = abstractHandlerMethodMapping.getHandler(request).getHandler();

HandlerMethod handlerMethod = (HandlerMethod) object;

Method method = handlerMethod.getMethod();

long threshold = LockConstant.HOTREQUESTTHRESHOLD;

int section = LockConstant.HOTREQUESTSECTION;

long expired = LockConstant.HOTREQUESTEXPIRED;

//返回值

Object rep = joinPoint.proceed();

if (method.isAnnotationPresent(HotRequest.class)) {

HotRequest hotRequest = method.getAnnotation(HotRequest.class);

if (!hotRequest.required()) {

return rep;

}

threshold = hotRequest.threshold();

section = hotRequest.section();

expired = hotRequest.expired();

}

if (!SupportUtil.support(hotRequestSupport)) {

throw new BusinessException(CacheException.Proxy.REDISHOTREQUESTSUPPORTERROR);

} else if (BaseConstant.SUPPORTFALSEBOOL == hotRequestSupport) {

return rep;

}

String servletPath = request.getServletPath();

//请求值

Object req = joinPoint.getArgs();

HotRequestRo hotRequestRo = hotRequestService.getHotRequestRo(servletPath, ParamUtil.buildRequestMap(req));

if (ObjectUtils.isEmpty(hotRequestRo)) {

hotRequestRo = new HotRequestRo();

hotRequestRo.setCounter(1L);

hotRequestRo.setStart(System.currentTimeMillis());

hotRequestRo.setUrl(servletPath);

hotRequestRo.setQ(req);

hotRequestRo.setR(rep);

hotRequestRo.setExpired(expired);

hotRequestService.save(hotRequestRo);

} else {

hotRequestRo.setQ(req);

hotRequestRo.setR(rep);

hotRequestRo.setExpired(expired);

hotRequestService.update(hotRequestRo, threshold, section);

}

return rep;

}

}



拦截器



```java

public class HotRequestInterceptor extends HandlerInterceptorAdapter {

/**

* 服务名

*/

@Value("${spring.application.name:lvmoney}")

private String serverName;

private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestInterceptor.class);



@Value("${frame.redis.hotRequest.support:false}")

private boolean hotRequestSupport;

@Autowired

HotRequestService hotRequestService;



@Autowired

CaffeineService caffeineService;



@Autowired

DistributedLockerService distributedLockerService;



/**

* 获得热点服务的数据

* 1、如果能够通过 caffeine获得数据直接返回数据不再进行后续流程

* 2、同一请求(url和参数一样的请求)计数器的变更加了分布式锁,这里会当caffeine里面没有数据同时马上就达到阀值的某些请求会到控制器获得数据

* 3、2中的考虑是一个极端考虑,在高热点访问就算在caffeine没有数据某些请求发到控制器也是可以接受的。

*

* @param httpServletRequest:

* @param httpServletResponse:

* @param object:

* @throws

* @return: boolean

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/19 17:34

*/

@Override

public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,

Object object) throws Exception {

if (!SupportUtil.support(hotRequestSupport)) {

throw new BusinessException(CacheException.Proxy.REDISHOTREQUESTSUPPORTERROR);

} else if (BaseConstant.SUPPORTFALSEBOOL == hotRequestSupport) {

return super.preHandle(httpServletRequest, httpServletResponse, object);

}

HandlerMethod handlerMethod;

try {

handlerMethod = (HandlerMethod) object;

} catch (Exception e) {

return super.preHandle(httpServletRequest, httpServletResponse, object);

}

Method method = handlerMethod.getMethod();

if (method.isAnnotationPresent(HotRequest.class)) {

HotRequest hotRequest = method.getAnnotation(HotRequest.class);

if (!hotRequest.required()) {

return super.preHandle(httpServletRequest, httpServletResponse, object);

} else {

if (!hotRequest.required()) {

return super.preHandle(httpServletRequest, httpServletResponse, object);

}

HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();

String servletPath = request.getServletPath();

Object obj = caffeineService.get(LockConstant.HOTREQUESTCAFFEINECACHENAME, RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + servletPath + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(getReqVo(request)));

if (obj != null) {

HttpServletResponse httpResponse = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();

httpResponse.setContentType("application/json;charset=utf-8");

httpResponse.setHeader("Access-Control-Allow-Credentials", "true");

String json = JsonUtil.t2JsonString(obj);

try {

httpResponse.getWriter().print(json);

return false;

} catch (IOException e) {

LOGGER.error("其他错误处理response返回处理报错:{}", e.getMessage());

}

} else {

return super.preHandle(httpServletRequest, httpServletResponse, object);

}



}

} else {

return super.preHandle(httpServletRequest, httpServletResponse, object);

}

return super.preHandle(httpServletRequest, httpServletResponse, object);

}



@Override

public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o,

ModelAndView modelAndView) throws Exception {

}



@Override

public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,

Object o, Exception e) throws Exception {

}



/**

* 获得请求参数k-v

*

* @param request:

* @throws

* @return: java.util.Map

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/19 17:32

*/

private Map getReqVo(HttpServletRequest request) {

Map reqVo = new HashMap(BaseConstant.MAPDEFAULTSIZE);

Enumeration<String> enumeration = request.getParameterNames();

while (enumeration.hasMoreElements()) {

String value = enumeration.nextElement();

String v = request.getParameter(value);

if (!LockConstant.JSONEMPTYVALUE.equals(v)) {

reqVo.put(value, request.getParameter(value));

}

}

return reqVo;

}



/**

* 获得每个请求统一分的布式锁key

*

* @param request:

* @throws

* @return: java.lang.String

* @author: lvmoney /XXXXXX科技有限公司

* @date: 2020/5/19 17:32

*/

private String getLockKey(HttpServletRequest request) {

String servletPath = request.getServletPath();

return RedisConstant.HOTREQUESTINTERCEPTORLOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + servletPath + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(getReqVo(request));

}



}



redis 实体



@Data

@AllArgsConstructor

@NoArgsConstructor

public class HotRequestRo<Q, R> implements Serializable {

private static final long serialVersionUID = 5284336712963666131L;

/**

* 请求地址

*/

private String url;

/**

* 开始时间

*/

private long start;

/**

* 访问计数

*/

private long counter;



/**

* 返回数据

*/

private R r;



/**

* 请求参数

*/

private Q q;

/**

* 失效时间

*/

private Long expired;

}



操作接口



@Service

public class HotRequestServiceImpl implements HotRequestService {

private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestServiceImpl.class);



@Autowired

BaseRedisService baseRedisService;



@Autowired

DistributedLockerService distributedLockerService;

/**

* 服务名

*/

@Value("${spring.application.name:lvmoney}")

private String serverName;

/**

* 秒和毫秒的转化

*/

private static final Integer SEC_CONVERT = 1000;

@Autowired

CaffeineService caffeineService;



@Override

public void save(HotRequestRo hotRequestRo) {

Map<String, String> reqVo = ParamUtil.buildRequestMap(hotRequestRo.getQ());

baseRedisService.setString(RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl() + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo), hotRequestRo, hotRequestRo.getExpired());

}



@Override

public HotRequestRo getHotRequestRo(String url, Map<String, String> reqVo) {

try {

Object obj = baseRedisService.getByKey(RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + url + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo));

HotRequestRo hotRequestRo = JSON.parseObject(obj.toString(), new TypeReference<HotRequestRo>() {

});

return hotRequestRo;

} catch (Exception e) {

return null;

}



}



@Override

public void update(HotRequestRo hotRequestRo, long threshold, int section) {

distributedLockerService.lock(RedisConstant.HOTREQUESTUPDATELOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl(), TimeUnit.SECONDS, LockConstant.LOCK_TIME);

long counter = hotRequestRo.getCounter();

long now = System.currentTimeMillis();

int seq = (int) ((now - hotRequestRo.getStart()) / SEC_CONVERT);

if (seq > section && counter < threshold) {

//如果间隔时间超过了检查区间且阀值没有达到,则重置数据

hotRequestRo.setCounter(1L);

hotRequestRo.setStart(now);

} else if (counter >= threshold) {

//如果达到阀值,保存数据到caffeine

Map<String, String> reqVo = ParamUtil.buildRequestMap(hotRequestRo.getQ());

caffeineService.save(LockConstant.HOTREQUESTCAFFEINECACHENAME, RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl() + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo), hotRequestRo.getR());

} else {

//其他情况增加次数

hotRequestRo.setCounter(counter + 1);

}

save(hotRequestRo);

distributedLockerService.unlock(RedisConstant.HOTREQUESTUPDATELOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl());

}



}



测试用例



这里的入参一般只有一个实体(强制)



@HotRequest(required = true, threshold = 5, section = 1800)

@PostMapping(value = “frame/cache/get”)

public ApiResult get(HotRequestVo hotRequestVo) {

System.out.println(“controller”);

return ApiResult.success(baseRedisService.getByKey(“cacheDemo”));

}



使用配置



application.yml



frame:

redis:

hotRequest:

support: true



测试结果

通过修改下面的配置true or false来就行并发验证



frame:

redis:

hotRequest:

support: true



注意我这里的控制器获得数据就是直接获得redis数据(模拟redis的热点key),当其为true(支持我们上面的改造)时其并发性能大大优于其为false(单台热点数据)时。



发布于: 2020 年 05 月 21 日 阅读数: 60
用户头像

柠檬

关注

人生尚未成功,朋友仍需努力 2020.05.21 加入

长期从事微服务,中台等后台开发和架构设计。一些见解和实现可查看https://gitee.com/lvmoney/zhy-frame-parent

评论

发布
暂无评论
k8s上运行我们的springboot服务之——热点数据