第五周
作业 &总结:
写在前面,为什么需要一致性 Hash?
在分布式服务集群中,需要提供存储元素 object 的路由算法,来计算其应该所在的服务器位置。
假设服务器集群是一个数组 int[n-1] (n 为服务器个数)如果使用这样的 hash 算法:
路由到的服务器的数组位置:index = hash(object) / n;
当增加一个节点或者减少一个节点时,会导致大量元素路由的服务器位置改变,
导致请求 object 落空,命中率下降。
废多看码:
集群 ServerCluster 接口:
public interface ServerCluster {
void addServerNode(ServerNode serverNode);
void removeServerNode(ServerNode serverNode);
ServerNode routeServerNode(String key);
List<ServerNode> getAllServerNodes();
}
求余集群 RemainderCluster:
public class RemainderCluster implements ServerCluster {
private List<ServerNode> serverNodes;
public RemainderCluster() {
this.serverNodes = new ArrayList<>();
}
@Override
public List<ServerNode> getAllServerNodes() {
return this.serverNodes;
}
@Override
public void addServerNode(ServerNode serverNode) {
System.out.println("Add serverNode(" + serverNode.getAddress()+ ") to the cluster...");
this.serverNodes.add(serverNode);
}
@Override
public void removeServerNode(ServerNode serverNode) {
this.serverNodes.removeIf(sn -> serverNode.getAddress().equals(sn.getAddress()));
}
@Override
public ServerNode routeServerNode(String key) {
long hash = Math.abs(key.hashCode());
long serverIndex = hash % this.serverNodes.size();
return this.serverNodes.get((int) serverIndex);
}
}
集群服务节点 ServerNode:
@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class ServerNode {
//Server's address ip+port : 192.168.0.100:11211
private String address;
//Server's data (k,v) -> map
private Map<String, Object> data;
public Object get(String key) {
return data.get(key);
}
public void put(String key, Object value) {
this.data.put(key, value);
}
public void remove(String key) {
this.data.remove(key);
}
}
数据初始化/统计工具 StatisticalTool:
public class StatisticalTool {
final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";
/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(ServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}
/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(ServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
ServerNode serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}
/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}
/**
* Print the node status of the cluster
*
* @param cluster cluster data's total count
*/
public static void printClusterStat(ServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}
}
测试类代码 Runner:
public class Runner {
public static void main(String[] args) {
System.out.println("-----------------------Test remainder mode-----------------------");
int dataTotalCount = 10000;
ServerCluster cluster = new RemainderCluster();
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes
StatisticalTool.initCluster(cluster, 3);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data
StatisticalTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat
StatisticalTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
StatisticalTool.printClusterStat(cluster);
float addHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);
System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new RemainderCluster();
StatisticalTool.initCluster(cluster, 3);
StatisticalTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");
cluster.removeServerNode(ServerNode.of("192.168.0.102:11211", new HashMap<>()));
//re Print cluster stat
StatisticalTool.printClusterStat(cluster);
float removeHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);
}
}
测试结果:
D:\Java\openjdk-11\bin\java.exe
-----------------------Test remainder mode-----------------------
Initialize cluster server nodes ...
Add serverNode(192.168.0.100:11211) to the cluster...
Add serverNode(192.168.0.101:11211) to the cluster...
Add serverNode(192.168.0.102:11211) to the cluster...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
ServerNode IP: 192.168.0.102:11211 -> data count: 3334
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Add serverNode(192.168.0.103:11211) to the cluster...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
ServerNode IP: 192.168.0.102:11211 -> data count: 3334
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.25
-----------------------Test remove server node-----------------------
Add serverNode(192.168.0.100:11211) to the cluster...
Add serverNode(192.168.0.101:11211) to the cluster...
Add serverNode(192.168.0.102:11211) to the cluster...
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
Statistics is : 0.3331
Process finished with exit code 0
测试环境:集群节点数位 3,数据总数为 10000;
当集群中节点数固定,不变化的情况下,命中率为 1.0;
当集群中添加(扩容)一个节点时,命中率下降为 0.25;
当集群中删除(故障)一个节点时,命中率下降为 0.3331;
数据命中率低,系统无法合理扩缩容,容错率低,数据源并发变高。
因此需要一致性 hash 算法作为简单取余算法进行改进:
其实本质上,一致性 hash 也是 hash 取模,只是是永远的对 2 的 32 次方-1 取模。
一致性 hash 引入了一个叫做一致性 hash 环的概念,即将(0-2^32-1)中间的所有整数首尾相接连接成一个环。
OK,先使用简单的一致性 hash 来解决命中率低的问题:
接口 SimpleServerCluster:
public interface ServerCluster {
void addServerNode(SimpleServer serverNode);
void removeServerNode(SimpleServer serverNode);
SimpleServer routeServerNode(String key);
List<SimpleServer> getAllServerNodes();
}
简单 hash 集群 SimpleHashCluster:
public class SimpleHashCluster implements ServerCluster {
private SortedMap<Integer, SimpleServer> serverNodes;
public SimpleHashCluster() {
this.serverNodes = new TreeMap();
}
@Override
public void addServerNode(SimpleServer serverNode) {
this.serverNodes.put(Math.abs(serverNode.getAddress().hashCode()), serverNode);
}
@Override
public void removeServerNode(SimpleServer serverNode) {
this.serverNodes.remove(Math.abs(serverNode.getAddress().hashCode()));
}
@Override
public SimpleServer routeServerNode(String key) {
int keyHash = Math.abs(key.hashCode());
SortedMap<Integer, SimpleServer> nextServerMap = this.serverNodes.tailMap(keyHash);
int serverKey = nextServerMap.isEmpty() ? serverNodes.firstKey() : nextServerMap.firstKey();
return serverNodes.get(serverKey);
}
@Override
public List<SimpleServer> getAllServerNodes() {
return new ArrayList<>(this.serverNodes.values());
}
}
集群服务节点 SimpleServer:
@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class SimpleServer {
//Server's address ip+port : 192.168.0.100:11211
private String address;
//Server's data (k,v) -> map
private Map<String, Object> data;
public Object get(String key) {
return data.get(key);
}
void put(String key, Object value) {
this.data.put(key, value);
}
void remove(String key) {
this.data.remove(key);
}
}
数据初始化/统计工具 SimpleHashTool:
public class SimpleHashTool {
final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";
/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(SimpleServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(SimpleServer.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}
/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(SimpleServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
SimpleServer serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}
/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(SimpleServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}
/**
* Print the node status of the cluster
*
* @param cluster cluster data's total count
*/
public static void printClusterStat(SimpleServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}
}
测试类 Runner:
public class Runner {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
System.out.println("-----------------------Test simple hash mode-----------------------");
int dataTotalCount = 10000;
int serverCount = 3;
SimpleServerCluster cluster = new SimpleHashCluster();
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes
SimpleHashTool.initCluster(cluster, serverCount);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data
SimpleHashTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat
SimpleHashTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(SimpleServer.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
SimpleHashTool.printClusterStat(cluster);
float addHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);
System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new SimpleHashCluster();
SimpleHashTool.initCluster(cluster, serverCount);
SimpleHashTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");
cluster.removeServerNode(SimpleServer.of("192.168.0.102:11211", new HashMap<>()));
//re Print cluster stat
SimpleHashTool.printClusterStat(cluster);
float removeHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);
long endTime = System.currentTimeMillis();
System.out.println("测试总耗时:" + (endTime - startTime) + "ms");
}
}
注意,此时,完全按照求余模式和数量级别进行测试(10000K-V 数据,3 个服务节点),结果如下:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.91
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
Statistics is : 1.0
测试总耗时:71ms
Process finished with exit code 0
当集群中节点数固定,不变化的情况下,命中率为 1.0;
当集群中添加(扩容)一个节点时,命中率下降为 0.91;
当集群中删除(故障)一个节点时,命中率下降为 1.0;
当把数据规模扩大 100 倍,使用 100 万个 K-V 数据时,结果如下:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.9091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
Statistics is : 1.0
测试总耗时:1474ms
Process finished with exit code 0
当集群中节点数固定,不变化的情况下,命中率为 1.0;
当集群中添加(扩容)一个节点时,命中率下降为 0.9091;
当集群中删除(故障)一个节点时,命中率下降为 1.0;
当继续扩大 10 倍,使用 1000 万 K-V 数据测试时,结果如下:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
Statistics is : 0.96
测试总耗时:13724ms
Process finished with exit code 0
当集群中节点数固定,不变化的情况下,命中率为 1.0;
当集群中添加(扩容)一个节点时,命中率下降为 0.59091;
当集群中删除(故障)一个节点时,命中率下降为 0.96;
以上结果删除只是在尾部(192.168.0.102),当删除的节点在前部(192.168.0.100)时:
1000 万 K-V,3 节点:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics is : 0.649099
测试总耗时:13518ms
Process finished with exit code 0
当集群中添加(扩容)一个节点时,命中率下降为 0.59091;
当集群中删除(故障)前部节点时,命中率下降为 0.649099;
删除节点在中部(192.168.0.101):
1000 万 K-V,3 节点:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics is : 0.390901
测试总耗时:12728ms
Process finished with exit code 0
当集群中添加(扩容)一个节点时,命中率下降为 0.59091;
当集群中删除(故障)中部节点时,命中率下降为 0.390901;
为了降低风险,增加集群中的服务器数量来均摊异常,将服务器个数从 3 扩容到 10,分别测试 100 万数据和 1000 万数据:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics is : 0.9091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics is : 0.99991
测试总耗时:1407ms
Process finished with exit code 0
1000 万 K-V:
-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 1100090
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 890900
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 1100090
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics is : 0.91091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 890900
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics is : 0.889991
测试总耗时:13892ms
Process finished with exit code 0
可以看出,出现了 hash 环中数据不够均匀的情况(当然是用 java 的 hashcode 效果也不好,我换了其他的 hash 算法分布的也不是特别均匀),若故障节点为均摊较多的服务器时,命中率极低!
继续改进简单 hash,增加虚拟节点到真实节点的映射,并改用标准差来确认 hash 环中数据分布的情况。
标准差:标准差是一组数据平均值分散程度的一种度量。 一个较大的标准差值,代表大部分数值和其平均值之间差异较大;一个较小的标准差,代表这些数值较接近平均值。
标准差小说明数据更加准确。
样本标准差=方差的算术平方根=s=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/(n-1))
总体标准差=σ=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/n)
OK,先上代码,先使用 hashcode:
集群 ServerCluster 接口:
public interface ServerCluster {
void addServerNode(ServerNode serverNode);
void removeServerNode(ServerNode serverNode);
ServerNode routeServerNode(String key);
List<ServerNode> getAllServerNodes();
}
集群 ConsistentHashCluster:
public class ConsistentHashCluster implements ServerCluster {
private List<ServerNode> serverNodes;
private SortedMap<Integer, ServerNode> virtualNodes;
private Integer virtualCount;
public ConsistentHashCluster(Integer virtualCount) {
this.serverNodes = new ArrayList<>();
this.virtualNodes = new TreeMap<>();
this.virtualCount = virtualCount;
}
@Override
public void addServerNode(ServerNode serverNode) {
this.serverNodes.add(serverNode);
IntStream.range(0, this.virtualCount).forEach(i -> {
//String key = serverNode.getAddress() + HashTool.split + i;
//int hash = HashTool.getHashCode(key);
int hash = HashTool.getHashCode(serverNode, i);
this.virtualNodes.put(hash, serverNode);
});
}
@Override
public void removeServerNode(ServerNode serverNode) {
this.serverNodes.removeIf(sn -> sn.getAddress().equals(serverNode.getAddress()));
IntStream.range(0, this.virtualCount).forEach(i -> {
//String key = serverNode.getAddress() + HashTool.split + i;
//int hash = HashTool.getHashCode(key);
int hash = HashTool.getHashCode(serverNode, i);
this.virtualNodes.remove(hash);
});
}
@Override
public ServerNode routeServerNode(String key) {
int hash = key.hashCode();
//int hash = HashTool.getHashCode(key);
SortedMap<Integer, ServerNode> nextMap = this.virtualNodes.tailMap(hash);
int serverKey = nextMap.isEmpty() ? this.virtualNodes.firstKey() : nextMap.firstKey();
return this.virtualNodes.get(serverKey);
}
@Override
public List<ServerNode> getAllServerNodes() {
return this.serverNodes;
}
}
集群服务节点 ServerNode:
@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class ServerNode {
//Server's address ip+port : 192.168.0.100:11211
private String address;
//Server's data (k,v) -> map
private Map<String, Object> data;
public Object get(String key) {
return data.get(key);
}
public void put(String key, Object value) {
this.data.put(key, value);
}
public void remove(String key) {
this.data.remove(key);
}
}
数据初始化/统计工具 HashTool:
public class HashTool {
final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";
public final static String split = "_";
public static Integer getHashCode(ServerNode serverNode, int index) {
return (serverNode.getAddress() + HashTool.split + index).hashCode();
}
public static Integer getHashCode(String s) {
HashFunction hashFunction = Hashing.murmur3_128(13);
return Math.abs(hashFunction.hashString(s, Charsets.UTF_8).hashCode());
}
/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(ServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}
/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(ServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
ServerNode serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}
/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}
/**
* Print the node status of the cluster
*
* @param cluster cluster object
*/
public static void printClusterStat(ServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}
/**
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @param serverCount serverNode's Count
* @return ds
*/
public static double getSd(ServerCluster cluster, int dataTotalCount, int serverCount) {
int avg = dataTotalCount / serverCount;
double powSum = cluster.getAllServerNodes().stream().mapToDouble(n -> Math.pow(n.getData().size() - avg, 2)).sum();
double ds = Math.sqrt(powSum / serverCount);
return ds;
}
}
测试类代码 Runner:
public class Runner {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
int dataTotalCount = 1000000;
int serverCount = 10;
int replicas = 100;
System.out.println(String.format("data total count : %s, serverCount : %s, virtualCount each server : %s", dataTotalCount, serverCount , replicas));
System.out.println("-----------------------Test hash mode-----------------------");
ServerCluster cluster = new ConsistentHashCluster(replicas);
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes
HashTool.initCluster(cluster, serverCount);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data
HashTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat
HashTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
HashTool.printClusterStat(cluster);
float addHit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new ConsistentHashCluster(replicas);
HashTool.initCluster(cluster, serverCount);
HashTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");
cluster.removeServerNode(ServerNode.of("192.168.0.101:11211", new HashMap<>()));
//re Print cluster stat
HashTool.printClusterStat(cluster);
float removeHit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
long endTime = System.currentTimeMillis();
System.out.println("测试总耗时:" + (endTime - startTime) + "ms");
}
}
100 万 K-V,10 节点,每台 100 虚拟节点,结果:
data total count : 1000000, serverCount : 10, virtualCount each server : 100
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 180594.5232281422
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.9091
Sd is : 183342.25323149053
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
Statistics is : 0.99991
Sd is : 177809.39516797193
测试总耗时:1902ms
Process finished with exit code 0
跟之前一样,使用自带的 hashcode 散列的特别不均匀,标准差达到 18 万+,从节点的 data count 也可以看出来,效果奇差无比。
换成 murmur3_128:
data total count : 1000000, serverCount : 10, virtualCount each server : 100
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.101:11211 -> data count: 112653
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 8029.741403059005
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.101:11211 -> data count: 112653
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.892557
Sd is : 32626.31985069723
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
Statistics is : 0.887347
Sd is : 6961.817729587582
测试总耗时:4848ms
Process finished with exit code 0
标准差 8029,从 data count 直观来看,已经比之前好太多,进一步增加虚拟节点个数(500 个),使数据分布更均匀:
data total count : 1000000, serverCount : 10, virtualCount each server : 500
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.101:11211 -> data count: 100265
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 5072.616031201258
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.101:11211 -> data count: 100265
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.897037
Sd is : 32027.042220598516
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
Statistics is : 0.899735
Sd is : 5071.923786887969
测试总耗时:5384ms
Process finished with exit code 0
1000 个虚拟节点:
data total count : 1000000, serverCount : 10, virtualCount each server : 1000
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.101:11211 -> data count: 96919
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 2740.3759960998054
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.101:11211 -> data count: 96919
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.900173
Sd is : 31741.292673739677
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
Statistics is : 0.903081
Sd is : 2561.3286591142496
测试总耗时:5644ms
Process finished with exit code 0
结论:
虚拟节点的存在可以使 hash 环中的节点命中率变的均衡。
虚拟节点越多,分布越均匀,同时使用好的 hash 算法也可以使数据分布更均匀。
但会带来数据牺牲,真实节点增加或者减少时,由于虚拟节点数量剧烈变化,数据的重新分配可能会影响到更多的真实节点。
另外过多的虚拟节点也需要单独的数据结构存储,浪费空间和查询时间,不能无限增的加虚拟节点。
虚拟节点越多 在服务增加或恢复时,涉及数据迁移的真实节点就越多。有数据迁移场景需求的话需要考虑这一点。
版权声明: 本文为 InfoQ 作者【Geek_2b3614】的原创文章。
原文链接:【http://xie.infoq.cn/article/7bd289c4447a734762c17c82b】。未经作者许可,禁止转载。
Geek_2b3614
还未添加个人签名 2019.05.09 加入
还未添加个人简介
评论