架构师训练营第五周作业 设计分布式缓存系统
其他优秀解答:
https://xie.infoq.cn/article/19de0be8fef3f66cd620ba1cb
https://xie.infoq.cn/article/9f07cb0af8c4341e5c8dbff2a
设计思路
使用泛型增强缓存的通用性,对于各种key, value类型支持。
面向接口编程,外界使用缓存(ICache interface)时内部分布式的细节被封装。
CacheNode实现ICache,是物理存储数据的类,使用HashMap存储key, value。如果有更多时间,会通过依赖注入,使代码可扩展性增强,可以使用其他物理服务器完成缓存,比如Redis.
DistributedCache实现ICache,通过IConsistentHashing接口判断该从哪一个缓存中读写数据,在CacheNode里读写。
ConsistentHashing实现IConsistentHashing接口,初始化时定义虚拟节点的个数,可以增加删除物理节点,也可以判断一个具体数据应该存储在何处。算法里的环可以用TreeMap实现,可以最高效率的查找环中下一个节点。算法里,我也随时确保在查询下一节点时,环内是有物理节点的。
从群里讨论学到,虚拟节点的哈希值不能改变,否则重启导致大量缓存不命中而导致雪崩。
哈希函数是否真正的“随机”,哈希值不聚集,决定了整个系统的表现。
以下是省略了方法和变量的类图,有箭头代表实现,连线代表关联。
代码
ICache接口
public interface ICache<K, V> { /* Interface for cache that stores key-value pair. */ V get(K key); V put(K key, V val); V remove(K key);}
IConsistentHashing接口
interface IConsistentHashing<K, V> { /* One key corresponds to one value only. */ void addNode(String nodeId); boolean removeNode(String nodeId); ICache<K, V> getCache(K key);}
DistributedCache类
public class DistributedCache<K,V> implements ICache<K, V>{ /* Use ConsistentHashing class to determine which cache to store the data. */ private final IConsistentHashing<K, V> consistentHashing; public DistributedCache(ConsistentHashing<K, V> consistentHashing) { this.consistentHashing = consistentHashing; } @Override public V get(K key) { ICache<K, V> cache = consistentHashing.getCache(key); return cache.get(key); } @Override public V put(K key, V val) { ICache<K, V> cache = consistentHashing.getCache(key); return cache.put(key, val); } @Override public V remove(K key) { ICache<K, V> cache = consistentHashing.getCache(key); return cache.remove(key); }}
CacheNode类
import java.util.HashMap;public class CacheNode<K,V> implements ICache<K,V> { /* A single cache node in a distributed cache. */ private final String nodeId; private final HashMap<K, V> storage = new HashMap<>(); public CacheNode(String nodeId) { this.nodeId = nodeId; } public int getLoad() { return storage.size(); } public String getNodeId() { return nodeId; } @Override public V get(K key) { return storage.get(key); } @Override public V put(K key, V val) { return storage.put(key, val); } @Override public V remove(K key) { return storage.remove(key); } @Override public int hashCode() { return nodeId.hashCode(); } @Override public boolean equals(Object o) { if (o == null) return false; if (o == this) return true; if (o.getClass() != getClass()) return false; CacheNode<?, ?> that = (CacheNode<?, ?>) o; return that.nodeId.equals(this.nodeId); }}
ConsistentHashing类
import java.util.*;public class ConsistentHashing<K, V> implements IConsistentHashing<K, V> { /* Allows dynamically adding nodes and removing nodes. Can help to allocate the specific cache to store a specific key-value pair. */ private int nodesCount; private final int virtualNodeCount; private final TreeMap<Integer, String> index2Node = new TreeMap<>(); private final HashMap<String, ICache<K, V>> caches = new HashMap<>(); private final HashMap<String, Set<Integer>> node2VirtualIndices = new HashMap<>(); public ConsistentHashing(int nodesCount, int virtualNodeCount, List<String> ids){ this.nodesCount = nodesCount; this.virtualNodeCount = virtualNodeCount; initNodes(ids); } private void initNodes(List<String> ids){ if (nodesCount <= 0) { throw new IllegalArgumentException("Nodes number should be positive."); } if (nodesCount != ids.size()) { throw new IllegalArgumentException("Nodes number should equal to the number of ids provided"); } for (int j = 0; j < nodesCount; j++) { String id = ids.get(j); // TODO: 应该通过注入完成,为了通用性,不应该仅仅是用CacheNode的实现 ICache<K, V> cache = new CacheNode<>(id); Set<Integer> virtualNodeIndices = new HashSet<>(); for (int i = 0; i < virtualNodeCount; i++) { int index = generateVirtualNodeIndex(id, i); virtualNodeIndices.add(index); index2Node.put(index, id); } node2VirtualIndices.put(id, virtualNodeIndices); caches.put(id, cache); } } private int generateVirtualNodeIndex(String nodeId, int virtualNodeIndex) { return hash(nodeId + "&&" + virtualNodeIndex); } public static int hash(String key) { final int p = 16777619; int ret = (int) 2166136261L; for (int i = 0; i < key.length(); i++) { ret = (ret ^ key.charAt(i)) * p; } ret += ret << 13; ret ^= ret >> 7; ret += ret << 3; ret ^= ret >> 17; ret += ret << 5; return ret < 0 ? -ret : ret; } private int hash1(String key) { int hash = key.hashCode(); return hash * 1354435761; } @Override public void addNode(String id) { ICache<K, V> cache = new CacheNode<>(id); Set<Integer> virtualNodeIndices = new HashSet<>(); for (int i = 0; i < virtualNodeCount; i++) { int index = generateVirtualNodeIndex(id, i); virtualNodeIndices.add(index); index2Node.put(index, id); } node2VirtualIndices.put(id, virtualNodeIndices); caches.put(id, cache); ++ nodesCount; } @Override public boolean removeNode(String id) { if (!caches.containsKey(id)) { return false; } for (int index : node2VirtualIndices.get(id)) { if (id.equals(index2Node.get(index))) { // remove the node index2Node.remove(index); } } node2VirtualIndices.remove(id); caches.remove(id); -- nodesCount; return true; } @Override public ICache<K, V> getCache(K key) { return getNextNode(key); } private ICache<K, V> getNextNode(K key) { if (nodesCount == 0) { throw new IllegalStateException("Cannot operate when there is no nodes available"); } int hashCode = Math.abs(hash(key.toString())); Integer nextKey = index2Node.ceilingKey(hashCode); if (nextKey == null) { nextKey = index2Node.firstKey(); } return caches.get(index2Node.get(nextKey)); } public void printAnalytics() { List<Integer> loadCounts = new LinkedList<>(); for (Map.Entry<String, ICache<K,V>> entry : caches.entrySet()) { CacheNode<K,V> node = (CacheNode<K,V>)entry.getValue(); int totalLoad = node.getLoad(); System.out.println("Node " + node.getNodeId() + " has " + totalLoad + "."); loadCounts.add(totalLoad); } double avg = loadCounts.stream().mapToInt(Integer::intValue).average().orElse(0); double std = Math.sqrt(loadCounts.stream().map(count -> Math.pow(count - avg, 2)) .mapToDouble(Double::doubleValue).average().orElse(0)); System.out.println("The standard deviation is " + std); }}
ConsistentHashingAnalytics类
import java.util.List;import java.util.Arrays;public class ConsistentHashingAnalytics { public static void main(String[] args) { List<String> servers = Arrays.asList("192.168.1.1:8080", "192.168.1.2:8080", "192.168.1.3:8080", "192.168.1.4:8080", "192.168.1.5:8080", "192.168.1.6:8080", "192.168.1.7:8080", "192.168.1.8:8080", "192.168.1.9:8080", "192.168.1.10:8080"); ConsistentHashing<Integer, String> consistentHashing = new ConsistentHashing<>(10, 500, servers); ICache<Integer, String> distributedCache = new DistributedCache<>(consistentHashing); for (int i = 0; i < 1000000; i++) { Integer key = i; String val = "val" + key; distributedCache.put(key, val); } consistentHashing.printAnalytics(); // 失效判断 consistentHashing.addNode("192.168.1.16:8080"); int count = 0; for (int i = 0; i < 1000000; i++) { Integer key = i; String val = distributedCache.get(key); if (val != null) { count++; } } double percentage = count / 1000000.0 * 100; System.out.println("The hitting rate is " + percentage + "%."); }}
结果分析
缓存总个数1000000,10个物理节点。
Node 192.168.1.10:8080 has 97871.Node 192.168.1.5:8080 has 101779.Node 192.168.1.7:8080 has 99462.Node 192.168.1.2:8080 has 108177.Node 192.168.1.6:8080 has 100165.Node 192.168.1.9:8080 has 95702.Node 192.168.1.1:8080 has 96673.Node 192.168.1.3:8080 has 105396.Node 192.168.1.8:8080 has 98767.Node 192.168.1.4:8080 has 96008.The standard deviation is 3885.7738225480907The hitting rate is 90.4693%.
当用了500个虚拟节点时,标准差为3995.8. 可以看到每个节点的缓存数都在100000上下不远,分布较为均匀。
当我移除了一个节点后,仍然有90.4693的缓存命中,这也得益于之前缓存的均匀分配。
Melo
还未添加个人签名 2019.09.17 加入
还未添加个人简介
评论