架构师训练营第 5 周作业——一致性 Hash 算法

用户头像
在野
关注
发布于: 2020 年 07 月 08 日

作业

  • 用你熟悉的编程语言实现一致性 hash 算法。

  • 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

一致性hash环

一致性Hash算法是将整个Hash空间组织成一个虚拟的圆环,圆环的值空间为0 ~ 2^32 - 1(一个32位无符号整型),整个圆环以顺时针方向组织,圆环正上方的点代表0,0点右侧的第一个点代表1,以此类推,直到 2^32 - 1 构成一个闭环。



将服务器hash到环上

我们将各个服务器进行一次哈希,可以选择服务器的IP或主机名作为关键字进行哈希,这样每台服务器就确定在了哈希环的一个位置上,比如我们有4台机器,使用IP地址哈希后在环空间的位置如下图所示。

将数据存储到服务器上

下面我们使用以下算法来存储数据到相应的服务器:

  • 首先将要存储的数据Key通过Hash函数(与计算服务器的hash值是一个算法)计算出哈希值

  • 然后从数据Key的哈希值开始沿环顺时针查找,遇到的第一个服务器就是数据要存储的服务器

虚拟节点

在一致性Hash算法服务节点太少的情况下,容易因为节点分布不均匀面造成数据倾斜(被缓存的对象大部分缓存在某一台服务器上)问题。为了解决数据倾斜问题,一致性Hash算法引入了虚拟节点机制,即对每一个服务器节点计算多个哈希,每个计算结果位置都放置一个服务节点,称为虚拟节点。

加入虚拟节点之后,即使在服务节点很少的情况下,也能做到数据的均匀分布。

每台机器映射的虚拟节点越多,则分布得越均匀。

一致性hash算法实现

hash算法

采用Cassandra实现的MurmurHash算法。

public class MurmurHash {
public static long hash3_x64_128(String data) {
ByteBuffer key = ByteBuffer.wrap(data.getBytes());
int offset = key.position();
int length = key.remaining();
final int nblocks = length >> 4; // Process as 128-bit blocks.
long h1 = 0;
long h2 = 0;
long c1 = 0x87c37b91114253d5L;
long c2 = 0x4cf5ad432745937fL;
// ----------
// body
for (int i = 0; i < nblocks; i++) {
long k1 = getblock(key, offset, i * 2 + 0);
long k2 = getblock(key, offset, i * 2 + 1);
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
h1 ^= k1;
h1 = rotl64(h1, 27);
h1 += h2;
h1 = h1 * 5 + 0x52dce729;
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
h2 = rotl64(h2, 31);
h2 += h1;
h2 = h2 * 5 + 0x38495ab5;
}
// ----------
// tail
// Advance offset to the unprocessed tail of the data.
offset += nblocks * 16;
long k1 = 0;
long k2 = 0;
switch (length & 15) {
case 15:
k2 ^= ((long) key.get(offset + 14)) << 48;
case 14:
k2 ^= ((long) key.get(offset + 13)) << 40;
case 13:
k2 ^= ((long) key.get(offset + 12)) << 32;
case 12:
k2 ^= ((long) key.get(offset + 11)) << 24;
case 11:
k2 ^= ((long) key.get(offset + 10)) << 16;
case 10:
k2 ^= ((long) key.get(offset + 9)) << 8;
case 9:
k2 ^= ((long) key.get(offset + 8)) << 0;
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
case 8:
k1 ^= ((long) key.get(offset + 7)) << 56;
case 7:
k1 ^= ((long) key.get(offset + 6)) << 48;
case 6:
k1 ^= ((long) key.get(offset + 5)) << 40;
case 5:
k1 ^= ((long) key.get(offset + 4)) << 32;
case 4:
k1 ^= ((long) key.get(offset + 3)) << 24;
case 3:
k1 ^= ((long) key.get(offset + 2)) << 16;
case 2:
k1 ^= ((long) key.get(offset + 1)) << 8;
case 1:
k1 ^= ((long) key.get(offset));
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
h1 ^= k1;
}
;
h1 ^= length;
h2 ^= length;
h1 += h2;
h2 += h1;
h1 = fmix(h1);
h2 = fmix(h2);
h1 += h2;
h2 += h1;
return Math.abs(h1);
}
protected static long fmix(long k) {
k ^= k >>> 33;
k *= 0xff51afd7ed558ccdL;
k ^= k >>> 33;
k *= 0xc4ceb9fe1a85ec53L;
k ^= k >>> 33;
return k;
}
protected static long getblock(ByteBuffer key, int offset, int index) {
int i_8 = index << 3;
int blockOffset = offset + i_8;
return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8)
+ (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24)
+ (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40)
+ (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56);
}
protected static long rotl64(long v, int n) {
return ((v << n) | (v >>> (64 - n)));
}
}

服务器节点

public class Node {
private String hostname;
private Map<Object, Object> store;
public Node(String hostname) {
this.hostname = hostname;
this.store = new ConcurrentHashMap<Object, Object>();
}
public boolean save(String key, Object value) {
store.put(key, value);
return true;
}
public Object get(String key) {
return store.get(key);
}
public int getSize() {
return store.size();
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
}

一致性hash环

public class TokenRing {
private final SortedMap<Long, Node> ring = new TreeMap<Long, Node>();
private List<Node> nodeList;
// 虚拟节点个数
private int virtualNodes;
/**
* @param nodes 节点(机器)个数
* @param virtualNodes 虚拟节点个数
*/
public TokenRing(int nodes, int virtualNodes) {
this.virtualNodes = virtualNodes;
nodeList = new Vector<Node>();
for (int i = 0; i < nodes; i++) {
Node node = new Node("host:::" + i);
addNode(node);
}
}
/**
* 添加节点
* @param node
* @return
*/
public boolean addNode(Node node) {
nodeList.add(node);
for (int j = 0; j < virtualNodes; j++) {
ring.put(MurmurHash.hash3_x64_128(node.getHostname() + j), node);
}
return true;
}
/**
* 删除节点
* @param node
* @return
*/
public boolean removeNode(Node node) {
if (nodeList.remove(node)) {
ring.forEach((token, nd) -> {
if (node == nd) {
ring.remove(token);
}
});
return true;
}
return false;
}
/**
* 取目标节点
* @param key
* @return
*/
public Node getNode(String key) {
long token = MurmurHash.hash3_x64_128(key);
if (!ring.containsKey(token)) {
SortedMap<Long, Node> tailMap = ring.tailMap(token);
token = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
}
return ring.get(token);
}
public Set<Node> getNodes() {
return new HashSet<>(ring.values());
}
}

测试结果

public class Client {
public static void main(String[] args) {
int n = 10;
TokenRing tokenRing = new TokenRing(n, 100);
for (int i = 0; i < 1000000; i++) {
String data = UUID.randomUUID().toString();
Node node = tokenRing.getNode(data);
if (node != null) {
node.save(data, data);
} else {
throw new RuntimeException("没有找到[" + data + "]存放的节点");
}
}
Collection<Node> nodes = tokenRing.getNodes();
int[] array = new int[n];
int i = 0;
for (Node node : nodes) {
System.out.println(node.getHostname() + "--->>>" + node.getSize());
array[i++] = node.getSize();
}
System.out.println("标准差为:" + StandardDeviation.getSD(array));
}
}

对100万数据,存储在10个节点上,虚拟节点分别是100个、300个、500个、1000个、2000个和5000个的数据分布如下表格所示。

从测试结果可知,随着虚拟节点的增加,数据分布得越均匀。



发布于: 2020 年 07 月 08 日 阅读数: 98
用户头像

在野

关注

还未添加个人签名 2012.03.11 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营第 5 周作业——一致性Hash算法