写点什么

week5-homework- 一致性哈希

用户头像
J
关注
发布于: 2020 年 12 月 24 日

作业一(2 选 1):



  1. 用你熟悉的编程语言实现一致性 hash 算法。

  2. 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。



作业一:算法实现



Interface



package com.app.hash;
/**
* @author lvlin
* @date 2020-12-21 9:14 AM
*/
public interface ConsistentHashing<K, V> {
/**
* add a physical node to the consistent hashing
* @param node node information
*/
void addNode(Node node);
/**
* remove a physical node from the consistent hashing
* @param node node id
*/
void removeNode(Node node);
/**
* print out the allocation and slots information for this consistent hashing system
* @return generated information
* @param detail show details or not
*/
String info(final boolean detail);
/**
* the size of k-v pairs
* @return k-v pairs' size
*/
int size();
/**
* is empty or not
* @return empty or not
*/
boolean isEmpty();
/**
* retrieve value with key
* @param key key
* @return value if found, else null
*/
V get(final K key);
/**
* put a k-v pair
* @param key key
* @param value value
* @return the old value with given key
*/
V put(final K key, final V value);
/**
* remove item with given key
* @param key the key
* @return value for removed item, else null
*/
V remove(final K key);
}



验证代码



package com.app.hash;
import java.io.*;
/**
* @author lvlin
* @date 2020-12-22 8:43 AM
*/
public final class Main {
public static void main(String[] args) throws IOException {
final int[] factors = new int[]{1, 3, 5, 7, 10, 12, 15};
// final int[] numNodes = new int[]{2, 5, 10, 15, 20};
final int[] numNodes = new int[]{10};
final int[] numRecords = new int[]{1_000_000};
PrintStream console = System.out;
File file = new File("statics.txt");
try (FileOutputStream outputStream = new FileOutputStream(file)) {
PrintStream ps = new PrintStream(outputStream);
System.setOut(ps);
for (final int factor : factors) {
for (final int numNode : numNodes) {
for (final int numRecord : numRecords) {
testConsistentHashing(factor, numNode, numRecord);
}
}
}
}
System.setOut(console);
System.out.println("Done");
}
private static void testConsistentHashing(final int factor, final int numNodes, final int numRecords) {
// create 10 physical node
ConsistentHashing<String, String> consistentHashing = new CycleConsistentHashing<>(factor);
Node[] nodes = new Node[numNodes];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new Node(String.valueOf(i));
}
for (final Node node : nodes) {
consistentHashing.addNode(node);
}
// create 100 000 000 K-V
for (int i = 0; i < numRecords; i++) {
String key = String.format("Key%05d", i);
String value = String.format("Value%05d", i);
consistentHashing.put(key, value);
}
System.out.println(consistentHashing.info(false));
}
}



物理和虚拟节点的定义



package com.app.hash;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author lvlin
* @date 2020-12-22 8:44 AM
*/
public final class Node {
/**
* the node id
*/
private final String id;
/**
* the physical node points to
*/
private final Node physicalNode;
private AtomicInteger count;
/**
* create a physical node
*
* @param id the node id for this physical node
*/
public Node(final String id) {
Objects.requireNonNull(id);
this.id = "Physical_" + id;
physicalNode = null;
count = new AtomicInteger(0);
}
/**
* create a virtual node
*
* @param id the node id for this virtual node
* @param physicalNode the physical node points to
*/
public Node(final String id, final Node physicalNode) {
Objects.requireNonNull(id);
Objects.requireNonNull(physicalNode);
this.id = id;
this.physicalNode = physicalNode;
this.count = null;
}
public String getId() {
return id;
}
public Node getPhysicalNode() {
return isPhysicalNode() ? this : physicalNode;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Node node = (Node) o;
return getId().equals(node.getId());
}
@Override
public int hashCode() {
HashFunction hashFunction = Hashing.murmur3_32();
return hashFunction.hashString(getId(), Charset.defaultCharset()).asInt();
// return Objects.hash(getId());
}
public String info() {
return "Node id: " + getId() + "," +
"Node type: " + nodeType() + "," +
"Size: " + count();
}
public String nodeType() {
return isPhysicalNode() ? "Physical" : "Virtual";
}
public boolean isPhysicalNode() {
return null == physicalNode;
}
@Override
public String toString() {
return "Node{" +
"id='" + id + '\'' +
", node type =" + nodeType() +
", count=" + count() +
'}';
}
public int count() {
return isPhysicalNode() ? count.get() : getPhysicalNode().count();
}
public <V, K> V get(final K key) {
return null;
}
public <V, K> V put(final K key, final V value) {
if (!isPhysicalNode()) {
return getPhysicalNode().put(key, value);
}
count.incrementAndGet();
return null;
}
public <V, K> V remove(final K key) {
if (!isPhysicalNode()) {
return getPhysicalNode().remove(key);
}
count.getAndDecrement();
return null;
}
}



环形一致性hash的实现



package com.app.hash;
import com.google.common.hash.Hashing;
import java.nio.charset.Charset;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author lvlin
* @date 2020-12-22 8:41 AM
*/
public final class CycleConsistentHashing<K, V> implements ConsistentHashing<K, V> {
private final ConflictedMap<Integer, Node> position2Node;
private final ConflictedMap<Node, Integer> node2Position;
/**
* positions on the cycle
*/
private int[] positions;
/**
* a mapping from the physical node to its virtual nodes.
*/
private final Map<Node,List<Node>> physicalNodes2VirtualNodes;
/**
* Replicate factor for virtual nodes.
*/
private final int factor;
/**
* distance between two adjacent physical node and virtual node.
*/
private final int distance;
private final int seed = 43;
/**
* creat a cycling consistent hashing system with replicate factor
*
* @param factor the replicate factor for virtual nodes
*/
public CycleConsistentHashing(final int factor) {
if (factor <= 0) {
throw new IllegalArgumentException("factor should > 0");
}
this.factor = factor;
this.distance = Integer.MAX_VALUE / factor;
position2Node = new ConflictedMap<>();
node2Position = new ConflictedMap<>();
physicalNodes2VirtualNodes = new HashMap<>();
}
@Override
public void addNode(final Node node) {
int hashCode = node.hashCode();
long temp = hashCode;
position2Node.put(hashCode, node);
node2Position.put(node, hashCode);
for (int i = 0; i < factor; i++) {
temp += distance;
Node vNode = buildVirtualNode(node, i);
int vPosition = position(temp);
position2Node.put(vPosition, vNode);
node2Position.put(vNode, vPosition);
List<Node> vNodes = physicalNodes2VirtualNodes.getOrDefault(node, new LinkedList<>());
vNodes.add(vNode);
physicalNodes2VirtualNodes.put(node, vNodes);
}
rebuildPositionArray();
}
private void rebuildPositionArray() {
Object[] objects = position2Node.map.keySet().stream().sorted().toArray();
positions = new int[objects.length];
for (int i = 0; i < objects.length; i++) {
positions[i] = (int) objects[i];
}
}
@Override
public void removeNode(final Node node) {
List<Node> nodes = physicalNodes2VirtualNodes.get(node);
if (nodes == null){
return;
}
// remove the virtual nodes firstly
for (final Node vNode : nodes) {
List<Integer> positions = node2Position.get(vNode);
for (final Integer position : positions) {
position2Node.remove(position, vNode);
}
node2Position.remove(vNode);
}
// remove the physical node
position2Node.remove(node.hashCode(), node);
node2Position.remove(node);
physicalNodes2VirtualNodes.remove(node);
rebuildPositionArray();
}
private Node buildVirtualNode(final Node physicalNode, final int virtualNodeId) {
return new Node(String.format("V%d_%s",
virtualNodeId, physicalNode.getId()), physicalNode);
}
private static int position(long value) {
return (int) (value % Integer.MAX_VALUE);
}
@Override
public String info(final boolean detail) {
StringBuilder sb = new StringBuilder();
sb.append("Physical Nodes: ").append(physicalNodes2VirtualNodes.size()).append("\n");
sb.append("Replicated factor: ").append(factor).append("\n");
sb.append("Virtual Nodes: ").append(node2Position.size() - physicalNodes2VirtualNodes.size()).append("\n");
sb.append("positions: ").append(position2Node.size()).append("\n");
sb.append("duplicated positions: ").append(node2Position.size() - positions.length).append("\n");
sb.append("Size of K-V pairs: ").append(size()).append("\n");
sb.append("Physical Nodes Information: \n");
physicalNodes2VirtualNodes.keySet().forEach(node -> sb.append(node.info()).append("\n"));
if (detail) {
sb.append("Physical --> Virtual").append("\n");
physicalNodes2VirtualNodes.forEach((node, nodes) -> {
for (final Node vNode : nodes) {
sb.append(node.getId()).append("/").append(node.hashCode())
.append(":")
.append(vNode.getId()).append("/").append(vNode.hashCode())
.append("\n");
}
});
}
return sb.toString();
}
@Override
public int size() {
return physicalNodes2VirtualNodes.keySet().stream().map(Node::count).reduce(Integer::sum).get();
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public V get(final K key) {
Node node = findNode(key);
return node.get(key);
}
@Override
public V put(final K key, final V value) {
Node node = findNode(key);
return node.put(key, value);
}
@Override
public V remove(final K key) {
Node node = findNode(key);
return node.remove(key);
}
private Node findNode(final K key) {
int i = 0;
int hashCode = Hashing.murmur3_32(seed).hashString(key.toString(), Charset.defaultCharset()).asInt();
// key.hashCode();
while (i < positions.length) {
if (positions[i] >= hashCode){
return findPhysicalNode(position2Node.get(positions[i]));
}
i++;
}
return findPhysicalNode(position2Node.get(positions[0]));
}
private Node findPhysicalNode(List<Node> nodes){
for (final Node node : nodes) {
return node.getPhysicalNode();
}
return null;
}
/**
* A hashmap can handle conflicts
* @param <K> the key type
* @param <V> the value type
*/
public static class ConflictedMap<K, V> {
private final Map<K, List<V>> map = new HashMap<>();
public void put(final K key, final V value) {
List<V> vList = map.getOrDefault(key, new LinkedList<>());
// if (!vList.isEmpty()){
// System.out.println("found conflict for key " + key);
// System.out.println("exists: ");
// for (final V v : vList) {
// System.out.println("v = " + v);
// }
// System.out.println("new adding: " + value);
// }
vList.add(value);
map.put(key, vList);
}
public List<V> get(final K key) {
return map.getOrDefault(key, null);
}
public void remove(final K key, final V value) {
List<V> vList = map.get(key);
if (vList != null){
List<V> collect = vList.stream().filter(v -> !v.equals(value)).collect(Collectors.toList());
if (!collect.isEmpty()){
map.put(key, collect);
} else {
map.remove(key);
}
}
}
public void remove(final K key) {
map.remove(key);
}
public int size() {
return map.size();
}
}
}



作业二:数据分布分析



基于Main.java的测试,可用得到10个物理节点,100万数据,在不同虚拟节点数量下的分布情况



Physical Nodes: 10
Replicated factor: 3
Virtual Nodes: 30
positions: 40
duplicated positions: 0
Size of K-V pairs: 1000000
Physical Nodes Information:
Node id: Physical_6,Node type: Physical,Size: 144006
Node id: Physical_9,Node type: Physical,Size: 55599
Node id: Physical_0,Node type: Physical,Size: 145714
Node id: Physical_5,Node type: Physical,Size: 239999
Node id: Physical_3,Node type: Physical,Size: 21760
Node id: Physical_7,Node type: Physical,Size: 72346
Node id: Physical_2,Node type: Physical,Size: 25752
Node id: Physical_8,Node type: Physical,Size: 98867
Node id: Physical_4,Node type: Physical,Size: 8047
Node id: Physical_1,Node type: Physical,Size: 187910



利用extract.py脚本分析得到





从标准差上可用看到,本文实现的一致性哈希算法,在分布上不够均匀,不同物理节点上存在着1个数量级的差异,标准差占总样本数量的10%左右。



# coding: utf-8
# extract.py
import numpy as np
def block():
b = []
for line in open("statics.txt"):
if line == '\n':
yield b
b = []
else:
b.append(line)
def extract(bl):
p = int(bl[0].split(":")[1])
v = int(bl[2].split(":")[1])
r = int(bl[5].split(":")[1])
a = []
for ns in bl[7:]:
a.append(int(ns.split(":")[3]))
details = "|".join([str(_) for _ in a])
return str(p), str(v), str(r),str(np.std(a)),details
print("Physical Nodes, Virtual Nodes, Records, std, details")
for x in block():
print(",".join(extract(x)))



文中源码可用在github获取.



用户头像

J

关注

还未添加个人签名 2015.06.24 加入

还未添加个人简介

评论

发布
暂无评论
week5-homework-一致性哈希