一致性哈希算法
定义
consistent hashing算法早在1997年就在论文Consistent hashing and random trees中被提出,目前在分布式缓存、负载均衡、RPC框架中应用很广泛。
consistent hashing是一种hash算法,在添加/移除 Node节点时,能尽可能小的改变已存在的映射关系,尽可能的满足单调性(由新的节点加入时,哈希结果应能保证原有已分配的内容可以被映射到新的节点中去)的要求。
基本思想是使用相同的散列函数对对象和服务节点进行散列。 这样做的原因是将服务节点映射到一个区间,该区间将包含许多对象散列。 如果服务节点被移除,那么它的间隔将被具有相邻间隔的服务节点接管。所有其他服务节点保持不变。
实现
使用Java语言实现一致性hash算法
public class ConsistentHash<T> { private final HashFunction hashFunction; private final int numberOfReplicas; private final SortedMap<Long, T> circle = new TreeMap<Long, T>(); public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes) { this.hashFunction = hashFunction; this.numberOfReplicas = numberOfReplicas; for (T node : nodes) { add(node); } } public void add(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.put(hashFunction.hash(node.toString() + i), node); } } public void remove(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.remove(hashFunction.hash(node.toString() + i)); } } public T get(String key) { if (circle.isEmpty()) { return null; } long hash = hashFunction.hash(key); if (!circle.containsKey(hash)) { SortedMap<Long, T> tailMap = circle.tailMap(hash); hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } return circle.get(hash); }}
//hash算法可扩展public interface HashFunction { long hash(String obj);}
public class MD5Hash implements HashFunction { @Override public long hash(String obj) { try { MessageDigest messageDigest = MessageDigest.getInstance("MD5"); byte[] md5 = messageDigest.digest(obj.getBytes()); return ((long) (md5[3] & 0xFF) << 24) | ((long) (md5[2] & 0xFF) << 16) | ((long) (md5[1] & 0xFF) << 8) | (long) (md5[0] & 0xFF); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return 0; }}
测试
测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
public class ConsistentHashingTest { public static void main(String[] args) { long totalSize = 1000000; int nodeSize = 10; HashMap cacheMap = new HashMap<String, String>(1024); for (int i = 0; i < totalSize; i++) { cacheMap.put("key" + i, "value" + i); } ArrayList<String> nodeList = new ArrayList(10); for (int i = 0; i < nodeSize; i++) { nodeList.add("node" + i); } for (int replicaNum = 1; replicaNum <= 500; replicaNum+=5) { ConsistentHash consistentHash = new ConsistentHash(new MD5Hash(), replicaNum, nodeList); //调整前的分布数据的标准差 Map<String, Map> beforeNode = storeDataOfNode(cacheMap, consistentHash); Map<String, String> beforeDataStoreNode = beforeNode.get("dataStoreNode"); Map<String, Integer> beforeDataSizeNode = beforeNode.get("dataSizeOfNode"); int sd = (int) standardDeviation(beforeDataSizeNode, totalSize, nodeSize); String beforeSD = sd + "(" + String.format("%.2f", (float)sd/(totalSize/nodeSize) * 100) + ")"; //增加Node节点 consistentHash.add("node New"); //调整后的分布数据的标准差 Map<String, Map> afterNode = storeDataOfNode(cacheMap, consistentHash); Map<String, String> afterDataStoreNode = afterNode.get("dataStoreNode"); Map<String, Integer> afterDataSizeNode = afterNode.get("dataSizeOfNode"); int newSD = (int) standardDeviation(afterDataSizeNode, totalSize, nodeSize+1); String afterSD = newSD + "(" + String.format("%.2f", (float)newSD/(totalSize/(nodeSize+1)) * 100) + ")"; //计算映射变化的数据量 int rc = 0; for (String key : beforeDataStoreNode.keySet()) { if(!(beforeDataStoreNode.get(key).equals(afterDataStoreNode.get(key)))) { rc++; } } //打印数据 System.out.println("replicaNum:" + replicaNum + "," + "调整前标准差:" +beforeSD + "," + "调整后标准差:" +afterSD + "," + "增加node后映射变化数据量:" + rc + "(" + String.format("%.2f", (float)rc/totalSize * 100) + ")"); } } //服务器节点保存数据 private static Map<String, Map> storeDataOfNode(HashMap cacheMap, ConsistentHash consistentHash) { Map<String, Map> result = new HashMap<>(); Map<String, String> dataStoreNode = new TreeMap(); Map<String, Integer> dataSizeOfNode = new HashMap<>(); for (int i = 0; i < cacheMap.size(); i++) { String node = (String) consistentHash.get("key" + i); //.... dataStoreNode.put("key" + i, node); dataSizeOfNode.merge(node, 1, (a, b) -> a + b); } result.put("dataStoreNode", dataStoreNode); result.put("dataSizeOfNode", dataSizeOfNode); return result; } //计算标准差 private static double standardDeviation(Map<String, Integer> dataSizeOfNode, long totalSize, int nodeSize) { double average = (double) totalSize / nodeSize; return Math.sqrt(dataSizeOfNode.values().stream().mapToDouble(n -> (n - average) * (n - average)).summaryStatistics().getAverage()); }}
测试结果展示
#虚拟节点较少时(<15)replicaNum:1,调整前标准差:100931(100.93),调整后标准差:99458(109.40),增加node后映射变化数据量:16148(1.61)replicaNum:6,调整前标准差:41939(41.94),调整后标准差:44525(48.98),增加node后映射变化数据量:46081(4.61)replicaNum:11,调整前标准差:23821(23.82),调整后标准差:22199(24.42),增加node后映射变化数据量:103437(10.34)#虚拟节点130-160时replicaNum:131,调整前标准差:7321(7.32),调整后标准差:6392(7.03),增加node后映射变化数据量:85915(8.59)replicaNum:136,调整前标准差:7748(7.75),调整后标准差:6939(7.63),增加node后映射变化数据量:88440(8.84)replicaNum:141,调整前标准差:7323(7.32),调整后标准差:7069(7.78),增加node后映射变化数据量:86961(8.70)replicaNum:146,调整前标准差:6749(6.75),调整后标准差:6556(7.21),增加node后映射变化数据量:85178(8.52)replicaNum:151,调整前标准差:7378(7.38),调整后标准差:7215(7.94),增加node后映射变化数据量:84768(8.48)replicaNum:156,调整前标准差:8630(8.63),调整后标准差:8057(8.86),增加node后映射变化数据量:84124(8.41)replicaNum:161,调整前标准差:9884(9.88),调整后标准差:8833(9.72),增加node后映射变化数据量:85619(8.56)#虚拟节点较多时(>450)replicaNum:486,调整前标准差:3659(3.66),调整后标准差:3073(3.38),增加node后映射变化数据量:92185(9.22)replicaNum:491,调整前标准差:3291(3.29),调整后标准差:2657(2.92),增加node后映射变化数据量:92770(9.28)replicaNum:496,调整前标准差:3348(3.35),调整后标准差:2756(3.03),增加node后映射变化数据量:92502(9.25)
总结
i). 没有增加虚拟节点或增加较少虚拟节点时
服务器节点的数据分布标准差较大,分布严重不均衡
增加/删除节点时后,数据迁移率有时低有时高,不稳定,随机性很高
ii). 虚拟节点为150左右时
服务器节点的数据分布标准差较小,分布比较均衡
增加/删除节点时后,数据迁移率相对稳定,迁移量较小
iii). 虚拟节点较多时
服务器节点的数据分布标准差更小,分布更均衡
增加/删除节点时后,数据迁移率相对稳定,但迁移量有小幅提升
同时虚拟节点增多,会影响映射性能,增加资源的消耗
最后,通过分析,虚拟节点在150左右时,数据分布均匀、节点变动时迁移量小,收益较高。同时,我们可以调整和优化相应的hash算法,提升一致性hash算法的数据分布、迁移率、性能。
版权声明: 本文为 InfoQ 作者【dony.zhang】的原创文章。
原文链接:【http://xie.infoq.cn/article/ad8e098bcbd53cefedf17aaf0】。文章转载请联系作者。
dony.zhang
专注成就专业 2018.07.06 加入
程序员
评论