写点什么

第 5 周作业 - 一致性 hash 算法实现

用户头像
Rocky·Chen
关注
发布于: 2020 年 11 月 22 日

一、作业描述

作业一(2 选 1):

  1. 用你熟悉的编程语言实现一致性 hash 算法。

  2. 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。



二、作业实现

package online.chenkai.demon.consistent.hash;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
/**
* 缓存节点
*
* @author chenkai 2020-11-22 21:16:00
* @version 1.0.0
*/
@Data
@AllArgsConstructor
@Builder
public class Node implements Serializable {
private static final long serialVersionUID = 2230751659258524805L;
/**
* 节点ip
*/
private String ip;
/**
* 节点名称
*/
private String name;
}
package online.chenkai.demon.consistent.hash.strategy;
/**
* Hash算法
*
* @author chenkai 2020-11-22 21:22:00
* @version 1.0.0
*/
public interface HashStrategy {
/**
* hashCode计算
*
* @param key String
*
* @return Long hash值
*/
Long hashCode(String key);
}
package online.chenkai.demon.consistent.hash.strategy;
import org.apache.commons.lang3.StringUtils;
/**
* 缺省Hash算法
*
* @author chenkai 2020-11-22 21:22:00
* @version 1.0.0
*/
public class DefaultHashStrategy implements HashStrategy {
/**
* hashCode计算
*
* @param key String
*
* @return Long hash值
*/
@Override
public Long hashCode(String key) {
if (StringUtils.isBlank(key)) {
throw new IllegalArgumentException();
}
return (long)key.hashCode();
}
}
package online.chenkai.demon.consistent.hash.strategy;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* MurMurHash算法
*
* @author chenkai 2020-11-22 21:26:00
* @version 1.0.0
*/
public class MurMurHashStrategy implements HashStrategy {
/**
* hashCode计算
*
* @param key String
*
* @return Long hash值
*/
@Override
public Long hashCode(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
}
package online.chenkai.demon.consistent.hash;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import lombok.extern.slf4j.Slf4j;
import online.chenkai.demon.consistent.hash.strategy.DefaultHashStrategy;
import online.chenkai.demon.consistent.hash.strategy.HashStrategy;
import online.chenkai.demon.consistent.hash.strategy.MurMurHashStrategy;
/**
* 一致性hash实现
*
* @author chenkai 2020-11-22 21:17:00
* @version 1.0.0
*/
@Slf4j
public class ConsistentHash {
/**
* 真实节点引入虚拟接点数
*/
private int replicas;
/**
* 环形结构
*/
private final SortedMap<Long, Node> ringBuffer = new TreeMap<>();
/**
* hash工具类
*/
private HashStrategy hashUtils;
public ConsistentHash(int replicas, Collection<Node> nodes) {
new ConsistentHash(replicas, nodes, new DefaultHashStrategy());
}
public ConsistentHash(int replicas, Collection<Node> nodes, HashStrategy hashUtils) {
if (replicas < 1 || CollectionUtils.isEmpty(nodes) || Objects.isNull(hashUtils)) {
throw new IllegalArgumentException();
}
this.replicas = replicas;
this.hashUtils = hashUtils;
for (Node node : nodes) {
add(node);
}
}
/**
* 增加真实节点
*
* @param node 节点
*/
public void add(Node node) {
for (int i = 1; i <= this.replicas; i++) {
ringBuffer.put(hashUtils.hashCode(node.getIp() + "#" + i), node);
}
}
/**
* 删除真实节点
*
* @param node 节点
*/
public void remove(Node node) {
for (int i = 1; i <= this.replicas; i++) {
ringBuffer.remove(hashUtils.hashCode(node.getIp() + "#" + i));
}
}
/**
* 最近真实节点查询
*
* @param key 缓存key
*
* @return Node 最近节点
*/
public Node get(String key) {
if (MapUtils.isEmpty(ringBuffer)) {
return null;
}
// hash值
long hash = hashUtils.hashCode(key);
// 沿环的顺时针找到一个虚拟节点
if (!ringBuffer.containsKey(hash)) {
SortedMap<Long, Node> tailMap = ringBuffer.tailMap(hash);
hash = tailMap.isEmpty() ? ringBuffer.firstKey() : tailMap.firstKey();
}
return ringBuffer.get(hash);
}
/**
* 测试main方法
*
* @param args 入参
*/
public static void main(String[] args) {
// 节点IP前缀
String ipPrefix = "192.168.1.";
// 真实节点数量
int nodeNum = 10;
// 虚拟节点数量
int replicasNode = 1000;
// 缓存记录数
int keyNum = 100000;
// 记录每个真实节点上保存记录数
Map<String, Integer> map = new HashMap<>(nodeNum);
// 增加10个真实节点
List<Node> nodes = new ArrayList<>();
for (int i = 1; i <= nodeNum; i++) {
map.put(ipPrefix + i, 0);
Node node = new Node(ipPrefix + i, "node#" + i);
nodes.add(node);
}
// 为每个节点引入1000个虚拟节点
ConsistentHash consistentHash = new ConsistentHash(replicasNode, nodes, new MurMurHashStrategy());
// 数据写入缓存
for (int i = 0; i < keyNum; i++) {
// 生成缓存value
String data = UUID.randomUUID().toString() + i;
// 寻找真实机器节点
Node node = consistentHash.get(data);
if (Objects.isNull(node)) {
continue;
}
// 每台真实节点上保存的记录数++
map.put(node.getIp(), map.get(node.getIp()) + 1);
}
/* **********************结果输出********************************* */
// 平均值
int averageValue = keyNum / nodeNum;
double sum = 0;
for (int i = 1; i <= nodeNum; i++) {
log.debug("{}节点记录数:{},平均值:{}", ipPrefix + i, map.get(ipPrefix + i), averageValue);
sum += Math.pow((map.get(ipPrefix + i) - averageValue), 2);
}
// 方差
double variance = sum / nodeNum;
// 保留2位小数
DecimalFormat decimalFormat = new DecimalFormat("######0.00");
log.debug("10W键值对一致性hash算法数据分布,方差:{},标准差:{}", decimalFormat.format(variance),
decimalFormat.format(Math.sqrt(variance / nodeNum)));
}
}



// 结果输出



用户头像

Rocky·Chen

关注

还未添加个人签名 2018.03.03 加入

还未添加个人简介

评论 (1 条评论)

发布
用户头像
可以调整虚拟节点数,并增加读取耗时,看看变化
2020 年 11 月 29 日 11:27
回复
没有更多了
第5周作业-一致性hash算法实现