架构课第 5 课作业
发布于: 2020 年 07 月 08 日
作业
用你熟悉的编程语言实现一致性 hash 算法。
编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
一致性hash算法实现
hash算法
采用Cassandra实现的MurmurHash算法。
public class MurmurHash { public static long hash3_x64_128(String data) { ByteBuffer key = ByteBuffer.wrap(data.getBytes()); int offset = key.position(); int length = key.remaining(); final int nblocks = length >> 4; // Process as 128-bit blocks. long h1 = 0; long h2 = 0; long c1 = 0x87c37b91114253d5L; long c2 = 0x4cf5ad432745937fL; // ---------- // body for (int i = 0; i < nblocks; i++) { long k1 = getblock(key, offset, i * 2 + 0); long k2 = getblock(key, offset, i * 2 + 1); k1 *= c1; k1 = rotl64(k1, 31); k1 *= c2; h1 ^= k1; h1 = rotl64(h1, 27); h1 += h2; h1 = h1 * 5 + 0x52dce729; k2 *= c2; k2 = rotl64(k2, 33); k2 *= c1; h2 ^= k2; h2 = rotl64(h2, 31); h2 += h1; h2 = h2 * 5 + 0x38495ab5; } // ---------- // tail // Advance offset to the unprocessed tail of the data. offset += nblocks * 16; long k1 = 0; long k2 = 0; switch (length & 15) { case 15: k2 ^= ((long) key.get(offset + 14)) << 48; case 14: k2 ^= ((long) key.get(offset + 13)) << 40; case 13: k2 ^= ((long) key.get(offset + 12)) << 32; case 12: k2 ^= ((long) key.get(offset + 11)) << 24; case 11: k2 ^= ((long) key.get(offset + 10)) << 16; case 10: k2 ^= ((long) key.get(offset + 9)) << 8; case 9: k2 ^= ((long) key.get(offset + 8)) << 0; k2 *= c2; k2 = rotl64(k2, 33); k2 *= c1; h2 ^= k2; case 8: k1 ^= ((long) key.get(offset + 7)) << 56; case 7: k1 ^= ((long) key.get(offset + 6)) << 48; case 6: k1 ^= ((long) key.get(offset + 5)) << 40; case 5: k1 ^= ((long) key.get(offset + 4)) << 32; case 4: k1 ^= ((long) key.get(offset + 3)) << 24; case 3: k1 ^= ((long) key.get(offset + 2)) << 16; case 2: k1 ^= ((long) key.get(offset + 1)) << 8; case 1: k1 ^= ((long) key.get(offset)); k1 *= c1; k1 = rotl64(k1, 31); k1 *= c2; h1 ^= k1; } ; h1 ^= length; h2 ^= length; h1 += h2; h2 += h1; h1 = fmix(h1); h2 = fmix(h2); h1 += h2; h2 += h1; return Math.abs(h1); } protected static long fmix(long k) { k ^= k >>> 33; k *= 0xff51afd7ed558ccdL; k ^= k >>> 33; k *= 0xc4ceb9fe1a85ec53L; k ^= k >>> 33; return k; } protected static long getblock(ByteBuffer key, int offset, int index) { int i_8 = index << 3; int blockOffset = offset + i_8; return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8) + (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24) + (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40) + (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56); } protected static long rotl64(long v, int n) { return ((v << n) | (v >>> (64 - n))); } }
服务器节点
public class Node { private String hostname; private Map<Object, Object> store; public Node(String hostname) { this.hostname = hostname; this.store = new ConcurrentHashMap<Object, Object>(); } public boolean save(String key, Object value) { store.put(key, value); return true; } public Object get(String key) { return store.get(key); } public int getSize() { return store.size(); } public String getHostname() { return hostname; } public void setHostname(String hostname) { this.hostname = hostname; }}
一致性hash环
public class TokenRing { private final SortedMap<Long, Node> ring = new TreeMap<Long, Node>(); private List<Node> nodeList; // 虚拟节点个数 private int virtualNodes; /** * @param nodes 节点(机器)个数 * @param virtualNodes 虚拟节点个数 */ public TokenRing(int nodes, int virtualNodes) { this.virtualNodes = virtualNodes; nodeList = new Vector<Node>(); for (int i = 0; i < nodes; i++) { Node node = new Node("host:::" + i); addNode(node); } } /** * 添加节点 * @param node * @return */ public boolean addNode(Node node) { nodeList.add(node); for (int j = 0; j < virtualNodes; j++) { ring.put(MurmurHash.hash3_x64_128(node.getHostname() + j), node); } return true; } /** * 删除节点 * @param node * @return */ public boolean removeNode(Node node) { if (nodeList.remove(node)) { ring.forEach((token, nd) -> { if (node == nd) { ring.remove(token); } }); return true; } return false; } /** * 取目标节点 * @param key * @return */ public Node getNode(String key) { long token = MurmurHash.hash3_x64_128(key); if (!ring.containsKey(token)) { SortedMap<Long, Node> tailMap = ring.tailMap(token); token = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey(); } return ring.get(token); } public Set<Node> getNodes() { return new HashSet<>(ring.values()); }}
测试结果
public class Client { public static void main(String[] args) { int n = 10; TokenRing tokenRing = new TokenRing(n, 100); for (int i = 0; i < 1000000; i++) { String data = UUID.randomUUID().toString(); Node node = tokenRing.getNode(data); if (node != null) { node.save(data, data); } else { throw new RuntimeException("没有找到[" + data + "]存放的节点"); } } Collection<Node> nodes = tokenRing.getNodes(); int[] array = new int[n]; int i = 0; for (Node node : nodes) { System.out.println(node.getHostname() + "--->>>" + node.getSize()); array[i++] = node.getSize(); } System.out.println("标准差为:" + StandardDeviation.getSD(array)); }}
对100万数据,存储在10个节点上,虚拟节点分别是100个、300个、500个、1000个、2000个和5000个的数据分布如下表格所示。
从测试结果可知,随着虚拟节点的增加,数据分布得越均匀。
划线
评论
复制
发布于: 2020 年 07 月 08 日 阅读数: 47
张瑞浩
关注
还未添加个人签名 2018.09.18 加入
还未添加个人简介
评论