| 导语 同等性Hash算法是办理分布式缓存等问题的一种算法; 本文先容了同等性Hash算法的事理,并给出了一种实现和实际利用的案例;
同等性Hash算法背景考虑这么一种场景:
我们有三台缓存做事器编号node0、node1、node2,现在有3000万个key,希望可以将这些个key均匀的缓存到三台机器上,你会想到什么方案呢?
我们可能首先想到的方案是:取模算法hash(key)% N,即:对key进行hash运算后取模,N是机器的数量;
这样,对key进行hash后的结果对3取模,得到的结果一定是0、1或者2,恰好对应做事器node0、node1、node2,存取数据直接找对应的做事器即可,大略粗暴,完备可以办理上述的问题;
取模算法虽然利用大略,但对机器数量取模,在集群扩容和紧缩时却有一定的局限性:由于在生产环境中根据业务量的大小,调度做事器数量是常有的事;
而做事器数量N发生变革后hash(key)% N打算的结果也会随之变革!
比如:一个做事器节点挂了,打算公式从hash(key)% 3变成了hash(key)% 2,结果会发生变革,此时想要访问一个key,这个key的缓存位置大概率会发生改变,那么之前缓存key的数据也会失落去浸染与意义;
大量缓存在同一韶光失落效,造成缓存的雪崩,进而导致全体缓存系统的不可用,这基本上是不能接管的;
为理解决优化上述情形,同等性hash算法应运而生~
同等性Hash算法详述算法事理
同等性哈希算法在 1997 年由麻省理工学院提出,是一种分外的哈希算法,在移除或者添加一个做事器时,能够尽可能小地改变已存在的做事要求与处理要求做事器之间的映射关系;
同等性哈希办理了大略哈希算法在分布式哈希表(Distributed Hash Table,DHT)中存在的动态伸缩等问题;
同等性hash算法实质上也是一种取模算法;
不过,不同于上边按做事器数量取模,同等性hash是对固定值2^32取模;
IPv4的地址是4组8位2进制数组成,以是用2^32可以担保每个IP地址会有唯一的映射;
① hash环我们可以将这2^32个值抽象成一个圆环⭕️,圆环的正上方的点代表0,顺时针排列,以此类推:1、2、3…直到2^32-1,而这个由2的32次方个点组成的圆环统称为hash环;
② 做事器映射到hash环
在对做事器进行映射时,利用hash(做事器ip)% 2^32,即:
利用做事器IP地址进行hash打算,用哈希后的结果对2^32取模,结果一定是一个0到2^32-1之间的整数;
而这个整数映射在hash环上的位置代表了一个做事器,依次将node0、node1、node2三个缓存做事器映射到hash环上;
③ 工具key映射到做事器
在对对应的Key映射到详细的做事器时,须要首先打算Key的Hash值:hash(key)% 2^32;
注:此处的Hash函数可以和之前打算做事器映射至Hash环的函数不同,只要担保取值范围和Hash环的范围相同即可(即:2^32);
将Key映射至做事器遵照下面的逻辑:
从缓存工具key的位置开始,沿顺时针方向碰着的第一个做事器,便是当前工具将要缓存到的做事器;
假设我们有 "semlinker"、"kakuqo"、"lolo"、"fer" 四个工具,分别简写为 o1、o2、o3 和 o4;
首先,利用哈希函数打算这个工具的 hash 值,值的范围是 [0, 2^32-1]:
图中工具的映射关系如下:
hash(o1) = k1; hash(o2) = k2;hash(o3) = k3; hash(o4) = k4;
同时 3 台缓存做事器,分别为 CS1、CS2 和 CS3:
则可知,各工具和做事器的映射关系如下:
K1 => CS1K4 => CS3K2 => CS2K3 => CS1
即:
以上便是同等性Hash的事情事理;
可以看到,同等性Hash便是:将原来单个点的Hash映射,转变为了在一个环上的某个片段上的映射!
下面我们来看几种做事器扩缩容的场景;
做事器扩缩容场景① 做事器减少
假设 CS3 做事器涌现故障导致做事下线,这时原来存储于 CS3 做事器的工具 o4,须要被重新分配至 CS2 做事器,其它工具仍存储在原有的机器上:
此时受影响的数据只有 CS2 和 CS3 做事器之间的部分数据!
② 做事器增加
如果业务量激增,我们须要增加一台做事器 CS4,经由同样的 hash 运算,该做事器终极落于 t1 和 t2 做事器之间,详细如下图所示:
此时,只有 t1 和 t2 做事器之间的部分工具须要重新分配;
在以上示例中只有 o3 工具须要重新分配,即它被重新到 CS4 做事器;
在前面我们已经说过:如果利用大略的取模方法,当新添加做事器时可能会导致大部分缓存失落效,而利用同等性哈希算法后,这种情形得到了较大的改进,由于只有少部分工具须要重新分配!
数据偏斜&做事器性能平衡问题引出问题
在上面给出的例子中,各个做事器险些是均匀被均摊到Hash环上;
但是在实际场景中很难选取到一个Hash函数这么完美的将各个做事器散列到Hash环上;
此时,在做事器节点数量太少的情形下,很随意马虎由于节点分布不屈均而造成数据倾斜问题;
如下图被缓存的工具大部分缓存在node-4做事器上,导致其他节点资源摧残浪费蹂躏,系统压力大部分集中在node-4节点上,这样的集群是非常不康健的:
同时,还有另一个问题:
在上面新增做事器 CS4 时,CS4 只分担了 CS1 做事器的负载,做事器 CS2 和 CS3 并没有由于 CS4 做事器的加入而减少负载压力;如果 CS4 做事器的性能与原有做事器的性能同等乃至可能更高,那么这种结果并不是我们所期望的;
虚拟节点
针对上面的问题,我们可以通过:引入虚拟节点来办理负载不屈衡的问题:
即将每台物理做事器虚拟为一组虚拟做事器,将虚拟做事器放置到哈希环上,如果要确定工具的做事器,需先确定工具的虚拟做事器,再由虚拟做事器确定物理做事器;
如下图所示:
在图中:o1 和 o2 表示工具,v1 ~ v6 表示虚拟做事器,s1 ~ s3 表示实际的物理做事器;
虚拟节点的打算
虚拟节点的hash打算常日可以采取:对应节点的IP地址加数字编号后缀 hash(10.24.23.227#1) 的办法;
举个例子,node-1节点IP为10.24.23.227,正常打算node-1的hash值:
hash(10.24.23.227#1)% 2^32假设我们给node-1设置三个虚拟节点,node-1#1、node-1#2、node-1#3,对它们进行hash后取模:
hash(10.24.23.227#1)% 2^32hash(10.24.23.227#2)% 2^32hash(10.24.23.227#3)% 2^32把稳:
分配的虚拟节点个数越多,映射在hash环上才会越趋于均匀,节点太少的话很丢脸出效果;
引入虚拟节点的同时也增加了新的问题,要做虚拟节点和真实节点间的映射,工具key->虚拟节点->实际节点之间的转换;
利用场景同等性hash在分布式系统中该当是实现负载均衡的首选算法,它的实现比较灵巧,既可以在客户端实现,也可以在中间件上实现,比如日常利用较多的缓存中间件memcached和redis集群都有用到它;
memcached的集群比较分外,严格来说它只能算是伪集群,由于它的做事器之间不能通信,要求的分发路由完备靠客户端来的打算出缓存工具该当落在哪个做事器上,而它的路由算法用的便是同等性hash;
还有redis集群中hash槽的观点,虽然实现不尽相同,但思想万变不离其宗,看完本篇的同等性hash,你再去理解redis槽位就轻松多了;
其它的运用处景还有很多:
RPC框架Dubbo用来选择做事供应者分布式关系数据库分库分表:数据与节点的映射关系LVS负载均衡调度器……同等性Hash算法实现下面我们根据上面的讲述,利用Golang实现一个同等性Hash算法,这个算法具有一些下面的功能特性:
同等性Hash核心算法;支持自定义Hash算法;支持自定义虚拟节点个数;详细源代码见:
https://github.com/JasonkayZK/consistent-hashing-demo
下面开始实现吧!
构造体、缺点以及常量定义① 构造体定义
首先定义每一台缓存做事器的数据构造:
core/host.go
type Host struct { // the host id: ip:port Name string // the load bound of the host LoadBound int64}
个中:
Name:缓存做事器的Ip地址 + 端口,如:127.0.0.1:8000LoadBound:缓存做事器当前处理的“要求”缓存数,这个字段在后文含有负载边界值的同等性Hash中会用到;其次,定义同等性Hash的构造:
core/algorithm.go
// Consistent is an implementation of consistent-hashing-algorithmtype Consistent struct { // the number of replicas replicaNum int // the total loads of all replicas totalLoad int64 // the hash function for keys hashFunc func(key string) uint64 // the map of virtual nodes to hosts hostMap map[string]Host // the map of hashed virtual nodes to host name replicaHostMap map[uint64]string // the hash ring sortedHostsHashSet []uint64 // the hash ring lock sync.RWMutex}
个中:
replicaNum:表示每个真实的缓存做事器在Hash环中存在的虚拟节点数;totalLoad:所有物理做事器对应的总缓存“要求”数(这个字段在后文含有负载边界值的同等性Hash中会用到);hashFunc:打算Hash环映射以及Key映射的散列函数;hostMap:物理做事器名称对应的Host构造体映射;replicaHostMap:Hash环中虚拟节点对应真实缓存做事器名称的映射;sortedHostsHashSet:Hash环;sync.RWMutex:操作Hash环时用到的读写锁;大概的构造如上所示,下面我们来看一些常量和缺点的定义;
② 常量和缺点定义
常量的定义如下:
core/algorithm.go
const ( // The format of the host replica name hostReplicaFormat = `%s%d`)var ( // the default number of replicas defaultReplicaNum = 10 // the load bound factor // ref: https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html loadBoundFactor = 0.25 // the default Hash function for keys defaultHashFunc = func(key string) uint64 { out := sha512.Sum512([]byte(key)) return binary.LittleEndian.Uint64(out[:]) })
分别表示:
defaultReplicaNum:默认情形下,每个真实的物理做事器在Hash环中虚拟节点的个数;loadBoundFactor:负载边界因数(这个字段在后文含有负载边界值的同等性Hash中会用到);defaultHashFunc:默认的散列函数,这里用到的是SHA512算法,并取的是unsigned int64,这一点和上面先容的0~2^32-1有所差异!
hostReplicaFormat:虚拟节点名称格式,这里的虚拟节点的格式为:%s%d,和上文提到的10.24.23.227#1的格式有所差异,但是道理是一样的!还有一些缺点的定义:
core/error.go
var ( ErrHostAlreadyExists = errors.New("host already exists") ErrHostNotFound = errors.New("host not found"))
分别表示做事器已经注册,以及缓存做事器未找到;
下面来看详细的方法实现!
注册/注销缓存做事器① 注册缓存做事器
注册缓存做事器的代码如下:
core/algorithm.go
func (c Consistent) RegisterHost(hostName string) error { c.Lock() defer c.Unlock() if _, ok := c.hostMap[hostName]; ok { return ErrHostAlreadyExists } c.hostMap[hostName] = &Host{ Name: hostName, LoadBound: 0, } for i := 0; i < c.replicaNum; i++ { hashedIdx := c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i)) c.replicaHostMap[hashedIdx] = hostName c.sortedHostsHashSet = append(c.sortedHostsHashSet, hashedIdx) } // sort hashes in ascending order sort.Slice(c.sortedHostsHashSet, func(i int, j int) bool { if c.sortedHostsHashSet[i] < c.sortedHostsHashSet[j] { return true } return false }) return nil}
代码比较大略,大略说一下;
首先,检讨做事器是否已经注册,如果已经注册,则直接返回已经注册的缺点;
随后,创建一个Host工具,并且在 for 循环中创建多个虚拟节点:
根据 hashFunc 打算做事器散列值【注:此处打算的散列值可能和之前的值存在冲突,本实现中暂不考虑这种场景】;将散列值加入 replicaHostMap 中;将散列值加入 sortedHostsHashSet 中;末了,对Hash环进行排序;
这里利用数组作为Hash环只是为了便于解释,在实际实现中建议选用其他数据构造进行实现,以获取更好的性能;
当缓存做事器信息写入 replicaHostMap 映射以及 Hash 环后,即完成了缓存做事器的注册;
② 注销缓存做事器
注销缓存做事器的代码如下:
core/algorithm.go
func (c Consistent) UnregisterHost(hostName string) error { c.Lock() defer c.Unlock() if _, ok := c.hostMap[hostName]; !ok { return ErrHostNotFound } delete(c.hostMap, hostName) for i := 0; i < c.replicaNum; i++ { hashedIdx := c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i)) delete(c.replicaHostMap, hashedIdx) c.delHashIndex(hashedIdx) } return nil}// Remove hashed host index from the hash ringfunc (c Consistent) delHashIndex(val uint64) { idx := -1 l := 0 r := len(c.sortedHostsHashSet) - 1 for l <= r { m := (l + r) / 2 if c.sortedHostsHashSet[m] == val { idx = m break } else if c.sortedHostsHashSet[m] < val { l = m + 1 } else if c.sortedHostsHashSet[m] > val { r = m - 1 } } if idx != -1 { c.sortedHostsHashSet = append(c.sortedHostsHashSet[:idx], c.sortedHostsHashSet[idx+1:]...) }}
和注册缓存做事器相反,将做事器在 Map 映射以及 Hash 环中去除即完成了注销;
这里的逻辑和上面注册的逻辑极为类似,这里不再赘述!
查询Key(核心)
查询 Key 是全体同等性 Hash 算法的核心,但是实现起来也并不繁芜;
代码如下:
core/algorithm.go
func (c Consistent) GetKey(key string) (string, error) { hashedKey := c.hashFunc(key) idx := c.searchKey(hashedKey) return c.replicaHostMap[c.sortedHostsHashSet[idx]], nil}func (c Consistent) searchKey(key uint64) int { idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool { return c.sortedHostsHashSet[i] >= key }) if idx >= len(c.sortedHostsHashSet) { // make search as a ring idx = 0 } return idx}
代码首先打算 key 的散列值;
随后,在Hash环上“顺时针”探求可以缓存的第一台缓存做事器:
idx := sort.Search(len(c.sortedHostsHashSet), func(i int) bool { return c.sortedHostsHashSet[i] >= key})
把稳到,如果 key 比当前Hash环中最大的虚拟节点的 hash 值还大,则选择当前 Hash环 中 hash 值最小的一个节点(即“环形”的逻辑):
if idx >= len(c.sortedHostsHashSet) { // make search as a ring idx = 0}
searchKey 返回了虚拟节点在 Hash 环数组中的 index;
随后,我们利用 map 返回 index 对应的缓存做事器的名称即可;
至此,同等性 Hash 算法基本实现,接下来我们来验证一下;
同等性Hash算法实践与考验算法验证前准备① 缓存做事器准备
在验证算法之前,我们还须要准备几台缓存做事器;
为了大略起见,这里利用了 HTTP 做事器作为缓存做事器,详细代码如下所示:
server/main.go
package mainimport ( "flag" "fmt" "net/http" "sync" "time")type CachedMap struct { KvMap sync.Map Lock sync.RWMutex}var ( cache = CachedMap{KvMap: sync.Map{}} port = flag.String("p", "8080", "port") regHost = "http://localhost:18888" expireTime = 10)func main() { flag.Parse() stopChan := make(chan interface{}) startServer(port) <-stopChan}func startServer(port string) { hostName := fmt.Sprintf("localhost:%s", port) fmt.Printf("start server: %s\n", port) err := registerHost(hostName) if err != nil { panic(err) } http.HandleFunc("/", kvHandle) err = http.ListenAndServe(":"+port, nil) if err != nil { err = unregisterHost(hostName) if err != nil { panic(err) } panic(err) }}func kvHandle(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() if _, ok := cache.KvMap.Load(r.Form["key"][0]); !ok { val := fmt.Sprintf("hello: %s", r.Form["key"][0]) cache.KvMap.Store(r.Form["key"][0], val) fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val) time.AfterFunc(time.Duration(expireTime)time.Second, func() { cache.KvMap.Delete(r.Form["key"][0]) fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val) }) } val, _ := cache.KvMap.Load(r.Form["key"][0]) _, err := fmt.Fprintf(w, val.(string)) if err != nil { panic(err) }}func registerHost(host string) error { resp, err := http.Get(fmt.Sprintf("%s/register?host=%s", regHost, host)) if err != nil { return err } defer resp.Body.Close() return nil}func unregisterHost(host string) error { resp, err := http.Get(fmt.Sprintf("%s/unregister?host=%s", regHost, host)) if err != nil { return err } defer resp.Body.Close() return nil}
代码接管由命令行指定的 -p 参数指定做事器端口号;
代码实行后,会调用 startServer 函数启动一个http做事器;
在 startServer 函数中,首先调用 registerHost 在代理做事器上进行注册(下文会讲),并监听 /路径,详细代码如下:
func startServer(port string) { hostName := fmt.Sprintf("localhost:%s", port) fmt.Printf("start server: %s\n", port) err := registerHost(hostName) if err != nil { panic(err) } http.HandleFunc("/", kvHandle) err = http.ListenAndServe(":"+port, nil) if err != nil { err = unregisterHost(hostName) if err != nil { panic(err) } panic(err) }}
kvHandle 函数对要求进行处理:
func kvHandle(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() if _, ok := cache.KvMap.Load(r.Form["key"][0]); !ok { val := fmt.Sprintf("hello: %s", r.Form["key"][0]) cache.KvMap.Store(r.Form["key"][0], val) fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val) time.AfterFunc(time.Duration(expireTime)time.Second, func() { cache.KvMap.Delete(r.Form["key"][0]) fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val) }) } val, _ := cache.KvMap.Load(r.Form["key"][0]) _, err := fmt.Fprintf(w, val.(string)) if err != nil { panic(err) }}
首先,解析来自路径的参数:?key=xxx;
随后,查询做事器中的缓存(为了大略起见,这里利用 sync.Map 来仿照缓存):
如果缓存不存在,则写入缓存,并通过 time.AfterFunc 设置缓存过期韶光(expireTime);末了,返回缓存;
② 缓存代理做事器准备
有了缓存做事器之后,我们还须要一个代理做事器来选择详细选择哪个缓存做事器来要求;
代码如下:
proxy/proxy.go
package proxyimport ( "fmt" "github.com/jasonkayzk/consistent-hashing-demo/core" "io/ioutil" "net/http" "time")type Proxy struct { consistent core.Consistent}// NewProxy creates a new Proxyfunc NewProxy(consistent core.Consistent) Proxy { proxy := &Proxy{ consistent: consistent, } return proxy}func (p Proxy) GetKey(key string) (string, error) { host, err := p.consistent.GetKey(key) if err != nil { return "", err } resp, err := http.Get(fmt.Sprintf("http://%s?key=%s", host, key)) if err != nil { return "", err } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) fmt.Printf("Response from host %s: %s\n", host, string(body)) return string(body), nil}func (p Proxy) RegisterHost(host string) error { err := p.consistent.RegisterHost(host) if err != nil { return err } fmt.Println(fmt.Sprintf("register host: %s success", host)) return nil}func (p Proxy) UnregisterHost(host string) error { err := p.consistent.UnregisterHost(host) if err != nil { return err } fmt.Println(fmt.Sprintf("unregister host: %s success", host)) return nil}
代理做事器的逻辑很大略,便是创建一个同等性Hash构造: Consistent,把 Consistent 和要求缓存做事器的逻辑进行了一层封装;
算法验证启动代理做事器
启动代理做事器的代码如下:
package mainimport ( "fmt" "github.com/jasonkayzk/consistent-hashing-demo/core" "github.com/jasonkayzk/consistent-hashing-demo/proxy" "net/http")var ( port = "18888" p = proxy.NewProxy(core.NewConsistent(10, nil)))func main() { stopChan := make(chan interface{}) startServer(port) <-stopChan}func startServer(port string) { http.HandleFunc("/register", registerHost) http.HandleFunc("/unregister", unregisterHost) http.HandleFunc("/key", getKey) fmt.Printf("start proxy server: %s\n", port) err := http.ListenAndServe(":"+port, nil) if err != nil { panic(err) }}func registerHost(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() err := p.RegisterHost(r.Form["host"][0]) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintf(w, err.Error()) return } _, _ = fmt.Fprintf(w, fmt.Sprintf("register host: %s success", r.Form["host"][0]))}func unregisterHost(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() err := p.UnregisterHost(r.Form["host"][0]) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintf(w, err.Error()) return } _, _ = fmt.Fprintf(w, fmt.Sprintf("unregister host: %s success", r.Form["host"][0]))}func getKey(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() val, err := p.GetKey(r.Form["key"][0]) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintf(w, err.Error()) return } _, _ = fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))}
和缓存做事器类似,这里采取 HTTP 做事器来仿照;
代理做事器监听 18888 端口的几个路由:
/register:注册缓存做事器;/unregister:注销缓存做事器;/key:查询缓存Key;这里为了大略起见,利用了这种办法进行做事注册,实际利用时请利用其他组件进行实现!
接下来启动缓存做事器:
启动缓存做事器
start proxy server: 18888
分别启动三个缓存做事器:
$ go run server/main.go -p 8080start server: 8080$ go run server/main.go -p 8081start server: 8081$ go run server/main.go -p 8082start server: 8082
同时,代理做事器输出:
register host: localhost:8080 successregister host: localhost:8081 successregister host: localhost:8082 success
可以看到缓存做事器已经成功注册;
要求代理做事器获取Key
可以利用 curl 命令要求代理做事器获取缓存 key:
$ curl localhost:18888/key?key=123key: 123, val: hello: 123
此时,代理做事器输出:
Response from host localhost:8080: hello: 123
同时,8000端口的缓存做事器输出:
cached key: {123: hello: 123}removed cached key after 10s: {123: hello: 123}
可以看到,8000端口的做事器对key值进行了缓存,并在10秒后打消了缓存;
考试测验多次获取Key
考试测验获取多个Key:
Response from host localhost:8082: hello: 45363456Response from host localhost:8080: hello: 4Response from host localhost:8082: hello: 1Response from host localhost:8080: hello: 2Response from host localhost:8082: hello: 3Response from host localhost:8080: hello: 4Response from host localhost:8082: hello: 5Response from host localhost:8080: hello: 6Response from host localhost:8082: hello: sdkbnfoerwtnbreResponse from host localhost:8082: hello: sd45555254tg423i5gvj4v5Response from host localhost:8081: hello: 0Response from host localhost:8082: hello: 032452345
可以看到不同的key被散列到了不同的缓存做事器;
接下来我们通过debug查看详细的变量来一探究竟;
通过Debug查看注册和Hash环
开启debug,并注册单个缓存做事器后,查看 Consistent 中的值:
注册三个缓存做事器后,查看 Consistent 中的值:
从debug中的变量,我们就可以很清楚的看到注册不同数量的做事器时,同等性Hash上做事器的动态变革!
以上便是基本的同等性Hash算法的实现了!
但是很多时候,我们的缓存做事器须要同时处理大量的缓存要求,而通过上面的算法,我们总是会去同一台缓存做事器去获取缓存数据;
如果很多的热点数据都落在了同一台缓存做事器上,则可能会涌现性能瓶颈;
Google 在2017年提出了: 含有负载边界值的同等性Hash算法;
下面我们在基本的同等性Hash算法的根本上,实现含有负载边界值的同等性Hash!
含有负载边界值的同等性Hash算法描述
17年时,Google 提出了含有负载边界值的同等性Hash算法,此算法紧张运用于在实现同等性的同时,实现负载的均匀性;
此算法最初由 Vimeo 的 Andrew Rodland 在 haproxy 中实现并开源;
参考:
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
arvix论文地址:
https://arxiv.org/abs/1608.01350
这个算法将缓存做事器视为一个含有一定容量的桶(可以大略理解为Hash桶),将客户端视为球,则均匀性目标表示为:所有约即是均匀密度(球的数量除以桶的数量):
实际利用时,可以设定一个均匀密度的参数 ε,将每个桶的容量设置为均匀加载韶光的 下上限 (1+ε);
详细的打算过程如下:
首先,打算 key 的 Hash 值;随后,沿着 Hash 环顺时针探求第一台知足条件(均匀容量限定)的做事器;获取缓存;例如下面的图:
利用哈希函数将 6 个球和 3 个桶分配给 Hash环 上的随机位置,假设每个桶的容量设置为 2,按 ID 值的递增顺序分配球;
1号球顺时针移动,进入C桶;2号球进入A桶;3号和4号球进入B桶;5号球进入C桶;然后6号球顺时针移动,首先击中B桶;但是桶 B 的容量为 2,并且已经包含球 3 和 4,以是球 6 连续移动到达桶 C,但该桶也已满;末了,球 6 终极进入具有备用插槽的桶 A;算法实现在上面基本同等性 Hash 算法实现的根本上,我们连续实现含有负载边界值的同等性Hash算法;
在核心算法中添加根据负载情形查询Key的函数,以及增加/开释负载值的函数;
根据负载情形查询 Key 的函数:
core/algorithm.go
func (c Consistent) GetKeyLeast(key string) (string, error) { c.RLock() defer c.RUnlock() if len(c.replicaHostMap) == 0 { return "", ErrHostNotFound } hashedKey := c.hashFunc(key) idx := c.searchKey(hashedKey) // Find the first host that may serve the key i := idx for { host := c.replicaHostMap[c.sortedHostsHashSet[i]] loadChecked, err := c.checkLoadCapacity(host) if err != nil { return "", err } if loadChecked { return host, nil } i++ // if idx goes to the end of the ring, start from the beginning if i >= len(c.replicaHostMap) { i = 0 } }}func (c Consistent) checkLoadCapacity(host string) (bool, error) { // a safety check if someone performed c.Done more than needed if c.totalLoad < 0 { c.totalLoad = 0 } var avgLoadPerNode float64 avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.hostMap))) if avgLoadPerNode == 0 { avgLoadPerNode = 1 } avgLoadPerNode = math.Ceil(avgLoadPerNode (1 + loadBoundFactor)) candidateHost, ok := c.hostMap[host] if !ok { return false, ErrHostNotFound } if float64(candidateHost.LoadBound)+1 <= avgLoadPerNode { return true, nil } return false, nil}
在 GetKeyLeast 函数中,首先根据 searchKey 函数,顺时针获取可能知足条件的第一个虚拟节点;
随后调用 checkLoadCapacity 校验当前缓存做事器的负载数是否知足条件:
candidateHost.LoadBound+1 <= (c.totalLoad + 1) / len(hosts) (1 + loadBoundFactor)如果不知足条件,则沿着 Hash 环走到下一个虚拟节点,连续判断是否知足条件,直到知足条件;
这里利用的是无条件的 for 循环,由于一定存在低于 均匀负载(1 + loadBoundFactor) 的虚拟节点!
增加/开释负载值的函数:
core/algorithm.go
func (c Consistent) Inc(hostName string) { c.Lock() defer c.Unlock() atomic.AddInt64(&c.hostMap[hostName].LoadBound, 1) atomic.AddInt64(&c.totalLoad, 1)}func (c Consistent) Done(host string) { c.Lock() defer c.Unlock() if _, ok := c.hostMap[host]; !ok { return } atomic.AddInt64(&c.hostMap[host].LoadBound, -1) atomic.AddInt64(&c.totalLoad, -1)}
逻辑比较大略,便是原子的对对应缓存做事器进行负载加减一操作;
算法测试修正代理做事器代码
在代理做事器中增加路由:
proxy/proxy.go
func (p Proxy) GetKeyLeast(key string) (string, error) { host, err := p.consistent.GetKeyLeast(key) if err != nil { return "", err } p.consistent.Inc(host) time.AfterFunc(time.Second10, func() { // drop the host after 10 seconds(for testing)! fmt.Printf("dropping host: %s after 10 second\n", host) p.consistent.Done(host) }) resp, err := http.Get(fmt.Sprintf("http://%s?key=%s", host, key)) if err != nil { return "", err } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) fmt.Printf("Response from host %s: %s\n", host, string(body)) return string(body), nil}
把稳:这里仿照的是单个key要求可能会持续10s钟;
启动代理做事器时增加路由:
main.go
测试
func startServer(port string) { // ...... http.HandleFunc("/key_least", getKeyLeast) // ......}func getKeyLeast(w http.ResponseWriter, r http.Request) { _ = r.ParseForm() val, err := p.GetKeyLeast(r.Form["key"][0]) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintf(w, err.Error()) return } _, _ = fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))}
启动代理做事器,并开启三台缓存做事器;
通过下面的命令获取含有负载边界的Key:
$ curl localhost:18888/key_least?key=123key: 123, val: hello: 123
多次要求后的结果如下:
```start proxy server: 18888register host: localhost:8080 successregister host: localhost:8081 successregister host: localhost:8082 successResponse from host localhost:8080: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8081: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8081: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8081: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8081: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8080: hello: 123Response from host localhost:8082: hello: 123Response from host localhost:8082: hello: 123```
可以看到,缓存被均摊到了其他做事器(这是由于一个缓存要求会持续10s导致的)!
总结
本文抛砖引玉的讲解了同等性Hash算法的事理,并供应了Go的实现;
在此根本之上,根据 Google 的论文实现了带有负载边界的同等性Hash算法;
当然上面的代码在实际生产环境下仍旧须要部分改进,如:
做事注册;缓存做事器实现;心跳检测;……大家在实际利用时,可以根据须要,搭配实际的组件!