一致性 hash 算法

用户头像
changtai
关注
发布于: 2020 年 07 月 08 日
概述:

Java实现一致性hash算法,比较 取模hash、没有虚拟节点一致性hash、有虚拟节点一致性hash的优缺点。

作业一:
  1. 用你熟悉的编程语言实现一致性 hash 算法。

项目结构:



类图:



hash算法相关:

/**
* hash方法接口
*
* @author zhaoct
* @date 2020-07-07 15:20
*/
public interface HashStrategy {
/**
* 计算 hash code
* @param str
* @return
*/
int getHashCode(String str);
}



/**
* JDK hash
*
* @author zhaoct
* @date 2020-07-07 15:22
*/
public class JdkHash implements HashStrategy{
@Override
public int getHashCode(String str) {
return str.hashCode();
}
}



/**
* FNV1_32_HASH hash
*
* @author zhaoct
* @date 2020-07-07 15:26
*/
public class FnvHash implements HashStrategy{
private static final long FNV_32_INIT = 2166136261L;
private static final int FNV_32_PRIME = 16777619;
@Override
public int getHashCode(String str) {
final int p = FNV_32_PRIME;
int hash = (int) FNV_32_INIT;
for (int i = 0; i < str.length(); i++){
hash = (hash ^ str.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
hash = Math.abs(hash);
return hash;
}
}

存储相关:

/**
* 一致性hash环上面的虚拟节点
*
* @author zhaoct
* @date 2020-07-07 10:00
*/
public class Node {
/**
* 虚拟节点名称
*/
private String name;
/**
* 缓存服务器
*/
private Server server;
/**
* 虚拟节点上数据数量,用于统计
*/
private AtomicInteger count;
public Node(String name, Server server){
this.name = name;
this.server = server;
this.count = new AtomicInteger(0);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Server getServer() {
return server;
}
public void setServer(Server server) {
this.server = server;
}
public void incrementCount(){
count.incrementAndGet();
}
public Integer getCount() {
return count.get();
}



/**
* 缓存服务器
*
* @author zhaoct
* @date 2020-07-07 9:32
*/
public class Server {
/**
* 缓存服务器名称
*/
private String name;
/**
* 虚拟节点个数
*/
private Integer nodeNumber;
/**
* 缓存服务器的虚拟节点
*/
private List<Node> nodeList;
/**
* mock 数据存储
*/
private Map<String, String> dataMap = null;
/**
* 取模hash构造方法
* @param name
*/
public Server(String name){
this.name = name;
this.dataMap = new ConcurrentHashMap<>();
}
/**
* 一致性hash构造方法
* @param name
* @param nodeNumber
*/
public Server(String name, Integer nodeNumber){
this.name = name;
this.nodeNumber = nodeNumber;
this.nodeList = new ArrayList<>();
this.dataMap = new ConcurrentHashMap<>();
//初始化虚拟节点
for (Integer i = 0; i < nodeNumber; i++) {
Node node = new Node(name + " : " + i, this);
nodeList.add(node);
}
}
public void put(String key, String value){
this.dataMap.put(key, value);
}
public String get(String key){
return dataMap.get(key);
}
public Integer getSize(){
return dataMap.size();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getNodeNumber() {
return nodeNumber;
}
public void setNodeNumber(Integer nodeNumber) {
this.nodeNumber = nodeNumber;
}
public List<Node> getNodeList() {
return nodeList;
}
public void setNodeList(List<Node> nodeList) {
this.nodeList = nodeList;
}



/**
*
* 负载均衡接口
* @author zhaoct
* @date 2020-07-07 10:38
*/
public interface LoadBalanceStrategy {
/**
* 从分布式缓存中获取服务器
* @param key
* @return
*/
Server getServer(String key);
/**
* 添加一台服务器
* @param server
*/
void addServer(Server server);
/**
* 删除一台服务器
* @param server
*/
void removeServer(Server server);
}



/**
* 取模 hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class ModuloHashLoadBalance implements LoadBalanceStrategy{
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public ModuloHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.hashStrategy = hashStrategy;
this.servers = servers;
}
@Override
public Server getServer(String key) {
return servers.get(hashStrategy.getHashCode(key) % servers.size());
}
@Override
public void addServer(Server server) {
servers.add(server);
}
@Override
public void removeServer(Server server) {
servers.remove(server);
}
}



/**
* 没有虚拟节点一致性hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class NoVirtualNodeHashLoadBalance implements LoadBalanceStrategy {
/**
* hash 环
*/
TreeMap<Integer, Server> ring ;
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public NoVirtualNodeHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.ring = new TreeMap<>();
this.hashStrategy = hashStrategy;
this.servers = servers;
// 创建一致性hash环
this.buildConsistentHashRing(this.servers);
}
private TreeMap<Integer, Server> buildConsistentHashRing(List<Server> servers) {
for (Server server : servers) {
ring.put(hashStrategy.getHashCode(server.getName()), server);
}
return ring;
}
@Override
public Server getServer(String key) {
Map.Entry<Integer, Server> entry = ring.ceilingEntry(hashStrategy.getHashCode(key));
if(entry == null){
//取第一个
entry = ring.firstEntry();
}
return entry.getValue();
}
@Override
public void addServer(Server server) {
servers.add(server);
//添加到环上
ring.put(hashStrategy.getHashCode(server.getName()), server);
}
@Override
public void removeServer(Server server) {
servers.remove(server);
//从环上删除
ring.remove(hashStrategy.getHashCode(server.getName()));
}
}



/**
* 一致性hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class ConsistentHashLoadBalance implements LoadBalanceStrategy {
/**
* hash 环
*/
TreeMap<Integer, Server> ring ;
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public ConsistentHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.ring = new TreeMap<>();
this.hashStrategy = hashStrategy;
this.servers = servers;
// 创建一致性hash环
this.buildConsistentHashRing(this.servers);
}
private TreeMap<Integer, Server> buildConsistentHashRing(List<Server> servers) {
//添加虚拟节点
for (Server server : servers) {
for (Node node : server.getNodeList()) {
ring.put(hashStrategy.getHashCode(node.getName()), server);
}
}
return ring;
}
@Override
public Server getServer(String key) {
Map.Entry<Integer, Server> entry = ring.ceilingEntry(hashStrategy.getHashCode(key));
if(entry == null){
//取第一个
entry = ring.firstEntry();
}
return entry.getValue();
}
@Override
public void addServer(Server server) {
servers.add(server);
//添加到环上
for (Node node : server.getNodeList()) {
ring.put(hashStrategy.getHashCode(node.getName()), server);
}
}
@Override
public void removeServer(Server server) {
servers.remove(server);
//从环上删除
for (Node node : server.getNodeList()) {
ring.remove(hashStrategy.getHashCode(node.getName()));
}
}
}



Cache相关:

/**
* Cache 接口
*
* @author zhaoct
* @date 2020-07-07 15:13
*/
public interface Cache {
/**
* 向缓存中放数据
* @param key
* @param value
*/
void put(String key, String value);
/**
* 从缓存中获取数据
* @param key
* @return
*/
String get(String key);
/**
* 添加一台服务器
* @param server
*/
void addServer(Server server);
/**
* 删除一台服务器
* @param server
*/
void removeServer(Server server);
}



/**
* 分布式cache
*
* @author zhaoct
* @date 2020-07-07 15:15
*/
public class DistributedCache implements Cache{
private LoadBalanceStrategy loadBalanceStrategy;
public DistributedCache(LoadBalanceStrategy loadBalanceStrategy){
this.loadBalanceStrategy = loadBalanceStrategy;
}
@Override
public void put(String key, String value) {
Server server = loadBalanceStrategy.getServer(key);
server.put(key, value);
//System.out.println(" key: " + key + " 写入到缓存服务器 " + server.getName());
}
@Override
public String get(String key) {
Server server = loadBalanceStrategy.getServer(key);
//System.out.println(" key: " + key + " 从缓存服务器读取 " + server.getName());
return server.get(key);
}
@Override
public void addServer(Server server) {
loadBalanceStrategy.addServer(server);
}
@Override
public void removeServer(Server server) {
loadBalanceStrategy.removeServer(server);
}



测试用例相关:

/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class ModuloHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
@Before
public void init(){
//10台服务器
servers.addAll(
Arrays.asList(
new Server("服务器1"),
new Server("服务器2"),
new Server("服务器3"),
new Server("服务器4"),
new Server("服务器5"),
new Server("服务器6"),
new Server("服务器7"),
new Server("服务器8"),
new Server("服务器9"),
new Server("服务器10")
));
cache = new DistributedCache(new ModuloHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);
//标准差
double temp = 0;
for(Server server : servers){
temp += Math.pow(server.getSize() - average , 2);
}
double sd = Math.sqrt(temp/ servers.size());
System.out.println(" 标准差是 " + sd);
}
@Test
public void testHitRate(){
//添加一台服务器,看命中率的变化
cache.addServer(new Server("服务器11"));
int hit = 0;
for(String key : keys){
if(cache.get(key) != null){
hit ++;
}
}
System.out.println("添加一个节点缓存命中率:" + hit*100/keys.size() + "%");
}
}



/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class NoVirtualNodeHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
@Before
public void init(){
int nodeNumber = 0;
//10台服务器,没有虚拟节点
servers.addAll(
Arrays.asList(
new Server("服务器1"),
new Server("服务器2"),
new Server("服务器3"),
new Server("服务器4"),
new Server("服务器5"),
new Server("服务器6"),
new Server("服务器7"),
new Server("服务器8"),
new Server("服务器9"),
new Server("服务器10")
));
cache = new DistributedCache(new NoVirtualNodeHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);
//标准差
double temp = 0;
for(Server server : servers){
temp += Math.pow(server.getSize() - average , 2);
}
double sd = Math.sqrt(temp/ servers.size());
System.out.println(" 标准差是 " + sd);
}
@Test
public void testHitRate(){
//添加一台服务器,看命中率的变化
cache.addServer(new Server("服务器11"));
int hit = 0;
for(String key : keys){
if(cache.get(key) != null){
hit ++;
}
}
System.out.println("添加一个节点缓存命中率:" + hit*100/keys.size() + "%");
}
}



/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class ConsistentHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
/**
* 每台服务器虚拟节点个数
*/
private int nodeNumber = 200;
@Before
public void init(){
//10台服务器,每个服务器1000个虚拟节点
servers.addAll(
Arrays.asList(
new Server("服务器1", nodeNumber),
new Server("服务器2", nodeNumber),
new Server("服务器3", nodeNumber),
new Server("服务器4", nodeNumber),
new Server("服务器5", nodeNumber),
new Server("服务器6", nodeNumber),
new Server("服务器7", nodeNumber),
new Server("服务器8", nodeNumber),
new Server("服务器9", nodeNumber),
new Server("服务器10", nodeNumber)
));
cache = new DistributedCache(new ConsistentHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);
//标准差
double temp = 0;
for(Server server : servers){
temp += Math.pow(server.getSize() - average , 2);
}
double sd = Math.sqrt(temp/ servers.size());
System.out.println(" 标准差是 " + sd);
}
@Test
public void testHitRate(){
//添加一台服务器,看命中率的变化
cache.addServer(new Server("服务器11", nodeNumber));
int hit = 0;
for(String key : keys){
if(cache.get(key) != null){
hit ++;
}
}
System.out.println("添加一个节点缓存命中率:" + hit*100/keys.size() + "%");
}
}



git 地址:

https://github.com/changtaizhao/consistent-hash



  1. 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

一致性hash测试结果:

每服务器虚拟节点数:1

服务器1 有 43227 个元素

服务器2 有 162560 个元素

服务器3 有 173999 个元素

服务器4 有 241283 个元素

服务器5 有 84944 个元素

服务器6 有 29856 个元素

服务器7 有 140652 个元素

服务器8 有 21711 个元素

服务器9 有 13409 个元素

服务器10 有 88359 个元素

平均值是 100000

标准差是 72894.30422056308

每服务器虚拟节点数:10

服务器1 有 77811 个元素

服务器2 有 78215 个元素

服务器3 有 89686 个元素

服务器4 有 122221 个元素

服务器5 有 119967 个元素

服务器6 有 75684 个元素

服务器7 有 97824 个元素

服务器8 有 84698 个元素

服务器9 有 181443 个元素

服务器10 有 72451 个元素

平均值是 100000

标准差是 31918.386798207706

每服务器虚拟节点数:100

服务器1 有 93366 个元素

服务器2 有 98423 个元素

服务器3 有 111819 个元素

服务器4 有 104389 个元素

服务器5 有 97374 个元素

服务器6 有 89372 个元素

服务器7 有 115807 个元素

服务器8 有 97820 个元素

服务器9 有 89860 个元素

服务器10 有 101770 个元素

平均值是 100000

标准差是 8281.69895613213

每服务器虚拟节点数:150

服务器1 有 101328 个元素

服务器2 有 99261 个元素

服务器3 有 107727 个元素

服务器4 有 99915 个元素

服务器5 有 83171 个元素

服务器6 有 89148 个元素

服务器7 有 111212 个元素

服务器8 有 103692 个元素

服务器9 有 107004 个元素

服务器10 有 97542 个元素

平均值是 100000

标准差是 8108.280902879475

每服务器虚拟节点数:200

服务器1 有 100015 个元素

服务器2 有 95075 个元素

服务器3 有 105460 个元素

服务器4 有 98936 个元素

服务器5 有 87417 个元素

服务器6 有 88452 个元素

服务器7 有 101578 个元素

服务器8 有 102327 个元素

服务器9 有 114410 个元素

服务器10 有 106330 个元素

平均值是 100000

标准差是 7762.150166030029

每服务器虚拟节点数:300

服务器1 有 98960 个元素

服务器2 有 98255 个元素

服务器3 有 110390 个元素

服务器4 有 91912 个元素

服务器5 有 99064 个元素

服务器6 有 96504 个元素

服务器7 有 106162 个元素

服务器8 有 95821 个元素

服务器9 有 100982 个元素

服务器10 有 101950 个元素

平均值是 100000

标准差是 5007.930610541644

每服务器虚拟节点数:500

服务器1 有 102749 个元素

服务器2 有 102107 个元素

服务器3 有 101832 个元素

服务器4 有 97176 个元素

服务器5 有 93783 个元素

服务器6 有 98392 个元素

服务器7 有 98563 个元素

服务器8 有 101065 个元素

服务器9 有 99840 个元素

服务器10 有 104493 个元素

平均值是 100000

标准差是 2966.0789942279016

每服务器虚拟节点数:1000

服务器1 有 96968 个元素

服务器2 有 106384 个元素

服务器3 有 99051 个元素

服务器4 有 98573 个元素

服务器5 有 92606 个元素

服务器6 有 100638 个元素

服务器7 有 103492 个元素

服务器8 有 101825 个元素

服务器9 有 101592 个元素

服务器10 有 98871 个元素

平均值是 100000

标准差是 3567.877296096378

每服务器虚拟节点数:2000

服务器1 有 98349 个元素

服务器2 有 97230 个元素

服务器3 有 99094 个元素

服务器4 有 101444 个元素

服务器5 有 96241 个元素

服务器6 有 102745 个元素

服务器7 有 102966 个元素

服务器8 有 100363 个元素

服务器9 有 101939 个元素

服务器10 有 99629 个元素

平均值是 100000

标准差是 2186.231140570457

每服务器虚拟节点数:5000

服务器1 有 99988 个元素

服务器2 有 98921 个元素

服务器3 有 97474 个元素

服务器4 有 101020 个元素

服务器5 有 99390 个元素

服务器6 有 100698 个元素

服务器7 有 100509 个元素

服务器8 有 100879 个元素

服务器9 有 101088 个元素

服务器10 有 100033 个元素

平均值是 100000

标准差是 1079.875918798081

每服务器虚拟节点数:10000

服务器1 有 100408 个元素

服务器2 有 99115 个元素

服务器3 有 100106 个元素

服务器4 有 100213 个元素

服务器5 有 98736 个元素

服务器6 有 100813 个元素

服务器7 有 100622 个元素

服务器8 有 100443 个元素

服务器9 有 100155 个元素

服务器10 有 99389 个元素

平均值是 100000

标准差是 651.5702571480684

每服务器虚拟节点数:15000

服务器1 有 100584 个元素

服务器2 有 99859 个元素

服务器3 有 98939 个元素

服务器4 有 100352 个元素

服务器5 有 100004 个元素

服务器6 有 99062 个元素

服务器7 有 100450 个元素

服务器8 有 100774 个元素

服务器9 有 99721 个元素

服务器10 有 100255 个元素

平均值是 100000

标准差是 586.0771280300913

每服务器虚拟节点数:20000

服务器1 有 100443 个元素

服务器2 有 100321 个元素

服务器3 有 99072 个元素

服务器4 有 100238 个元素

服务器5 有 99271 个元素

服务器6 有 100205 个元素

服务器7 有 100888 个元素

服务器8 有 99559 个元素

服务器9 有 99980 个元素

服务器10 有 100023 个元素

平均值是 100000

标准差是 526.7388347179274

每服务器虚拟节点数:100000

服务器1 有 99673 个元素

服务器2 有 99481 个元素

服务器3 有 100264 个元素

服务器4 有 100186 个元素

服务器5 有 100357 个元素

服务器6 有 99947 个元素

服务器7 有 99119 个元素

服务器8 有 100578 个元素

服务器9 有 100079 个元素

服务器10 有 100316 个元素

平均值是 100000

标准差是 427.455494759396

趋势图如下:



总结:

  1. 理论来说,虚拟节点越多,标准差越小,数据分布也是越均衡

  2. 实际上虚拟节点越多,系统的复杂性也越高,虚拟节点100-500是比较合理的范围



取模hash测试结果:

服务器1 有 99991 个元素

服务器2 有 99545 个元素

服务器3 有 99921 个元素

服务器4 有 99952 个元素

服务器5 有 100339 个元素

服务器6 有 99995 个元素

服务器7 有 100235 个元素

服务器8 有 100052 个元素

服务器9 有 99976 个元素

服务器10 有 99994 个元素

平均值是 100000

标准差是 197.26581051971476

添加一个节点缓存命中率:9%



没有虚拟节点一致性hash测试结果:

服务器1 有 110589 个元素

服务器2 有 24875 个元素

服务器3 有 149689 个元素

服务器4 有 78130 个元素

服务器5 有 30297 个元素

服务器6 有 139371 个元素

服务器7 有 71770 个元素

服务器8 有 265079 个元素

服务器9 有 56154 个元素

服务器10 有 74046 个元素

平均值是 100000

标准差是 67643.07872206882

添加一个节点缓存命中率:98%



有虚拟节点一致性hash测试结果(200个虚拟节点):

服务器1 有 100285 个元素

服务器2 有 94565 个元素

服务器3 有 105066 个元素

服务器4 有 99358 个元素

服务器5 有 86738 个元素

服务器6 有 88554 个元素

服务器7 有 101951 个元素

服务器8 有 102614 个元素

服务器9 有 114636 个元素

服务器10 有 106233 个元素

平均值是 100000

标准差是 7913.853119688285

添加一个节点缓存命中率:91%



总结:

  1. 取模hash是数据分布最均衡的,但是一旦有数据节点的变化,缓存命中率会急速下降

  2. 没有虚拟节点一致性hash数据分布非常不均衡

  3. 有虚拟节点一致性hash是比较折中的方案,数据分布相对平衡,添加、删除节点,命中率也不会有太大影响



用户头像

changtai

关注

还未添加个人签名 2018.04.30 加入

还未添加个人简介

评论

发布
暂无评论
一致性 hash 算法