一致性哈希算法简单实现

用户头像
Jerry Tse
关注
发布于: 2020 年 07 月 08 日

0. 前言

在分布式系统中,如何确定访问系统中的哪台机器节点大致有两种方法:

  • 余数哈希

  • 一致性哈希



余数哈希算法相对简单:hash(key)%节点总数,但是在系统伸缩即节点总数变化的时候,公式的计算结果会变化。对于有状态服务节点,例如缓存节点选择上影响比较大,会造成部分节点服务器缓存失效。



定义

一致性哈希解决了余数哈希系统伸缩影响整体节点数据问题。

(图1)

图1就是一致性哈希的示意图,首先有一个哈希环的概念,其实就是0到2^31-1,范围和无符号int的相同。

第一步:我们对服务器节点取哈希值,我们将计算好的哈希值对应放在哈希环相同值的位置上。

第二步:我们也对要确定访问某台服务器的key(key是可以代表一起请求的参数,例如URL)进行哈希计算。同样将计算好的值放入哈希环对应值的位置。

第三步:最后我们从我们站在key坐在哈希环的位置顺时针查找,找到第一个服务器所在位置上对应的服务器就是我们要访问的服务器。



带来的好处

新增或删除服务器节点,影响相对小。



(图2)

图2是扩容和缩容的对访问影响的示意图。

我们新增一台服务器或去掉一台服务器都只影响key3的后续访问。其他key不受影响。所以相对于余数哈希,服务器伸缩对访问的影响范围较小。



问题

如图1所示,哈希函数生成的值是随机的,我们无法控制各服务器节点在哈希环上的位置,这样就会造成服务器所承接的数据访问量有多有少,无法达到均衡访问的目的。余数哈希就相对均衡多了。



如何解决

既然服务器少的情况下有不均衡的情况,那我们就添加服务器,环上的服务器越多分布均衡的概率就越大。但是服务器也是资源,我们不能随意添加,这样虽然解决了一致性哈希访问均衡问题,但是成本过高,得不偿失。

那怎么办,我们可以换一种思路,既然不能添加实际的服务器,我们可以添加虚拟的服务器,虚拟服务器是一个逻辑概念,只是用来在哈希环上占位,他的行为都交由实际服务器来处理。这样一来,我们可以通过任意添加虚拟节点的数量来达到使服务器在哈希环上分布均匀的目的。





(图3)

如图3所示,新增虚拟节点之后,服务器在哈希环上分布更加均匀。

这里要注意,图3只是一个示意图,因为哈希函数的随机性,虚拟节点的位置也有很大的随意性,但是相对而言没有虚拟节点而言均匀分布趋势是向好。



这里还有一个问题,是不是虚拟节点添加的越多,数据分布的越均衡,下面我们通过简单实现一致性哈希算法的程序来验证一下,多少个虚拟节点可以达到数据的均衡。



1. 代码示例



(图4)

我们结合类图说明。

  • 首先我们定义了两个接口,HashCircle和Cache。一个代表哈希环,一个代表缓存。我们模拟缓存服务器的实际使用情况,来验证虚拟节点对访问均衡性的影响。

/**
* 哈希环
*/
public interface HashCircle {
/**
* 插入节点
* @param cache
*/
void insertNode(Cache cache);
/**
* 获取节点
* @param key
* @return
*/
Cache getNode(String key);
/**
* 删除节点
* @param cache
*/
void removeNode(Cache cache);
}



/**
* 缓存接口
*/
public interface Cache {
/**
* 查询
* @param key
*/
String get(String key);
/**
* 添加
* @param key
* @param value
*/
void add(String key, String value);
/**
* 添加(覆盖)
* @param key
* @param value
*/
void set(String key, String value);
/**
* 删除
* @param key
*/
void remove(String key);
/**
* 总数
*/
int size();
}



  • 接下来用SortMapCircle实现了HashCircle接口,SortMapCircle类使用TreeMap容器,因为TreeMap中key有序。我们Map中的可以存储节点的哈希值,value存储节点的引用。用这种方式表示节点的哈希环占位。



import net.jcip.annotations.NotThreadSafe;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* 使用sortedMap实现哈希环
*/
@NotThreadSafe
public class SortedMapCircle implements HashCircle {
private static final SortedMap<Integer, Cache> CIRCLE = new TreeMap<>();
private Integer[] keys = null;
@Override
public void insertNode(Cache cache) {
CIRCLE.put(Math.abs(cache.hashCode()), cache);
keys = null;
}
/**
* 二分查找实现根据key找到对应节点。
* @param key
* @return
*/
@Override
public Cache getNode(String key) {
//避免每次都进行耗时的toArray操作
if (keys == null) {
keys = CIRCLE.keySet().toArray(new Integer[0]);
}
//二分查找
int left = 0;
int right = keys.length - 1;
int keyHash = Math.abs(key.hashCode());
while (left <= right) {
int mid = (left + right) / 2;
if (keyHash <= keys[mid]) {
if (mid - 1 >= 0 && keys[mid - 1] <= keyHash) {
return CIRCLE.get(keys[mid]);
}
right = mid - 1;
} else {
left = mid + 1;
}
}
return CIRCLE.get(CIRCLE.firstKey());
}
@Override
public void removeNode(Cache cache) {
CIRCLE.remove(Math.abs(cache.hashCode()));
keys = null;
}
}



  • 之后是Cache的具体实现部分,AbstractCache类实现了Cache接口,实现了一下公共的部分,新增了IP和port属性,重写了hashCode和equals方法。模拟我们日常对缓存服务器的配置使用场景。后面是Node和VNode的实现类,他们才是具体实现缓存逻辑的地方,一个代表物理节点,一个代表虚拟节点。Node中使用HashMap模拟缓存容器(我们简单点实现,不用真的连接一个memcached)。VNode比较有意思,他将他的所有行为都委托给了他的Node类型的成员变量(典型的代理模式),所谓的虚拟节点真是名不虚传。

import java.util.Objects;
/**
* Cache的抽象类
*/
public abstract class AbstractCache implements Cache {
private String ip;
private String port;
//增加节点hash值的散列性
private String md5;
protected AbstractCache(String ip, String port) {
this.ip = ip;
this.port = port;
this.md5 = MD5Utils.encode(this.ip+":"+this.port);
}
@Override
public abstract String get(String key);
@Override
public abstract void add(String key, String value) ;
@Override
public abstract void set(String key, String value) ;
@Override
public abstract void remove(String key) ;
@Override
public abstract int size();
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractCache that = (AbstractCache) o;
return md5 == that.md5 &&
ip.equals(that.ip) &&
port.equals(that.port);
}
@Override
public int hashCode() {
return Objects.hash(ip, port, md5);
}
}



import net.jcip.annotations.NotThreadSafe;
import java.util.HashMap;
import java.util.Map;
/**
* 缓存节点
*/
@NotThreadSafe
public class Node extends AbstractCache {
//使用Map实现缓存功能
private Map<String, String> cache;
public Node(String ip, String port) {
super(ip,port);
cache = new HashMap<>();
}
public String get(String key) {
return cache.get(key);
}
public void add(String key, String value) {
if(!cache.containsKey(key)){
cache.put(key, value);
}
}
public void set(String key, String value) {
cache.put(key, value);
}
public void remove(String key) {
cache.remove(key);
}
@Override
public int size() {
return cache.size();
}
}



import net.jcip.annotations.NotThreadSafe;
/**
* 缓存虚拟节点
*/
@NotThreadSafe
public class VNode extends AbstractCache{
private Node node;
public VNode(Node node, String ip, String port) {
super(ip,port);
this.node = node;
}
@Override
public String get(String key) {
return node.get(key);
}
@Override
public void add(String key, String value) {
node.add(key, value);
}
@Override
public void set(String key, String value) {
node.set(key,value);
}
@Override
public void remove(String key) {
node.remove(key);
}
@Override
public int size() {
return node.size();
}
}



  • 最后我们完成了Client程序作为测试程序。

import java.util.ArrayList;
import java.util.List;
public class Client {
public static void main(String[] args) {
//虚拟节点数量
int vnodeCount = 400;
//物理节点数量
int nodeCount = 10;
//插入数据总量
int dataCount = 1000000;
List<Node> nodes = new ArrayList<>();
HashCircle hashCircle = new SortedMapCircle();
long begin1 = System.currentTimeMillis();
for (int i = 0; i < nodeCount; i++) {
Node node = new Node(String.format("10.0.45.%s", 190 + i), "12001");
hashCircle.insertNode(node);
insertVNode(node, vnodeCount, hashCircle);
nodes.add(node);
}
long end1 = System.currentTimeMillis();
long begin = System.currentTimeMillis();
for (int i = 0; i < dataCount; i++) {
//保证数据内容哈希值每次试验一致且尽量均衡。
String key = MD5Utils.encode(i + "");
hashCircle.getNode(key).add(key, key);
}
long end = System.currentTimeMillis();
int count = 0;
List<Integer> sizeList = new ArrayList<>();
int offset = 1;
for (Node node : nodes) {
System.out.println(String.format("物理节点%s的缓存数据总数:%s", offset++, node.size()));
sizeList.add(node.size());
count += node.size();
}
System.out.println(String.format("%s个物理节点缓存数据总数:%s", nodeCount, count));
System.out.println(String.format("初始化节点总耗时(物理节点:%s;虚拟节点:%s):%sms", nodeCount, vnodeCount * nodeCount, end1 - begin1));
System.out.println(String.format("插入数据规模为%s数据总耗时:%sms", dataCount, (end - begin)));
System.out.println(String.format("数据规模%s下%s个物理节点(每个包含%s个节点虚拟节点)标准差:%s", dataCount, nodeCount, vnodeCount, standardDeviation(sizeList)));
}
private static void insertVNode(Node node, int vnodeCount, HashCircle hashCircle) {
for (int i = 0; i < vnodeCount; i++) {
Cache tmp = new VNode(node, node.getIp(), String.valueOf(Integer.valueOf(node.getPort()) + i + 1));
hashCircle.insertNode(tmp);
}
}
/**
* 标准差
* @param input
* @return
*/
private static double standardDeviation(List<Integer> input) {
int sum = 0;
for (Integer tmp : input) {
sum += tmp; //求出数组的总和
}
double average = sum / input.size(); //求出数组的平均数
int total = 0;
for (Integer tmp : input) {
total += (tmp - average) * (tmp - average); //求出方差,如果要计算方差的话这一步就可以了
}
return Math.sqrt(total / input.size()); //求出标准差
}
}



源码地址:下载



2. 运行分析



我们模拟十个物理缓存节点,若干虚拟节点,插入1百万数据。通过统计不同虚拟节点数下十个物理节点中缓存数据的总数的标准差,来衡量多少个虚拟节点能达到节点数据分布均衡的效果。



因为每次模拟输入的一百万数据均一致,所以在每个虚拟节点情况下得到的标准差均是固定值,每次执行都一样,避免随机数据造成实验结果偏移。



下面是一部分运行数据

  • 没有虚拟节点的情况

物理节点1的缓存数据总数:2187

物理节点2的缓存数据总数:85280

物理节点3的缓存数据总数:232267

物理节点4的缓存数据总数:33781

物理节点5的缓存数据总数:19570

物理节点6的缓存数据总数:167067

物理节点7的缓存数据总数:335361

物理节点8的缓存数据总数:30845

物理节点9的缓存数据总数:31026

物理节点10的缓存数据总数:62616

10个物理节点缓存数据总数:1000000

初始化节点总耗时(物理节点:10;虚拟节点:0):26ms

插入数据规模为1000000数据总耗时:1690ms

数据规模1000000下10个物理节点(每个包含0个节点虚拟节点)标准差:14654.29507004687



  • 50个虚拟节点的情况

物理节点1的缓存数据总数:99086

物理节点2的缓存数据总数:119359

物理节点3的缓存数据总数:87000

物理节点4的缓存数据总数:119459

物理节点5的缓存数据总数:96023

物理节点6的缓存数据总数:108174

物理节点7的缓存数据总数:90886

物理节点8的缓存数据总数:96276

物理节点9的缓存数据总数:86627

物理节点10的缓存数据总数:97110

10个物理节点缓存数据总数:1000000

初始化节点总耗时(物理节点:10;虚拟节点:500):44ms

插入数据规模为1000000数据总耗时:1811ms

数据规模1000000下10个物理节点(每个包含50个节点虚拟节点)标准差:11357.870222889502



。。。。



我将数据统计成表格:



  • 横轴表示10个物理节点存储数据的标准差

  • 纵轴表示每个物理节点拥有的虚拟节点数量



根据实验数据我们可见,节点数在100后开始标准差有大幅度降低,说明数据分布相对均衡。200节点时标准差已经达到了4000左右仅是最高值的27%,400节点时也是一个低点大概为最高值的14%。我们可见随着节点的数量增多,标准差是趋势下降而不是绝对下降。



因为随着虚拟节点数增加,getNode()方法的数据规模增加,执行效率会下降。所以我们需要综合考虑节点数目的选择。



综上所述,虚拟节点数在200和400时,数据分布较均衡。



3. 后语

我们通过一段模拟日常缓存使用场景的代码实现了一致性哈希算法并估算出虚拟节点数量的最佳实践值。

一致性哈希算法是分布式系统中比较重要的算法,虽然实践中应用的已经不多了,但是理论基础值得我们掌握。

希望本文对你有帮助,如果你有问题也欢迎给我留言。



发布于: 2020 年 07 月 08 日 阅读数: 43
用户头像

Jerry Tse

关注

还未添加个人签名 2018.11.02 加入

还未添加个人简介

评论

发布
暂无评论
一致性哈希算法简单实现