写点什么

第五周

用户头像
Geek_2b3614
关注
发布于: 2020 年 07 月 08 日

作业 &总结:


写在前面,为什么需要一致性 Hash?

在分布式服务集群中,需要提供存储元素 object 的路由算法,来计算其应该所在的服务器位置。

假设服务器集群是一个数组 int[n-1] (n 为服务器个数)如果使用这样的 hash 算法:

路由到的服务器的数组位置:index = hash(object) / n;

当增加一个节点或者减少一个节点时,会导致大量元素路由的服务器位置改变,

导致请求 object 落空,命中率下降。


废多看码:

集群 ServerCluster 接口:

public interface ServerCluster {
void addServerNode(ServerNode serverNode);
void removeServerNode(ServerNode serverNode);
ServerNode routeServerNode(String key);
List<ServerNode> getAllServerNodes();}
复制代码


求余集群 RemainderCluster:

public class RemainderCluster implements ServerCluster {
private List<ServerNode> serverNodes;
public RemainderCluster() { this.serverNodes = new ArrayList<>(); }
@Override public List<ServerNode> getAllServerNodes() { return this.serverNodes; }
@Override public void addServerNode(ServerNode serverNode) { System.out.println("Add serverNode(" + serverNode.getAddress()+ ") to the cluster..."); this.serverNodes.add(serverNode); }
@Override public void removeServerNode(ServerNode serverNode) { this.serverNodes.removeIf(sn -> serverNode.getAddress().equals(sn.getAddress())); }
@Override public ServerNode routeServerNode(String key) { long hash = Math.abs(key.hashCode()); long serverIndex = hash % this.serverNodes.size(); return this.serverNodes.get((int) serverIndex); }}
复制代码


集群服务节点 ServerNode:

@Data@Builder@AllArgsConstructor(staticName = "of")public class ServerNode {
//Server's address ip+port : 192.168.0.100:11211 private String address;
//Server's data (k,v) -> map private Map<String, Object> data;
public Object get(String key) { return data.get(key); }
public void put(String key, Object value) { this.data.put(key, value); }
public void remove(String key) { this.data.remove(key); }}
复制代码


数据初始化/统计工具 StatisticalTool:

public class StatisticalTool {
final static String keyPrefix = "remainderKey"; final static String dataPrefix = "RemainderData";
/** * Initialize cluster server nodes * * @param cluster cluster object * @param serverNodeCount serverNode's Count */ public static void initCluster(ServerCluster cluster, int serverNodeCount) { IntStream.range(0, serverNodeCount).forEach(i -> { cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>())); }); }
/** * Initialize cluster server nodes data * * @param cluster cluster object * @param dataTotalCount cluster data's total count */ public static void initClusterData(ServerCluster cluster, int dataTotalCount) { IntStream.range(0, dataTotalCount).forEach(i -> { String key = keyPrefix + i; String value = dataPrefix + i; ServerNode serverNode = cluster.routeServerNode(key); serverNode.put(key, value); }); }
/** * If the query object is not empty, it is a hit * * @param cluster cluster object * @param dataTotalCount cluster data's total count * @return hit count */ public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) { return IntStream.range(0, dataTotalCount).filter(i -> { String key = keyPrefix + i; return cluster.routeServerNode(key).get(key) != null; }).count() / 1f; }
/** * Print the node status of the cluster * * @param cluster cluster data's total count */ public static void printClusterStat(ServerCluster cluster) { System.out.println("Scan the cluster server nodes stat: "); cluster.getAllServerNodes().forEach(sn -> { System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size()); }); }}
复制代码


测试类代码 Runner:

public class Runner {
public static void main(String[] args) {
System.out.println("-----------------------Test remainder mode-----------------------");
int dataTotalCount = 10000;
ServerCluster cluster = new RemainderCluster();
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes StatisticalTool.initCluster(cluster, 3);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data StatisticalTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat StatisticalTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = StatisticalTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("-----------------------Test add server node-----------------------"); System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>())); //re Print cluster stat StatisticalTool.printClusterStat(cluster);
float addHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + addHit / dataTotalCount);

System.out.println("-----------------------Test remove server node-----------------------"); //rebuild cluster = new RemainderCluster(); StatisticalTool.initCluster(cluster, 3); StatisticalTool.initClusterData(cluster, dataTotalCount); System.out.println("rebuild end ...");
cluster.removeServerNode(ServerNode.of("192.168.0.102:11211", new HashMap<>())); //re Print cluster stat StatisticalTool.printClusterStat(cluster);
float removeHit = StatisticalTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + removeHit / dataTotalCount); }}
复制代码


测试结果:

D:\Java\openjdk-11\bin\java.exe -----------------------Test remainder mode-----------------------Initialize cluster server nodes ...Add serverNode(192.168.0.100:11211) to the cluster...Add serverNode(192.168.0.101:11211) to the cluster...Add serverNode(192.168.0.102:11211) to the cluster...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 3333ServerNode IP: 192.168.0.101:11211 -> data count: 3333ServerNode IP: 192.168.0.102:11211 -> data count: 3334Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterAdd serverNode(192.168.0.103:11211) to the cluster...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 3333ServerNode IP: 192.168.0.101:11211 -> data count: 3333ServerNode IP: 192.168.0.102:11211 -> data count: 3334ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.25-----------------------Test remove server node-----------------------Add serverNode(192.168.0.100:11211) to the cluster...Add serverNode(192.168.0.101:11211) to the cluster...Add serverNode(192.168.0.102:11211) to the cluster...rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 3333ServerNode IP: 192.168.0.101:11211 -> data count: 3333Statistics is : 0.3331
Process finished with exit code 0
复制代码


测试环境:集群节点数位 3,数据总数为 10000

当集群中节点数固定,不变化的情况下,命中率为 1.0;

当集群中添加(扩容)一个节点时,命中率下降为 0.25

当集群中删除(故障)一个节点时,命中率下降为 0.3331


数据命中率低,系统无法合理扩缩容,容错率低,数据源并发变高。


因此需要一致性 hash 算法作为简单取余算法进行改进:

其实本质上,一致性 hash 也是 hash 取模,只是是永远的对 2 的 32 次方-1 取模。

一致性 hash 引入了一个叫做一致性 hash 环的概念,即将(0-2^32-1)中间的所有整数首尾相接连接成一个环。


OK,先使用简单的一致性 hash 来解决命中率低的问题:

接口 SimpleServerCluster:

public interface ServerCluster {
void addServerNode(SimpleServer serverNode);
void removeServerNode(SimpleServer serverNode);
SimpleServer routeServerNode(String key);
List<SimpleServer> getAllServerNodes();}
复制代码


简单 hash 集群 SimpleHashCluster:

public class SimpleHashCluster implements ServerCluster {
private SortedMap<Integer, SimpleServer> serverNodes;
public SimpleHashCluster() { this.serverNodes = new TreeMap(); }
@Override public void addServerNode(SimpleServer serverNode) { this.serverNodes.put(Math.abs(serverNode.getAddress().hashCode()), serverNode); }
@Override public void removeServerNode(SimpleServer serverNode) { this.serverNodes.remove(Math.abs(serverNode.getAddress().hashCode())); }
@Override public SimpleServer routeServerNode(String key) { int keyHash = Math.abs(key.hashCode()); SortedMap<Integer, SimpleServer> nextServerMap = this.serverNodes.tailMap(keyHash); int serverKey = nextServerMap.isEmpty() ? serverNodes.firstKey() : nextServerMap.firstKey(); return serverNodes.get(serverKey); }
@Override public List<SimpleServer> getAllServerNodes() { return new ArrayList<>(this.serverNodes.values()); }}
复制代码


集群服务节点 SimpleServer:

@Data@Builder@AllArgsConstructor(staticName = "of")public class SimpleServer {
//Server's address ip+port : 192.168.0.100:11211 private String address;
//Server's data (k,v) -> map private Map<String, Object> data;
public Object get(String key) { return data.get(key); }
void put(String key, Object value) { this.data.put(key, value); }
void remove(String key) { this.data.remove(key); }}
复制代码


数据初始化/统计工具 SimpleHashTool:

public class SimpleHashTool {
final static String keyPrefix = "remainderKey"; final static String dataPrefix = "RemainderData";
/** * Initialize cluster server nodes * * @param cluster cluster object * @param serverNodeCount serverNode's Count */ public static void initCluster(SimpleServerCluster cluster, int serverNodeCount) { IntStream.range(0, serverNodeCount).forEach(i -> { cluster.addServerNode(SimpleServer.of("192.168.0." + (100 + i) + ":11211", new HashMap<>())); }); }
/** * Initialize cluster server nodes data * * @param cluster cluster object * @param dataTotalCount cluster data's total count */ public static void initClusterData(SimpleServerCluster cluster, int dataTotalCount) { IntStream.range(0, dataTotalCount).forEach(i -> { String key = keyPrefix + i; String value = dataPrefix + i; SimpleServer serverNode = cluster.routeServerNode(key); serverNode.put(key, value); }); }
/** * If the query object is not empty, it is a hit * * @param cluster cluster object * @param dataTotalCount cluster data's total count * @return hit count */ public static float calculateHitRate(SimpleServerCluster cluster, int dataTotalCount) { return IntStream.range(0, dataTotalCount).filter(i -> { String key = keyPrefix + i; return cluster.routeServerNode(key).get(key) != null; }).count() / 1f; }
/** * Print the node status of the cluster * * @param cluster cluster data's total count */ public static void printClusterStat(SimpleServerCluster cluster) { System.out.println("Scan the cluster server nodes stat: "); cluster.getAllServerNodes().forEach(sn -> { System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size()); }); }}
复制代码


测试类 Runner:

public class Runner {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
System.out.println("-----------------------Test simple hash mode-----------------------");
int dataTotalCount = 10000; int serverCount = 3;
SimpleServerCluster cluster = new SimpleHashCluster();
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes SimpleHashTool.initCluster(cluster, serverCount);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data SimpleHashTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat SimpleHashTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount);
System.out.println("-----------------------Test add server node-----------------------"); System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(SimpleServer.of("192.168.0.103:11211", new HashMap<>())); //re Print cluster stat SimpleHashTool.printClusterStat(cluster);
float addHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + addHit / dataTotalCount);

System.out.println("-----------------------Test remove server node-----------------------"); //rebuild cluster = new SimpleHashCluster(); SimpleHashTool.initCluster(cluster, serverCount); SimpleHashTool.initClusterData(cluster, dataTotalCount); System.out.println("rebuild end ...");
cluster.removeServerNode(SimpleServer.of("192.168.0.102:11211", new HashMap<>())); //re Print cluster stat SimpleHashTool.printClusterStat(cluster);
float removeHit = SimpleHashTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + removeHit / dataTotalCount);
long endTime = System.currentTimeMillis(); System.out.println("测试总耗时:" + (endTime - startTime) + "ms"); }}
复制代码


注意,此时,完全按照求余模式和数量级别进行测试(10000K-V 数据,3 个服务节点),结果如下:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 990ServerNode IP: 192.168.0.100:11211 -> data count: 9010ServerNode IP: 192.168.0.102:11211 -> data count: 0Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 990ServerNode IP: 192.168.0.100:11211 -> data count: 9010ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.91-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 990ServerNode IP: 192.168.0.100:11211 -> data count: 9010Statistics is : 1.0测试总耗时:71ms
Process finished with exit code 0
复制代码

当集群中节点数固定,不变化的情况下,命中率为 1.0;

当集群中添加(扩容)一个节点时,命中率下降为 0.91

当集群中删除(故障)一个节点时,命中率下降为 1.0


当把数据规模扩大 100 倍,使用 100 万个 K-V 数据时,结果如下:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 90990ServerNode IP: 192.168.0.100:11211 -> data count: 909010ServerNode IP: 192.168.0.102:11211 -> data count: 0Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 90990ServerNode IP: 192.168.0.100:11211 -> data count: 909010ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.9091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 90990ServerNode IP: 192.168.0.100:11211 -> data count: 909010Statistics is : 1.0测试总耗时:1474ms
Process finished with exit code 0
复制代码

当集群中节点数固定,不变化的情况下,命中率为 1.0;

当集群中添加(扩容)一个节点时,命中率下降为 0.9091

当集群中删除(故障)一个节点时,命中率下降为 1.0


当继续扩大 10 倍,使用 1000 万 K-V 数据测试时,结果如下:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.59091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010Statistics is : 0.96测试总耗时:13724ms
Process finished with exit code 0
复制代码

当集群中节点数固定,不变化的情况下,命中率为 1.0;

当集群中添加(扩容)一个节点时,命中率下降为 0.59091

当集群中删除(故障)一个节点时,命中率下降为 0.96


以上结果删除只是在尾部(192.168.0.102),当删除的节点在前部(192.168.0.100)时:

1000 万 K-V,3 节点:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.59091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.102:11211 -> data count: 400000Statistics is : 0.649099测试总耗时:13518ms
Process finished with exit code 0
复制代码

当集群中添加(扩容)一个节点时,命中率下降为 0.59091

当集群中删除(故障)前部节点时,命中率下降为 0.649099


删除节点在中部(192.168.0.101)

1000 万 K-V,3 节点:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 6090990ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.59091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 3509010ServerNode IP: 192.168.0.102:11211 -> data count: 400000Statistics is : 0.390901测试总耗时:12728ms
Process finished with exit code 0
复制代码

当集群中添加(扩容)一个节点时,命中率下降为 0.59091

当集群中删除(故障)中部节点时,命中率下降为 0.390901


为了降低风险,增加集群中的服务器数量来均摊异常,将服务器个数从 3 扩容到 10,分别测试 100 万数据和 1000 万数据:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 90ServerNode IP: 192.168.0.106:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 909000ServerNode IP: 192.168.0.100:11211 -> data count: 0ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0ServerNode IP: 192.168.0.104:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 90900ServerNode IP: 192.168.0.108:11211 -> data count: 0Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 90ServerNode IP: 192.168.0.106:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 909000ServerNode IP: 192.168.0.100:11211 -> data count: 0ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0ServerNode IP: 192.168.0.104:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 0ServerNode IP: 192.168.0.108:11211 -> data count: 0Statistics is : 0.9091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.106:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 909000ServerNode IP: 192.168.0.100:11211 -> data count: 0ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0ServerNode IP: 192.168.0.104:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 90900ServerNode IP: 192.168.0.108:11211 -> data count: 0Statistics is : 0.99991测试总耗时:1407ms
Process finished with exit code 0
复制代码

1000 万 K-V:

-----------------------Test simple hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 1100090ServerNode IP: 192.168.0.106:11211 -> data count: 900010ServerNode IP: 192.168.0.105:11211 -> data count: 2209000ServerNode IP: 192.168.0.100:11211 -> data count: 400000ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.107:11211 -> data count: 900000ServerNode IP: 192.168.0.109:11211 -> data count: 1400000ServerNode IP: 192.168.0.104:11211 -> data count: 900000ServerNode IP: 192.168.0.103:11211 -> data count: 890900ServerNode IP: 192.168.0.108:11211 -> data count: 900000Statistics cache hit rate: Statistics is : 1.0-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.101:11211 -> data count: 1100090ServerNode IP: 192.168.0.106:11211 -> data count: 900010ServerNode IP: 192.168.0.105:11211 -> data count: 2209000ServerNode IP: 192.168.0.100:11211 -> data count: 400000ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.107:11211 -> data count: 900000ServerNode IP: 192.168.0.109:11211 -> data count: 1400000ServerNode IP: 192.168.0.104:11211 -> data count: 900000ServerNode IP: 192.168.0.103:11211 -> data count: 0ServerNode IP: 192.168.0.108:11211 -> data count: 900000Statistics is : 0.91091-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.106:11211 -> data count: 900010ServerNode IP: 192.168.0.105:11211 -> data count: 2209000ServerNode IP: 192.168.0.100:11211 -> data count: 400000ServerNode IP: 192.168.0.102:11211 -> data count: 400000ServerNode IP: 192.168.0.107:11211 -> data count: 900000ServerNode IP: 192.168.0.109:11211 -> data count: 1400000ServerNode IP: 192.168.0.104:11211 -> data count: 900000ServerNode IP: 192.168.0.103:11211 -> data count: 890900ServerNode IP: 192.168.0.108:11211 -> data count: 900000Statistics is : 0.889991测试总耗时:13892ms
Process finished with exit code 0
复制代码


可以看出,出现了 hash 环中数据不够均匀的情况(当然是用 java 的 hashcode 效果也不好,我换了其他的 hash 算法分布的也不是特别均匀),若故障节点为均摊较多的服务器时,命中率极低


继续改进简单 hash,增加虚拟节点到真实节点的映射,并改用标准差来确认 hash 环中数据分布的情况。


标准差:标准差是一组数据平均值分散程度的一种度量。 一个较大的标准差值,代表大部分数值和其平均值之间差异较大;一个较小的标准差,代表这些数值较接近平均值。

标准差小说明数据更加准确。


样本标准差=方差的算术平方根=s=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/(n-1))

总体标准差=σ=sqrt(((x1-x)²+(x2-x)²+……(xn-x)²)/n)


OK,先上代码,先使用 hashcode:

集群 ServerCluster 接口:

public interface ServerCluster {
void addServerNode(ServerNode serverNode);
void removeServerNode(ServerNode serverNode);
ServerNode routeServerNode(String key);
List<ServerNode> getAllServerNodes();}
复制代码


集群 ConsistentHashCluster:

public class ConsistentHashCluster implements ServerCluster {
private List<ServerNode> serverNodes; private SortedMap<Integer, ServerNode> virtualNodes; private Integer virtualCount;

public ConsistentHashCluster(Integer virtualCount) { this.serverNodes = new ArrayList<>(); this.virtualNodes = new TreeMap<>(); this.virtualCount = virtualCount; }
@Override public void addServerNode(ServerNode serverNode) { this.serverNodes.add(serverNode); IntStream.range(0, this.virtualCount).forEach(i -> { //String key = serverNode.getAddress() + HashTool.split + i; //int hash = HashTool.getHashCode(key); int hash = HashTool.getHashCode(serverNode, i); this.virtualNodes.put(hash, serverNode); }); }
@Override public void removeServerNode(ServerNode serverNode) { this.serverNodes.removeIf(sn -> sn.getAddress().equals(serverNode.getAddress())); IntStream.range(0, this.virtualCount).forEach(i -> { //String key = serverNode.getAddress() + HashTool.split + i; //int hash = HashTool.getHashCode(key); int hash = HashTool.getHashCode(serverNode, i); this.virtualNodes.remove(hash); }); }
@Override public ServerNode routeServerNode(String key) { int hash = key.hashCode(); //int hash = HashTool.getHashCode(key); SortedMap<Integer, ServerNode> nextMap = this.virtualNodes.tailMap(hash); int serverKey = nextMap.isEmpty() ? this.virtualNodes.firstKey() : nextMap.firstKey(); return this.virtualNodes.get(serverKey); }
@Override public List<ServerNode> getAllServerNodes() { return this.serverNodes; }}
复制代码


集群服务节点 ServerNode:

@Data@Builder@AllArgsConstructor(staticName = "of")public class ServerNode {
//Server's address ip+port : 192.168.0.100:11211 private String address;
//Server's data (k,v) -> map private Map<String, Object> data;
public Object get(String key) { return data.get(key); }
public void put(String key, Object value) { this.data.put(key, value); }
public void remove(String key) { this.data.remove(key); }}
复制代码


数据初始化/统计工具 HashTool:

public class HashTool {
final static String keyPrefix = "remainderKey"; final static String dataPrefix = "RemainderData";
public final static String split = "_";
public static Integer getHashCode(ServerNode serverNode, int index) { return (serverNode.getAddress() + HashTool.split + index).hashCode(); }
public static Integer getHashCode(String s) { HashFunction hashFunction = Hashing.murmur3_128(13); return Math.abs(hashFunction.hashString(s, Charsets.UTF_8).hashCode()); }
/** * Initialize cluster server nodes * * @param cluster cluster object * @param serverNodeCount serverNode's Count */ public static void initCluster(ServerCluster cluster, int serverNodeCount) { IntStream.range(0, serverNodeCount).forEach(i -> { cluster.addServerNode(ServerNode.of("192.168.0." + (100 + i) + ":11211", new HashMap<>())); }); }
/** * Initialize cluster server nodes data * * @param cluster cluster object * @param dataTotalCount cluster data's total count */ public static void initClusterData(ServerCluster cluster, int dataTotalCount) { IntStream.range(0, dataTotalCount).forEach(i -> { String key = keyPrefix + i; String value = dataPrefix + i; ServerNode serverNode = cluster.routeServerNode(key); serverNode.put(key, value); }); }
/** * If the query object is not empty, it is a hit * * @param cluster cluster object * @param dataTotalCount cluster data's total count * @return hit count */ public static float calculateHitRate(ServerCluster cluster, int dataTotalCount) { return IntStream.range(0, dataTotalCount).filter(i -> { String key = keyPrefix + i; return cluster.routeServerNode(key).get(key) != null; }).count() / 1f; }
/** * Print the node status of the cluster * * @param cluster cluster object */ public static void printClusterStat(ServerCluster cluster) { System.out.println("Scan the cluster server nodes stat: "); cluster.getAllServerNodes().forEach(sn -> { System.out.println("ServerNode IP: " + sn.getAddress() + " -> data count: " + sn.getData().size()); }); }
/** * @param cluster cluster object * @param dataTotalCount cluster data's total count * @param serverCount serverNode's Count * @return ds */ public static double getSd(ServerCluster cluster, int dataTotalCount, int serverCount) { int avg = dataTotalCount / serverCount; double powSum = cluster.getAllServerNodes().stream().mapToDouble(n -> Math.pow(n.getData().size() - avg, 2)).sum(); double ds = Math.sqrt(powSum / serverCount); return ds; }}
复制代码


测试类代码 Runner:

public class Runner {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
int dataTotalCount = 1000000; int serverCount = 10; int replicas = 100;
System.out.println(String.format("data total count : %s, serverCount : %s, virtualCount each server : %s", dataTotalCount, serverCount , replicas)); System.out.println("-----------------------Test hash mode-----------------------");
ServerCluster cluster = new ConsistentHashCluster(replicas);
System.out.println("Initialize cluster server nodes ...");
//Initialize cluster server nodes HashTool.initCluster(cluster, serverCount);
System.out.println("Initialize cluster server nodes data ...");
//Initialize cluster server nodes data HashTool.initClusterData(cluster, dataTotalCount);
//Print cluster stat HashTool.printClusterStat(cluster);
System.out.println("Statistics cache hit rate: ");
float hit = HashTool.calculateHitRate(cluster, dataTotalCount);
System.out.println("Statistics is : " + hit / dataTotalCount); System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
System.out.println("-----------------------Test add server node-----------------------"); System.out.println("Try to add server nodes in the cluster");
cluster.addServerNode(ServerNode.of("192.168.0.103:11211", new HashMap<>())); //re Print cluster stat HashTool.printClusterStat(cluster);
float addHit = HashTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + addHit / dataTotalCount); System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
System.out.println("-----------------------Test remove server node-----------------------"); //rebuild cluster = new ConsistentHashCluster(replicas); HashTool.initCluster(cluster, serverCount); HashTool.initClusterData(cluster, dataTotalCount); System.out.println("rebuild end ...");
cluster.removeServerNode(ServerNode.of("192.168.0.101:11211", new HashMap<>())); //re Print cluster stat HashTool.printClusterStat(cluster);
float removeHit = HashTool.calculateHitRate(cluster, dataTotalCount); System.out.println("Statistics is : " + removeHit / dataTotalCount);
System.out.println("Sd is : " + HashTool.getSd(cluster, dataTotalCount, serverCount));
long endTime = System.currentTimeMillis(); System.out.println("测试总耗时:" + (endTime - startTime) + "ms"); }}
复制代码


100 万 K-V,10 节点,每台 100 虚拟节点,结果:

data total count : 1000000, serverCount : 10, virtualCount each server : 100-----------------------Test hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 370000ServerNode IP: 192.168.0.101:11211 -> data count: 90ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 90900ServerNode IP: 192.168.0.104:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 9000ServerNode IP: 192.168.0.106:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 530000ServerNode IP: 192.168.0.108:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0Statistics cache hit rate: Statistics is : 1.0Sd is : 180594.5232281422-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 370000ServerNode IP: 192.168.0.101:11211 -> data count: 90ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 90900ServerNode IP: 192.168.0.104:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 9000ServerNode IP: 192.168.0.106:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 530000ServerNode IP: 192.168.0.108:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.9091Sd is : 183342.25323149053-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 370000ServerNode IP: 192.168.0.102:11211 -> data count: 0ServerNode IP: 192.168.0.103:11211 -> data count: 90900ServerNode IP: 192.168.0.104:11211 -> data count: 10ServerNode IP: 192.168.0.105:11211 -> data count: 9000ServerNode IP: 192.168.0.106:11211 -> data count: 0ServerNode IP: 192.168.0.107:11211 -> data count: 530000ServerNode IP: 192.168.0.108:11211 -> data count: 0ServerNode IP: 192.168.0.109:11211 -> data count: 0Statistics is : 0.99991Sd is : 177809.39516797193测试总耗时:1902ms
Process finished with exit code 0
复制代码


跟之前一样,使用自带的 hashcode 散列的特别不均匀,标准差达到 18 万+,从节点的 data count 也可以看出来效果奇差无比。


换成 murmur3_128:

data total count : 1000000, serverCount : 10, virtualCount each server : 100-----------------------Test hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 94914ServerNode IP: 192.168.0.101:11211 -> data count: 112653ServerNode IP: 192.168.0.102:11211 -> data count: 101994ServerNode IP: 192.168.0.103:11211 -> data count: 107443ServerNode IP: 192.168.0.104:11211 -> data count: 100143ServerNode IP: 192.168.0.105:11211 -> data count: 106704ServerNode IP: 192.168.0.106:11211 -> data count: 86169ServerNode IP: 192.168.0.107:11211 -> data count: 87813ServerNode IP: 192.168.0.108:11211 -> data count: 103563ServerNode IP: 192.168.0.109:11211 -> data count: 98604Statistics cache hit rate: Statistics is : 1.0Sd is : 8029.741403059005-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 94914ServerNode IP: 192.168.0.101:11211 -> data count: 112653ServerNode IP: 192.168.0.102:11211 -> data count: 101994ServerNode IP: 192.168.0.103:11211 -> data count: 107443ServerNode IP: 192.168.0.104:11211 -> data count: 100143ServerNode IP: 192.168.0.105:11211 -> data count: 106704ServerNode IP: 192.168.0.106:11211 -> data count: 86169ServerNode IP: 192.168.0.107:11211 -> data count: 87813ServerNode IP: 192.168.0.108:11211 -> data count: 103563ServerNode IP: 192.168.0.109:11211 -> data count: 98604ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.892557Sd is : 32626.31985069723-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 94914ServerNode IP: 192.168.0.102:11211 -> data count: 101994ServerNode IP: 192.168.0.103:11211 -> data count: 107443ServerNode IP: 192.168.0.104:11211 -> data count: 100143ServerNode IP: 192.168.0.105:11211 -> data count: 106704ServerNode IP: 192.168.0.106:11211 -> data count: 86169ServerNode IP: 192.168.0.107:11211 -> data count: 87813ServerNode IP: 192.168.0.108:11211 -> data count: 103563ServerNode IP: 192.168.0.109:11211 -> data count: 98604Statistics is : 0.887347Sd is : 6961.817729587582测试总耗时:4848ms
Process finished with exit code 0
复制代码


标准差 8029,从 data count 直观来看,已经比之前好太多,进一步增加虚拟节点个数(500 个),使数据分布更均匀:

data total count : 1000000, serverCount : 10, virtualCount each server : 500-----------------------Test hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 91295ServerNode IP: 192.168.0.101:11211 -> data count: 100265ServerNode IP: 192.168.0.102:11211 -> data count: 103216ServerNode IP: 192.168.0.103:11211 -> data count: 102963ServerNode IP: 192.168.0.104:11211 -> data count: 101132ServerNode IP: 192.168.0.105:11211 -> data count: 107092ServerNode IP: 192.168.0.106:11211 -> data count: 96877ServerNode IP: 192.168.0.107:11211 -> data count: 96541ServerNode IP: 192.168.0.108:11211 -> data count: 93644ServerNode IP: 192.168.0.109:11211 -> data count: 106975Statistics cache hit rate: Statistics is : 1.0Sd is : 5072.616031201258-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 91295ServerNode IP: 192.168.0.101:11211 -> data count: 100265ServerNode IP: 192.168.0.102:11211 -> data count: 103216ServerNode IP: 192.168.0.103:11211 -> data count: 102963ServerNode IP: 192.168.0.104:11211 -> data count: 101132ServerNode IP: 192.168.0.105:11211 -> data count: 107092ServerNode IP: 192.168.0.106:11211 -> data count: 96877ServerNode IP: 192.168.0.107:11211 -> data count: 96541ServerNode IP: 192.168.0.108:11211 -> data count: 93644ServerNode IP: 192.168.0.109:11211 -> data count: 106975ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.897037Sd is : 32027.042220598516-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 91295ServerNode IP: 192.168.0.102:11211 -> data count: 103216ServerNode IP: 192.168.0.103:11211 -> data count: 102963ServerNode IP: 192.168.0.104:11211 -> data count: 101132ServerNode IP: 192.168.0.105:11211 -> data count: 107092ServerNode IP: 192.168.0.106:11211 -> data count: 96877ServerNode IP: 192.168.0.107:11211 -> data count: 96541ServerNode IP: 192.168.0.108:11211 -> data count: 93644ServerNode IP: 192.168.0.109:11211 -> data count: 106975Statistics is : 0.899735Sd is : 5071.923786887969测试总耗时:5384ms
Process finished with exit code 0
复制代码


1000 个虚拟节点:

data total count : 1000000, serverCount : 10, virtualCount each server : 1000-----------------------Test hash mode-----------------------Initialize cluster server nodes ...Initialize cluster server nodes data ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 100710ServerNode IP: 192.168.0.101:11211 -> data count: 96919ServerNode IP: 192.168.0.102:11211 -> data count: 103927ServerNode IP: 192.168.0.103:11211 -> data count: 99827ServerNode IP: 192.168.0.104:11211 -> data count: 96885ServerNode IP: 192.168.0.105:11211 -> data count: 103520ServerNode IP: 192.168.0.106:11211 -> data count: 96862ServerNode IP: 192.168.0.107:11211 -> data count: 103488ServerNode IP: 192.168.0.108:11211 -> data count: 97655ServerNode IP: 192.168.0.109:11211 -> data count: 100207Statistics cache hit rate: Statistics is : 1.0Sd is : 2740.3759960998054-----------------------Test add server node-----------------------Try to add server nodes in the clusterScan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 100710ServerNode IP: 192.168.0.101:11211 -> data count: 96919ServerNode IP: 192.168.0.102:11211 -> data count: 103927ServerNode IP: 192.168.0.103:11211 -> data count: 99827ServerNode IP: 192.168.0.104:11211 -> data count: 96885ServerNode IP: 192.168.0.105:11211 -> data count: 103520ServerNode IP: 192.168.0.106:11211 -> data count: 96862ServerNode IP: 192.168.0.107:11211 -> data count: 103488ServerNode IP: 192.168.0.108:11211 -> data count: 97655ServerNode IP: 192.168.0.109:11211 -> data count: 100207ServerNode IP: 192.168.0.103:11211 -> data count: 0Statistics is : 0.900173Sd is : 31741.292673739677-----------------------Test remove server node-----------------------rebuild end ...Scan the cluster server nodes stat: ServerNode IP: 192.168.0.100:11211 -> data count: 100710ServerNode IP: 192.168.0.102:11211 -> data count: 103927ServerNode IP: 192.168.0.103:11211 -> data count: 99827ServerNode IP: 192.168.0.104:11211 -> data count: 96885ServerNode IP: 192.168.0.105:11211 -> data count: 103520ServerNode IP: 192.168.0.106:11211 -> data count: 96862ServerNode IP: 192.168.0.107:11211 -> data count: 103488ServerNode IP: 192.168.0.108:11211 -> data count: 97655ServerNode IP: 192.168.0.109:11211 -> data count: 100207Statistics is : 0.903081Sd is : 2561.3286591142496测试总耗时:5644ms
Process finished with exit code 0
复制代码


结论:

虚拟节点的存在可以使 hash 环中的节点命中率变的均衡。

虚拟节点越多,分布越均匀,同时使用好的 hash 算法也可以使数据分布更均匀。

但会带来数据牺牲,真实节点增加或者减少时,由于虚拟节点数量剧烈变化,数据的重新分配可能会影响到更多的真实节点。

另外过多的虚拟节点也需要单独的数据结构存储,浪费空间和查询时间,不能无限增的加虚拟节点。

虚拟节点越多 在服务增加或恢复时,涉及数据迁移的真实节点就越多。有数据迁移场景需求的话需要考虑这一点。


发布于: 2020 年 07 月 08 日阅读数: 86
用户头像

Geek_2b3614

关注

还未添加个人签名 2019.05.09 加入

还未添加个人简介

评论

发布
暂无评论
第五周