写点什么

第五周

用户头像
Geek_2b3614
关注
发布于: 2020 年 07 月 08 日

作业&总结:



写在前面,为什么需要一致性Hash?

在分布式服务集群中,需要提供存储元素object的路由算法,来计算其应该所在的服务器位置。

假设服务器集群是一个数组int[n-1] (n为服务器个数)如果使用这样的hash算法:

路由到的服务器的数组位置:index = hash(object) / n;

当增加一个节点或者减少一个节点时,会导致大量元素路由的服务器位置改变,

导致请求object落空,命中率下降。



废多看码:

集群ServerCluster接口:

public interface ServerCluster {

void addServerNode(ServerNode serverNode);

void removeServerNode(ServerNode serverNode);

ServerNode routeServerNode(String key);

List<ServerNode> getAllServerNodes();
}



求余集群RemainderCluster:

public class RemainderCluster implements ServerCluster {

private List<ServerNode> serverNodes;

public RemainderCluster() {
this.serverNodes = new ArrayList<>();
}

@Override
public List<ServerNode> getAllServerNodes() {
return this.serverNodes;
}

@Override
public void addServerNode(ServerNode serverNode) {
System.out.println("Add serverNode(" + serverNode.getAddress()+ ") to the cluster...");
this.serverNodes.add(serverNode);
}

@Override
public void removeServerNode(ServerNode serverNode) {
this.serverNodes.removeIf(sn -> serverNode.getAddress().equals(sn.getAddress()));
}

@Override
public ServerNode routeServerNode(String key) {
long hash = Math.abs(key.hashCode());
long serverIndex = hash % this.serverNodes.size();
return this.serverNodes.get((int) serverIndex);
}
}



集群服务节点ServerNode:

@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class ServerNode {

//Server's address ip+port : 192.168.0.100:11211
private String address;

//Server's data (k,v) -> map
private Map<String, Object> data;

public Object get(String key) {
return data.get(key);
}

public void put(String key, Object value) {
this.data.put(key, value);
}

public void remove(String key) {
this.data.remove(key);
}
}



数据初始化/统计工具StatisticalTool:

public class StatisticalTool {

final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";

/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(ServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}

/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(ServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
ServerNode serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}

/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}

/**
* Print the node status of the cluster
*
* @param cluster cluster data's total count
*/
public static void printClusterStat(ServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}
}



测试类代码Runner:

public class Runner {

public static void main(String[] args) {

System.out.println("-----------------------Test remainder mode-----------------------");

int dataTotalCount = 10000;

ServerCluster cluster = new RemainderCluster();

System.out.println("Initialize cluster server nodes ...");

//Initialize cluster server nodes
StatisticalTool.initCluster(cluster, 3);

System.out.println("Initialize cluster server nodes data ...");

//Initialize cluster server nodes data
StatisticalTool.initClusterData(cluster, dataTotalCount);

//Print cluster stat
StatisticalTool.printClusterStat(cluster);

System.out.println("Statistics cache hit rate: ");

float hit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);

System.out.println("Statistics is : " + hit / dataTotalCount);

System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");

cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
StatisticalTool.printClusterStat(cluster);

float addHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);


System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new RemainderCluster();
StatisticalTool.initCluster(cluster, 3);
StatisticalTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");

cluster.removeServerNode(ServerNode.of("192.168.0.102:11211", new HashMap<>()));
//re Print cluster stat
StatisticalTool.printClusterStat(cluster);

float removeHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);
}
}



测试结果:

D:\Java\openjdk-11\bin\java.exe
-----------------------Test remainder mode-----------------------
Initialize cluster server nodes ...
Add serverNode(192.168.0.100:11211) to the cluster...
Add serverNode(192.168.0.101:11211) to the cluster...
Add serverNode(192.168.0.102:11211) to the cluster...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
ServerNode IP: 192.168.0.102:11211 -> data count: 3334
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Add serverNode(192.168.0.103:11211) to the cluster...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
ServerNode IP: 192.168.0.102:11211 -> data count: 3334
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.25
-----------------------Test remove server node-----------------------
Add serverNode(192.168.0.100:11211) to the cluster...
Add serverNode(192.168.0.101:11211) to the cluster...
Add serverNode(192.168.0.102:11211) to the cluster...
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3333
ServerNode IP: 192.168.0.101:11211 -> data count: 3333
Statistics is : 0.3331

Process finished with exit code 0



测试环境:集群节点数位3,数据总数为10000

当集群中节点数固定,不变化的情况下,命中率为1.0;

当集群中添加(扩容)一个节点时,命中率下降为0.25

当集群中删除(故障)一个节点时,命中率下降为0.3331



数据命中率低,系统无法合理扩缩容,容错率低,数据源并发变高。



因此需要一致性hash算法作为简单取余算法进行改进:

其实本质上,一致性hash也是hash取模,只是是永远的对2的32次方-1取模。

一致性hash引入了一个叫做一致性hash环的概念,即将(0-2^32-1)中间的所有整数首尾相接连接成一个环。



OK,先使用简单的一致性hash来解决命中率低的问题:

接口SimpleServerCluster:

public interface ServerCluster {

void addServerNode(SimpleServer serverNode);

void removeServerNode(SimpleServer serverNode);

SimpleServer routeServerNode(String key);

List<SimpleServer> getAllServerNodes();
}



简单hash集群SimpleHashCluster:

public class SimpleHashCluster implements ServerCluster {

private SortedMap<Integer, SimpleServer> serverNodes;

public SimpleHashCluster() {
this.serverNodes = new TreeMap();
}

@Override
public void addServerNode(SimpleServer serverNode) {
this.serverNodes.put(Math.abs(serverNode.getAddress().hashCode()), serverNode);
}

@Override
public void removeServerNode(SimpleServer serverNode) {
this.serverNodes.remove(Math.abs(serverNode.getAddress().hashCode()));
}

@Override
public SimpleServer routeServerNode(String key) {
int keyHash = Math.abs(key.hashCode());
SortedMap<Integer, SimpleServer> nextServerMap = this.serverNodes.tailMap(keyHash);
int serverKey = nextServerMap.isEmpty() ? serverNodes.firstKey() : nextServerMap.firstKey();
return serverNodes.get(serverKey);
}

@Override
public List<SimpleServer> getAllServerNodes() {
return new ArrayList<>(this.serverNodes.values());
}
}



集群服务节点SimpleServer:

@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class SimpleServer {

//Server's address ip+port : 192.168.0.100:11211
private String address;

//Server's data (k,v) -> map
private Map<String, Object> data;

public Object get(String key) {
return data.get(key);
}

void put(String key, Object value) {
this.data.put(key, value);
}

void remove(String key) {
this.data.remove(key);
}
}



数据初始化/统计工具SimpleHashTool:

public class SimpleHashTool {

final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";

/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(SimpleServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(SimpleServer.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}

/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(SimpleServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
SimpleServer serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}

/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(SimpleServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}

/**
* Print the node status of the cluster
*
* @param cluster cluster data's total count
*/
public static void printClusterStat(SimpleServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}
}



测试类Runner:

public class Runner {

public static void main(String[] args) {

long startTime = System.currentTimeMillis();

System.out.println("-----------------------Test simple hash mode-----------------------");

int dataTotalCount = 10000;
int serverCount = 3;

SimpleServerCluster cluster = new SimpleHashCluster();

System.out.println("Initialize cluster server nodes ...");

//Initialize cluster server nodes
SimpleHashTool.initCluster(cluster, serverCount);

System.out.println("Initialize cluster server nodes data ...");

//Initialize cluster server nodes data
SimpleHashTool.initClusterData(cluster, dataTotalCount);

//Print cluster stat
SimpleHashTool.printClusterStat(cluster);

System.out.println("Statistics cache hit rate: ");

float hit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);

System.out.println("Statistics is : " + hit / dataTotalCount);

System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");

cluster.addServerNode(SimpleServer.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
SimpleHashTool.printClusterStat(cluster);

float addHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);


System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new SimpleHashCluster();
SimpleHashTool.initCluster(cluster, serverCount);
SimpleHashTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");

cluster.removeServerNode(SimpleServer.of("192.168.0.102:11211", new HashMap<>()));
//re Print cluster stat
SimpleHashTool.printClusterStat(cluster);

float removeHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);

long endTime = System.currentTimeMillis();
System.out.println("测试总耗时:" + (endTime - startTime) + "ms");
}
}



注意,此时,完全按照求余模式和数量级别进行测试(10000K-V数据,3个服务节点),结果如下:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.91
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 990
ServerNode IP: 192.168.0.100:11211 -> data count: 9010
Statistics is : 1.0
测试总耗时:71ms

Process finished with exit code 0

当集群中节点数固定,不变化的情况下,命中率为1.0;

当集群中添加(扩容)一个节点时,命中率下降为0.91

当集群中删除(故障)一个节点时,命中率下降为1.0



当把数据规模扩大100倍,使用100万个K-V数据时,结果如下:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.9091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90990
ServerNode IP: 192.168.0.100:11211 -> data count: 909010
Statistics is : 1.0
测试总耗时:1474ms

Process finished with exit code 0

当集群中节点数固定,不变化的情况下,命中率为1.0;

当集群中添加(扩容)一个节点时,命中率下降为0.9091

当集群中删除(故障)一个节点时,命中率下降为1.0



当继续扩大10倍,使用1000万K-V数据测试时,结果如下:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
Statistics is : 0.96
测试总耗时:13724ms

Process finished with exit code 0

当集群中节点数固定,不变化的情况下,命中率为1.0;

当集群中添加(扩容)一个节点时,命中率下降为0.59091

当集群中删除(故障)一个节点时,命中率下降为0.96



以上结果删除只是在尾部(192.168.0.102),当删除的节点在前部(192.168.0.100)时:

1000万K-V,3节点:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics is : 0.649099
测试总耗时:13518ms

Process finished with exit code 0

当集群中添加(扩容)一个节点时,命中率下降为0.59091

当集群中删除(故障)前部节点时,命中率下降为0.649099



删除节点在中部(192.168.0.101)

1000万K-V,3节点:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 6090990
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.59091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 3509010
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
Statistics is : 0.390901
测试总耗时:12728ms

Process finished with exit code 0

当集群中添加(扩容)一个节点时,命中率下降为0.59091

当集群中删除(故障)中部节点时,命中率下降为0.390901



为了降低风险,增加集群中的服务器数量来均摊异常,将服务器个数从3扩容到10,分别测试100万数据和1000万数据:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics is : 0.9091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.106:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 909000
ServerNode IP: 192.168.0.100:11211 -> data count: 0
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.104:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.108:11211 -> data count: 0
Statistics is : 0.99991
测试总耗时:1407ms

Process finished with exit code 0

1000万K-V:

-----------------------Test simple hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 1100090
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 890900
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics cache hit rate:
Statistics is : 1.0
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.101:11211 -> data count: 1100090
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 0
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics is : 0.91091
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.106:11211 -> data count: 900010
ServerNode IP: 192.168.0.105:11211 -> data count: 2209000
ServerNode IP: 192.168.0.100:11211 -> data count: 400000
ServerNode IP: 192.168.0.102:11211 -> data count: 400000
ServerNode IP: 192.168.0.107:11211 -> data count: 900000
ServerNode IP: 192.168.0.109:11211 -> data count: 1400000
ServerNode IP: 192.168.0.104:11211 -> data count: 900000
ServerNode IP: 192.168.0.103:11211 -> data count: 890900
ServerNode IP: 192.168.0.108:11211 -> data count: 900000
Statistics is : 0.889991
测试总耗时:13892ms

Process finished with exit code 0



可以看出,出现了hash环中数据不够均匀的情况(当然是用java的hashcode效果也不好,我换了其他的hash算法分布的也不是特别均匀),若故障节点为均摊较多的服务器时,命中率极低



继续改进简单hash,增加虚拟节点到真实节点的映射,并改用标准差来确认hash环中数据分布的情况。



标准差:标准差是一组数据平均值分散程度的一种度量。 一个较大的标准差值,代表大部分数值和其平均值之间差异较大;一个较小的标准差,代表这些数值较接近平均值。

标准差小说明数据更加准确。



样本标准差=方差的算术平方根=s=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/(n-1))

总体标准差=σ=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/n)



OK,先上代码,先使用hashcode:

集群ServerCluster接口:

public interface ServerCluster {

void addServerNode(ServerNode serverNode);

void removeServerNode(ServerNode serverNode);

ServerNode routeServerNode(String key);

List<ServerNode> getAllServerNodes();
}



集群ConsistentHashCluster:

public class ConsistentHashCluster implements ServerCluster {

private List<ServerNode> serverNodes;
private SortedMap<Integer, ServerNode> virtualNodes;
private Integer virtualCount;


public ConsistentHashCluster(Integer virtualCount) {
this.serverNodes = new ArrayList<>();
this.virtualNodes = new TreeMap<>();
this.virtualCount = virtualCount;
}

@Override
public void addServerNode(ServerNode serverNode) {
this.serverNodes.add(serverNode);
IntStream.range(0, this.virtualCount).forEach(i -> {
//String key = serverNode.getAddress() + HashTool.split + i;
//int hash = HashTool.getHashCode(key);
int hash = HashTool.getHashCode(serverNode, i);
this.virtualNodes.put(hash, serverNode);
});
}

@Override
public void removeServerNode(ServerNode serverNode) {
this.serverNodes.removeIf(sn -> sn.getAddress().equals(serverNode.getAddress()));
IntStream.range(0, this.virtualCount).forEach(i -> {
//String key = serverNode.getAddress() + HashTool.split + i;
//int hash = HashTool.getHashCode(key);
int hash = HashTool.getHashCode(serverNode, i);
this.virtualNodes.remove(hash);
});
}

@Override
public ServerNode routeServerNode(String key) {
int hash = key.hashCode();
//int hash = HashTool.getHashCode(key);
SortedMap<Integer, ServerNode> nextMap = this.virtualNodes.tailMap(hash);
int serverKey = nextMap.isEmpty() ? this.virtualNodes.firstKey() : nextMap.firstKey();
return this.virtualNodes.get(serverKey);
}

@Override
public List<ServerNode> getAllServerNodes() {
return this.serverNodes;
}
}



集群服务节点ServerNode:

@Data
@Builder
@AllArgsConstructor(staticName = "of")
public class ServerNode {

//Server's address ip+port : 192.168.0.100:11211
private String address;

//Server's data (k,v) -> map
private Map<String, Object> data;

public Object get(String key) {
return data.get(key);
}

public void put(String key, Object value) {
this.data.put(key, value);
}

public void remove(String key) {
this.data.remove(key);
}
}



数据初始化/统计工具HashTool:

public class HashTool {

final static String keyPrefix = "remainderKey";
final static String dataPrefix = "RemainderData";

public final static String split = "_";

public static Integer getHashCode(ServerNode serverNode, int index) {
return (serverNode.getAddress() + HashTool.split + index).hashCode();
}

public static Integer getHashCode(String s) {
HashFunction hashFunction = Hashing.murmur3_128(13);
return Math.abs(hashFunction.hashString(s, Charsets.UTF_8).hashCode());
}

/**
* Initialize cluster server nodes
*
* @param cluster cluster object
* @param serverNodeCount serverNode's Count
*/
public static void initCluster(ServerCluster cluster, int serverNodeCount) {
IntStream.range(0, serverNodeCount).forEach(i -> {
cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>()));
});
}

/**
* Initialize cluster server nodes data
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
*/
public static void initClusterData(ServerCluster cluster, int dataTotalCount) {
IntStream.range(0, dataTotalCount).forEach(i -> {
String key = keyPrefix + i;
String value = dataPrefix + i;
ServerNode serverNode = cluster.routeServerNode(key);
serverNode.put(key, value);
});
}

/**
* If the query object is not empty, it is a hit
*
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @return hit count
*/
public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) {
return IntStream.range(0, dataTotalCount).filter(i -> {
String key = keyPrefix + i;
return cluster.routeServerNode(key).get(key) != null;
}).count() / 1f;
}

/**
* Print the node status of the cluster
*
* @param cluster cluster object
*/
public static void printClusterStat(ServerCluster cluster) {
System.out.println("Scan the cluster server nodes stat: ");
cluster.getAllServerNodes().forEach(sn -> {
System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size());
});
}

/**
* @param cluster cluster object
* @param dataTotalCount cluster data's total count
* @param serverCount serverNode's Count
* @return ds
*/
public static double getSd(ServerCluster cluster, int dataTotalCount, int serverCount) {
int avg = dataTotalCount / serverCount;
double powSum = cluster.getAllServerNodes().stream().mapToDouble(n -> Math.pow(n.getData().size() - avg, 2)).sum();
double ds = Math.sqrt(powSum / serverCount);
return ds;
}
}



测试类代码Runner:

public class Runner {

public static void main(String[] args) {

long startTime = System.currentTimeMillis();

int dataTotalCount = 1000000;
int serverCount = 10;
int replicas = 100;

System.out.println(String.format("data total count : %s, serverCount : %s, virtualCount each server : %s", dataTotalCount, serverCount , replicas));
System.out.println("-----------------------Test hash mode-----------------------");

ServerCluster cluster = new ConsistentHashCluster(replicas);

System.out.println("Initialize cluster server nodes ...");

//Initialize cluster server nodes
HashTool.initCluster(cluster, serverCount);

System.out.println("Initialize cluster server nodes data ...");

//Initialize cluster server nodes data
HashTool.initClusterData(cluster, dataTotalCount);

//Print cluster stat
HashTool.printClusterStat(cluster);

System.out.println("Statistics cache hit rate: ");

float hit = HashTool.calculateHitRate(cluster, dataTotalCount);

System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));

System.out.println("-----------------------Test add server node-----------------------");
System.out.println("Try to add server nodes in the cluster");

cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>()));
//re Print cluster stat
HashTool.printClusterStat(cluster);

float addHit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + addHit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));

System.out.println("-----------------------Test remove server node-----------------------");
//rebuild
cluster = new ConsistentHashCluster(replicas);
HashTool.initCluster(cluster, serverCount);
HashTool.initClusterData(cluster, dataTotalCount);
System.out.println("rebuild end ...");

cluster.removeServerNode(ServerNode.of("192.168.0.101:11211", new HashMap<>()));
//re Print cluster stat
HashTool.printClusterStat(cluster);

float removeHit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + removeHit / dataTotalCount);

System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));

long endTime = System.currentTimeMillis();
System.out.println("测试总耗时:" + (endTime - startTime) + "ms");
}
}



100万K-V,10节点,每台100虚拟节点,结果:

data total count : 1000000, serverCount : 10, virtualCount each server : 100
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 180594.5232281422
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.101:11211 -> data count: 90
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.9091
Sd is : 183342.25323149053
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 370000
ServerNode IP: 192.168.0.102:11211 -> data count: 0
ServerNode IP: 192.168.0.103:11211 -> data count: 90900
ServerNode IP: 192.168.0.104:11211 -> data count: 10
ServerNode IP: 192.168.0.105:11211 -> data count: 9000
ServerNode IP: 192.168.0.106:11211 -> data count: 0
ServerNode IP: 192.168.0.107:11211 -> data count: 530000
ServerNode IP: 192.168.0.108:11211 -> data count: 0
ServerNode IP: 192.168.0.109:11211 -> data count: 0
Statistics is : 0.99991
Sd is : 177809.39516797193
测试总耗时:1902ms

Process finished with exit code 0



跟之前一样,使用自带的hashcode散列的特别不均匀,标准差达到18万+,从节点的data count也可以看出来效果奇差无比。



换成murmur3_128:

data total count : 1000000, serverCount : 10, virtualCount each server : 100
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.101:11211 -> data count: 112653
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 8029.741403059005
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.101:11211 -> data count: 112653
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.892557
Sd is : 32626.31985069723
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 94914
ServerNode IP: 192.168.0.102:11211 -> data count: 101994
ServerNode IP: 192.168.0.103:11211 -> data count: 107443
ServerNode IP: 192.168.0.104:11211 -> data count: 100143
ServerNode IP: 192.168.0.105:11211 -> data count: 106704
ServerNode IP: 192.168.0.106:11211 -> data count: 86169
ServerNode IP: 192.168.0.107:11211 -> data count: 87813
ServerNode IP: 192.168.0.108:11211 -> data count: 103563
ServerNode IP: 192.168.0.109:11211 -> data count: 98604
Statistics is : 0.887347
Sd is : 6961.817729587582
测试总耗时:4848ms

Process finished with exit code 0



标准差8029,从data count直观来看,已经比之前好太多,进一步增加虚拟节点个数(500个),使数据分布更均匀:

data total count : 1000000, serverCount : 10, virtualCount each server : 500
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.101:11211 -> data count: 100265
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 5072.616031201258
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.101:11211 -> data count: 100265
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.897037
Sd is : 32027.042220598516
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 91295
ServerNode IP: 192.168.0.102:11211 -> data count: 103216
ServerNode IP: 192.168.0.103:11211 -> data count: 102963
ServerNode IP: 192.168.0.104:11211 -> data count: 101132
ServerNode IP: 192.168.0.105:11211 -> data count: 107092
ServerNode IP: 192.168.0.106:11211 -> data count: 96877
ServerNode IP: 192.168.0.107:11211 -> data count: 96541
ServerNode IP: 192.168.0.108:11211 -> data count: 93644
ServerNode IP: 192.168.0.109:11211 -> data count: 106975
Statistics is : 0.899735
Sd is : 5071.923786887969
测试总耗时:5384ms

Process finished with exit code 0



1000个虚拟节点:

data total count : 1000000, serverCount : 10, virtualCount each server : 1000
-----------------------Test hash mode-----------------------
Initialize cluster server nodes ...
Initialize cluster server nodes data ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.101:11211 -> data count: 96919
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
Statistics cache hit rate:
Statistics is : 1.0
Sd is : 2740.3759960998054
-----------------------Test add server node-----------------------
Try to add server nodes in the cluster
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.101:11211 -> data count: 96919
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
ServerNode IP: 192.168.0.103:11211 -> data count: 0
Statistics is : 0.900173
Sd is : 31741.292673739677
-----------------------Test remove server node-----------------------
rebuild end ...
Scan the cluster server nodes stat:
ServerNode IP: 192.168.0.100:11211 -> data count: 100710
ServerNode IP: 192.168.0.102:11211 -> data count: 103927
ServerNode IP: 192.168.0.103:11211 -> data count: 99827
ServerNode IP: 192.168.0.104:11211 -> data count: 96885
ServerNode IP: 192.168.0.105:11211 -> data count: 103520
ServerNode IP: 192.168.0.106:11211 -> data count: 96862
ServerNode IP: 192.168.0.107:11211 -> data count: 103488
ServerNode IP: 192.168.0.108:11211 -> data count: 97655
ServerNode IP: 192.168.0.109:11211 -> data count: 100207
Statistics is : 0.903081
Sd is : 2561.3286591142496
测试总耗时:5644ms

Process finished with exit code 0



结论:

虚拟节点的存在可以使hash环中的节点命中率变的均衡。

虚拟节点越多,分布越均匀,同时使用好的hash算法也可以使数据分布更均匀。

但会带来数据牺牲,真实节点增加或者减少时,由于虚拟节点数量剧烈变化,数据的重新分配可能会影响到更多的真实节点。

另外过多的虚拟节点也需要单独的数据结构存储,浪费空间和查询时间,不能无限增的加虚拟节点。

虚拟节点越多 在服务增加或恢复时,涉及数据迁移的真实节点就越多。有数据迁移场景需求的话需要考虑这一点。



发布于: 2020 年 07 月 08 日阅读数: 77
用户头像

Geek_2b3614

关注

还未添加个人签名 2019.05.09 加入

还未添加个人简介

评论

发布
暂无评论
第五周