写点什么

实现一致性哈希算法

用户头像
Aldaron
关注
发布于: 2020 年 07 月 08 日

题目



  1. 用你熟悉的编程语言实现一致性 hash 算法

  2. 编写测试用例测试这个算法。测试100万 KV 数据, 10个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。



作业

整体程序实现是由以下类图结构完成。

哈希函数主要尝试了 CRC32 和 FNV 两种哈希函数。

在10节点存储100W数据的情况下,标准差基本稳定在2.7W左右。

在哈希函数选择上还是有上升的空间,而且可以利用数据分片的形式可以更好的改善数据分布不均匀的问题。



代码

代码以Golang实现,结构如下:

ConsistenHash

├── README.md

├── consistenthashing

│   ├── consistent_hashing.go

│   ├── hash_ring.go

│   └── node.go

├── hash

│   └── hash.go

└── main.go



main.go

package main
import "strconv"
import (
hashing "./consistenthashing"
)
func main() {
hashRing, err := initHashRing()
if err != nil {
return
}
for i := 0; i < 100*10000; i++ {
node, err := hashRing.Get(strconv.Itoa(i))
if err != nil {
continue
}
err2 := node.Set(strconv.Itoa(i), i)
if err2 != nil {
continue
}
}
hashRing.Print()
}
func initHashRing() (hashRing *hashing.HashRing, err error) {
hashRing = hashing.NewHashRing()
for i := 0; i < 10; i++ {
node := hashing.NewLocalNodeWithName("cache" + strconv.Itoa(i))
err = hashRing.AddNode(node)
}
return
}



consistent_hashing.go

package consistenthashing
type ConsistentHashing struct {
HashRing
}
func (ch *ConsistentHashing) Get(key string) (value interface{}) {
node, err := ch.HashRing.Get(key)
if err != nil {
return nil
}
value, err = node.Get(key)
return
}
func (ch *ConsistentHashing) Set(key string, value interface{}) {
node, err := ch.HashRing.Get(key)
if err != nil {
return
}
err = node.Set(key, value)
return
}



hash_ring.go

package consistenthashing
import (
"../hash"
"errors"
"fmt"
"sort"
"strconv"
"sync"
)
var (
ErrorEmptyRing = errors.New("哈希环节点数为0")
)
/**
* HashRing
*/
// 一致性哈希环
type HashRing struct {
ring map[uint32]Node // 哈希环
nodes map[string]Node // 节点列表
numberOfReplicas int // 节点副本数
sortedHashes uint32Slice
sync.RWMutex // 一致性哈希读写锁
hashFunction hash.Hash
}
func NewHashRing() *HashRing {
ch := &HashRing{
ring: make(map[uint32]Node),
nodes: make(map[string]Node),
numberOfReplicas: 30,
hashFunction: &hash.Crc32Adapter{},
}
return ch
}
// 增加节点
func (hr *HashRing) AddNode(node Node) error {
hr.Lock()
defer hr.Unlock()
err := hr.addNode(node)
return err
}
func (hr *HashRing) addNode(node Node) error {
for i := 0; i < hr.numberOfReplicas; i++ {
hr.ring[hr.hash(hr.nodeKey(node.GetNodeName(), i))] = node
}
hr.nodes[node.GetNodeName()] = node
hr.updateSortedHashes()
return nil
}
// 移除节点
func (hr *HashRing) RemoveNode(node Node) error {
hr.Lock()
defer hr.Unlock()
err := hr.removeNode(node)
return err
}
func (hr *HashRing) removeNode(node Node) error {
for i := 0; i < hr.numberOfReplicas; i++ {
delete(hr.ring, hr.hash(hr.nodeKey(node.GetNodeName(), i)))
}
delete(hr.nodes, node.GetNodeName())
hr.updateSortedHashes()
return nil
}
func (hr *HashRing) GetNodeList() (nodes []string) {
hr.RLock()
defer hr.RUnlock()
var list []string
for nodeId, _ := range hr.nodes {
list = append(list, nodeId)
}
return list
}
func (hr *HashRing) hash(key string) uint32 {
return hr.hashFunction.Hash(key)
}
func (hr *HashRing) Get(key string) (node Node, err error) {
hr.RLock()
defer hr.RUnlock()
if len(hr.ring) == 0 {
return nil, ErrorEmptyRing
}
hashKey := hr.hash(key)
i := hr.search(hashKey)
return hr.ring[hr.sortedHashes[i]], nil
}
func (hr *HashRing) nodeKey(nodeHashId string, idx int) string {
return strconv.Itoa(idx) + nodeHashId
}
func (hr *HashRing) search(hashKey uint32) int {
f := func(x int) bool {
return hr.sortedHashes[x] > hashKey
}
i := sort.Search(len(hr.sortedHashes), f)
if i >= len(hr.sortedHashes) {
i = 0
}
return i
}
func (hr *HashRing) updateSortedHashes() {
hashes := hr.sortedHashes[:0]
//reallocate if we're holding on to too much (1/4th)
if cap(hr.sortedHashes)/(hr.numberOfReplicas*4) > len(hr.ring) {
hashes = nil
}
for k := range hr.ring {
hashes = append(hashes, k)
}
sort.Sort(hashes)
hr.sortedHashes = hashes
}
func (hr *HashRing) Print() {
for _, node := range hr.nodes {
fmt.Println(node.GetDesc())
}
}
// 可排序的int数组类型
type uint32Slice []uint32
func (u uint32Slice) Len() int {
return len(u)
}
func (u uint32Slice) Less(i, j int) bool {
return u[i] < u[j]
}
func (u uint32Slice) Swap(i, j int) {
u[i], u[j] = u[j], u[i]
}



node.go

package consistenthashing
import "strconv"
type Node interface {
Set(key string, value interface{}) (err error)
Get(key string) (value interface{}, err error)
GetNodeName() string
GetDesc() string
}
type LocalNode struct {
name string
metaData map[string]interface{}
}
func NewLocalNodeWithName(name string) *LocalNode {
node := &LocalNode{
name: name,
metaData: make(map[string]interface{}),
}
return node
}
func (node *LocalNode) Set(key string, value interface{}) (err error) {
node.metaData[key] = value
return nil
}
func (node *LocalNode) Get(key string) (value interface{}, err error) {
return node.metaData[key], nil
}
func (node *LocalNode) GetNodeName() (id string) {
return node.name
}
func (node *LocalNode) GetDesc() string {
//"Name :" + node.GetNodeName() + " Length :" +
return strconv.Itoa(len(node.metaData))
}



hash.go

package hash
import (
"hash/adler32"
"hash/crc32"
"hash/fnv"
)
type Hash interface {
Hash(key string) uint32
}
type Crc32Adapter struct {}
func (c *Crc32Adapter) Hash(key string) uint32 {
if len(key) < 64 {
var scratch [64]byte
copy(scratch[:], key)
return crc32.ChecksumIEEE(scratch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
type FnvAdapter struct {}
func (f FnvAdapter) Hash(key string) uint32 {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return h.Sum32()
}
type AdlerAdapter struct {
}
func (a AdlerAdapter) Hash(key string) uint32 {
return adler32.Checksum([]byte(key))
}



用户头像

Aldaron

关注

还未添加个人签名 2018.04.28 加入

还未添加个人简介

评论

发布
暂无评论
实现一致性哈希算法