写点什么

架构师训练营第 1 期 - 第 5 周课后练习

用户头像
Anyou Liu
关注
发布于: 2020 年 10 月 25 日
  1. 用你熟悉的编程语言实现一致性 hash 算法。

  2. 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。


答:

  1. 一致性 hash 算法是定义一个 2^32 长度的环,环的顺时针方向依次是 0 1 2 ... 2^32-1,服务器节点分布于环上,可以通过一个散列函数对每一个服务器节点求 hash 值,hash 值对应环上的位置,这些服务器节点组成集群,当需要把数据保存到集群时,可以用同一个散列函数对数据求 hash 值,如果 hash 值正好对于一台服务器节点,则数据就保存到那个节点上,如果 hash 值所在环上的位置没有服务器节点,则顺时针查找最近的节点,把数据保存到那个节点上。

定义接口 ConsistentHashRing 表示一致性 hash 环

package consistenthash;
public interface ConsistentHashRing {
/** * 用于把服务器节点注册到环上,registeredName是注册的名称,node是服务器节点实例, * 返回值true表示注册成功,false表示失败,当节点的hash值所在位置已经被注册时,会注册失败。 * * @param registeredName * @param node * @return */ boolean register(String registeredName, Node node);
/** * 根据key请求hash值,然后选择hash值所在的节点或者最近的节点。 * * @param key * @return */ Node selectNode(String key);}
复制代码

定义接口 ConsistentHashStrategy 表示一致性 hash 算法

package consistenthash;
public interface ConsistentHashStrategy {
/** * 一致性hash算法实现,根据key求出hash值,范围在0到2^32-1之间。 * * @param key * @return */ long hash(String key);}
复制代码

定义接口 Node 表示服务器节点

package consistenthash;
import java.util.Map;
public interface Node {
/** * 服务器节点名称 * * @return */ String getName();
/** * 用于把当前节点注册到一致性hash环上 * * @param ring */ void registerTo(ConsistentHashRing ring);
/** * 保存KV数据 * * @param key * @param value */ void set(String key, String value);
/** * 获取键对应的数据 * * @param key * @return */ String get(String key);
/** * 获取所有的KV数据 * * @return */ Map<String, String> getAll();}
复制代码

定义接口 Cluster 表示集群

package consistenthash;
import java.util.Collection;
public interface Cluster {
/** * 添加服务器节点到集群中 * * @param server */ void addServer(Node server);
/** * 获取集群中的所有服务器节点 * * @return */ Collection<Node> getServers();}
复制代码

定义接口 Cacheable 用来给客户端保存数据到集群中

package consistenthash;
import java.util.Map;
public interface Cacheable {
/** * 客户端用来保存KV数据到集群中 * * @param key * @param value */ void set(String key, String value);
/** * 客户端获取键对应的值 * * @param key * @return */ String get(String key);
/** * 客户端获取所有的KV数据 * * @return */ Map<String, String> getAll();}
复制代码

ConsistentHashRing 的实现类如下

package consistenthash.impl;
import consistenthash.ConsistentHashRing;import consistenthash.ConsistentHashStrategy;import consistenthash.Node;
import java.util.NavigableMap;import java.util.TreeMap;
public class ConsistentHashRingImpl implements ConsistentHashRing {
private ConsistentHashStrategy strategy; private NavigableMap<Long, Node> hash2NodeMapping;

public ConsistentHashRingImpl(ConsistentHashStrategy strategy) { this.strategy = strategy; hash2NodeMapping = new TreeMap<>(); }
@Override public boolean register(String registeredName, Node node) { long h = strategy.hash(registeredName); if (hash2NodeMapping.containsKey(h)) { return false; } hash2NodeMapping.put(h, node); return true; }
@Override public Node selectNode(String key) { long h = strategy.hash(key); Node equalOrNext = getEqualOrNext(h); if (equalOrNext != null) { return equalOrNext; } equalOrNext = getEqualOrNext(0); if (equalOrNext != null) { return equalOrNext; } return null; }
private Node getEqualOrNext(long h) { if (hash2NodeMapping.containsKey(h)) { return hash2NodeMapping.get(h); } Long higherHash = hash2NodeMapping.higherKey(h); if (higherHash != null) { return hash2NodeMapping.get(higherHash); } return null; }}
复制代码

ConsistentHashStrategy 的实现类如下,该 hash 算法实现并不是很好,不够随机,会造成数据分布不均衡,后面会有测试结果

package consistenthash.impl;
import consistenthash.ConsistentHashStrategy;
import java.math.BigInteger;
public class DefaultConsistentHashStrategy implements ConsistentHashStrategy {
public static final BigInteger MAX = BigInteger.valueOf(2l).pow(32);
private static final long[] FACTORS = new long[]{ 7l, 33333337l, 17l, 9999997l, 737l, 177773l, 5777l, 55553l };
public long hash(String key) { if (key == null) { return 0l; } int nextFactor = 0; BigInteger bigInteger = BigInteger.ZERO;
for (int i = 0; i < key.length(); ++i) { bigInteger = bigInteger.add(BigInteger.valueOf(key.charAt(i) * FACTORS[nextFactor])); nextFactor = (nextFactor + 1) % FACTORS.length; }
return bigInteger.mod(MAX).longValue(); }}
复制代码

Node 的实现类如下

package consistenthash.impl;
import consistenthash.ConsistentHashRing;import consistenthash.Node;
import java.util.*;
public class NodeImpl implements Node {
private String name; private int numOfVirtuals = 1; private List<String> virtualNames; private Map<String, String> cache;
public NodeImpl(String name, int numOfVirtuals) { this.name = name; if (numOfVirtuals > 1) { this.numOfVirtuals = numOfVirtuals; } virtualNames = new ArrayList<>(this.numOfVirtuals); cache = new HashMap<>(); }
@Override public String getName() { return name; }
@Override public void registerTo(ConsistentHashRing ring) { registerVirtuals(ring); }
private void registerVirtuals(ConsistentHashRing ring) { while (virtualNames.size() < numOfVirtuals) { String virtualName = name + "-" + Utils.random(); if (ring.register(virtualName, this)) { virtualNames.add(virtualName); } } }
@Override public void set(String key, String value) { cache.put(key, value); }
@Override public String get(String key) { return cache.get(key); }
@Override public Map<String, String> getAll() { return cache; }
@Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; NodeImpl node = (NodeImpl) o; return Objects.equals(name, node.name); }
@Override public int hashCode() { return Objects.hash(name); }}
复制代码

Cluster 和 Cacheable 的实现类是 ClusterImpl,如下

package consistenthash.impl;
import consistenthash.Cacheable;import consistenthash.Cluster;import consistenthash.ConsistentHashRing;import consistenthash.Node;
import java.util.Collection;import java.util.HashMap;import java.util.HashSet;import java.util.Map;
public class ClusterImpl implements Cluster, Cacheable {
private Collection<Node> servers; private ConsistentHashRing ring;
public ClusterImpl(ConsistentHashRing ring) { servers = new HashSet<>(); this.ring = ring; }
@Override public void set(String key, String value) { Node node = ring.selectNode(key); if (node == null) { throw new NoSuchNodeException("Cannot find a node for key : " + key); } node.set(key, value); }
@Override public String get(String key) { Node node = ring.selectNode(key); return node.get(key); }
@Override public Map<String, String> getAll() { int totalSize = 0; for (Node server : servers) { totalSize += server.getAll().size(); } Map<String, String> all = new HashMap<>(totalSize); for (Node server : servers) { all.putAll(server.getAll()); } return all; }
@Override public void addServer(Node server) { if (server != null && servers.add(server)) { server.registerTo(ring); } }
@Override public Collection<Node> getServers() { return servers; }}
复制代码

其他辅助类如下

package consistenthash.impl;
public class NoSuchNodeException extends RuntimeException {
public NoSuchNodeException(String message) { super(message); }}
复制代码

Utils 用来生成随机数

package consistenthash.impl;
public class Utils {
private static final char[] ALL_CHARS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
public static String random(int digits) { StringBuilder random = new StringBuilder(); for (int j = 0; j < digits; ++j) { random.append(ALL_CHARS[(int) (Math.random() * ALL_CHARS.length)]); } return random.toString(); }
public static String random() { return random(10); }
public static String randomDigits() { return random((int) (Math.random() * 50) + 1); }}
复制代码

编写测试用例验证功能,是否可以正常保存数据

package consistenthash;
import consistenthash.impl.*;import org.junit.Before;import org.junit.Test;
import java.util.HashMap;import java.util.Map;
public class TestCacheable {
private Cacheable cacheProvider; private Cluster cluster;
@Before public void init() { ConsistentHashStrategy strategy = new DefaultConsistentHashStrategy(); ConsistentHashRing ring = new ConsistentHashRingImpl(strategy); cluster = new ClusterImpl(ring); cacheProvider = (Cacheable) cluster; }
@Test public void test1Server2Keys_getCorrectly() { Node server = new NodeImpl("A", 1); cluster.addServer(server);
String key = Utils.randomDigits(); String value = Utils.randomDigits(); cacheProvider.set(key, value);
String key2 = Utils.randomDigits(); String value2 = Utils.randomDigits(); cacheProvider.set(key2, value2);
String valueFromCache = cacheProvider.get(key); String value2FromCache = cacheProvider.get(key2);
assert value.equals(valueFromCache); assert value2.equals(value2FromCache); }
@Test public void test10Servers100wKeys_getCorrectly() { for (int i = 0; i < 10; ++i) { Node server = new NodeImpl(String.valueOf(i + 1), 100); cluster.addServer(server); }
Map<String, String> keyValues = new HashMap<>(1000000); while (keyValues.size() < 1000000) { String key = Utils.randomDigits(); if (keyValues.containsKey(key)) { continue; } String value = Utils.randomDigits(); keyValues.put(key, value); cacheProvider.set(key, value); }
Map<String, String> cachedKeyValues = cacheProvider.getAll(); assert keyValues.size() == cachedKeyValues.size();
for (Map.Entry ent : keyValues.entrySet()) { assert ent.getValue().equals(cacheProvider.get((String) ent.getKey())); } }}
复制代码

编写测试用例查看数据是否分布均衡,测试 10 台服务器,150 个虚拟节点,100 万 KV 数据,测试 10 次

package consistenthash;
import consistenthash.impl.*;import org.junit.Test;
import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;
public class TestDefaultConsistentHashStrategy {
@Test public void testCheckDataIsSavedAvaragely() { for (int i = 0; i < 10; ++i) { save100wKeyValues(); } }
private Map<String, String> generateKeyValues() { Map<String, String> keyValues = new HashMap<>(1000000); while (keyValues.size() < 1000000) { String key = Utils.randomDigits(); if (keyValues.containsKey(key)) { continue; } String value = Utils.randomDigits(); keyValues.put(key, value); } return keyValues; }
private void save100wKeyValues() { Map<String, String> keyValues = generateKeyValues(); ConsistentHashStrategy strategy = new DefaultConsistentHashStrategy(); ConsistentHashRing ring = new ConsistentHashRingImpl(strategy); Cluster cluster = new ClusterImpl(ring); Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) { Node server = new NodeImpl(String.valueOf(i + 1), 150); cluster.addServer(server); }
for (Map.Entry ent : keyValues.entrySet()) { cacheProvider.set((String) ent.getKey(), (String) ent.getValue()); }
Collection<Node> servers = cluster.getServers(); List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList()); System.out.println(sizes); }}
复制代码

输出结果如下,单台节点超过 22 万数据的情况经常发生,实现的一致性 hash 算法不够随机,数据分布严重不均衡

[287330, 88104, 72255, 83516, 99165, 79517, 69959, 66605, 68770, 84779][70911, 72771, 70810, 91113, 238796, 80842, 74977, 87004, 102186, 110590][93456, 213739, 87046, 79116, 104590, 84308, 84405, 86987, 79754, 86599][110293, 70246, 174517, 84289, 95834, 74562, 91138, 77457, 107824, 113840][108898, 64680, 107857, 218407, 81578, 96128, 84769, 64026, 82255, 91402][68869, 70524, 85821, 117791, 75754, 118709, 78305, 176414, 88124, 119689][69396, 111033, 76321, 118371, 75236, 87844, 81748, 229839, 67904, 82308][77754, 60745, 260744, 83517, 58209, 77669, 137146, 82400, 87142, 74674][108600, 88933, 72277, 59950, 93209, 80266, 212365, 78181, 97448, 108771][80300, 76555, 88550, 76072, 118870, 79298, 228305, 93768, 69529, 88753]
复制代码

接下来会介绍改进的算法


  1. 之前编写的一致性 hash 算法不够随机,导致数据分布严重不均衡,单台节点会存储超过 20 万的数据,因此需要对 hash 算法提出改进。自己实现的 hash 算法是根据固定的素数做运算得出 hash 值,不够随机,想着是否可以借鉴已有的 hash 算法来实现。MD5 就是一种 hash 算法,它是根据请求的内容随机生成 128 位的 hash 值,范围在 0 到 2^128 之间,hash 值取值范围大,也比较随机。重新实现的一致性 hash 算法如下:

package consistenthash.impl;
import consistenthash.ConsistentHashStrategy;
import java.math.BigInteger;import java.nio.charset.StandardCharsets;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;
public class Md5ConsistentHashStrategy implements ConsistentHashStrategy {
private static final BigInteger MAX = BigInteger.valueOf(2l).pow(32);
public long hash(String key) { if (key == null) { return 0l; }
MessageDigest messageDigest; try { messageDigest = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } messageDigest.update(key.getBytes(StandardCharsets.UTF_8)); BigInteger bigInteger = new BigInteger(messageDigest.digest());
return bigInteger.mod(MAX).longValue(); }}
复制代码

编写的测试用例如下

package consistenthash;
import consistenthash.impl.*;import org.junit.Test;
import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;
public class TestMd5ConsistentHashStrategy {
@Test public void testCheckDataIsSavedAvaragely() { for (int i = 0; i < 10; ++i) { save100wKeyValues(); } }
private Map<String, String> generateKeyValues() { Map<String, String> keyValues = new HashMap<>(1000000); while (keyValues.size() < 1000000) { String key = Utils.randomDigits(); if (keyValues.containsKey(key)) { continue; } String value = Utils.randomDigits(); keyValues.put(key, value); } return keyValues; }
private void save100wKeyValues() { Map<String, String> keyValues = generateKeyValues(); ConsistentHashStrategy strategy = new Md5ConsistentHashStrategy(); ConsistentHashRing ring = new ConsistentHashRingImpl(strategy); Cluster cluster = new ClusterImpl(ring); Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) { Node server = new NodeImpl(String.valueOf(i + 1), 150); cluster.addServer(server); }
for (Map.Entry ent : keyValues.entrySet()) { cacheProvider.set((String) ent.getKey(), (String) ent.getValue()); }
Collection<Node> servers = cluster.getServers(); List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList()); System.out.println(sizes); }}
复制代码

输出结果如下,比 DefaultConsistentHashStrategy 的算法实现要好很多,数据分布明显均衡

[85036, 90579, 104895, 117825, 92461, 103194, 99862, 102620, 102007, 101521][96948, 88482, 96963, 109167, 109254, 86621, 103940, 109846, 94351, 104428][106554, 96678, 90942, 105560, 105197, 102851, 97988, 96473, 95182, 102575][91784, 111891, 105755, 88399, 92705, 113522, 97292, 91781, 105055, 101816][105047, 112721, 98489, 92381, 100562, 108465, 96748, 96063, 88134, 101390][103390, 99131, 98818, 93555, 89801, 102828, 105571, 90024, 105894, 110988][95961, 98861, 107550, 106023, 110996, 105163, 97110, 95399, 84950, 97987][113775, 91105, 104870, 99463, 94246, 90767, 107859, 100132, 100724, 97059][94853, 111725, 102316, 88900, 93076, 95625, 95076, 114884, 95779, 107766][102448, 94131, 87003, 110910, 101418, 97821, 99978, 110305, 93782, 102204]
复制代码

借助于数学统计中的标准差,我们可以比较一下 2 种算法的数据均衡分布的好坏。标准差可以用来衡量是数据收敛的大小,标准差越小,说明数据越收敛,数据分布越均衡。

辅助类 MathUtils

package consistenthash.impl;
import java.util.List;
public class MathUtils {
public static double[] averageAndstandardDeviation(List<Integer> sizes) { double average; double sum = 0d; for (Integer s : sizes) { sum += s; } average = sum / sizes.size();
double standardDeviation; double sum2 = 0d; for (Integer s : sizes) { sum2 += Math.pow(s - average, 2); } standardDeviation = Math.sqrt(sum2 / sizes.size()); return new double[]{average, standardDeviation}; }}
复制代码

测试用例如下

package consistenthash;
import consistenthash.impl.*;import org.junit.Test;
import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;
public class TestStandardDeviation {
private Map<String, String> generateKeyValues() { Map<String, String> keyValues = new HashMap<>(1000000); while (keyValues.size() < 1000000) { String key = Utils.randomDigits(); if (keyValues.containsKey(key)) { continue; } String value = Utils.randomDigits(); keyValues.put(key, value); } return keyValues; }
@Test public void test10TimesOfDefaultAndMd5HashAlgorithm_Md5IsBetter() { ConsistentHashStrategy defaultStrategy = new DefaultConsistentHashStrategy(); ConsistentHashStrategy md5Strategy = new Md5ConsistentHashStrategy(); for (int i = 0; i < 10; ++i) { Map<String, String> keyValues = generateKeyValues(); double sdForDefaultStrategy = getStandardDeviationFor10Servers100wKeys(keyValues, defaultStrategy); double sdForMd5Strategy = getStandardDeviationFor10Servers100wKeys(keyValues, md5Strategy); assert sdForMd5Strategy < sdForDefaultStrategy; }
}
private double getStandardDeviationFor10Servers100wKeys(Map<String, String> keyValues, ConsistentHashStrategy strategy) { ConsistentHashRing ring = new ConsistentHashRingImpl(strategy); Cluster cluster = new ClusterImpl(ring); Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) { Node server = new NodeImpl(String.valueOf(i + 1), 150); cluster.addServer(server); }
for (Map.Entry ent : keyValues.entrySet()) { cacheProvider.set((String) ent.getKey(), (String) ent.getValue()); }
Collection<Node> servers = cluster.getServers(); List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList()); double[] results = MathUtils.averageAndstandardDeviation(sizes); return results[1]; }}
复制代码

100 万的 KV 数据,10 台服务器节点,150 个虚拟节点,测试 10 次的结果是每次 Md5 的 hash 算法的标准差都要小于 default 的 hash 算法的标准差。


用户头像

Anyou Liu

关注

还未添加个人签名 2019.05.24 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营第 1 期 - 第5周课后练习