写点什么

第五周课后作业

用户头像
iHai
关注
发布于: 2020 年 07 月 09 日



  • 用你熟悉的编程语言实现一致性 hash 算法。

  • 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。



一、编写一个一致性hash算法



定义一个路由接口

public interface NodeRoute {

/**
* 根据key获取对应的服务节点
* @param key
* @return
*/
ServiceNode get(String key);

void addNode(ServiceNode node);

void removeNode(ServiceNode node);

Collection<ServiceNode> getAllNodes();
}




实现一个有虚拟节点的一致性hash算法

public class VirtualNodeConsistentHashRoute implements NodeRoute {

private final ConcurrentNavigableMap<Integer, ServiceNode> skipListMap = new ConcurrentSkipListMap<>();
private final CopyOnWriteArrayList<ServiceNode> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
private final int virtualNodeNumber;
private final int virtualNodeLength;

/**
* 构造函数
* @param nodeNumber 每个节点的虚拟节点数
*/
public VirtualNodeConsistentHashRoute(int nodeNumber) {
virtualNodeNumber = Math.max(nodeNumber, 1);
virtualNodeLength = Integer.MAX_VALUE / virtualNodeNumber;
}

/**
* 添加一个虚拟节点
* 虚拟节点的计算比较简单,以node的hash值加上(Integer.MAX_VALUE / virtualNodeNumber)的值,确保能均匀分布在环上
* @param node
*/
@Override
public void addNode(ServiceNode node) {
if (Objects.isNull(node)) {
return;
}

if (copyOnWriteArrayList.contains(node)) {
return;
}

copyOnWriteArrayList.add(node);
int hashCode = node.hashCode();
long virtualNodeHashCode;
for (int i = 0; i < virtualNodeNumber; i++) {
virtualNodeHashCode = hashCode + (long)i * virtualNodeLength;
virtualNodeHashCode = virtualNodeHashCode % Integer.MAX_VALUE;
skipListMap.put((int) virtualNodeHashCode, node);
}
}

/**
* 删除服务节点
* 需要移除对应的虚拟节点
* @param node
*/
@Override
public void removeNode(ServiceNode node) {
if (Objects.isNull(node)) {
return;
}

if (!copyOnWriteArrayList.contains(node)) {
return;
}

copyOnWriteArrayList.remove(node);
int hashCode = node.hashCode();
long virtualNodeHashCode;
for (int i = 0; i < virtualNodeNumber; i++) {
virtualNodeHashCode = hashCode + (long)i * virtualNodeLength;
virtualNodeHashCode = virtualNodeHashCode % Integer.MAX_VALUE;
skipListMap.remove((int)virtualNodeHashCode);
}
}

/**
* 获取所有服务节点
* @return
*/
@Override
public Collection<ServiceNode> getAllNodes() {
return Collections.unmodifiableCollection(copyOnWriteArrayList);
}

/**
* 根据key获取对应的服务节点
* @param key
* @return
*/
@Override
public ServiceNode get(String key) {
if (Objects.isNull(key) || CollectionUtils.isEmpty(skipListMap)) {
return null;
}

int keyHashValue = key.hashCode();
Map.Entry entry = skipListMap.ceilingEntry(keyHashValue);
return Objects.nonNull(entry) && Objects.nonNull(entry.getValue())
? (ServiceNode) entry.getValue()
: skipListMap.firstEntry().getValue();

}
}

服务节点简单定义如下

public class ServiceNode {

private final AtomicInteger atomicInteger = new AtomicInteger();

public ServiceNode() {
}

public void add(String key, Object value) {
atomicInteger.incrementAndGet();
}

public int count() {
return atomicInteger.get();
}
}



二、测试

使用100w个kv值测试

public class LoadBalancingTest {

public static void main(String[] args) {
//10个服务器节点,每个服务器有150个虚拟节点
NodeRoute nodeRoute = getNodeRouteHaveServices(10, 150);
//100w个随机kv存取
testServiceNodeHit(nodeRoute, 1_000_000);
//计算标准差
printStandardDeviation(1_000_000, nodeRoute.getAllNodes());
}

private static NodeRoute getNodeRouteHaveServices(int serviceNumber, int virtualNodeNumber) {
NodeRoute nodeRoute = new VirtualNodeConsistentHashRoute(virtualNodeNumber);
for (int i = 0; i < serviceNumber; i++) {
nodeRoute.addNode(new ServiceNode());
}
return nodeRoute;
}

private static void testServiceNodeHit(NodeRoute nodeRoute, int number) {
Random random = new SecureRandom();
String keyString;
Object value = new Object();
for (int i = 0; i < number; i++) {
keyString = "" + random.nextInt(Integer.MAX_VALUE);
ServiceNode serviceNode = nodeRoute.get(keyString);
serviceNode.add(keyString, value);
}
}

private static void printStandardDeviation(int testNumber,
Collection<ServiceNode> serviceNodes) {
int nodeNumber = serviceNodes.size();
int average = testNumber / nodeNumber;
double variance = serviceNodes.stream()
.mapToDouble(serviceNode -> {
int hitCount = serviceNode.count();
double oneVariance = Math.pow((hitCount - average), 2) / nodeNumber;
System.out.println("节点数据量: " + hitCount);
return oneVariance;
}).sum();
double standardDeviation = Math.sqrt(variance);
System.out.println("标准差:" + standardDeviation);
}
}

某次打印结果:

节点数据量: 51240

节点数据量: 115350

节点数据量: 125391

节点数据量: 214859

节点数据量: 88105

节点数据量: 49171

节点数据量: 121402

节点数据量: 70641

节点数据量: 125535

节点数据量: 38306

标准差:49953.69202971888



三、感想

在编写带虚拟节点的一致性hash算法过程中,遇到了不少问题,主要有两点。

第一个问题是虚拟节点如何才能均匀地分布在环上。目前采用的是服务节点的hash值加上一定的值(2的32次方除以虚拟节点数量),来确保某个服务节点的虚拟节点能覆盖整个环。但是不同服务节点之间,由于hash计算带来的随机性,无法控制所有虚拟节点都均匀分布(所谓均匀分布是指理想情况下虚拟节点的间隔都一样)。特别要考虑到服务节点的增删,那复杂度就更大了。

针对分布均匀,曾经想过虚拟节点的计算分布不一定要和服务节点有关,自己做一套位置控制逻辑即可。但是一有节点增删,在不移动虚拟节点情况下,该逻辑也略有复杂。有点研究。

第二个问题是即使能实现虚拟节点之间的间隔一样,但是调用时入参的不确定性或冷热情况不一,还是会导致服务器压力不一。所以单靠一套静态的配置就想打天下,略有不足。觉得还是要结合状态监控,动态伸缩更稳妥些。

发布于: 2020 年 07 月 09 日阅读数: 49
用户头像

iHai

关注

还未添加个人签名 2018.07.26 加入

还未添加个人简介

评论

发布
暂无评论
第五周课后作业