写点什么

架构师训练营第 5 周作业——一致性 Hash 算法

用户头像
在野
关注
发布于: 2020 年 07 月 08 日

作业

  • 用你熟悉的编程语言实现一致性 hash 算法。

  • 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

一致性 hash 环

一致性 Hash 算法是将整个 Hash 空间组织成一个虚拟的圆环,圆环的值空间为0 ~ 2^32 - 1(一个32位无符号整型),整个圆环以顺时针方向组织,圆环正上方的点代表 0,0 点右侧的第一个点代表 1,以此类推,直到 2^32 - 1 构成一个闭环。


将服务器 hash 到环上

我们将各个服务器进行一次哈希,可以选择服务器的 IP 或主机名作为关键字进行哈希,这样每台服务器就确定在了哈希环的一个位置上,比如我们有 4 台机器,使用 IP 地址哈希后在环空间的位置如下图所示。

将数据存储到服务器上

下面我们使用以下算法来存储数据到相应的服务器:

  • 首先将要存储的数据 Key 通过 Hash 函数(与计算服务器的 hash 值是一个算法)计算出哈希值

  • 然后从数据 Key 的哈希值开始沿环顺时针查找,遇到的第一个服务器就是数据要存储的服务器

虚拟节点

在一致性 Hash 算法服务节点太少的情况下,容易因为节点分布不均匀面造成数据倾斜(被缓存的对象大部分缓存在某一台服务器上)问题。为了解决数据倾斜问题,一致性 Hash 算法引入了虚拟节点机制,即对每一个服务器节点计算多个哈希,每个计算结果位置都放置一个服务节点,称为虚拟节点。

加入虚拟节点之后,即使在服务节点很少的情况下,也能做到数据的均匀分布。

每台机器映射的虚拟节点越多,则分布得越均匀。

一致性 hash 算法实现

hash 算法

采用 Cassandra 实现的 MurmurHash 算法。

public class MurmurHash {
public static long hash3_x64_128(String data) { ByteBuffer key = ByteBuffer.wrap(data.getBytes()); int offset = key.position(); int length = key.remaining(); final int nblocks = length >> 4; // Process as 128-bit blocks.
long h1 = 0; long h2 = 0;
long c1 = 0x87c37b91114253d5L; long c2 = 0x4cf5ad432745937fL;
// ---------- // body
for (int i = 0; i < nblocks; i++) { long k1 = getblock(key, offset, i * 2 + 0); long k2 = getblock(key, offset, i * 2 + 1);
k1 *= c1; k1 = rotl64(k1, 31); k1 *= c2; h1 ^= k1;
h1 = rotl64(h1, 27); h1 += h2; h1 = h1 * 5 + 0x52dce729;
k2 *= c2; k2 = rotl64(k2, 33); k2 *= c1; h2 ^= k2;
h2 = rotl64(h2, 31); h2 += h1; h2 = h2 * 5 + 0x38495ab5; }
// ---------- // tail
// Advance offset to the unprocessed tail of the data. offset += nblocks * 16;
long k1 = 0; long k2 = 0;
switch (length & 15) { case 15: k2 ^= ((long) key.get(offset + 14)) << 48; case 14: k2 ^= ((long) key.get(offset + 13)) << 40; case 13: k2 ^= ((long) key.get(offset + 12)) << 32; case 12: k2 ^= ((long) key.get(offset + 11)) << 24; case 11: k2 ^= ((long) key.get(offset + 10)) << 16; case 10: k2 ^= ((long) key.get(offset + 9)) << 8; case 9: k2 ^= ((long) key.get(offset + 8)) << 0; k2 *= c2; k2 = rotl64(k2, 33); k2 *= c1; h2 ^= k2;
case 8: k1 ^= ((long) key.get(offset + 7)) << 56; case 7: k1 ^= ((long) key.get(offset + 6)) << 48; case 6: k1 ^= ((long) key.get(offset + 5)) << 40; case 5: k1 ^= ((long) key.get(offset + 4)) << 32; case 4: k1 ^= ((long) key.get(offset + 3)) << 24; case 3: k1 ^= ((long) key.get(offset + 2)) << 16; case 2: k1 ^= ((long) key.get(offset + 1)) << 8; case 1: k1 ^= ((long) key.get(offset)); k1 *= c1; k1 = rotl64(k1, 31); k1 *= c2; h1 ^= k1; } ;
h1 ^= length; h2 ^= length;
h1 += h2; h2 += h1;
h1 = fmix(h1); h2 = fmix(h2);
h1 += h2; h2 += h1;
return Math.abs(h1); }
protected static long fmix(long k) { k ^= k >>> 33; k *= 0xff51afd7ed558ccdL; k ^= k >>> 33; k *= 0xc4ceb9fe1a85ec53L; k ^= k >>> 33;
return k; }
protected static long getblock(ByteBuffer key, int offset, int index) { int i_8 = index << 3; int blockOffset = offset + i_8; return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8) + (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24) + (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40) + (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56); }
protected static long rotl64(long v, int n) { return ((v << n) | (v >>> (64 - n))); } }
复制代码

服务器节点

public class Node {
private String hostname; private Map<Object, Object> store;
public Node(String hostname) { this.hostname = hostname; this.store = new ConcurrentHashMap<Object, Object>(); }
public boolean save(String key, Object value) { store.put(key, value); return true; }
public Object get(String key) { return store.get(key); } public int getSize() { return store.size(); }
public String getHostname() { return hostname; }
public void setHostname(String hostname) { this.hostname = hostname; }}
复制代码

一致性 hash 环

public class TokenRing {		private final SortedMap<Long, Node> ring = new TreeMap<Long, Node>();	private List<Node> nodeList;	// 虚拟节点个数	private int virtualNodes;
/** * @param nodes 节点(机器)个数 * @param virtualNodes 虚拟节点个数 */ public TokenRing(int nodes, int virtualNodes) { this.virtualNodes = virtualNodes; nodeList = new Vector<Node>(); for (int i = 0; i < nodes; i++) { Node node = new Node("host:::" + i); addNode(node); } } /** * 添加节点 * @param node * @return */ public boolean addNode(Node node) { nodeList.add(node); for (int j = 0; j < virtualNodes; j++) { ring.put(MurmurHash.hash3_x64_128(node.getHostname() + j), node); } return true; } /** * 删除节点 * @param node * @return */ public boolean removeNode(Node node) { if (nodeList.remove(node)) { ring.forEach((token, nd) -> { if (node == nd) { ring.remove(token); } }); return true; } return false; } /** * 取目标节点 * @param key * @return */ public Node getNode(String key) { long token = MurmurHash.hash3_x64_128(key); if (!ring.containsKey(token)) { SortedMap<Long, Node> tailMap = ring.tailMap(token); token = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey(); } return ring.get(token); } public Set<Node> getNodes() { return new HashSet<>(ring.values()); }}
复制代码

测试结果

public class Client {
public static void main(String[] args) { int n = 10; TokenRing tokenRing = new TokenRing(n, 100); for (int i = 0; i < 1000000; i++) { String data = UUID.randomUUID().toString(); Node node = tokenRing.getNode(data); if (node != null) { node.save(data, data); } else { throw new RuntimeException("没有找到[" + data + "]存放的节点"); } } Collection<Node> nodes = tokenRing.getNodes(); int[] array = new int[n]; int i = 0; for (Node node : nodes) { System.out.println(node.getHostname() + "--->>>" + node.getSize()); array[i++] = node.getSize(); } System.out.println("标准差为:" + StandardDeviation.getSD(array)); }
}
复制代码

对 100 万数据,存储在 10 个节点上,虚拟节点分别是 100 个、300 个、500 个、1000 个、2000 个和 5000 个的数据分布如下表格所示。

从测试结果可知,随着虚拟节点的增加,数据分布得越均匀。


发布于: 2020 年 07 月 08 日阅读数: 141
用户头像

在野

关注

还未添加个人签名 2012.03.11 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营第 5 周作业——一致性Hash算法