第五周课后作业
用你熟悉的编程语言实现一致性 hash 算法。
编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
一、编写一个一致性hash算法
定义一个路由接口
public interface NodeRoute { /** * 根据key获取对应的服务节点 * @param key * @return */ ServiceNode get(String key); void addNode(ServiceNode node); void removeNode(ServiceNode node); Collection<ServiceNode> getAllNodes();}
实现一个有虚拟节点的一致性hash算法
public class VirtualNodeConsistentHashRoute implements NodeRoute { private final ConcurrentNavigableMap<Integer, ServiceNode> skipListMap = new ConcurrentSkipListMap<>(); private final CopyOnWriteArrayList<ServiceNode> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); private final int virtualNodeNumber; private final int virtualNodeLength; /** * 构造函数 * @param nodeNumber 每个节点的虚拟节点数 */ public VirtualNodeConsistentHashRoute(int nodeNumber) { virtualNodeNumber = Math.max(nodeNumber, 1); virtualNodeLength = Integer.MAX_VALUE / virtualNodeNumber; } /** * 添加一个虚拟节点 * 虚拟节点的计算比较简单,以node的hash值加上(Integer.MAX_VALUE / virtualNodeNumber)的值,确保能均匀分布在环上 * @param node */ @Override public void addNode(ServiceNode node) { if (Objects.isNull(node)) { return; } if (copyOnWriteArrayList.contains(node)) { return; } copyOnWriteArrayList.add(node); int hashCode = node.hashCode(); long virtualNodeHashCode; for (int i = 0; i < virtualNodeNumber; i++) { virtualNodeHashCode = hashCode + (long)i * virtualNodeLength; virtualNodeHashCode = virtualNodeHashCode % Integer.MAX_VALUE; skipListMap.put((int) virtualNodeHashCode, node); } } /** * 删除服务节点 * 需要移除对应的虚拟节点 * @param node */ @Override public void removeNode(ServiceNode node) { if (Objects.isNull(node)) { return; } if (!copyOnWriteArrayList.contains(node)) { return; } copyOnWriteArrayList.remove(node); int hashCode = node.hashCode(); long virtualNodeHashCode; for (int i = 0; i < virtualNodeNumber; i++) { virtualNodeHashCode = hashCode + (long)i * virtualNodeLength; virtualNodeHashCode = virtualNodeHashCode % Integer.MAX_VALUE; skipListMap.remove((int)virtualNodeHashCode); } } /** * 获取所有服务节点 * @return */ @Override public Collection<ServiceNode> getAllNodes() { return Collections.unmodifiableCollection(copyOnWriteArrayList); } /** * 根据key获取对应的服务节点 * @param key * @return */ @Override public ServiceNode get(String key) { if (Objects.isNull(key) || CollectionUtils.isEmpty(skipListMap)) { return null; } int keyHashValue = key.hashCode(); Map.Entry entry = skipListMap.ceilingEntry(keyHashValue); return Objects.nonNull(entry) && Objects.nonNull(entry.getValue()) ? (ServiceNode) entry.getValue() : skipListMap.firstEntry().getValue(); }}
服务节点简单定义如下
public class ServiceNode { private final AtomicInteger atomicInteger = new AtomicInteger(); public ServiceNode() { } public void add(String key, Object value) { atomicInteger.incrementAndGet(); } public int count() { return atomicInteger.get(); }}
二、测试
使用100w个kv值测试
public class LoadBalancingTest { public static void main(String[] args) { //10个服务器节点,每个服务器有150个虚拟节点 NodeRoute nodeRoute = getNodeRouteHaveServices(10, 150); //100w个随机kv存取 testServiceNodeHit(nodeRoute, 1_000_000); //计算标准差 printStandardDeviation(1_000_000, nodeRoute.getAllNodes()); } private static NodeRoute getNodeRouteHaveServices(int serviceNumber, int virtualNodeNumber) { NodeRoute nodeRoute = new VirtualNodeConsistentHashRoute(virtualNodeNumber); for (int i = 0; i < serviceNumber; i++) { nodeRoute.addNode(new ServiceNode()); } return nodeRoute; } private static void testServiceNodeHit(NodeRoute nodeRoute, int number) { Random random = new SecureRandom(); String keyString; Object value = new Object(); for (int i = 0; i < number; i++) { keyString = "" + random.nextInt(Integer.MAX_VALUE); ServiceNode serviceNode = nodeRoute.get(keyString); serviceNode.add(keyString, value); } } private static void printStandardDeviation(int testNumber, Collection<ServiceNode> serviceNodes) { int nodeNumber = serviceNodes.size(); int average = testNumber / nodeNumber; double variance = serviceNodes.stream() .mapToDouble(serviceNode -> { int hitCount = serviceNode.count(); double oneVariance = Math.pow((hitCount - average), 2) / nodeNumber; System.out.println("节点数据量: " + hitCount); return oneVariance; }).sum(); double standardDeviation = Math.sqrt(variance); System.out.println("标准差:" + standardDeviation); }}
某次打印结果:
节点数据量: 51240
节点数据量: 115350
节点数据量: 125391
节点数据量: 214859
节点数据量: 88105
节点数据量: 49171
节点数据量: 121402
节点数据量: 70641
节点数据量: 125535
节点数据量: 38306
标准差:49953.69202971888
三、感想
在编写带虚拟节点的一致性hash算法过程中,遇到了不少问题,主要有两点。
第一个问题是虚拟节点如何才能均匀地分布在环上。目前采用的是服务节点的hash值加上一定的值(2的32次方除以虚拟节点数量),来确保某个服务节点的虚拟节点能覆盖整个环。但是不同服务节点之间,由于hash计算带来的随机性,无法控制所有虚拟节点都均匀分布(所谓均匀分布是指理想情况下虚拟节点的间隔都一样)。特别要考虑到服务节点的增删,那复杂度就更大了。
针对分布均匀,曾经想过虚拟节点的计算分布不一定要和服务节点有关,自己做一套位置控制逻辑即可。但是一有节点增删,在不移动虚拟节点情况下,该逻辑也略有复杂。有点研究。
第二个问题是即使能实现虚拟节点之间的间隔一样,但是调用时入参的不确定性或冷热情况不一,还是会导致服务器压力不一。所以单靠一套静态的配置就想打天下,略有不足。觉得还是要结合状态监控,动态伸缩更稳妥些。
版权声明: 本文为 InfoQ 作者【iHai】的原创文章。
原文链接:【http://xie.infoq.cn/article/c43c33d77385ece8753e11ea4】。文章转载请联系作者。
iHai
还未添加个人签名 2018.07.26 加入
还未添加个人简介
评论