架构师训练营 第五周 作业

发布于: 2020 年 07 月 08 日

作业一:

用你熟悉的编程语言实现一致性 hash 算法。

编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

哈希算法

首先,一致性哈希算法依赖于普通的哈希算法。大多数同学对哈希算法的理解可能都停留在 JDK 的 hashCode 函数上。其实哈希算法有很多种实现,它们在不同方面都各有优劣,针对不同的场景可以使用不同的哈希算法实现。

下面,我们会介绍一下几款比较常见的哈希算法,并且了解一下它们在分布均匀程度,哈希碰撞概率和性能等方面的优劣。

MD5 算法:

全称为 Message-Digest Algorithm 5,用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一,主流编程语言普遍已有 MD5 实现。MD5 的作用是把大容量信息压缩成一种保密的格式(就是把一个任意长度的字节串变换成定长的16进制数字串)。常见的文件完整性校验就是使用 MD5。

CRC 算法:

全称为 CyclicRedundancyCheck,中文名称为循环冗余校验。它是一类重要的,编码和解码方法简单,检错和纠错能力强的哈希算法,在通信领域广泛地用于实现差错控制。

MurmurHash 算法:

高运算性能,低碰撞率,由 Austin Appleby 创建于 2008 年,现已应用到 Hadoop、libstdc++、nginx、libmemcached 等开源系统。Java 界中 Redis,Memcached,Cassandra,HBase,Lucene和Guava 都在使用它。

FNV 算法:

全称为 Fowler-Noll-Vo 算法,是以三位发明人 Glenn Fowler,Landon Curt Noll,Phong Vo 的名字来命名的,最早在 1991 年提出。 FNV 能快速 hash 大量数据并保持较小的冲突率,它的高度分散使它适用于 hash 一些非常相近的字符串,比如 URL,hostname,文件名,text 和 IP 地址等。

Ketama 算法:

一致性哈希算法的实现之一,其他的哈希算法有通用的一致性哈希算法实现,只不过是替换了哈希映射函数而已,但 Ketama 是一整套的流程,我们将在后面介绍。

一致性哈希算法

首先将缓存服务器( ip + 端口号)进行哈希,映射成环上的一个节点,计算出缓存数据 key 值的 hash key,同样映射到环上,并顺时针选取最近的一个服务器节点作为该缓存应该存储的服务器。具体实现见后续的章节。

比如说,当存在 A,B,C,D 四个缓存服务器时,它们及其 key 值为1的缓存数据在一致性哈希环上的位置如下图所示,根据顺时针取最近一个服务器节点的规则,该缓存数据应该存储在服务器 B 上。

当要存储一个 key 值为4的缓存数据时,它在一致性哈希环上的位置如下所示,所以它应该存储在服务器 C 上。

类似的,key 值为5,6的数据应该存在服务 D 上,key 值为7,8的数据应该存储在服务 A 上。

此时,服务器 B 宕机下线,服务器 B 中存储的缓存数据要进行迁移,但由于一致性哈希环的存在,只需要迁移key 值为1的数据,其他的数据的存储服务器不会发生变化。这也是一致性哈希算法比取余映射算法出色的地方。

由于服务器 B 下线,key 值为1的数据顺时针最近的服务器是 C ,所以数据存迁移到服务器 C 上。

现实情况下,服务器在一致性哈希环上的位置不可能分布的这么均匀,导致了每个节点实际占据环上的区间大小不一。

这种情况下,可以增加虚节点来解决。通过增加虚节点,使得每个节点在环上所“管辖”的区域更加均匀。这样就既保证了在节点变化时,尽可能小的影响数据分布的变化,而同时又保证了数据分布的均匀。

实现方式如下:

StatisticsUtil

public class StatisticsUtil {
/**
* //方差s^2=[(x1-x)^2 +...(xn-x)^2]/n
* @return
* @exception
* @author lvq
* @date 2020/7/8 18:58
*/
public static double variance(Long[] x) {
int m = x.length;
double sum = 0;
for (int i = 0; i < m; i++) {//求和
sum += x[i];
}
double dAve = sum / m;//求平均值
double dVar = 0;
for (int i = 0; i < m; i++) {//求方差
dVar += (x[i] - dAve) * (x[i] - dAve);
}
return dVar / m;
}
/**
* //标准差σ=sqrt(s^2)
* @return
* @exception
* @author lvq
* @date 2020/7/8 18:58
*/
public static double standardDeviation(Long[] x) {
int m = x.length;
double sum = 0;
for (int i = 0; i < m; i++) {//求和
sum += x[i];
}
double dAve = sum / m;//求平均值
double dVar = 0;
for (int i = 0; i < m; i++) {//求方差
dVar += (x[i] - dAve) * (x[i] - dAve);
}
return Math.sqrt(dVar / m);
}
}

KeyUtil

public class KeyUtil
{
private KeyUtil() {
// Empty
}
/**
* @return
* @exception
* @author lvq
* @param k
* @date 2020/7/7 20:40
*/
public static byte[] getKeyBytes(String k) {
try {
return k.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
/**
* @return
* @exception
* @author lvq
* @param keys
* @date 2020/7/7 20:41
*/
public static Collection<byte[]> getKeyBytes(Collection<String> keys) {
Collection<byte[]> rv = new ArrayList<byte[]>(keys.size());
for (String s : keys) {
rv.add(getKeyBytes(s));
}
return rv;
}
}

HashAlgorithm.java

public interface HashAlgorithm {
long hash(final String k);
}

DefaultHashAlgorithm

public enum DefaultHashAlgorithm implements HashAlgorithm {
/**
* Native hash (String.hashCode()).
*/
NATIVE_HASH,
/**
* CRC_HASH as used by the perl API. This will be more consistent both
* across multiple API users as well as java versions, but is mostly likely
* significantly slower.
*/
CRC_HASH,
/**
* FNV hashes are designed to be fast while maintaining a low collision rate.
* The FNV speed allows one to quickly hash lots of data while maintaining a
* reasonable collision rate.
*
* @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv
* comparisons</a>
* @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at
* wikipedia</a>
*/
FNV1_64_HASH,
/**
* Variation of FNV.
*/
FNV1A_64_HASH,
/**
* 32-bit FNV1.
*/
FNV1_32_HASH,
/**
* 32-bit FNV1a.
*/
FNV1A_32_HASH,
/**
* MD5-based hash algorithm used by ketama.
*/
KETAMA_HASH,
MURMUR_HASH;
private static final long FNV_64_INIT = 0xcbf29ce484222325L;
private static final long FNV_64_PRIME = 0x100000001b3L;
private static final long FNV_32_INIT = 2166136261L;
private static final long FNV_32_PRIME = 16777619;
private static MessageDigest md5Digest = null;
static {
try {
md5Digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
}
/**
* @return
* @exception
* @author lvq
* @param k
* @date 2020/7/7 20:42
*/
@Override
public long hash(final String k) {
long rv = 0;
int len = k.length();
switch (this) {
case NATIVE_HASH:
rv = k.hashCode();
break;
case CRC_HASH:
// return (crc32(shift) >> 16) & 0x7fff;
CRC32 crc32 = new CRC32();
crc32.update(KeyUtil.getKeyBytes(k));
rv = (crc32.getValue() >> 16) & 0x7fff;
break;
case FNV1_64_HASH:
rv = FNV_64_INIT;
for (int i = 0; i < len; i++) {
rv *= FNV_64_PRIME;
rv ^= k.charAt(i);
}
break;
case FNV1A_64_HASH:
rv = FNV_64_INIT;
for (int i = 0; i < len; i++) {
rv ^= k.charAt(i);
rv *= FNV_64_PRIME;
}
break;
case FNV1_32_HASH:
rv = FNV_32_INIT;
for (int i = 0; i < len; i++) {
rv *= FNV_32_PRIME;
rv ^= k.charAt(i);
}
break;
case FNV1A_32_HASH:
rv = FNV_32_INIT;
for (int i = 0; i < len; i++) {
rv ^= k.charAt(i);
rv *= FNV_32_PRIME;
}
break;
case KETAMA_HASH:
byte[] bKey = computeMd5(k);
rv = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16)
| ((long) (bKey[1] & 0xFF) << 8)
| (bKey[0] & 0xFF);
break;
case MURMUR_HASH:
rv = murmurHash(k);
break;
default:
assert false;
}
return rv & 0xffffffffL; /* Truncate to 32-bits */
}
private long murmurHash(String key) {
ByteBuffer buf = ByteBuffer.wrap(KeyUtil.getKeyBytes(key));
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
public static byte[] computeMd5(String k) {
MessageDigest md5;
try {
md5 = (MessageDigest) md5Digest.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("clone of MD5 not supported", e);
}
md5.update(KeyUtil.getKeyBytes(k));
return md5.digest();
}
}

ServerNode

public class ServerNode {
private final SocketAddress socketAddress;
/**
* @return
* @exception
* @author lvq
* @param socketAddress
* @date 2020/7/7 20:20
*/
public ServerNode(SocketAddress socketAddress)
{
this.socketAddress=socketAddress;
}
/**
* @return
* @exception
* @author lvq
* @date 2020/7/7 20:20
*/
public SocketAddress getSocketAddress()
{
return socketAddress;
}
}

NodeLocator

public interface NodeLocator {
/**
* @return
* @exception
* @author lvq
* @param s
* @date 2020/7/7 20:17
*/
ServerNode getPrimary(String s);
}

ConsistentHashNodeLocator

public class ConsistentHashNodeLocator implements NodeLocator {
private final static int VIRTUAL_NODE_SIZE = 12;
private final static String VIRTUAL_NODE_SUFFIX = "-";
private volatile TreeMap<Long,ServerNode> hashRing;
private final HashAlgorithm hashAlg;
/**
* @return
* @exception
* @author lvq
* @param nodes
* @param hashAlg
* @date 2020/7/7 20:29
*/
public ConsistentHashNodeLocator(List<ServerNode> nodes,HashAlgorithm hashAlg)
{
this.hashRing=buildConsistentHashRing(hashAlg, nodes);
this.hashAlg = hashAlg;
}
/**
* @return
* @exception
* @author lvq
* @param hashAlg
* @param nodes
* @date 2020/7/7 20:28
*/
private TreeMap<Long, ServerNode> buildConsistentHashRing(HashAlgorithm hashAlg, List<ServerNode> nodes)
{
TreeMap<Long, ServerNode> virtualNodeRing = new TreeMap<Long, ServerNode>();
for (ServerNode node : nodes) {
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
// 新增虚拟节点的方式如果有影响,也可以抽象出一个由物理节点扩展虚拟节点的类
virtualNodeRing.put(hashAlg.hash(node.getSocketAddress().toString() + VIRTUAL_NODE_SUFFIX + i), node);
}
}
return virtualNodeRing;
}
/**
* @return
* @exception
* @author lvq
* @param * @param null
* @date 2020/7/7 20:31
*/
@Override
public ServerNode getPrimary(String s)
{
long hash=hashAlg.hash(s);
return getNodeForkey(hashRing,hash);
}
/**
* @return
* @exception
* @author lvq
* @date 2020/7/7 20:31
*/
public ServerNode getNodeForkey(TreeMap<Long, ServerNode> hashRing, long hash)
{
// 向右查找firsrt key
Map.Entry<Long,ServerNode> locatenode=hashRing.ceilingEntry(hash);
//
if (locatenode == null) {
locatenode = hashRing.firstEntry();
}
return locatenode.getValue();
}
}

main

public class Main {
public static void main(String[] args)
{
testDistribution();
}
public static void testDistribution()
{
List<ServerNode> servers = new ArrayList<ServerNode>();
for (String ip : ips) {
servers.add(new ServerNode(new InetSocketAddress(ip, 8080)));
}
NodeLocator nodeLocator = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.KETAMA_HASH);
// 构造 1000000 随机请求
List<String> keys = new ArrayList<String>();
for (int i = 0; i < 1000000; i++) {
keys.add(UUID.randomUUID().toString());
}
// 统计分布
AtomicLongMap<ServerNode> atomicLongMap = AtomicLongMap.create();
for (ServerNode server : servers) {
atomicLongMap.put(server, 0);
}
for (String key : keys) {
ServerNode node = nodeLocator.getPrimary(key);
atomicLongMap.getAndIncrement(node);
}
System.out.println(StatisticsUtil.variance(atomicLongMap.asMap().values().toArray(new Long[]{})));
System.out.println(StatisticsUtil.standardDeviation(atomicLongMap.asMap().values().toArray(new Long[]{})));
}
static String[] ips = {
"10.238.172.215",
"10.238.176.96",
"10.238.65.34",
"10.238.64.205",
"10.238.65.67",
"10.238.247.206",
"10.238.173.47",
"10.238.65.117",
"10.238.69.32",
"10.238.173.46"
};
}

更改ConsistentHashNodeLocator虚拟节点大小

12个虚拟节点

方差:5.845460454E8

标准差:24177.387067257703

100个虚拟节点

方差:1.3559987E8

标准差:11644.735720487606

1000个虚拟节点

方差:2.02807308E7

标准差:4503.413238866716

理解了一致性哈希算法。下来继续看看增加节点及,删除节点后,命中率如何.

用户头像

极客

关注

还未添加个人签名 2018.03.29 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营 第五周 作业