写点什么

架构师训练营第五周作业 设计分布式缓存系统

用户头像
Melo
关注
发布于: 2020 年 07 月 08 日

其他优秀解答:

https://xie.infoq.cn/article/19de0be8fef3f66cd620ba1cb

https://xie.infoq.cn/article/9f07cb0af8c4341e5c8dbff2a



设计思路

  1. 使用泛型增强缓存的通用性,对于各种key, value类型支持。

  2. 面向接口编程,外界使用缓存(ICache interface)时内部分布式的细节被封装。

  3. CacheNode实现ICache,是物理存储数据的类,使用HashMap存储key, value。如果有更多时间,会通过依赖注入,使代码可扩展性增强,可以使用其他物理服务器完成缓存,比如Redis.

  4. DistributedCache实现ICache,通过IConsistentHashing接口判断该从哪一个缓存中读写数据,在CacheNode里读写。

  5. ConsistentHashing实现IConsistentHashing接口,初始化时定义虚拟节点的个数,可以增加删除物理节点,也可以判断一个具体数据应该存储在何处。算法里的环可以用TreeMap实现,可以最高效率的查找环中下一个节点。算法里,我也随时确保在查询下一节点时,环内是有物理节点的。

  6. 从群里讨论学到,虚拟节点的哈希值不能改变,否则重启导致大量缓存不命中而导致雪崩。

  7. 哈希函数是否真正的“随机”,哈希值不聚集,决定了整个系统的表现。



以下是省略了方法和变量的类图,有箭头代表实现,连线代表关联。

代码

ICache接口

public interface ICache<K, V> {
/*
Interface for cache that stores key-value pair.
*/
V get(K key);
V put(K key, V val);
V remove(K key);
}



IConsistentHashing接口

interface IConsistentHashing<K, V> {
/*
One key corresponds to one value only.
*/
void addNode(String nodeId);
boolean removeNode(String nodeId);
ICache<K, V> getCache(K key);
}



DistributedCache类

public class DistributedCache<K,V> implements ICache<K, V>{
/*
Use ConsistentHashing class to determine which cache to store the data.
*/
private final IConsistentHashing<K, V> consistentHashing;
public DistributedCache(ConsistentHashing<K, V> consistentHashing) {
this.consistentHashing = consistentHashing;
}
@Override
public V get(K key) {
ICache<K, V> cache = consistentHashing.getCache(key);
return cache.get(key);
}
@Override
public V put(K key, V val) {
ICache<K, V> cache = consistentHashing.getCache(key);
return cache.put(key, val);
}
@Override
public V remove(K key) {
ICache<K, V> cache = consistentHashing.getCache(key);
return cache.remove(key);
}
}



CacheNode类

import java.util.HashMap;
public class CacheNode<K,V> implements ICache<K,V> {
/*
A single cache node in a distributed cache.
*/
private final String nodeId;
private final HashMap<K, V> storage = new HashMap<>();
public CacheNode(String nodeId) {
this.nodeId = nodeId;
}
public int getLoad() {
return storage.size();
}
public String getNodeId() {
return nodeId;
}
@Override
public V get(K key) {
return storage.get(key);
}
@Override
public V put(K key, V val) {
return storage.put(key, val);
}
@Override
public V remove(K key) {
return storage.remove(key);
}
@Override
public int hashCode() {
return nodeId.hashCode();
}
@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (o == this)
return true;
if (o.getClass() != getClass())
return false;
CacheNode<?, ?> that = (CacheNode<?, ?>) o;
return that.nodeId.equals(this.nodeId);
}
}



ConsistentHashing类

import java.util.*;
public class ConsistentHashing<K, V> implements IConsistentHashing<K, V> {
/*
Allows dynamically adding nodes and removing nodes. Can help to allocate the specific cache to store a
specific key-value pair.
*/
private int nodesCount;
private final int virtualNodeCount;
private final TreeMap<Integer, String> index2Node = new TreeMap<>();
private final HashMap<String, ICache<K, V>> caches = new HashMap<>();
private final HashMap<String, Set<Integer>> node2VirtualIndices = new HashMap<>();
public ConsistentHashing(int nodesCount, int virtualNodeCount, List<String> ids){
this.nodesCount = nodesCount;
this.virtualNodeCount = virtualNodeCount;
initNodes(ids);
}
private void initNodes(List<String> ids){
if (nodesCount <= 0) {
throw new IllegalArgumentException("Nodes number should be positive.");
}
if (nodesCount != ids.size()) {
throw new IllegalArgumentException("Nodes number should equal to the number of ids provided");
}
for (int j = 0; j < nodesCount; j++) {
String id = ids.get(j);
// TODO: 应该通过注入完成,为了通用性,不应该仅仅是用CacheNode的实现
ICache<K, V> cache = new CacheNode<>(id);
Set<Integer> virtualNodeIndices = new HashSet<>();
for (int i = 0; i < virtualNodeCount; i++) {
int index = generateVirtualNodeIndex(id, i);
virtualNodeIndices.add(index);
index2Node.put(index, id);
}
node2VirtualIndices.put(id, virtualNodeIndices);
caches.put(id, cache);
}
}
private int generateVirtualNodeIndex(String nodeId, int virtualNodeIndex) {
return hash(nodeId + "&&" + virtualNodeIndex);
}
public static int hash(String key) {
final int p = 16777619;
int ret = (int) 2166136261L;
for (int i = 0; i < key.length(); i++) {
ret = (ret ^ key.charAt(i)) * p;
}
ret += ret << 13;
ret ^= ret >> 7;
ret += ret << 3;
ret ^= ret >> 17;
ret += ret << 5;
return ret < 0 ? -ret : ret;
}
private int hash1(String key) {
int hash = key.hashCode();
return hash * 1354435761;
}
@Override
public void addNode(String id) {
ICache<K, V> cache = new CacheNode<>(id);
Set<Integer> virtualNodeIndices = new HashSet<>();
for (int i = 0; i < virtualNodeCount; i++) {
int index = generateVirtualNodeIndex(id, i);
virtualNodeIndices.add(index);
index2Node.put(index, id);
}
node2VirtualIndices.put(id, virtualNodeIndices);
caches.put(id, cache);
++ nodesCount;
}
@Override
public boolean removeNode(String id) {
if (!caches.containsKey(id)) {
return false;
}
for (int index : node2VirtualIndices.get(id)) {
if (id.equals(index2Node.get(index))) {
// remove the node
index2Node.remove(index);
}
}
node2VirtualIndices.remove(id);
caches.remove(id);
-- nodesCount;
return true;
}
@Override
public ICache<K, V> getCache(K key) {
return getNextNode(key);
}
private ICache<K, V> getNextNode(K key) {
if (nodesCount == 0) {
throw new IllegalStateException("Cannot operate when there is no nodes available");
}
int hashCode = Math.abs(hash(key.toString()));
Integer nextKey = index2Node.ceilingKey(hashCode);
if (nextKey == null) {
nextKey = index2Node.firstKey();
}
return caches.get(index2Node.get(nextKey));
}
public void printAnalytics() {
List<Integer> loadCounts = new LinkedList<>();
for (Map.Entry<String, ICache<K,V>> entry : caches.entrySet()) {
CacheNode<K,V> node = (CacheNode<K,V>)entry.getValue();
int totalLoad = node.getLoad();
System.out.println("Node " + node.getNodeId() + " has " + totalLoad + ".");
loadCounts.add(totalLoad);
}
double avg = loadCounts.stream().mapToInt(Integer::intValue).average().orElse(0);
double std = Math.sqrt(loadCounts.stream().map(count -> Math.pow(count - avg, 2))
.mapToDouble(Double::doubleValue).average().orElse(0));
System.out.println("The standard deviation is " + std);
}
}



ConsistentHashingAnalytics类

import java.util.List;
import java.util.Arrays;
public class ConsistentHashingAnalytics {
public static void main(String[] args) {
List<String> servers = Arrays.asList("192.168.1.1:8080", "192.168.1.2:8080", "192.168.1.3:8080",
"192.168.1.4:8080", "192.168.1.5:8080", "192.168.1.6:8080", "192.168.1.7:8080",
"192.168.1.8:8080", "192.168.1.9:8080", "192.168.1.10:8080");
ConsistentHashing<Integer, String> consistentHashing = new ConsistentHashing<>(10, 500, servers);
ICache<Integer, String> distributedCache = new DistributedCache<>(consistentHashing);
for (int i = 0; i < 1000000; i++) {
Integer key = i;
String val = "val" + key;
distributedCache.put(key, val);
}
consistentHashing.printAnalytics();
// 失效判断
consistentHashing.addNode("192.168.1.16:8080");
int count = 0;
for (int i = 0; i < 1000000; i++) {
Integer key = i;
String val = distributedCache.get(key);
if (val != null) {
count++;
}
}
double percentage = count / 1000000.0 * 100;
System.out.println("The hitting rate is " + percentage + "%.");
}
}



结果分析

缓存总个数1000000,10个物理节点。

Node 192.168.1.10:8080 has 97871.
Node 192.168.1.5:8080 has 101779.
Node 192.168.1.7:8080 has 99462.
Node 192.168.1.2:8080 has 108177.
Node 192.168.1.6:8080 has 100165.
Node 192.168.1.9:8080 has 95702.
Node 192.168.1.1:8080 has 96673.
Node 192.168.1.3:8080 has 105396.
Node 192.168.1.8:8080 has 98767.
Node 192.168.1.4:8080 has 96008.
The standard deviation is 3885.7738225480907

The hitting rate is 90.4693%.

当用了500个虚拟节点时,标准差为3995.8. 可以看到每个节点的缓存数都在100000上下不远,分布较为均匀。



当我移除了一个节点后,仍然有90.4693的缓存命中,这也得益于之前缓存的均匀分配。



用户头像

Melo

关注

还未添加个人签名 2019.09.17 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营第五周作业 设计分布式缓存系统