Dubbo 预热
#以下内容针对 JAVA 开发环境 #
背景
大家有没有经历过下面的两种场景:
场景一:种草买了最新款的手机,刚开始用的时候性能非常好,越用越卡,越用越卡,终于你忍受不了换了个新的,开始了无限循环·········
场景二:潘家园淘了串金刚,刚上手的时候麻麻扎扎的,于是盘呀盘,盘呀盘,终于盘成了温润如玉的样子!
咱们的服务就像一串没有盘过的金刚,刚启动的时候会因为各种各样的“毛刺”,表现得比较“扎手”,为什么会“扎手”呢,下面列举了其中几个主要“元凶”:
JAVA 类加载的过程,刚启动的时候不会加载所有的类,同时也就意味着符号引用并不会在一开始就完全替换为直接引用。
JAVA 的 Solgen 是“Write Once,Run Anywhere”,他不会像 C 一样,在开始时候就进行编译,而是翻译成机器认识的字节码,同时进行解释执行,对于热点代码,使用 JIT 进行本地编译。
服务依赖的一些缓存资源,在启动初期还未加载完全,DB 和 RPC 的 IO 耗时,都会影响接口本身的性能。
·········
这些“元凶”在高并发,部署上线,容器扩缩等场景会高频的出现,导致服务的 P99 有比较明显的波动。怎么处理这些问题呢?就进入到了今天的正题,「预热(WarmUp)」。
关于预热,下面是我们常用的处理方式:
构建热点缓存,将一些热点内容提前缓存在 jvm 中。
进行线上流量回放,提前按照真实请求分布,让 jvm 做好资源的加载和编译。
通过负载均衡的组件,对线上流量做整体规划。
热点探测
,流量回放
,负载均衡
每一块都有很多可聊的内容,我们也都进行了针对性的优化,由于篇幅有限,本次我们主要讲述通过第三种方式,对服务进行预热。
源码分析
#注意代码中的注释 #
补充一下背景:
Dubbo 的负载均衡一般在Consumer
侧指定,Provider
和Consumer
相关信息会保存在 Zookeeper 的节点上,比如在Consumer
端我们可以看到如下信息,:
#对哪些参数做 Hash 运算
hash.arguments=0
#虚拟节点个数
hash.nodes=320
#使用的负载均衡方式
loadbalance=consistenthash2
#接口超时时间
timeout=300
同样我们在Provider
侧的 Url 中也可以找到一些信息,比如:
#服务注册时间
timestamp=1640453274830
#服务预热时间
warmup=120000
#服务权重
weight=10
OK,背景知识储备完毕~下面带大家开始正式熟悉 Dubbo 的负载均衡策略,Dubbo 一共默认提供五种负载均衡策略:
Abstract LoadBalance - 负载均衡策略抽象类
Random LoadBalance - 加权随机策略
RoundRobin LoadBanlance - 加权轮询策略
LeastActive LoadBalance - 最少活跃调用数策略
ConsistentHash LoadBalance - 一致性 Hash 策略
ShortestResponse LoadBalance - 最短响应时间策略 这种方式在 2.5.x 和 2.6.x 默认是不支持的
五种负载均衡方式的类图,结构比较清晰,他们有一个共同的父类AbstractLoadBalance
,这个抽象类定义了负载均衡算法的主流程,同时提供了默认基于权重的预热方法,这是 Dubbo 自身默认预热功能的基础,我们来看一下 Dubbo 默认基于权重的预热功能是如何实现的。getWeight
方法,从 provider 的注册 URL 中获取权重
,注册时间
,预热时间
参数,通过calculateWarmupWeight
方法计算出当前时刻当前 invoker 的权重:
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
URL url = invoker.getUrl();
// 注册中心不需要预热
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
// 在注册url中获取配置的权重参数
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 在注册url中获取服务启动的时间
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 当前时间-启动时间=运行时间
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
// 在注册url中获取配置的预热时间
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
// 如果运行时间 < 配置的预热时间,计算当前invoker的权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}
复制代码
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 此位置用除法不太好理解,其实就是uptime/warmup*weight,也就是根据启动时间在配置的整个预热时间段的占比,获取权重
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
复制代码
RandomLoadBalance
权重随机策略,最常见的负载均衡策略,顾名思义,他就是根据上面提到的计算权重,在 invoker 集群间进行随机调用,但是由于随机的概率学特性,在 qps 较少的情况下,有可能出现流量倾斜。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 集群invoker个数
int length = invokers.size();
// 是否需要根据权重进行负载均衡,方法内部就是看是不是配置了权重和启动时间
if (!needWeightLoadBalance(invokers,invocation)){
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
boolean sameWeight = true;
// 存下每个invoker配置的权重
int[] weights = new int[length];
// 配置的权重总和
int totalWeight = 0;
for (int i = 0; i < length; i++) {
// AbstractLoadBalance中获取invoker权重的逻辑
int weight = getWeight(invokers.get(i), invocation);
// 求和
totalWeight += weight;
// 依次存入当前invoker的权重上限,为什么这么使用,看到下面就明白啦
weights[i] = totalWeight;
// 如果权重总和,不是invoker数量*weight,说明不是统一的权重
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
// 如果不是统一的权重,按照权重随机
if (totalWeight > 0 && !sameWeight) {
// 根据权重总和,获取一个随机数offset
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 遍历weights,看这个offset存在于哪个,就使用哪个invoker,比如三个invoker分别是10,20,30,weights=[10,30,60],随机了一个29,那么就选择第二个invoker
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// 如果是统一的权重,直接随机invoker数量
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
复制代码
RoundRobinLoadBalance
加权轮询策略,可以看做是对随机策略进行的优化,避免了流量倾斜的问题。同时RoundRobinLoadBalance
的计算逻辑也非常巧妙。直接看代码可能会一头雾水,先给大家举个代码执行的栗子,可以结合代码进行阅读,方便理解:
首先我们的服务有三个实例,初始权重为 A(1),B(2),C(3) ,下面的表格是每次调用时和调用后 invoker 的集群权重的变化过程,每次会选择权重最高的 invoker,同时对最高权重减去总权重,其余 invoker 增加自身配置的权重
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// methodWeightMap是一个method-invoker-WeightedRoundRobin的缓存map
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
// invoker总权重
int totalWeight = 0;
// 本次请求最大权重
long maxCurrent = Long.MIN_VALUE;
// 本次请求时间
long now = System.currentTimeMillis();
// 被选中的invoker
Invoker<T> selectedInvoker = null;
// 被选中invoker的WeightedRoundRobin缓存对象
WeightedRoundRobin selectedWRR = null;
// 遍历所有的invokers
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
// AbstractLoadBalance中获取invoker权重的逻辑
int weight = getWeight(invoker, invocation);
// 获取对应的WeightedRoundRobin缓存,如果没有,new
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
// 如果计算的权重和缓存的不一致,说明已经变化,更新缓存权重
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
// 每次请求,invoker自增自己的weight数
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 比较权重,选择invoker
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 对总权重进行求和
totalWeight += weight;
}
// 如果invokers有变化,删除超过60S未使用的服务提供者
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
// 如果选中了invoker,该invoker减去总权重
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
复制代码
LeastActiveLoadBalance
最少活跃调用数策略,在加权随机的基础上,增加正在处理请求数的判断,获取每个 invoker 的正在处理请求数,仅对最小请求数的一个或多个 invoker 进行RandomLoadBalance
策略。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 集群invoker个数
int length = invokers.size();
// 最少调用数
int leastActive = -1;
// 最少调用数的invoker数量
int leastCount = 0;
// 最少调用数的invoker数组
int[] leastIndexes = new int[length];
// 每个invoker的权重
int[] weights = new int[length];
// 权重总和,和之前不同的是,这里只求和最少调用数的invoker的权重
int totalWeight = 0;
// 最少调用数起始权重
int firstWeight = 0;
// 是否最少调用数的invoker都是一样的权重
boolean sameWeight = true;
// 遍历invokers,找出最少调用数的所有invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取方法的调用数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// AbstractLoadBalance中获取invoker权重的逻辑
int afterWarmup = getWeight(invoker, invocation);
// 保存权重
weights[i] = afterWarmup;
// 首个invoker或者小于之前的调用数
if (leastActive == -1 || active < leastActive) {
// 因为是首个,或者发现了更小调用数,下面都是重置类型的操作
// 重置最少调用数
leastActive = active;
// 重置最少调用数的invoker数量
leastCount = 1;
// 把当前invoker放入最少调用数数组第一位
leastIndexes[0] = i;
// 重置最少调用数求和权重
totalWeight = afterWarmup;
// 重置起始权重
firstWeight = afterWarmup;
// 因为只有一个,所以重置为true
sameWeight = true;
// 下面是发现了相同调用数时候发生的事情
} else if (active == leastActive) {
// 在最少调用数数组中记录invoker
leastIndexes[leastCount++] = i;
// 求和最少调用数权重
totalWeight += afterWarmup;
// 对比之前的权重,如果不一样,sameWeight置为false,这里的sameWeight和RandomLoadBalance用处一样
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 如果最少调用数的invoker只有一个,那么选择这个
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
// 下面的逻辑和RandomLoadBalance基本一致
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
复制代码
ShortestResponseLoadBalance
最短相应时间策略,和最少调用数策略类似,如果看懂了上面的LeastActiveLoadBalance
,其实ShortestResponseLoadBalance
就是把调用数的判断,替换为响应时间。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 集群invoker个数
int length = invokers.size();
// 最短响应时间
long shortestResponse = Long.MAX_VALUE;
// 含有相同最短响应时间的invoker数量
int shortestCount = 0;
// 保存最短响应时间的invoker数组
int[] shortestIndexes = new int[length];
// 保存每个invoker的权重
int[] weights = new int[length];
// 含有相同最短响应时间的权重总和
int totalWeight = 0;
// 最短响应时间的其实权重
int firstWeight = 0;
// 是否包含最短响应时间的invoker权重的相同
boolean sameWeight = true;
// 遍历invokers,找出包含相同最短相应时间的所有invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 这里是一个缓存,类似RoundRobinLoadBalance中缓存权重
SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus, SlideWindowData::new);
// 获取对应方法的响应时间
long estimateResponse = slideWindowData.getEstimateResponse();
// AbstractLoadBalance中获取invoker权重的逻辑
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// 这里和LeastActiveLoadBalance类似
if (estimateResponse < shortestResponse) {
shortestResponse = estimateResponse;
shortestCount = 1;
shortestIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (estimateResponse == shortestResponse) {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (System.currentTimeMillis() - lastUpdateTime > slidePeriod
&& onResetSlideWindow.compareAndSet(false, true)) {
//同步更新最短响应时间缓存
executorService.execute(() -> {
methodMap.values().forEach(SlideWindowData::reset);
lastUpdateTime = System.currentTimeMillis();
onResetSlideWindow.set(false);
});
}
// 这里和LeastActiveLoadBalance类似
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < shortestCount; i++) {
int shortestIndex = shortestIndexes[i];
offsetWeight -= weights[shortestIndex];
if (offsetWeight < 0) {
return invokers.get(shortestIndex);
}
}
}
return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
}
复制代码
ConsistentHashLoadBalance
一致性 Hash 策略,一致性 Hash 的好处就不过多解释了,说了这么多,不知道大家有没有仔细看源码中的注解,上面四种方式都有getWeight
的方法调用,也是 Dubbo 默认支持预热的基础,但是在这个策略中,是不支持预热的。原因从类名中一目了然,既然叫一致性 Hash,那么首要就是满足 Hash 特性。闲话少叙,我们来看代码~
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 获取invoker列表的hashcode
int invokersHashCode = getCorrespondingHashCode(invokers);
// 获取方法对应的缓存hash选择器
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果没有生成过,或者invoker列表的hashcode已经发生了变化,重建hash选择器
if (selector == null || selector.identityHashCode != invokersHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
复制代码
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 为了可以hash的更均匀,这里会在原有invoker的基础上虚拟出一些节点,默认是160,可配置
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取配置信息,两个配置,一个是虚拟节点数,一个是需要对那些参数进行hash
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 遍历invoker列表
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
// 这里除4主要是为了减少MD5的次数,使得16位的MD5可以的到充分的利用
for (int i = 0; i < replicaNumber / 4; i++) {
// 获取MD5值
byte[] digest = Bytes.getMD5(address + i);
// 根据h对MD5值进行位移,计算出所有该invoker对应的hash值,放入虚拟节点
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
totalRequestCount = new AtomicLong(0);
serverCount = invokers.size();
serverRequestCountMap.clear();
}
复制代码
上面提到,ConsistentHashLoadBalance
是不支持预热的,那么如果我们使用的就是ConsistentHashLoadBalance
或者对于另外LeastActiveLoadBalance
,RandomLoadBalance
,RoundRobinLoadBalance
,ShortestResponseLoadBalance
策略的预热功能不满意,就只能另辟蹊径了么?答案是 NO,我们可以把预热曲线设计成适用自身服务的样子,下面会用相对复杂一些的一致性 Hash 举例。
Tips:在这种策略下进行预热的前提是能够接受在服务启动初期的 Hash 错误。
这个图是ConsistentHashLoadBalance
这种策略启动初期的 QPS 分布,基本是一条直线,请求会瞬间全部打进来。对于 P99 的影响是不可控的。
这个时候我们在使用ConsistentHashSelector
重建 Hash 选择器之后,在select
方法中的selectForKey
方法中进行虚拟 Invoker 是否可用的判断,这里我们定义了一个叫做isInvokerAvailable
的方法处理可用性判断的逻辑。
private boolean isInvokerAvailable(String address) {
// invokerSelfNode标识Hash选择器中的虚拟节点,是否包含已经选取的address
if (!invokeSelfNode.containsKey(address)) {
return true;
}
if (null == virtualInvokers) {
return true;
}
Long key = invokeSelfNode.get(address).size() > 0 ? invokeSelfNode.get(address).get(0) : 0L;
Invoker<T> invoker = virtualInvokers.get(key);
if (null == invoker) {
return true;
}
long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);
// 获取预热功能需要的启动时间,预热时间
if (timestamp > 0L) {
int uptime = (int)(System.currentTimeMillis() - timestamp);
int warmup = 2 * 60 * 1000;
int totalRate = 100;
// 下面就是按照自己喜欢的曲线,根据启动时间操作
if (uptime > 0 && uptime < warmup) {
float rate;
int minAvailableRate;
// 在预热时间的前半部分,最多只允许10%的流量
if (uptime < warmup / 2) {
if (random.nextInt(totalRate) > 10) {
return false;
} else {
// 这10%的流量会按照一定的曲线,缓慢放入
rate = (float)uptime / ((float)warmup / 2);
int x = (int)(totalRate / 10 * rate);
minAvailableRate = (int)(x * x * 0.1);
return random.nextInt(totalRate / 10) < minAvailableRate;
}
} else {
// 在预热时间的后半部分,会按照线性函数进行流量放入
rate = (float)uptime / (float)warmup;
if (rate >= 1) {
return true;
} else {
minAvailableRate = (int)(rate * totalRate);
return random.nextInt(totalRate) < minAvailableRate;
}
}
}
}
return true;
}
复制代码
预热功能增加完成,我们来测试一下,很明显,QPS 曲线像我们预期的一样,P99 的波动也有了很明显的改善。
Dubbo 优雅停机
背景
2019-12-26 10:20:58.411 INFO 13080 --- [DubboShutdownHook] c.a.dubbo.config.DubboShutdownHook : [DUBBO] Run shutdown hook now., dubbo version: 2.6.6.kk, current host: 10.66.4.1082019-12-26 10:20:58.455 INFO 13080 --- [Thread-12] .d.c.e.AwaitingNonWebApplicationListener : [Dubbo] Current Spring Boot Application is about to shutdown...
通过上面两条日志,我们可以发现 Dubbo 和 Spring 的停机几乎是同时执行的,然后我们的服务就会 ERROR,ERROR,ERROR 开始闹钟模式~
为什么会出现那么多的 ERROR 呢,上面提到两个 ShutDownHook 几乎是同时执行的,那么正在处理的 Dubbo 请求会因为 Spring Bean 已经销毁导致找不到对应的 Bean,那么是不是让他们的 ShutDownHook 错开一些时间就可以了么?Yes!
源码分析
SpringExtensionFactory
这里我们可以看到 DubboShutDown 是监听的 ContextClosedEvent,这里就不再展开了,对 Spring 有兴趣的小伙伴,可以自行了解。
private static class ShutdownHookListener implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextClosedEvent) {
DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();
shutdownHook.destroyAll();
}
}
}
复制代码
那么 Spring 又是在什么时候进行 ShutDown 的呢?
在SpringApplication
中可以发现,Spring 在refreshContext
进行了 ShutDownHook 的注册
private void refreshContext(ConfigurableApplicationContext context) {
refresh(context);
if (this.registerShutdownHook) {
try {
context.registerShutdownHook();
}
catch (AccessControlException ex) {
// Not allowed in some environments.
}
}
}
复制代码
同时当 JVM 被 Kill 的时候会指定钩子方法
public void registerShutdownHook() {
if (this.shutdownHook == null) {
// No shutdown hook registered yet.
this.shutdownHook = new Thread() {
@Override
public void run() {
synchronized (startupShutdownMonitor) {
doClose();
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
}
复制代码
这也就导致了,我们上面描述的场景,当服务重启时,向 JVM 发送了 Kill 命令,这个时候 Spring 进行 ShutDown,同时 Dubbo 监听到了 Spring 的结束时间,同时进行 ShutDown。
解决方案
我们可以利用 Spring 高度可操作的特性,在服务启动的时候对 ShutDown 进行取消注册application.setRegisterShutdownHook(false);
同时注册我们自己的 ShutDownHook 方法
public void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
int dubboTimeout = ConfigUtils.getServerShutdownTimeout();
Thread.sleep(dubboTimeout);
this.configurableApplicationContext.close();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}));
}
复制代码
One more thing
欢迎加入 KKFamily,一起做有挑战、有意思的事情,简历可以投递至 hrzhaopin@kkworld.com
。
作者:临时工(花名),2019 年加入快看,负责快看推荐引擎开发工作。
评论