作业
一致性 hash 环
一致性 Hash 算法是将整个 Hash 空间组织成一个虚拟的圆环,圆环的值空间为0 ~ 2^32 - 1(一个32位无符号整型)
,整个圆环以顺时针方向组织,圆环正上方的点代表 0,0 点右侧的第一个点代表 1,以此类推,直到 2^32 - 1
构成一个闭环。
将服务器 hash 到环上
我们将各个服务器进行一次哈希,可以选择服务器的 IP 或主机名作为关键字进行哈希,这样每台服务器就确定在了哈希环的一个位置上,比如我们有 4 台机器,使用 IP 地址哈希后在环空间的位置如下图所示。
将数据存储到服务器上
下面我们使用以下算法来存储数据到相应的服务器:
虚拟节点
在一致性 Hash 算法服务节点太少的情况下,容易因为节点分布不均匀面造成数据倾斜(被缓存的对象大部分缓存在某一台服务器上)问题。为了解决数据倾斜问题,一致性 Hash 算法引入了虚拟节点机制
,即对每一个服务器节点计算多个哈希,每个计算结果位置都放置一个服务节点,称为虚拟节点。
加入虚拟节点之后,即使在服务节点很少的情况下,也能做到数据的均匀分布。
每台机器映射的虚拟节点越多,则分布得越均匀。
一致性 hash 算法实现
hash 算法
采用 Cassandra 实现的 MurmurHash 算法。
public class MurmurHash {
public static long hash3_x64_128(String data) {
ByteBuffer key = ByteBuffer.wrap(data.getBytes());
int offset = key.position();
int length = key.remaining();
final int nblocks = length >> 4; // Process as 128-bit blocks.
long h1 = 0;
long h2 = 0;
long c1 = 0x87c37b91114253d5L;
long c2 = 0x4cf5ad432745937fL;
// ----------
// body
for (int i = 0; i < nblocks; i++) {
long k1 = getblock(key, offset, i * 2 + 0);
long k2 = getblock(key, offset, i * 2 + 1);
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
h1 ^= k1;
h1 = rotl64(h1, 27);
h1 += h2;
h1 = h1 * 5 + 0x52dce729;
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
h2 = rotl64(h2, 31);
h2 += h1;
h2 = h2 * 5 + 0x38495ab5;
}
// ----------
// tail
// Advance offset to the unprocessed tail of the data.
offset += nblocks * 16;
long k1 = 0;
long k2 = 0;
switch (length & 15) {
case 15:
k2 ^= ((long) key.get(offset + 14)) << 48;
case 14:
k2 ^= ((long) key.get(offset + 13)) << 40;
case 13:
k2 ^= ((long) key.get(offset + 12)) << 32;
case 12:
k2 ^= ((long) key.get(offset + 11)) << 24;
case 11:
k2 ^= ((long) key.get(offset + 10)) << 16;
case 10:
k2 ^= ((long) key.get(offset + 9)) << 8;
case 9:
k2 ^= ((long) key.get(offset + 8)) << 0;
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
case 8:
k1 ^= ((long) key.get(offset + 7)) << 56;
case 7:
k1 ^= ((long) key.get(offset + 6)) << 48;
case 6:
k1 ^= ((long) key.get(offset + 5)) << 40;
case 5:
k1 ^= ((long) key.get(offset + 4)) << 32;
case 4:
k1 ^= ((long) key.get(offset + 3)) << 24;
case 3:
k1 ^= ((long) key.get(offset + 2)) << 16;
case 2:
k1 ^= ((long) key.get(offset + 1)) << 8;
case 1:
k1 ^= ((long) key.get(offset));
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
h1 ^= k1;
}
;
h1 ^= length;
h2 ^= length;
h1 += h2;
h2 += h1;
h1 = fmix(h1);
h2 = fmix(h2);
h1 += h2;
h2 += h1;
return Math.abs(h1);
}
protected static long fmix(long k) {
k ^= k >>> 33;
k *= 0xff51afd7ed558ccdL;
k ^= k >>> 33;
k *= 0xc4ceb9fe1a85ec53L;
k ^= k >>> 33;
return k;
}
protected static long getblock(ByteBuffer key, int offset, int index) {
int i_8 = index << 3;
int blockOffset = offset + i_8;
return ((long) key.get(blockOffset + 0) & 0xff) + (((long) key.get(blockOffset + 1) & 0xff) << 8)
+ (((long) key.get(blockOffset + 2) & 0xff) << 16) + (((long) key.get(blockOffset + 3) & 0xff) << 24)
+ (((long) key.get(blockOffset + 4) & 0xff) << 32) + (((long) key.get(blockOffset + 5) & 0xff) << 40)
+ (((long) key.get(blockOffset + 6) & 0xff) << 48) + (((long) key.get(blockOffset + 7) & 0xff) << 56);
}
protected static long rotl64(long v, int n) {
return ((v << n) | (v >>> (64 - n)));
}
}
复制代码
服务器节点
public class Node {
private String hostname;
private Map<Object, Object> store;
public Node(String hostname) {
this.hostname = hostname;
this.store = new ConcurrentHashMap<Object, Object>();
}
public boolean save(String key, Object value) {
store.put(key, value);
return true;
}
public Object get(String key) {
return store.get(key);
}
public int getSize() {
return store.size();
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
}
复制代码
一致性 hash 环
public class TokenRing {
private final SortedMap<Long, Node> ring = new TreeMap<Long, Node>();
private List<Node> nodeList;
// 虚拟节点个数
private int virtualNodes;
/**
* @param nodes 节点(机器)个数
* @param virtualNodes 虚拟节点个数
*/
public TokenRing(int nodes, int virtualNodes) {
this.virtualNodes = virtualNodes;
nodeList = new Vector<Node>();
for (int i = 0; i < nodes; i++) {
Node node = new Node("host:::" + i);
addNode(node);
}
}
/**
* 添加节点
* @param node
* @return
*/
public boolean addNode(Node node) {
nodeList.add(node);
for (int j = 0; j < virtualNodes; j++) {
ring.put(MurmurHash.hash3_x64_128(node.getHostname() + j), node);
}
return true;
}
/**
* 删除节点
* @param node
* @return
*/
public boolean removeNode(Node node) {
if (nodeList.remove(node)) {
ring.forEach((token, nd) -> {
if (node == nd) {
ring.remove(token);
}
});
return true;
}
return false;
}
/**
* 取目标节点
* @param key
* @return
*/
public Node getNode(String key) {
long token = MurmurHash.hash3_x64_128(key);
if (!ring.containsKey(token)) {
SortedMap<Long, Node> tailMap = ring.tailMap(token);
token = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
}
return ring.get(token);
}
public Set<Node> getNodes() {
return new HashSet<>(ring.values());
}
}
复制代码
测试结果
public class Client {
public static void main(String[] args) {
int n = 10;
TokenRing tokenRing = new TokenRing(n, 100);
for (int i = 0; i < 1000000; i++) {
String data = UUID.randomUUID().toString();
Node node = tokenRing.getNode(data);
if (node != null) {
node.save(data, data);
} else {
throw new RuntimeException("没有找到[" + data + "]存放的节点");
}
}
Collection<Node> nodes = tokenRing.getNodes();
int[] array = new int[n];
int i = 0;
for (Node node : nodes) {
System.out.println(node.getHostname() + "--->>>" + node.getSize());
array[i++] = node.getSize();
}
System.out.println("标准差为:" + StandardDeviation.getSD(array));
}
}
复制代码
对 100 万数据,存储在 10 个节点上,虚拟节点分别是 100 个、300 个、500 个、1000 个、2000 个和 5000 个的数据分布如下表格所示。
从测试结果可知,随着虚拟节点的增加,数据分布得越均匀。
评论