架构师训练营第五周命题作业

用户头像
whiter
关注
发布于: 2020 年 07 月 08 日

用你熟悉的编程语言实现一致性 hash 算法。

编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

一致性哈希算法实现

定义Cache接口

template<typename K, typename V>
class Cache
{
public:
virtual void put(K const& key, V const& value) = 0;
virtual V get(K const& key) = 0;
virtual bool contains(K const& key) = 0;
virtual void remove(K const& key) = 0;
};

单一节点SingleNodeCache类定义,实现Cache接口

template<typename K, typename V>
class SingleNodeCache : public Cache<K, V>
{
public:
virtual void put(K const& key, V const& value) override
{
m_map[key] = value;
}
virtual V get(K const& key) override
{
if (m_map.find(key) != m_map.end())
{
return m_map[key];
}
return {};
}
virtual bool contains(K const& key) override
{
return m_map.find(key) != m_map.end();
}
virtual void remove(K const& key) override
{
if (m_map.find(key) != m_map.end())
{
m_map.erase(key);
}
}
private:
std::map<K, V> m_map;
};

分布式DistributedCache类,实现Cache接口,包含一致性哈希算法

template<typename K, typename V>
class DistributedCache : public Cache<K, V>
{
public:
virtual void put(K const& key, V const& value);
virtual V get(K const& key);
virtual bool contains(K const& key);
virtual void remove(K const& key);
void addMachine(Cache* cache);
private:
int getMachineIndex(K const& key);
std::vector<std::shared_ptr<Cache>> m_machines;
std::map<size_t, int> m_hashIndexRing;
static constexpr int c_virtualNodeNum = 200;
};

其中 addMachine 方法实现如下,传入参数为添加机器的Cache指针。并根据定义好的虚拟节点个数计算虚拟节点的哈希值并插入一致性哈希环中。

template<typename K, typename V>
void DistributedCache::addMachine(Cache* cache)
{
m_machines.emplace_back(cache);
const int machineIndex = m_machines.size() - 1;
for (int i = 0; i < c_virtualNodeNum; i++)
{
std::hash<int> hashFunction;
size_t virtualNodeHash = hashFunction(machineIndex * c_virtualNodeNum + i);
m_hashIndexRing.emplace(virtualNodeHash, machineIndex);
}
}

getMachineIndex 方法实现如下,根据传入的键计算键的哈希值。然后使用哈希值和一致性哈希环进行二分查找,得到下一个和该值最接近的节点。

template<typename K, typename V>
int DistributedCache::getMachineIndex(K const& key)
{
std::hash<K> hashFunction;
auto target = std::make_pair(hashFunction(key), -1);
auto upper = std::upper_bound(m_hashIndexRing.begin(), m_hashIndexRing.end(),
target, [](auto x, auto y) -> bool
{
return x.first < y.first;
});
auto machineIndex = (upper != m_hashIndexRing.end()) ?
upper->second :
m_hashIndexRing.begin()->second;
return machineIndex;
}

一致性哈希分析

显然,如果虚拟节点的数量太少则会有可能导致各节点分配不均的情况,但是一味增加虚拟节点数量也会增加不必要的开销。为了得到合理的虚拟节点数量,可以通过计算在给定虚拟节点个数的情况下,各个节点分配的键值对(key value pair)数量的标准差来衡量。



分析中采用10个服务器结节点,1000000随机键值对,选取10为步长,计算节点分配标准差。当虚拟节点数量在100以下时,标准差均大于50000。当虚拟节点大于300时,标准差趋于稳定,如图。





根据此图可以看出,开始时,随着虚拟节点增多,标准差下降显著。当虚拟节点大于200时,增加虚拟节点数量标准差下降开始不明显。当虚拟节点大于300时,增加虚拟节点已经和标准差无明显关系。



最后附上完整代码,包含测试部分。

#include <algorithm>
#include <ctime>
#include <iostream>
#include <map>
#include <string>
#include <vector>
template<typename K, typename V>
class Cache
{
public:
virtual void put(K const& key, V const& value) = 0;
virtual V get(K const& key) = 0;
virtual bool contains(K const& key) = 0;
virtual void remove(K const& key) = 0;
};
template<typename K, typename V>
class SingleNodeCache : public Cache<K, V>
{
public:
virtual void put(K const& key, V const& value) override
{
m_map[key] = value;
}
virtual V get(K const& key) override
{
if (m_map.find(key) != m_map.end())
{
return m_map[key];
}
return {};
}
virtual bool contains(K const& key) override
{
return m_map.find(key) != m_map.end();
}
virtual void remove(K const& key) override
{
if (m_map.find(key) != m_map.end())
{
m_map.erase(key);
}
}
private:
std::map<K, V> m_map;
template<typename K, typename V>
friend class Statistics;
};
int c_virtualNodeNum = -1;
template<typename K, typename V>
class DistributedCache : public Cache<K, V>
{
public:
virtual void put(K const& key, V const& value) override
{
auto index = getMachineIndex(key);
m_machines[index]->put(key, value);
}
virtual V get(K const& key) override
{
auto index = getMachineIndex(key);
return m_machines[index]->get(key);
}
virtual bool contains(K const& key) override
{
auto index = getMachineIndex(key);
return m_machines[index]->contains(key);
}
virtual void remove(K const& key) override
{
auto index = getMachineIndex(key);
m_machines[index]->remove(key);
}
void addMachine(Cache* cache)
{
m_machines.emplace_back(cache);
const int machineIndex = m_machines.size() - 1;
for (int i = 0; i < c_virtualNodeNum; i++)
{
std::hash<int> hashFunction;
size_t virtualNodeHash = hashFunction(machineIndex * c_virtualNodeNum + i);
m_hashIndexRing.emplace(virtualNodeHash, machineIndex);
}
}
private:
int getMachineIndex(K const& key)
{
std::hash<K> hashFunction;
auto target = std::make_pair(hashFunction(key), -1);
auto upper = std::upper_bound(m_hashIndexRing.begin(), m_hashIndexRing.end(), target, [](auto x, auto y) -> bool
{
return x.first < y.first;
});
auto machineIndex = -1;
if (upper != m_hashIndexRing.end())
{
machineIndex = upper->second;
}
else
{
machineIndex = m_hashIndexRing.begin()->second;
}
return machineIndex;
}
std::vector<std::shared_ptr<Cache>> m_machines;
std::map<size_t, int> m_hashIndexRing;
//static constexpr int c_virtualNodeNum = 200;
template<typename K, typename V>
friend class Statistics;
};
template <typename K, typename V>
class Statistics
{
public:
void initialize(int machineNum)
{
for (int i = 0; i < machineNum; i++)
{
m_cache.addMachine(new SingleNodeCache<K, V>());
}
}
void insertTestData(int n)
{
for (int i = 0; i < n; i++)
{
auto key = std::rand() * std::rand();
auto value = std::rand() * std::rand();
m_cache.put(key, value);
}
}
void basicInfo(bool isDetailed = false)
{
std::cout << "Virtual node #: " << m_cache.m_hashIndexRing.size() / m_cache.m_machines.size() << std::endl;
if (isDetailed)
{
std::cout << "Virtual node hash value" << std::endl;
for (const auto& x : m_cache.m_hashIndexRing)
{
std::cout << x.first << " ";
}
std::cout << std::endl;
}
if (isDetailed)
{
std::cout << "Actual machine index:" << std::endl;
for (const auto& x : m_cache.m_hashIndexRing)
{
std::cout << x.second << " ";
}
std::cout << std::endl;
}
std::cout << "Machine #: " << m_cache.m_machines.size() << std::endl;
for (int i = 0; i < m_cache.m_machines.size(); i++)
{
const auto& machine = reinterpret_cast<SingleNodeCache<K, V>*>(m_cache.m_machines[i].get());
std::cout << "Machine " << i << " contains " << machine->m_map.size() << " pairs" << std::endl;
if (isDetailed)
{
for (const auto& x : machine->m_map)
{
std::cout << "(" << x.first << ", " << x.second << ") ";
}
std::cout << std::endl;
}
}
}
void standardDeviation()
{
std::vector<size_t> machinePairs;
size_t sum = 0;
const auto machineNum = m_cache.m_machines.size();
for (int i = 0; i < machineNum; i++)
{
const auto& machine = reinterpret_cast<SingleNodeCache<K, V>*>(m_cache.m_machines[i].get());
auto pairNum = machine->m_map.size();
machinePairs.emplace_back(pairNum);
sum += pairNum;
}
double average = sum / 1.0 / machineNum;
double deviationSum = 0;
for (auto x : machinePairs)
{
deviationSum += (x - average) * (x - average);
}
double standardDeviation = std::sqrt(deviationSum / machineNum);
std::cout << "Standard deviation: " << standardDeviation << std::endl;
}
private:
DistributedCache<K, V> m_cache;
};
int main()
{
std::srand(std::time(0));
for (int i = 10; i < 1000; i += 10)
{
c_virtualNodeNum = i;
Statistics<size_t, int> sta;
sta.initialize(10);
sta.insertTestData(1'000'000);
sta.basicInfo(false /*isDetailed*/);
sta.standardDeviation();
}
return 0;
}



发布于: 2020 年 07 月 08 日 阅读数: 36
用户头像

whiter

关注

还未添加个人签名 2020.05.29 加入

还未添加个人简介

评论

发布
暂无评论
架构师训练营第五周命题作业