一致性 hash 算法

用户头像
changtai
关注
发布于: 2020 年 07 月 08 日
概述:

Java实现一致性hash算法,比较 取模hash、没有虚拟节点一致性hash、有虚拟节点一致性hash的优缺点。

作业一:
  1. 用你熟悉的编程语言实现一致性 hash 算法。

项目结构:



类图:



hash算法相关:

/**
* hash方法接口
*
* @author zhaoct
* @date 2020-07-07 15:20
*/
public interface HashStrategy {
/**
* 计算 hash code
* @param str
* @return
*/
int getHashCode(String str);
}



/**
* JDK hash
*
* @author zhaoct
* @date 2020-07-07 15:22
*/
public class JdkHash implements HashStrategy{
@Override
public int getHashCode(String str) {
return str.hashCode();
}
}



/**
* FNV1_32_HASH hash
*
* @author zhaoct
* @date 2020-07-07 15:26
*/
public class FnvHash implements HashStrategy{
private static final long FNV_32_INIT = 2166136261L;
private static final int FNV_32_PRIME = 16777619;
@Override
public int getHashCode(String str) {
final int p = FNV_32_PRIME;
int hash = (int) FNV_32_INIT;
for (int i = 0; i < str.length(); i++){
hash = (hash ^ str.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
hash = Math.abs(hash);
return hash;
}
}

存储相关:

/**
* 一致性hash环上面的虚拟节点
*
* @author zhaoct
* @date 2020-07-07 10:00
*/
public class Node {
/**
* 虚拟节点名称
*/
private String name;
/**
* 缓存服务器
*/
private Server server;
/**
* 虚拟节点上数据数量,用于统计
*/
private AtomicInteger count;
public Node(String name, Server server){
this.name = name;
this.server = server;
this.count = new AtomicInteger(0);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Server getServer() {
return server;
}
public void setServer(Server server) {
this.server = server;
}
public void incrementCount(){
count.incrementAndGet();
}
public Integer getCount() {
return count.get();
}



/**
* 缓存服务器
*
* @author zhaoct
* @date 2020-07-07 9:32
*/
public class Server {
/**
* 缓存服务器名称
*/
private String name;
/**
* 虚拟节点个数
*/
private Integer nodeNumber;
/**
* 缓存服务器的虚拟节点
*/
private List<Node> nodeList;
/**
* mock 数据存储
*/
private Map<String, String> dataMap = null;
/**
* 取模hash构造方法
* @param name
*/
public Server(String name){
this.name = name;
this.dataMap = new ConcurrentHashMap<>();
}
/**
* 一致性hash构造方法
* @param name
* @param nodeNumber
*/
public Server(String name, Integer nodeNumber){
this.name = name;
this.nodeNumber = nodeNumber;
this.nodeList = new ArrayList<>();
this.dataMap = new ConcurrentHashMap<>();
//初始化虚拟节点
for (Integer i = 0; i < nodeNumber; i++) {
Node node = new Node(name + " : " + i, this);
nodeList.add(node);
}
}
public void put(String key, String value){
this.dataMap.put(key, value);
}
public String get(String key){
return dataMap.get(key);
}
public Integer getSize(){
return dataMap.size();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getNodeNumber() {
return nodeNumber;
}
public void setNodeNumber(Integer nodeNumber) {
this.nodeNumber = nodeNumber;
}
public List<Node> getNodeList() {
return nodeList;
}
public void setNodeList(List<Node> nodeList) {
this.nodeList = nodeList;
}



/**
*
* 负载均衡接口
* @author zhaoct
* @date 2020-07-07 10:38
*/
public interface LoadBalanceStrategy {
/**
* 从分布式缓存中获取服务器
* @param key
* @return
*/
Server getServer(String key);
/**
* 添加一台服务器
* @param server
*/
void addServer(Server server);
/**
* 删除一台服务器
* @param server
*/
void removeServer(Server server);
}



/**
* 取模 hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class ModuloHashLoadBalance implements LoadBalanceStrategy{
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public ModuloHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.hashStrategy = hashStrategy;
this.servers = servers;
}
@Override
public Server getServer(String key) {
return servers.get(hashStrategy.getHashCode(key) % servers.size());
}
@Override
public void addServer(Server server) {
servers.add(server);
}
@Override
public void removeServer(Server server) {
servers.remove(server);
}
}



/**
* 没有虚拟节点一致性hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class NoVirtualNodeHashLoadBalance implements LoadBalanceStrategy {
/**
* hash 环
*/
TreeMap<Integer, Server> ring ;
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public NoVirtualNodeHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.ring = new TreeMap<>();
this.hashStrategy = hashStrategy;
this.servers = servers;
// 创建一致性hash环
this.buildConsistentHashRing(this.servers);
}
private TreeMap<Integer, Server> buildConsistentHashRing(List<Server> servers) {
for (Server server : servers) {
ring.put(hashStrategy.getHashCode(server.getName()), server);
}
return ring;
}
@Override
public Server getServer(String key) {
Map.Entry<Integer, Server> entry = ring.ceilingEntry(hashStrategy.getHashCode(key));
if(entry == null){
//取第一个
entry = ring.firstEntry();
}
return entry.getValue();
}
@Override
public void addServer(Server server) {
servers.add(server);
//添加到环上
ring.put(hashStrategy.getHashCode(server.getName()), server);
}
@Override
public void removeServer(Server server) {
servers.remove(server);
//从环上删除
ring.remove(hashStrategy.getHashCode(server.getName()));
}
}



/**
* 一致性hash
*
* @author zhaoct
* @date 2020-07-07 9:25
*/
public class ConsistentHashLoadBalance implements LoadBalanceStrategy {
/**
* hash 环
*/
TreeMap<Integer, Server> ring ;
/**
* hash 方法
*/
private HashStrategy hashStrategy;
/**
* 服务器列表
*/
private List<Server> servers;
public ConsistentHashLoadBalance(List<Server> servers, HashStrategy hashStrategy){
this.ring = new TreeMap<>();
this.hashStrategy = hashStrategy;
this.servers = servers;
// 创建一致性hash环
this.buildConsistentHashRing(this.servers);
}
private TreeMap<Integer, Server> buildConsistentHashRing(List<Server> servers) {
//添加虚拟节点
for (Server server : servers) {
for (Node node : server.getNodeList()) {
ring.put(hashStrategy.getHashCode(node.getName()), server);
}
}
return ring;
}
@Override
public Server getServer(String key) {
Map.Entry<Integer, Server> entry = ring.ceilingEntry(hashStrategy.getHashCode(key));
if(entry == null){
//取第一个
entry = ring.firstEntry();
}
return entry.getValue();
}
@Override
public void addServer(Server server) {
servers.add(server);
//添加到环上
for (Node node : server.getNodeList()) {
ring.put(hashStrategy.getHashCode(node.getName()), server);
}
}
@Override
public void removeServer(Server server) {
servers.remove(server);
//从环上删除
for (Node node : server.getNodeList()) {
ring.remove(hashStrategy.getHashCode(node.getName()));
}
}
}



Cache相关:

/**
* Cache 接口
*
* @author zhaoct
* @date 2020-07-07 15:13
*/
public interface Cache {
/**
* 向缓存中放数据
* @param key
* @param value
*/
void put(String key, String value);
/**
* 从缓存中获取数据
* @param key
* @return
*/
String get(String key);
/**
* 添加一台服务器
* @param server
*/
void addServer(Server server);
/**
* 删除一台服务器
* @param server
*/
void removeServer(Server server);
}



/**
* 分布式cache
*
* @author zhaoct
* @date 2020-07-07 15:15
*/
public class DistributedCache implements Cache{
private LoadBalanceStrategy loadBalanceStrategy;
public DistributedCache(LoadBalanceStrategy loadBalanceStrategy){
this.loadBalanceStrategy = loadBalanceStrategy;
}
@Override
public void put(String key, String value) {
Server server = loadBalanceStrategy.getServer(key);
server.put(key, value);
//System.out.println(" key: " + key + " 写入到缓存服务器 " + server.getName());
}
@Override
public String get(String key) {
Server server = loadBalanceStrategy.getServer(key);
//System.out.println(" key: " + key + " 从缓存服务器读取 " + server.getName());
return server.get(key);
}
@Override
public void addServer(Server server) {
loadBalanceStrategy.addServer(server);
}
@Override
public void removeServer(Server server) {
loadBalanceStrategy.removeServer(server);
}



测试用例相关:

/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class ModuloHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
@Before
public void init(){
//10台服务器
servers.addAll(
Arrays.asList(
new Server("服务器1"),
new Server("服务器2"),
new Server("服务器3"),
new Server("服务器4"),
new Server("服务器5"),
new Server("服务器6"),
new Server("服务器7"),
new Server("服务器8"),
new Server("服务器9"),
new Server("服务器10")
));
cache = new DistributedCache(new ModuloHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);
//标准差
double temp = 0;
for(Server server : servers){
temp += Math.pow(server.getSize() - average , 2);
}
double sd = Math.sqrt(temp/ servers.size());
System.out.println(" 标准差是 " + sd);
}
@Test
public void testHitRate(){
//添加一台服务器,看命中率的变化
cache.addServer(new Server("服务器11"));
int hit = 0;
for(String key : keys){
if(cache.get(key) != null){
hit ++;
}
}
System.out.println("添加一个节点缓存命中率:" + hit*100/keys.size() + "%");
}
}



/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class NoVirtualNodeHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
@Before
public void init(){
int nodeNumber = 0;
//10台服务器,没有虚拟节点
servers.addAll(
Arrays.asList(
new Server("服务器1"),
new Server("服务器2"),
new Server("服务器3"),
new Server("服务器4"),
new Server("服务器5"),
new Server("服务器6"),
new Server("服务器7"),
new Server("服务器8"),
new Server("服务器9"),
new Server("服务器10")
));
cache = new DistributedCache(new NoVirtualNodeHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);
//标准差
double temp = 0;
for(Server server : servers){
temp += Math.pow(server.getSize() - average , 2);
}
double sd = Math.sqrt(temp/ servers.size());
System.out.println(" 标准差是 " + sd);
}
@Test
public void testHitRate(){
//添加一台服务器,看命中率的变化
cache.addServer(new Server("服务器11"));
int hit = 0;
for(String key : keys){
if(cache.get(key) != null){
hit ++;
}
}
System.out.println("添加一个节点缓存命中率:" + hit*100/keys.size() + "%");
}
}



/**
* 缓存测试用例
*
* @author zhaoct
* @date 2020-07-07 15:44
*/
public class ConsistentHashCacheTest {
private Cache cache;
private List<Server> servers = new ArrayList<>();
private List<String> keys = new ArrayList<>();
/**
* 每台服务器虚拟节点个数
*/
private int nodeNumber = 200;
@Before
public void init(){
//10台服务器,每个服务器1000个虚拟节点
servers.addAll(
Arrays.asList(
new Server("服务器1", nodeNumber),
new Server("服务器2", nodeNumber),
new Server("服务器3", nodeNumber),
new Server("服务器4", nodeNumber),
new Server("服务器5", nodeNumber),
new Server("服务器6", nodeNumber),
new Server("服务器7", nodeNumber),
new Server("服务器8", nodeNumber),
new Server("服务器9", nodeNumber),
new Server("服务器10", nodeNumber)
));
cache = new DistributedCache(new ConsistentHashLoadBalance(servers, new FnvHash()));
//100w数据测试
for(int i=0; i<1000000; i++){
String key = UUID.randomUUID().toString();
cache.put(key, "data" + i);
keys.add(key);
}
}
@Test
public void testCache(){
//打印元素个数
for(Server server : servers){
System.out.println(server.getName() + " 有 " + server.getSize() + " 个元素 ");
}
//统计标准差
//1. 平均值
int sum = 0;
for(Server server : servers){
sum += server.getSize();
}
int average = sum / servers.size();
System.out.println(" 平均值是 " + average);