第五周作业
一致性 hash 的简单实现
package common
import (
"errors"
"hash/crc32" "sort" "strconv" "sync")
//声明新切片类型
type units []uint32
//返回切片长度 因为是切片类型内部就是指针指向 所以不用 func (x *units) len() {}func (x units) Len() int {
return len(x)
}
//比较两个数大小
func (x units) Less(i,j int) bool {
return x[i] < x[j]
}
//切片中两个值的交换
func (x units) Swap(i,j int) {
x[i],x[j] = x[j],x[i]
}
//当 hash 环上没有数据时提示错误
var errEmpty = errors.New("Hash 环没有数据")
//创建结构体保存一致性 hash 信息
type Consistent struct {
//hash 环 key 为哈希值 值为存放节点的信息
circle map[uint32]string
//已经排序的节点 hash 切片
sortedHashes units
//虚拟节点个数,用来增加 hash 的平衡性
VirtualNode int
//map 读写锁 稍后解释 可以保证在高并发下读取的数据不会错误
sync.RWMutex
}
//创建一致性 hash 算法结构体 设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
//初始化变量
circle: make(map[uint32]string),
//设置虚拟节点个数
VirtualNode: 20,
}
}
//自动生成 key 值
//根据服务器相关信息加上 index 生成服务器相关的 key
func (c *Consistent) generateKey(element string,index int) string {
//副本 key 生成逻辑
return element+strconv.Itoa(index)
}
//获取 hash 位置 根据传入的 key 获取对应的 hash 值
func (c *Consistent) hashKey(key string) uint32 {
if len(key) < 64 {
//声明一个数组长度为 64
var srcatch [64]byte
//拷贝数据到数组当中
copy(srcatch[:],key)
//使用 IEEE 多项式返回数据的 CRC-32 校验和 是一个标准 能帮助我们通过算法算出 key 对应的 hash 值
return crc32.ChecksumIEEE(srcatch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
//更新排序 方便查找
func (c *Consistent) updateSortedHashes() {
//相当于清空切片里面的内容
hashes := c.sortedHashes[:0]
//判断切片容量,是否过大,如果过大则重置
//这个用于排序的切片的容量除以虚拟节点个数的 4 倍 大于 hash 环上的节点数 则清空里面的值
if cap(c.sortedHashes)/(c.VirtualNode*4) > len(c.circle) {
hashes = nil
}
//添加 hashs
for k := range c.circle {
hashes = append(hashes,k)
}
//对所有节点 hash 值进行排序
//方便之后进行二分法查找
sort.Sort(hashes)
//重新赋值
c.sortedHashes = hashes
}
//动态添加节点 需要加锁
//向 hash 环中添加节点
func (c *Consistent) Add(element string){
//加锁 分布式存储当中这个锁是非常关键的 后边会讲到
c.Lock()
//解锁
defer c.Unlock()
c.add(element)
}
//添加节点
//向 hash 环里面添加服务器信息
func (c *Consistent) add(element string) {
//循环虚拟节点 设置副本
//虚拟节点加上 generateKey()里面的规则 生成虚拟节点的信息 根据虚拟节点的信息进行 hash
for i:=0;i<c.VirtualNode;i++ {
//根据生成的虚拟节点添加到 hash 环中
c.circle[c.hashKey(c.generateKey(element,i))] = element
}
//下面对生成以后的虚拟节点进行排序
c.updateSortedHashes()
}
//动态删除节点
func (c *Consistent) Remove(element string){
c.Lock()
defer c.Unlock()
c.remove(element)
}
//删除节点
//需要传入服务器的信息比如是 ip
func (c *Consistent) remove(element string){
//根据传入的服务器的信息删除所欲的副节点
for i:=0;i<c.VirtualNode;i++{
delete(c.circle,c.hashKey(c.generateKey(element,i)))
}
//然后更新排序 方便二分法查找 hash 环上的节点
c.updateSortedHashes()
}
//顺时针查找最近的节点
func (c *Consistent) search(key uint32) int {
//查找算法
f := func(x int) bool {
return c.sortedHashes[x] > key
}
//使用“二分查找”算法来搜索指定切片满足条件的最小值
i := sort.Search(len(c.sortedHashes), f)
//如果超出范围则设置 i=0
if i >= len(c.sortedHashes) {
i = 0
}
return i
}
//根据数据标识获取最近服务器节点信息 其实是返回的 ip 地址
func (c *Consistent) Get(name string)(string,error){
//添加锁
c.RLock()
//解锁
defer c.RUnlock()
//如果为零则返回错误
if len(c.circle) == 0 {
return "",errEmpty
}
//计算 hash 值
key := c.hashKey(name)
//在 hash 环上查找我们最近的节点
i := c.search(key)
return c.circle[c.sortedHashes[i]],nil
}
评论