一致性 hash
发布于: 2020 年 07 月 09 日
编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
基类
public interface Cache { void set(String key, String value); String get(String key);}
Cache基类
用于通用实现cache操作
public interface ICacheService extends Cache { long getCacheSize();}
Cache实现类
public class CacheServiceImpl implements ICacheService{ private Map<String, String> cacheMap = new ConcurrentHashMap<>(); @Override public void set(String key, String value) { cacheMap.put(key, value); } @Override public String get(String key) { return cacheMap.get(key); } @Override public long getCacheSize() { return cacheMap.size(); }}
Hash环
TreeMap红黑树天然支持高效查找,ceilingEntry()函数查找当前key应该落在的cache节点
HashCode使用FNVHash算法,使Hash值更加均匀
支持指定具体Cache实现方式
/** * @Date : 2020/7/9 11:46 AM * @Description : 一致性hash环 */public final class ConsistencyHashRing { private TreeMap<Integer, Cache> map = new TreeMap<>(); private List<ICacheService> cacheList = new ArrayList<>(); private int virtualNodeNum; public void initConsistencyHashRing(List<String> serverIpList, int virtualNodeNum, Class<? extends ICacheService> clazz) { if (CollectionUtils.isEmpty(serverIpList)) { return; } this.virtualNodeNum = virtualNodeNum; serverIpList.forEach(ip -> { try { int ipHash = getHashCode(ip); ICacheService cache = clazz.newInstance(); map.put(ipHash, cache); cacheList.add(cache); for (int i = 0; i < virtualNodeNum; i++) { int virtualHash = getVirtualNodeHash(ip + i, virtualNodeNum); map.put(virtualHash, cache); } } catch (Exception e) { e.printStackTrace(); } }); } Cache getServer(String key) { assert StringUtils.isNotEmpty(key); int hashCode = getHashCode(key); Map.Entry<Integer, Cache> cacheEntry = map.ceilingEntry(hashCode); return cacheEntry == null ? map.firstEntry().getValue() : cacheEntry.getValue(); } private int getVirtualNodeHash(String ip, int virtualNum) { String virtualIp = ip + ":virtual:" + virtualNum; return getHashCode(virtualIp); } /** * FNVHash算法 * * @param data * @return */ private int getHashCode(String data) {// data = Md5Crypt.md5Crypt(data.getBytes()); final int p = 16777619; int hash = (int) 2166136261L; for (int i = 0; i < data.length(); i++) { hash = (hash ^ data.charAt(i)) * p; } hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5; return Math.abs(hash); } private long getSumSize() { return cacheList.stream().mapToLong(ICacheService::getCacheSize).sum(); } @Override public String toString() { long sum = getSumSize(); long avg = sum / cacheList.size(); long sumDiff = cacheList.stream().mapToLong(cache -> { long cacheSize = cache.getCacheSize(); long diff = cacheSize - avg; return diff * diff; }).sum(); String template = "服务节点数:%d 虚拟节点数:%d\r\n"; String format = String.format(template, cacheList.size(), virtualNodeNum); StringBuilder sbu = new StringBuilder(format); for (int i = 0; i < cacheList.size(); i++) { long cacheSize = cacheList.get(i).getCacheSize(); sbu.append("第").append(i).append("个节点").append("cache数量:").append(cacheSize).append("\r\n"); } // 标准差公式: (每个样本 - 平均值)的平方之和 / (服务器数 - 1),结果再取平方根 double sqrt = Math.sqrt(sumDiff / (cacheList.size() - 1)); sbu.append("标准差:").append(sqrt).append("\r\n"); return sbu.toString(); }}
用户直接操作CacheServer,内部指定Cache的具体实现方式
public class CacheServer implements Cache{ private ConsistencyHashRing consistencyHashRing; public CacheServer(List<String> serverList, int virtualCount) { consistencyHashRing = new ConsistencyHashRing(); consistencyHashRing.initConsistencyHashRing(serverList, virtualCount, CacheServiceImpl.class); } @Override public void set(String key, String value) { Cache cache = consistencyHashRing.getServer(key); cache.set(key, value); } @Override public String get(String key) { Cache cache = consistencyHashRing.getServer(key); return cache.get(key); } @Override public String toString() { return consistencyHashRing.toString(); }}
main函数
/** * @Date : 2020/7/9 1:26 PM * @Description : */public class CacheMain { private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); public static void main(String[] args) throws ExecutionException, InterruptedException { String serialResult = serialResult(0); String serialResult100 = serialResult(100); String serialResult300 = serialResult(300); String serialResult500 = serialResult(500); System.out.println(); System.out.println(serialResult); System.out.println(serialResult100); System.out.println(serialResult300); System.out.println(serialResult500); } private static String serialResult(int virtualCount) { String ipStr = "192.168.10.10"; List<String> ipList = IntStream.range(0, 10).boxed() .map(i -> ipStr + i).collect(toList()); CacheServer cacheServer = new CacheServer(ipList, virtualCount); Stopwatch started = Stopwatch.createStarted(); int maxCount = 1000000; for (int i = 0; i < maxCount; i++) { cacheServer.set("key:" + i, "value:" + i); System.out.println("key:" + i); } return cacheServer.toString() + " 耗时:" + started.stop(); } private static String parallelResult(int virtualCount) throws ExecutionException, InterruptedException { String ipStr = "192.168.10.10"; List<String> ipList = IntStream.range(0, 10).boxed() .map(i -> ipStr + i).collect(toList()); CacheServer cacheServer = new CacheServer(ipList, virtualCount); Stopwatch started = Stopwatch.createStarted(); CompletableFuture.allOf(IntStream.range(0, 10).boxed().map(num -> CompletableFuture.runAsync(() -> { int maxCount = 100000; for (int i = 0; i < maxCount; i++) { cacheServer.set(Thread.currentThread().getName() + ":key:" + i, "value:" + i); System.out.println(Thread.currentThread().getName() + ":key:" + i); } }, executorService)).toArray(CompletableFuture[]::new)).get(); return cacheServer.toString() + " 耗时:" + started.stop(); }}
测试结果
服务节点数:10 虚拟节点数:0第0个节点cache数量:43950第1个节点cache数量:17128第2个节点cache数量:2549第3个节点cache数量:200307第4个节点cache数量:439140第5个节点cache数量:54913第6个节点cache数量:38112第7个节点cache数量:165825第8个节点cache数量:27928第9个节点cache数量:10148标准差:136645.181221293服务节点数:10 虚拟节点数:100第0个节点cache数量:110642第1个节点cache数量:102568第2个节点cache数量:104212第3个节点cache数量:100112第4个节点cache数量:90182第5个节点cache数量:105007第6个节点cache数量:99653第7个节点cache数量:108661第8个节点cache数量:82271第9个节点cache数量:96692标准差:8559.965478902353服务节点数:10 虚拟节点数:300第0个节点cache数量:103301第1个节点cache数量:108235第2个节点cache数量:90724第3个节点cache数量:93514第4个节点cache数量:101656第5个节点cache数量:88390第6个节点cache数量:113624第7个节点cache数量:101026第8个节点cache数量:98185第9个节点cache数量:101345标准差:7718.145502645049服务节点数:10 虚拟节点数:500第0个节点cache数量:97527第1个节点cache数量:93576第2个节点cache数量:101842第3个节点cache数量:101476第4个节点cache数量:96864第5个节点cache数量:103557第6个节点cache数量:96292第7个节点cache数量:104967第8个节点cache数量:102286第9个节点cache数量:101613标准差:3677.101847923171服务节点数:10 虚拟节点数:1000第0个节点cache数量:96408第1个节点cache数量:103762第2个节点cache数量:100520第3个节点cache数量:99368第4个节点cache数量:96394第5个节点cache数量:100543第6个节点cache数量:96158第7个节点cache数量:100778第8个节点cache数量:107945第9个节点cache数量:98124标准差:3697.321327664124服务节点数:10 虚拟节点数:3000第0个节点cache数量:98855第1个节点cache数量:100362第2个节点cache数量:103029第3个节点cache数量:100546第4个节点cache数量:99497第5个节点cache数量:105667第6个节点cache数量:96533第7个节点cache数量:99691第8个节点cache数量:97276第9个节点cache数量:98544标准差:2686.149660759802服务节点数:10 虚拟节点数:5000第0个节点cache数量:99880第1个节点cache数量:99464第2个节点cache数量:100005第3个节点cache数量:99765第4个节点cache数量:100635第5个节点cache数量:101017第6个节点cache数量:98405第7个节点cache数量:101878第8个节点cache数量:101622第9个节点cache数量:97329标准差:1399.6224490911825服务节点数:10 虚拟节点数:10000第0个节点cache数量:100607第1个节点cache数量:99901第2个节点cache数量:99970第3个节点cache数量:101476第4个节点cache数量:100140第5个节点cache数量:99643第6个节点cache数量:98579第7个节点cache数量:101316第8个节点cache数量:98976第9个节点cache数量:99392标准差:935.3614274706863服务节点数:10 虚拟节点数:50000第0个节点cache数量:100249第1个节点cache数量:99739第2个节点cache数量:99587第3个节点cache数量:100525第4个节点cache数量:100537第5个节点cache数量:99728第6个节点cache数量:100600第7个节点cache数量:100033第8个节点cache数量:99835第9个节点cache数量:99167标准差:473.8575735387164服务节点数:10 虚拟节点数:100000第0个节点cache数量:99926第1个节点cache数量:99089第2个节点cache数量:99898第3个节点cache数量:99686第4个节点cache数量:100802第5个节点cache数量:100615第6个节点cache数量:100746第7个节点cache数量:99903第8个节点cache数量:99760第9个节点cache数量:99575标准差:554.7747290567587
划线
评论
复制
发布于: 2020 年 07 月 09 日阅读数: 50
石印掌纹
关注
还未添加个人签名 2018.11.22 加入
还未添加个人简介
评论