架构师训练营第 1 期 - 第 5 周课后练习
用你熟悉的编程语言实现一致性 hash 算法。
编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
答:
一致性 hash 算法是定义一个 2^32 长度的环,环的顺时针方向依次是 0 1 2 ... 2^32-1,服务器节点分布于环上,可以通过一个散列函数对每一个服务器节点求 hash 值,hash 值对应环上的位置,这些服务器节点组成集群,当需要把数据保存到集群时,可以用同一个散列函数对数据求 hash 值,如果 hash 值正好对于一台服务器节点,则数据就保存到那个节点上,如果 hash 值所在环上的位置没有服务器节点,则顺时针查找最近的节点,把数据保存到那个节点上。
定义接口 ConsistentHashRing 表示一致性 hash 环
package consistenthash;
public interface ConsistentHashRing {
/**
* 用于把服务器节点注册到环上,registeredName是注册的名称,node是服务器节点实例,
* 返回值true表示注册成功,false表示失败,当节点的hash值所在位置已经被注册时,会注册失败。
*
* @param registeredName
* @param node
* @return
*/
boolean register(String registeredName, Node node);
/**
* 根据key请求hash值,然后选择hash值所在的节点或者最近的节点。
*
* @param key
* @return
*/
Node selectNode(String key);
}
定义接口 ConsistentHashStrategy 表示一致性 hash 算法
package consistenthash;
public interface ConsistentHashStrategy {
/**
* 一致性hash算法实现,根据key求出hash值,范围在0到2^32-1之间。
*
* @param key
* @return
*/
long hash(String key);
}
定义接口 Node 表示服务器节点
package consistenthash;
import java.util.Map;
public interface Node {
/**
* 服务器节点名称
*
* @return
*/
String getName();
/**
* 用于把当前节点注册到一致性hash环上
*
* @param ring
*/
void registerTo(ConsistentHashRing ring);
/**
* 保存KV数据
*
* @param key
* @param value
*/
void set(String key, String value);
/**
* 获取键对应的数据
*
* @param key
* @return
*/
String get(String key);
/**
* 获取所有的KV数据
*
* @return
*/
Map<String, String> getAll();
}
定义接口 Cluster 表示集群
package consistenthash;
import java.util.Collection;
public interface Cluster {
/**
* 添加服务器节点到集群中
*
* @param server
*/
void addServer(Node server);
/**
* 获取集群中的所有服务器节点
*
* @return
*/
Collection<Node> getServers();
}
定义接口 Cacheable 用来给客户端保存数据到集群中
package consistenthash;
import java.util.Map;
public interface Cacheable {
/**
* 客户端用来保存KV数据到集群中
*
* @param key
* @param value
*/
void set(String key, String value);
/**
* 客户端获取键对应的值
*
* @param key
* @return
*/
String get(String key);
/**
* 客户端获取所有的KV数据
*
* @return
*/
Map<String, String> getAll();
}
ConsistentHashRing 的实现类如下
package consistenthash.impl;
import consistenthash.ConsistentHashRing;
import consistenthash.ConsistentHashStrategy;
import consistenthash.Node;
import java.util.NavigableMap;
import java.util.TreeMap;
public class ConsistentHashRingImpl implements ConsistentHashRing {
private ConsistentHashStrategy strategy;
private NavigableMap<Long, Node> hash2NodeMapping;
public ConsistentHashRingImpl(ConsistentHashStrategy strategy) {
this.strategy = strategy;
hash2NodeMapping = new TreeMap<>();
}
@Override
public boolean register(String registeredName, Node node) {
long h = strategy.hash(registeredName);
if (hash2NodeMapping.containsKey(h)) {
return false;
}
hash2NodeMapping.put(h, node);
return true;
}
@Override
public Node selectNode(String key) {
long h = strategy.hash(key);
Node equalOrNext = getEqualOrNext(h);
if (equalOrNext != null) {
return equalOrNext;
}
equalOrNext = getEqualOrNext(0);
if (equalOrNext != null) {
return equalOrNext;
}
return null;
}
private Node getEqualOrNext(long h) {
if (hash2NodeMapping.containsKey(h)) {
return hash2NodeMapping.get(h);
}
Long higherHash = hash2NodeMapping.higherKey(h);
if (higherHash != null) {
return hash2NodeMapping.get(higherHash);
}
return null;
}
}
ConsistentHashStrategy 的实现类如下,该 hash 算法实现并不是很好,不够随机,会造成数据分布不均衡,后面会有测试结果
package consistenthash.impl;
import consistenthash.ConsistentHashStrategy;
import java.math.BigInteger;
public class DefaultConsistentHashStrategy implements ConsistentHashStrategy {
public static final BigInteger MAX = BigInteger.valueOf(2l).pow(32);
private static final long[] FACTORS = new long[]{
7l, 33333337l, 17l, 9999997l, 737l, 177773l, 5777l, 55553l
};
public long hash(String key) {
if (key == null) {
return 0l;
}
int nextFactor = 0;
BigInteger bigInteger = BigInteger.ZERO;
for (int i = 0; i < key.length(); ++i) {
bigInteger = bigInteger.add(BigInteger.valueOf(key.charAt(i) * FACTORS[nextFactor]));
nextFactor = (nextFactor + 1) % FACTORS.length;
}
return bigInteger.mod(MAX).longValue();
}
}
Node 的实现类如下
package consistenthash.impl;
import consistenthash.ConsistentHashRing;
import consistenthash.Node;
import java.util.*;
public class NodeImpl implements Node {
private String name;
private int numOfVirtuals = 1;
private List<String> virtualNames;
private Map<String, String> cache;
public NodeImpl(String name, int numOfVirtuals) {
this.name = name;
if (numOfVirtuals > 1) {
this.numOfVirtuals = numOfVirtuals;
}
virtualNames = new ArrayList<>(this.numOfVirtuals);
cache = new HashMap<>();
}
@Override
public String getName() {
return name;
}
@Override
public void registerTo(ConsistentHashRing ring) {
registerVirtuals(ring);
}
private void registerVirtuals(ConsistentHashRing ring) {
while (virtualNames.size() < numOfVirtuals) {
String virtualName = name + "-" + Utils.random();
if (ring.register(virtualName, this)) {
virtualNames.add(virtualName);
}
}
}
@Override
public void set(String key, String value) {
cache.put(key, value);
}
@Override
public String get(String key) {
return cache.get(key);
}
@Override
public Map<String, String> getAll() {
return cache;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeImpl node = (NodeImpl) o;
return Objects.equals(name, node.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
}
Cluster 和 Cacheable 的实现类是 ClusterImpl,如下
package consistenthash.impl;
import consistenthash.Cacheable;
import consistenthash.Cluster;
import consistenthash.ConsistentHashRing;
import consistenthash.Node;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
public class ClusterImpl implements Cluster, Cacheable {
private Collection<Node> servers;
private ConsistentHashRing ring;
public ClusterImpl(ConsistentHashRing ring) {
servers = new HashSet<>();
this.ring = ring;
}
@Override
public void set(String key, String value) {
Node node = ring.selectNode(key);
if (node == null) {
throw new NoSuchNodeException("Cannot find a node for key : " + key);
}
node.set(key, value);
}
@Override
public String get(String key) {
Node node = ring.selectNode(key);
return node.get(key);
}
@Override
public Map<String, String> getAll() {
int totalSize = 0;
for (Node server : servers) {
totalSize += server.getAll().size();
}
Map<String, String> all = new HashMap<>(totalSize);
for (Node server : servers) {
all.putAll(server.getAll());
}
return all;
}
@Override
public void addServer(Node server) {
if (server != null && servers.add(server)) {
server.registerTo(ring);
}
}
@Override
public Collection<Node> getServers() {
return servers;
}
}
其他辅助类如下
package consistenthash.impl;
public class NoSuchNodeException extends RuntimeException {
public NoSuchNodeException(String message) {
super(message);
}
}
Utils 用来生成随机数
package consistenthash.impl;
public class Utils {
private static final char[] ALL_CHARS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
public static String random(int digits) {
StringBuilder random = new StringBuilder();
for (int j = 0; j < digits; ++j) {
random.append(ALL_CHARS[(int) (Math.random() * ALL_CHARS.length)]);
}
return random.toString();
}
public static String random() {
return random(10);
}
public static String randomDigits() {
return random((int) (Math.random() * 50) + 1);
}
}
编写测试用例验证功能,是否可以正常保存数据
package consistenthash;
import consistenthash.impl.*;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TestCacheable {
private Cacheable cacheProvider;
private Cluster cluster;
@Before
public void init() {
ConsistentHashStrategy strategy = new DefaultConsistentHashStrategy();
ConsistentHashRing ring = new ConsistentHashRingImpl(strategy);
cluster = new ClusterImpl(ring);
cacheProvider = (Cacheable) cluster;
}
@Test
public void test1Server2Keys_getCorrectly() {
Node server = new NodeImpl("A", 1);
cluster.addServer(server);
String key = Utils.randomDigits();
String value = Utils.randomDigits();
cacheProvider.set(key, value);
String key2 = Utils.randomDigits();
String value2 = Utils.randomDigits();
cacheProvider.set(key2, value2);
String valueFromCache = cacheProvider.get(key);
String value2FromCache = cacheProvider.get(key2);
assert value.equals(valueFromCache);
assert value2.equals(value2FromCache);
}
@Test
public void test10Servers100wKeys_getCorrectly() {
for (int i = 0; i < 10; ++i) {
Node server = new NodeImpl(String.valueOf(i + 1), 100);
cluster.addServer(server);
}
Map<String, String> keyValues = new HashMap<>(1000000);
while (keyValues.size() < 1000000) {
String key = Utils.randomDigits();
if (keyValues.containsKey(key)) {
continue;
}
String value = Utils.randomDigits();
keyValues.put(key, value);
cacheProvider.set(key, value);
}
Map<String, String> cachedKeyValues = cacheProvider.getAll();
assert keyValues.size() == cachedKeyValues.size();
for (Map.Entry ent : keyValues.entrySet()) {
assert ent.getValue().equals(cacheProvider.get((String) ent.getKey()));
}
}
}
编写测试用例查看数据是否分布均衡,测试 10 台服务器,150 个虚拟节点,100 万 KV 数据,测试 10 次
package consistenthash;
import consistenthash.impl.*;
import org.junit.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TestDefaultConsistentHashStrategy {
@Test
public void testCheckDataIsSavedAvaragely() {
for (int i = 0; i < 10; ++i) {
save100wKeyValues();
}
}
private Map<String, String> generateKeyValues() {
Map<String, String> keyValues = new HashMap<>(1000000);
while (keyValues.size() < 1000000) {
String key = Utils.randomDigits();
if (keyValues.containsKey(key)) {
continue;
}
String value = Utils.randomDigits();
keyValues.put(key, value);
}
return keyValues;
}
private void save100wKeyValues() {
Map<String, String> keyValues = generateKeyValues();
ConsistentHashStrategy strategy = new DefaultConsistentHashStrategy();
ConsistentHashRing ring = new ConsistentHashRingImpl(strategy);
Cluster cluster = new ClusterImpl(ring);
Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) {
Node server = new NodeImpl(String.valueOf(i + 1), 150);
cluster.addServer(server);
}
for (Map.Entry ent : keyValues.entrySet()) {
cacheProvider.set((String) ent.getKey(), (String) ent.getValue());
}
Collection<Node> servers = cluster.getServers();
List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList());
System.out.println(sizes);
}
}
输出结果如下,单台节点超过 22 万数据的情况经常发生,实现的一致性 hash 算法不够随机,数据分布严重不均衡
[287330, 88104, 72255, 83516, 99165, 79517, 69959, 66605, 68770, 84779]
[70911, 72771, 70810, 91113, 238796, 80842, 74977, 87004, 102186, 110590]
[93456, 213739, 87046, 79116, 104590, 84308, 84405, 86987, 79754, 86599]
[110293, 70246, 174517, 84289, 95834, 74562, 91138, 77457, 107824, 113840]
[108898, 64680, 107857, 218407, 81578, 96128, 84769, 64026, 82255, 91402]
[68869, 70524, 85821, 117791, 75754, 118709, 78305, 176414, 88124, 119689]
[69396, 111033, 76321, 118371, 75236, 87844, 81748, 229839, 67904, 82308]
[77754, 60745, 260744, 83517, 58209, 77669, 137146, 82400, 87142, 74674]
[108600, 88933, 72277, 59950, 93209, 80266, 212365, 78181, 97448, 108771]
[80300, 76555, 88550, 76072, 118870, 79298, 228305, 93768, 69529, 88753]
接下来会介绍改进的算法
之前编写的一致性 hash 算法不够随机,导致数据分布严重不均衡,单台节点会存储超过 20 万的数据,因此需要对 hash 算法提出改进。自己实现的 hash 算法是根据固定的素数做运算得出 hash 值,不够随机,想着是否可以借鉴已有的 hash 算法来实现。MD5 就是一种 hash 算法,它是根据请求的内容随机生成 128 位的 hash 值,范围在 0 到 2^128 之间,hash 值取值范围大,也比较随机。重新实现的一致性 hash 算法如下:
package consistenthash.impl;
import consistenthash.ConsistentHashStrategy;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5ConsistentHashStrategy implements ConsistentHashStrategy {
private static final BigInteger MAX = BigInteger.valueOf(2l).pow(32);
public long hash(String key) {
if (key == null) {
return 0l;
}
MessageDigest messageDigest;
try {
messageDigest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
messageDigest.update(key.getBytes(StandardCharsets.UTF_8));
BigInteger bigInteger = new BigInteger(messageDigest.digest());
return bigInteger.mod(MAX).longValue();
}
}
编写的测试用例如下
package consistenthash;
import consistenthash.impl.*;
import org.junit.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TestMd5ConsistentHashStrategy {
@Test
public void testCheckDataIsSavedAvaragely() {
for (int i = 0; i < 10; ++i) {
save100wKeyValues();
}
}
private Map<String, String> generateKeyValues() {
Map<String, String> keyValues = new HashMap<>(1000000);
while (keyValues.size() < 1000000) {
String key = Utils.randomDigits();
if (keyValues.containsKey(key)) {
continue;
}
String value = Utils.randomDigits();
keyValues.put(key, value);
}
return keyValues;
}
private void save100wKeyValues() {
Map<String, String> keyValues = generateKeyValues();
ConsistentHashStrategy strategy = new Md5ConsistentHashStrategy();
ConsistentHashRing ring = new ConsistentHashRingImpl(strategy);
Cluster cluster = new ClusterImpl(ring);
Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) {
Node server = new NodeImpl(String.valueOf(i + 1), 150);
cluster.addServer(server);
}
for (Map.Entry ent : keyValues.entrySet()) {
cacheProvider.set((String) ent.getKey(), (String) ent.getValue());
}
Collection<Node> servers = cluster.getServers();
List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList());
System.out.println(sizes);
}
}
输出结果如下,比 DefaultConsistentHashStrategy 的算法实现要好很多,数据分布明显均衡
[85036, 90579, 104895, 117825, 92461, 103194, 99862, 102620, 102007, 101521]
[96948, 88482, 96963, 109167, 109254, 86621, 103940, 109846, 94351, 104428]
[106554, 96678, 90942, 105560, 105197, 102851, 97988, 96473, 95182, 102575]
[91784, 111891, 105755, 88399, 92705, 113522, 97292, 91781, 105055, 101816]
[105047, 112721, 98489, 92381, 100562, 108465, 96748, 96063, 88134, 101390]
[103390, 99131, 98818, 93555, 89801, 102828, 105571, 90024, 105894, 110988]
[95961, 98861, 107550, 106023, 110996, 105163, 97110, 95399, 84950, 97987]
[113775, 91105, 104870, 99463, 94246, 90767, 107859, 100132, 100724, 97059]
[94853, 111725, 102316, 88900, 93076, 95625, 95076, 114884, 95779, 107766]
[102448, 94131, 87003, 110910, 101418, 97821, 99978, 110305, 93782, 102204]
借助于数学统计中的标准差,我们可以比较一下 2 种算法的数据均衡分布的好坏。标准差可以用来衡量是数据收敛的大小,标准差越小,说明数据越收敛,数据分布越均衡。
辅助类 MathUtils
package consistenthash.impl;
import java.util.List;
public class MathUtils {
public static double[] averageAndstandardDeviation(List<Integer> sizes) {
double average;
double sum = 0d;
for (Integer s : sizes) {
sum += s;
}
average = sum / sizes.size();
double standardDeviation;
double sum2 = 0d;
for (Integer s : sizes) {
sum2 += Math.pow(s - average, 2);
}
standardDeviation = Math.sqrt(sum2 / sizes.size());
return new double[]{average, standardDeviation};
}
}
测试用例如下
package consistenthash;
import consistenthash.impl.*;
import org.junit.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TestStandardDeviation {
private Map<String, String> generateKeyValues() {
Map<String, String> keyValues = new HashMap<>(1000000);
while (keyValues.size() < 1000000) {
String key = Utils.randomDigits();
if (keyValues.containsKey(key)) {
continue;
}
String value = Utils.randomDigits();
keyValues.put(key, value);
}
return keyValues;
}
@Test
public void test10TimesOfDefaultAndMd5HashAlgorithm_Md5IsBetter() {
ConsistentHashStrategy defaultStrategy = new DefaultConsistentHashStrategy();
ConsistentHashStrategy md5Strategy = new Md5ConsistentHashStrategy();
for (int i = 0; i < 10; ++i) {
Map<String, String> keyValues = generateKeyValues();
double sdForDefaultStrategy = getStandardDeviationFor10Servers100wKeys(keyValues, defaultStrategy);
double sdForMd5Strategy = getStandardDeviationFor10Servers100wKeys(keyValues, md5Strategy);
assert sdForMd5Strategy < sdForDefaultStrategy;
}
}
private double getStandardDeviationFor10Servers100wKeys(Map<String, String> keyValues, ConsistentHashStrategy strategy) {
ConsistentHashRing ring = new ConsistentHashRingImpl(strategy);
Cluster cluster = new ClusterImpl(ring);
Cacheable cacheProvider = (Cacheable) cluster;
for (int i = 0; i < 10; ++i) {
Node server = new NodeImpl(String.valueOf(i + 1), 150);
cluster.addServer(server);
}
for (Map.Entry ent : keyValues.entrySet()) {
cacheProvider.set((String) ent.getKey(), (String) ent.getValue());
}
Collection<Node> servers = cluster.getServers();
List<Integer> sizes = servers.stream().map(s -> s.getAll().size()).collect(Collectors.toList());
double[] results = MathUtils.averageAndstandardDeviation(sizes);
return results[1];
}
}
100 万的 KV 数据,10 台服务器节点,150 个虚拟节点,测试 10 次的结果是每次 Md5 的 hash 算法的标准差都要小于 default 的 hash 算法的标准差。
Anyou Liu
还未添加个人签名 2019.05.24 加入
还未添加个人简介
评论