k8s 上运行我们的 springboot 服务之——热点数据
综合描述
在我们微服务系统中总会用到redis,我们把某些在一个时间段内不断重复访问的数据叫着热点数据。例如:某个明星出轨的新闻,某个爆款的商品等。
这部分数据我个人觉得大概有以下几个特点:
1、在一段时间内不会变化
2、在某个时间段会有大量的用户去不断的查看
3、我们知道用户查看数据是我们系统的哪个服务接口,但是不能随时动态的确定是接口里面哪条数据
基于如上三个特点,我们会把这些接口的数据到放到redis中进行存储,用来降低mysql数据库的负载(在我看来系统的性能瓶颈总是在关系型数据库)。
问题
当我们redis达到了负载极限怎么办?也就是热点数据的访问量很大的时候,我们的单台redis扛不住了。
有的小伙伴会说redis可以上集群啊,不就解决了吗?
其实不然,我简单的说一下我的理解,我们redis做集群,master来负责数据的读写,由于redis hash槽的的设计,我们同一个key值的数据会放到一个固定的master节点。也就是在一台redis master节点上了,其他master节点没有存这个key的值。如果每个master做了主从,该master对应的从节点有这个key的数据,但是从节点做备份不做读写(遇到读写命中会转发给master)。如果我们做主从读写分离,它核心还是只有一台从节点在读还是存在单台扛不住的问题。
我们如何知道哪条数据是热点数据
网上的解决方案
读写分离
在操作读请求时,可以先加上readonly命令,这样从redis就可以提供读请求服务了,不需要转发到主redis
即达到了多台从redis去扛大量请求了,减少了主redis压力。这个方案需要对客户端进行改造,而且redis官方推荐没有必要使用读写分离
在系统和redis之间做一层代理
我的理解是:在存redis的时候我们改造key值(比如加入一些参数),使得数据在多个master都有备份。在读的时候根据我们在存的时候保存的规则,通过某种机制去不同的redis master获得数据
如何均匀的或者保证我们同一个key的数据更好的同步到不同master是一个问题点
加入一级缓存
当判断某个key是热点数据时,我们在每个微服务直接把数据存到缓存,不再到redis获得数据,由于我们服务本身就是集群,每台对外提供服务的节点都有一份热点数据,从而通过多台服务器去抗热点数据
什么时候把数据保存到缓存是一个问题点
通过redis4+版本获得热点key
在redis4+版本中,redis提供了–hotkeys命令获得热点数据
我们在业务系统(springboot架构)中如何使用–hotkeys获得热点数据,获得了热点数据业务系统该如何做是一个问题点
通过计数来确定热点key
在redis 读之前 做一个计数来找到热点key
计数(springboot架构),计数完了之后该如何做是一个问题点
等等等等。。。
很少有现成的方案或者落地代码供我们落地实现
我的实现
说明
1、在设计系统的时候我们大概能确定是哪个服务会导致热点数据,这个我们在开发的时候就要标记出来接口是哪个(用注解的方式),也可以动态加拦截器去统计,但是由于我们系统的接口太多,热点接口我觉得不会有太多,每个接口都去统计浪费资源
2、被标记的接口我不管你具体获得数据是通过mysql,redis,还是有混合都有。在没有达到我们预设的阀值我保存该接口的请求参数和返回值到redis并对该请求计数
3、当没有达到阀值时,我们的请求数据还是会走到我们的控制器并返回,同时会保存到redis
4、阀值判断,当我们的请求在一段时间内,请求总数没有达到计数里面的阀值,我们会继续发请求到控制器并返回数据。当达到阀值我们把数据保存的缓存,后面的请求来我们先判断缓存是否有数据,如果有直接返回,没有就把请求发到控制器并保存到缓存
5、当计数达到阀值后,由于我们的请求直接通过缓存获得数据,我们的阀值计数将不再进行。我们系统的这个计数redis和缓存的数据都有失效性,所以我们在配置的时候需要进行合理的配置,如果失效了,如果还是热点数据,无非就是把我们前面的事情再做了一遍(系统在短时间内可能有一个小的波动)
具体实现
技术罗列
拦截器 aop redisson 自定义注解等
代码片段
自定义注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface HotRequest {
boolean required() default true;
/**
* 默认频率10000次/s
*
* @throws
* @return: int
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/18 16:37
*/
long threshold() default LockConstant.HOTREQUESTTHRESHOLD;
/**
* 默认60秒
*
* @throws
* @return: int
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/18 21:04
*/
int section() default LockConstant.HOTREQUESTSECTION;
/**
* 默认 有效时间1800s 30分钟
*
* @throws
* @return: long
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/19 18:14
*/
long expired() default LockConstant.HOTREQUESTEXPIRED;
}
aop
@Aspect
@Component
public class HotRequestAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestAspect.class);
@Autowired
HotRequestService hotRequestService;
@Value("${frame.redis.hotRequest.support:false}")
private boolean hotRequestSupport;
@Autowired
CaffeineService caffeineService;
@Value("${spring.application.name:lvmoney}")
private String serverName;
@Autowired
AbstractHandlerMethodMapping abstractHandlerMethodMapping;
@Autowired
DistributedLockerService distributedLockerService;
/***
* 定义controller切入点拦截规则,拦截SystemControllerLog注解的方法
*/
@Pointcut("@annotation(com.zhy.frame.cache.lock.annotion.HotRequest)")
private void controllerAspect() {
}
/**
* @describe: 获得请求的参数和返回结果
* 如果没有token,一般在登录的时候没有token的,那么需要记录参数中username的值
* @param: [joinPoint]
* @return: ResultData
@author: lvmoney /四川*****科技有限公司
* 2019/2/1
*/
@Around("controllerAspect()")
public Object recordHotRequest(ProceedingJoinPoint joinPoint) throws Throwable {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
Object object = abstractHandlerMethodMapping.getHandler(request).getHandler();
HandlerMethod handlerMethod = (HandlerMethod) object;
Method method = handlerMethod.getMethod();
long threshold = LockConstant.HOTREQUESTTHRESHOLD;
int section = LockConstant.HOTREQUESTSECTION;
long expired = LockConstant.HOTREQUESTEXPIRED;
//返回值
Object rep = joinPoint.proceed();
if (method.isAnnotationPresent(HotRequest.class)) {
HotRequest hotRequest = method.getAnnotation(HotRequest.class);
if (!hotRequest.required()) {
return rep;
}
threshold = hotRequest.threshold();
section = hotRequest.section();
expired = hotRequest.expired();
}
if (!SupportUtil.support(hotRequestSupport)) {
throw new BusinessException(CacheException.Proxy.REDISHOTREQUESTSUPPORTERROR);
} else if (BaseConstant.SUPPORTFALSEBOOL == hotRequestSupport) {
return rep;
}
String servletPath = request.getServletPath();
//请求值
Object req = joinPoint.getArgs();
HotRequestRo hotRequestRo = hotRequestService.getHotRequestRo(servletPath, ParamUtil.buildRequestMap(req));
if (ObjectUtils.isEmpty(hotRequestRo)) {
hotRequestRo = new HotRequestRo();
hotRequestRo.setCounter(1L);
hotRequestRo.setStart(System.currentTimeMillis());
hotRequestRo.setUrl(servletPath);
hotRequestRo.setQ(req);
hotRequestRo.setR(rep);
hotRequestRo.setExpired(expired);
hotRequestService.save(hotRequestRo);
} else {
hotRequestRo.setQ(req);
hotRequestRo.setR(rep);
hotRequestRo.setExpired(expired);
hotRequestService.update(hotRequestRo, threshold, section);
}
return rep;
}
}
拦截器
```java
public class HotRequestInterceptor extends HandlerInterceptorAdapter {
/**
* 服务名
*/
@Value("${spring.application.name:lvmoney}")
private String serverName;
private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestInterceptor.class);
@Value("${frame.redis.hotRequest.support:false}")
private boolean hotRequestSupport;
@Autowired
HotRequestService hotRequestService;
@Autowired
CaffeineService caffeineService;
@Autowired
DistributedLockerService distributedLockerService;
/**
* 获得热点服务的数据
* 1、如果能够通过 caffeine获得数据直接返回数据不再进行后续流程
* 2、同一请求(url和参数一样的请求)计数器的变更加了分布式锁,这里会当caffeine里面没有数据同时马上就达到阀值的某些请求会到控制器获得数据
* 3、2中的考虑是一个极端考虑,在高热点访问就算在caffeine没有数据某些请求发到控制器也是可以接受的。
*
* @param httpServletRequest:
* @param httpServletResponse:
* @param object:
* @throws
* @return: boolean
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/19 17:34
*/
@Override
public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
Object object) throws Exception {
if (!SupportUtil.support(hotRequestSupport)) {
throw new BusinessException(CacheException.Proxy.REDISHOTREQUESTSUPPORTERROR);
} else if (BaseConstant.SUPPORTFALSEBOOL == hotRequestSupport) {
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
HandlerMethod handlerMethod;
try {
handlerMethod = (HandlerMethod) object;
} catch (Exception e) {
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
Method method = handlerMethod.getMethod();
if (method.isAnnotationPresent(HotRequest.class)) {
HotRequest hotRequest = method.getAnnotation(HotRequest.class);
if (!hotRequest.required()) {
return super.preHandle(httpServletRequest, httpServletResponse, object);
} else {
if (!hotRequest.required()) {
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String servletPath = request.getServletPath();
Object obj = caffeineService.get(LockConstant.HOTREQUESTCAFFEINECACHENAME, RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + servletPath + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(getReqVo(request)));
if (obj != null) {
HttpServletResponse httpResponse = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
httpResponse.setContentType("application/json;charset=utf-8");
httpResponse.setHeader("Access-Control-Allow-Credentials", "true");
String json = JsonUtil.t2JsonString(obj);
try {
httpResponse.getWriter().print(json);
return false;
} catch (IOException e) {
LOGGER.error("其他错误处理response返回处理报错:{}", e.getMessage());
}
} else {
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
}
} else {
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
return super.preHandle(httpServletRequest, httpServletResponse, object);
}
@Override
public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o,
ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse,
Object o, Exception e) throws Exception {
}
/**
* 获得请求参数k-v
*
* @param request:
* @throws
* @return: java.util.Map
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/19 17:32
*/
private Map getReqVo(HttpServletRequest request) {
Map reqVo = new HashMap(BaseConstant.MAPDEFAULTSIZE);
Enumeration<String> enumeration = request.getParameterNames();
while (enumeration.hasMoreElements()) {
String value = enumeration.nextElement();
String v = request.getParameter(value);
if (!LockConstant.JSONEMPTYVALUE.equals(v)) {
reqVo.put(value, request.getParameter(value));
}
}
return reqVo;
}
/**
* 获得每个请求统一分的布式锁key
*
* @param request:
* @throws
* @return: java.lang.String
* @author: lvmoney /XXXXXX科技有限公司
* @date: 2020/5/19 17:32
*/
private String getLockKey(HttpServletRequest request) {
String servletPath = request.getServletPath();
return RedisConstant.HOTREQUESTINTERCEPTORLOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + servletPath + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(getReqVo(request));
}
}
redis 实体
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HotRequestRo<Q, R> implements Serializable {
private static final long serialVersionUID = 5284336712963666131L;
/**
* 请求地址
*/
private String url;
/**
* 开始时间
*/
private long start;
/**
* 访问计数
*/
private long counter;
/**
* 返回数据
*/
private R r;
/**
* 请求参数
*/
private Q q;
/**
* 失效时间
*/
private Long expired;
}
操作接口
@Service
public class HotRequestServiceImpl implements HotRequestService {
private static final Logger LOGGER = LoggerFactory.getLogger(HotRequestServiceImpl.class);
@Autowired
BaseRedisService baseRedisService;
@Autowired
DistributedLockerService distributedLockerService;
/**
* 服务名
*/
@Value("${spring.application.name:lvmoney}")
private String serverName;
/**
* 秒和毫秒的转化
*/
private static final Integer SEC_CONVERT = 1000;
@Autowired
CaffeineService caffeineService;
@Override
public void save(HotRequestRo hotRequestRo) {
Map<String, String> reqVo = ParamUtil.buildRequestMap(hotRequestRo.getQ());
baseRedisService.setString(RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl() + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo), hotRequestRo, hotRequestRo.getExpired());
}
@Override
public HotRequestRo getHotRequestRo(String url, Map<String, String> reqVo) {
try {
Object obj = baseRedisService.getByKey(RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + url + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo));
HotRequestRo hotRequestRo = JSON.parseObject(obj.toString(), new TypeReference<HotRequestRo>() {
});
return hotRequestRo;
} catch (Exception e) {
return null;
}
}
@Override
public void update(HotRequestRo hotRequestRo, long threshold, int section) {
distributedLockerService.lock(RedisConstant.HOTREQUESTUPDATELOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl(), TimeUnit.SECONDS, LockConstant.LOCK_TIME);
long counter = hotRequestRo.getCounter();
long now = System.currentTimeMillis();
int seq = (int) ((now - hotRequestRo.getStart()) / SEC_CONVERT);
if (seq > section && counter < threshold) {
//如果间隔时间超过了检查区间且阀值没有达到,则重置数据
hotRequestRo.setCounter(1L);
hotRequestRo.setStart(now);
} else if (counter >= threshold) {
//如果达到阀值,保存数据到caffeine
Map<String, String> reqVo = ParamUtil.buildRequestMap(hotRequestRo.getQ());
caffeineService.save(LockConstant.HOTREQUESTCAFFEINECACHENAME, RedisConstant.HOTREQUESTPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl() + BaseConstant.CONNECTOR_UNDERLINE + ParamUtil.buildParam(reqVo), hotRequestRo.getR());
} else {
//其他情况增加次数
hotRequestRo.setCounter(counter + 1);
}
save(hotRequestRo);
distributedLockerService.unlock(RedisConstant.HOTREQUESTUPDATELOCKPREFIX + BaseConstant.CONNECTORUNDERLINE + serverName + BaseConstant.CONNECTORUNDERLINE + hotRequestRo.getUrl());
}
}
测试用例
这里的入参一般只有一个实体(强制)
@HotRequest(required = true, threshold = 5, section = 1800)
@PostMapping(value = “frame/cache/get”)
public ApiResult get(HotRequestVo hotRequestVo) {
System.out.println(“controller”);
return ApiResult.success(baseRedisService.getByKey(“cacheDemo”));
}
使用配置
application.yml
frame:
redis:
hotRequest:
support: true
测试结果
通过修改下面的配置true or false来就行并发验证
frame:
redis:
hotRequest:
support: true
注意我这里的控制器获得数据就是直接获得redis数据(模拟redis的热点key),当其为true(支持我们上面的改造)时其并发性能大大优于其为false(单台热点数据)时。
版权声明: 本文为 InfoQ 作者【柠檬】的原创文章。
原文链接:【http://xie.infoq.cn/article/ea9993ef203eb41ff338cf6d2】。
本文遵守【CC BY-SA】协议,转载请保留原文出处及本版权声明。
评论