week5-homework- 一致性哈希
发布于: 2020 年 12 月 24 日
作业一(2 选 1):
用你熟悉的编程语言实现一致性 hash 算法。
编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
作业一:算法实现
Interface
package com.app.hash;/** * @author lvlin * @date 2020-12-21 9:14 AM */public interface ConsistentHashing<K, V> { /** * add a physical node to the consistent hashing * @param node node information */ void addNode(Node node); /** * remove a physical node from the consistent hashing * @param node node id */ void removeNode(Node node); /** * print out the allocation and slots information for this consistent hashing system * @return generated information * @param detail show details or not */ String info(final boolean detail); /** * the size of k-v pairs * @return k-v pairs' size */ int size(); /** * is empty or not * @return empty or not */ boolean isEmpty(); /** * retrieve value with key * @param key key * @return value if found, else null */ V get(final K key); /** * put a k-v pair * @param key key * @param value value * @return the old value with given key */ V put(final K key, final V value); /** * remove item with given key * @param key the key * @return value for removed item, else null */ V remove(final K key);}
验证代码
package com.app.hash;import java.io.*;/** * @author lvlin * @date 2020-12-22 8:43 AM */public final class Main { public static void main(String[] args) throws IOException { final int[] factors = new int[]{1, 3, 5, 7, 10, 12, 15};// final int[] numNodes = new int[]{2, 5, 10, 15, 20}; final int[] numNodes = new int[]{10}; final int[] numRecords = new int[]{1_000_000}; PrintStream console = System.out; File file = new File("statics.txt"); try (FileOutputStream outputStream = new FileOutputStream(file)) { PrintStream ps = new PrintStream(outputStream); System.setOut(ps); for (final int factor : factors) { for (final int numNode : numNodes) { for (final int numRecord : numRecords) { testConsistentHashing(factor, numNode, numRecord); } } } } System.setOut(console); System.out.println("Done"); } private static void testConsistentHashing(final int factor, final int numNodes, final int numRecords) { // create 10 physical node ConsistentHashing<String, String> consistentHashing = new CycleConsistentHashing<>(factor); Node[] nodes = new Node[numNodes]; for (int i = 0; i < nodes.length; i++) { nodes[i] = new Node(String.valueOf(i)); } for (final Node node : nodes) { consistentHashing.addNode(node); } // create 100 000 000 K-V for (int i = 0; i < numRecords; i++) { String key = String.format("Key%05d", i); String value = String.format("Value%05d", i); consistentHashing.put(key, value); } System.out.println(consistentHashing.info(false)); }}
物理和虚拟节点的定义
package com.app.hash;import com.google.common.hash.HashFunction;import com.google.common.hash.Hashing;import java.nio.charset.Charset;import java.util.HashMap;import java.util.Map;import java.util.Objects;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;/** * @author lvlin * @date 2020-12-22 8:44 AM */public final class Node { /** * the node id */ private final String id; /** * the physical node points to */ private final Node physicalNode; private AtomicInteger count; /** * create a physical node * * @param id the node id for this physical node */ public Node(final String id) { Objects.requireNonNull(id); this.id = "Physical_" + id; physicalNode = null; count = new AtomicInteger(0); } /** * create a virtual node * * @param id the node id for this virtual node * @param physicalNode the physical node points to */ public Node(final String id, final Node physicalNode) { Objects.requireNonNull(id); Objects.requireNonNull(physicalNode); this.id = id; this.physicalNode = physicalNode; this.count = null; } public String getId() { return id; } public Node getPhysicalNode() { return isPhysicalNode() ? this : physicalNode; } @Override public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } final Node node = (Node) o; return getId().equals(node.getId()); } @Override public int hashCode() { HashFunction hashFunction = Hashing.murmur3_32(); return hashFunction.hashString(getId(), Charset.defaultCharset()).asInt();// return Objects.hash(getId()); } public String info() { return "Node id: " + getId() + "," + "Node type: " + nodeType() + "," + "Size: " + count(); } public String nodeType() { return isPhysicalNode() ? "Physical" : "Virtual"; } public boolean isPhysicalNode() { return null == physicalNode; } @Override public String toString() { return "Node{" + "id='" + id + '\'' + ", node type =" + nodeType() + ", count=" + count() + '}'; } public int count() { return isPhysicalNode() ? count.get() : getPhysicalNode().count(); } public <V, K> V get(final K key) { return null; } public <V, K> V put(final K key, final V value) { if (!isPhysicalNode()) { return getPhysicalNode().put(key, value); } count.incrementAndGet(); return null; } public <V, K> V remove(final K key) { if (!isPhysicalNode()) { return getPhysicalNode().remove(key); } count.getAndDecrement(); return null; }}
环形一致性hash的实现
package com.app.hash;import com.google.common.hash.Hashing;import java.nio.charset.Charset;import java.util.*;import java.util.stream.Collectors;/** * @author lvlin * @date 2020-12-22 8:41 AM */public final class CycleConsistentHashing<K, V> implements ConsistentHashing<K, V> { private final ConflictedMap<Integer, Node> position2Node; private final ConflictedMap<Node, Integer> node2Position; /** * positions on the cycle */ private int[] positions; /** * a mapping from the physical node to its virtual nodes. */ private final Map<Node,List<Node>> physicalNodes2VirtualNodes; /** * Replicate factor for virtual nodes. */ private final int factor; /** * distance between two adjacent physical node and virtual node. */ private final int distance; private final int seed = 43; /** * creat a cycling consistent hashing system with replicate factor * * @param factor the replicate factor for virtual nodes */ public CycleConsistentHashing(final int factor) { if (factor <= 0) { throw new IllegalArgumentException("factor should > 0"); } this.factor = factor; this.distance = Integer.MAX_VALUE / factor; position2Node = new ConflictedMap<>(); node2Position = new ConflictedMap<>(); physicalNodes2VirtualNodes = new HashMap<>(); } @Override public void addNode(final Node node) { int hashCode = node.hashCode(); long temp = hashCode; position2Node.put(hashCode, node); node2Position.put(node, hashCode); for (int i = 0; i < factor; i++) { temp += distance; Node vNode = buildVirtualNode(node, i); int vPosition = position(temp); position2Node.put(vPosition, vNode); node2Position.put(vNode, vPosition); List<Node> vNodes = physicalNodes2VirtualNodes.getOrDefault(node, new LinkedList<>()); vNodes.add(vNode); physicalNodes2VirtualNodes.put(node, vNodes); } rebuildPositionArray(); } private void rebuildPositionArray() { Object[] objects = position2Node.map.keySet().stream().sorted().toArray(); positions = new int[objects.length]; for (int i = 0; i < objects.length; i++) { positions[i] = (int) objects[i]; } } @Override public void removeNode(final Node node) { List<Node> nodes = physicalNodes2VirtualNodes.get(node); if (nodes == null){ return; } // remove the virtual nodes firstly for (final Node vNode : nodes) { List<Integer> positions = node2Position.get(vNode); for (final Integer position : positions) { position2Node.remove(position, vNode); } node2Position.remove(vNode); } // remove the physical node position2Node.remove(node.hashCode(), node); node2Position.remove(node); physicalNodes2VirtualNodes.remove(node); rebuildPositionArray(); } private Node buildVirtualNode(final Node physicalNode, final int virtualNodeId) { return new Node(String.format("V%d_%s", virtualNodeId, physicalNode.getId()), physicalNode); } private static int position(long value) { return (int) (value % Integer.MAX_VALUE); } @Override public String info(final boolean detail) { StringBuilder sb = new StringBuilder(); sb.append("Physical Nodes: ").append(physicalNodes2VirtualNodes.size()).append("\n"); sb.append("Replicated factor: ").append(factor).append("\n"); sb.append("Virtual Nodes: ").append(node2Position.size() - physicalNodes2VirtualNodes.size()).append("\n"); sb.append("positions: ").append(position2Node.size()).append("\n"); sb.append("duplicated positions: ").append(node2Position.size() - positions.length).append("\n"); sb.append("Size of K-V pairs: ").append(size()).append("\n"); sb.append("Physical Nodes Information: \n"); physicalNodes2VirtualNodes.keySet().forEach(node -> sb.append(node.info()).append("\n")); if (detail) { sb.append("Physical --> Virtual").append("\n"); physicalNodes2VirtualNodes.forEach((node, nodes) -> { for (final Node vNode : nodes) { sb.append(node.getId()).append("/").append(node.hashCode()) .append(":") .append(vNode.getId()).append("/").append(vNode.hashCode()) .append("\n"); } }); } return sb.toString(); } @Override public int size() { return physicalNodes2VirtualNodes.keySet().stream().map(Node::count).reduce(Integer::sum).get(); } @Override public boolean isEmpty() { return size() == 0; } @Override public V get(final K key) { Node node = findNode(key); return node.get(key); } @Override public V put(final K key, final V value) { Node node = findNode(key); return node.put(key, value); } @Override public V remove(final K key) { Node node = findNode(key); return node.remove(key); } private Node findNode(final K key) { int i = 0; int hashCode = Hashing.murmur3_32(seed).hashString(key.toString(), Charset.defaultCharset()).asInt(); // key.hashCode(); while (i < positions.length) { if (positions[i] >= hashCode){ return findPhysicalNode(position2Node.get(positions[i])); } i++; } return findPhysicalNode(position2Node.get(positions[0])); } private Node findPhysicalNode(List<Node> nodes){ for (final Node node : nodes) { return node.getPhysicalNode(); } return null; } /** * A hashmap can handle conflicts * @param <K> the key type * @param <V> the value type */ public static class ConflictedMap<K, V> { private final Map<K, List<V>> map = new HashMap<>(); public void put(final K key, final V value) { List<V> vList = map.getOrDefault(key, new LinkedList<>());// if (!vList.isEmpty()){// System.out.println("found conflict for key " + key);// System.out.println("exists: ");// for (final V v : vList) {// System.out.println("v = " + v);// }// System.out.println("new adding: " + value);// } vList.add(value); map.put(key, vList); } public List<V> get(final K key) { return map.getOrDefault(key, null); } public void remove(final K key, final V value) { List<V> vList = map.get(key); if (vList != null){ List<V> collect = vList.stream().filter(v -> !v.equals(value)).collect(Collectors.toList()); if (!collect.isEmpty()){ map.put(key, collect); } else { map.remove(key); } } } public void remove(final K key) { map.remove(key); } public int size() { return map.size(); } }}
作业二:数据分布分析
基于Main.java的测试,可用得到10个物理节点,100万数据,在不同虚拟节点数量下的分布情况
Physical Nodes: 10Replicated factor: 3Virtual Nodes: 30positions: 40duplicated positions: 0Size of K-V pairs: 1000000Physical Nodes Information: Node id: Physical_6,Node type: Physical,Size: 144006Node id: Physical_9,Node type: Physical,Size: 55599Node id: Physical_0,Node type: Physical,Size: 145714Node id: Physical_5,Node type: Physical,Size: 239999Node id: Physical_3,Node type: Physical,Size: 21760Node id: Physical_7,Node type: Physical,Size: 72346Node id: Physical_2,Node type: Physical,Size: 25752Node id: Physical_8,Node type: Physical,Size: 98867Node id: Physical_4,Node type: Physical,Size: 8047Node id: Physical_1,Node type: Physical,Size: 187910
利用extract.py脚本分析得到
从标准差上可用看到,本文实现的一致性哈希算法,在分布上不够均匀,不同物理节点上存在着1个数量级的差异,标准差占总样本数量的10%左右。
# coding: utf-8# extract.pyimport numpy as npdef block(): b = [] for line in open("statics.txt"): if line == '\n': yield b b = [] else: b.append(line) def extract(bl): p = int(bl[0].split(":")[1]) v = int(bl[2].split(":")[1]) r = int(bl[5].split(":")[1]) a = [] for ns in bl[7:]: a.append(int(ns.split(":")[3])) details = "|".join([str(_) for _ in a]) return str(p), str(v), str(r),str(np.std(a)),details print("Physical Nodes, Virtual Nodes, Records, std, details")for x in block(): print(",".join(extract(x)))
文中源码可用在github获取.
划线
评论
复制
发布于: 2020 年 12 月 24 日阅读数: 13
J
关注
还未添加个人签名 2015.06.24 加入
还未添加个人简介
评论