Balancer是一个由 Golang 开发的反向代理 7 层负载均衡,是一个适合初学者学习的 Golang 项目,今天我们就来看看这个项目是如何实现的。
前言
在开始了解具体的项目前需要了解一些基础的概念。
反向代理
反向代理指的是当用户访问接口或者是服务器资源时并不直接访问具体服务器,而是通过访问代理服务器,然后代理服务器根据具体的用户请求去具体的内网中的服务器获取所需的数据。
反向代理在互联网中被大量应用,通常反向代理由 web 服务器来实现,例如nginx,openresty等。
负载均衡
单点应用会面临可用性和性能的问题,因此往往需要进行多台服务部署。此时为了使得流量均匀分布到不同的服务器上,需要启用负载均衡机制。
一般来说负载均衡的能力是反向代理服务器自带的能力,负载均衡会有不少的算法,轮询加权等等,这个后续会介绍。
代码实现
Balancer作为一个反向代理的负载均衡器,其包含了不同负载均衡算法实现,以及一些心跳保持,健康检查的基础能力。
项目结构
Balancer的项目结构极为简单,主要就是服务器的基本代理实现proxy和负载均衡算法实现balancer
服务器基本结构
Balancer基于"net/http/httputil"包来实现自身基础能力,通过其ReverseProxy来实现自身的反向代理能力。
服务所需配置样例如下:
schema: httpport: 8089ssl_certificate:ssl_certificate_key:tcp_health_check: truehealth_check_interval: 3 max_allowed: 100location: - pattern: / proxy_pass: - "http://192.168.1.1" - "http://192.168.1.2:1015" - "https://192.168.1.2" - "http://my-server.com" balance_mode: round-robin
复制代码
服务器启动的入口在main中,启动方法很简单:
func main() { config, err := ReadConfig("config.yaml") if err != nil { log.Fatalf("read config error: %s", err) }
err = config.Validation() if err != nil { log.Fatalf("verify config error: %s", err) }
router := mux.NewRouter() for _, l := range config.Location { httpProxy, err := proxy.NewHTTPProxy(l.ProxyPass, l.BalanceMode) if err != nil { log.Fatalf("create proxy error: %s", err) } // start health check if config.HealthCheck { httpProxy.HealthCheck(config.HealthCheckInterval) } router.Handle(l.Pattern, httpProxy) } if config.MaxAllowed > 0 { router.Use(maxAllowedMiddleware(config.MaxAllowed)) } svr := http.Server{ Addr: ":" + strconv.Itoa(config.Port), Handler: router, }
// print config detail config.Print()
// listen and serve if config.Schema == "http" { err := svr.ListenAndServe() if err != nil { log.Fatalf("listen and serve error: %s", err) } } else if config.Schema == "https" { err := svr.ListenAndServeTLS(config.SSLCertificate, config.SSLCertificateKey) if err != nil { log.Fatalf("listen and serve error: %s", err) } }}
复制代码
在上述的启动方法中做了如下几件事:
获取到服务器的配置并进行解析
使用配置中配置的反向代理地址和负载均衡算法来初始化服务器并开启健康检查
启动服务
上述流程很简单,其重点在于服务器的构建:
type HTTPProxy struct { hostMap map[string]*httputil.ReverseProxy lb balancer.Balancer
sync.RWMutex // protect alive alive map[string]bool}
复制代码
上面是Balancer服务器的基本结构体,其中包含了负载均衡器lb,hostMap用来记录主机和反向代理之间的映射关系,alive用来记录反向代理服务器的健康状态。
func NewHTTPProxy(targetHosts []string, algorithm string) ( *HTTPProxy, error) {
hosts := make([]string, 0) hostMap := make(map[string]*httputil.ReverseProxy) alive := make(map[string]bool) for _, targetHost := range targetHosts { url, err := url.Parse(targetHost) if err != nil { return nil, err } proxy := httputil.NewSingleHostReverseProxy(url)
originDirector := proxy.Director proxy.Director = func(req *http.Request) { originDirector(req) req.Header.Set(XProxy, ReverseProxy) req.Header.Set(XRealIP, GetIP(req)) }
host := GetHost(url) alive[host] = true // initial mark alive hostMap[host] = proxy hosts = append(hosts, host) }
lb, err := balancer.Build(algorithm, hosts) if err != nil { return nil, err }
return &HTTPProxy{ hostMap: hostMap, lb: lb, alive: alive, }, nil}
复制代码
NewHTTPProxy适用于构建服务器反向代理的。他接收目标 host 的数组和负载均衡算法,之后将数据进行整合以此构建完整的hostMap和alive中的 map 数据。在上述的处理过程中反向代理额外添加了两个请求头将之传递到下游。
负载均衡算法
Balancer整个项目的核心是他实现的多种不同负载均衡算法,包括:bounded,random,consistent-hash,ip-hash,p2c,least-load,round-robin
我们先来看下负载均衡器是如何设计的:首先对负载均衡器进行抽象,抽象出Balancer接口:
type Balancer interface { Add(string) Remove(string) Balance(string) (string, error) Inc(string) Done(string)}
复制代码
其中Add和Remove用于增删负载均衡集群中的具体机器,Inc和Done则用于控制请求数目(如果有配置最大请求数),Balance用于选择最后负载均衡算法计算出的目标服务器。
并且其提供了各个算法的 map 映射以及创建的工厂方法:
type Factory func([]string) Balancer
var factories = make(map[string]Factory)
func Build(algorithm string, hosts []string) (Balancer, error) { factory, ok := factories[algorithm] if !ok { return nil, AlgorithmNotSupportedError } return factory(hosts), nil}
复制代码
由于大部分算法的Add和Remove逻辑相同,因此构建了BaseBalancer将这部分代码抽象出来:
type BaseBalancer struct { sync.RWMutex hosts []string}
// Add new host to the balancerfunc (b *BaseBalancer) Add(host string) { b.Lock() defer b.Unlock() for _, h := range b.hosts { if h == host { return } } b.hosts = append(b.hosts, host)}
// Remove new host from the balancerfunc (b *BaseBalancer) Remove(host string) { b.Lock() defer b.Unlock() for i, h := range b.hosts { if h == host { b.hosts = append(b.hosts[:i], b.hosts[i+1:]...) return } }}
// Balance selects a suitable host accordingfunc (b *BaseBalancer) Balance(key string) (string, error) { return "", nil}
// Inc .func (b *BaseBalancer) Inc(_ string) {}
// Done .func (b *BaseBalancer) Done(_ string) {}
复制代码
大体逻辑是通过数组存储 host 信息,Add和Remove方法就是在数组中进行增删具体的 host 信息。
round robin
round robin一般称之为轮询,是最经典,用途最广泛的负载均衡算法之一。
type RoundRobin struct { BaseBalancer i uint64}
func (r *RoundRobin) Balance(_ string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } host := r.hosts[r.i%uint64(len(r.hosts))] r.i++ return host, nil}
复制代码
它的代码实现原理是在RoundRobin的结构体中定义好一个uint值,每次请求时使用这个值和服务器数据进行取模操作以决定最后的目标服务器,并且在请求之后对于这个值进行累加,以确保下次请求使用不同服务器。
ip hash
ip hash是在服务器场合上使用较多的一种负载均衡策略,其策略宗旨是相同用户 ip 的请求总是会落在相同的服务器上。
func (r *IPHash) Balance(key string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } value := crc32.ChecksumIEEE([]byte(key)) % uint32(len(r.hosts)) return r.hosts[value], nil}
复制代码
代码中的实现是将用户 ip 利用ChecksumIEEE转换成 uint 之后和服务器数目进行取模操作。
random
顾名思义random是随机选取算法。
代码很简单:
func (r *Random) Balance(_ string) (string, error) { r.RLock() defer r.RUnlock() if len(r.hosts) == 0 { return "", NoHostError } return r.hosts[r.rnd.Intn(len(r.hosts))], nil}
复制代码
consistent hash
consistent hash即为一致性哈希,如果不知道什么是一致性 hash 可以去网上找资料,或者参考我之前的文章分布式系统中的哈希算法
func (c *Consistent) Add(host string) { c.ch.Add(host)}
func (c *Consistent) Remove(host string) { c.ch.Remove(host)}
func (c *Consistent) Balance(key string) (string, error) { if len(c.ch.Hosts()) == 0 { return "", NoHostError } return c.ch.Get(key)}
复制代码
代码中的一致性哈希并未自己实现,借助的是开源库"github.com/lafikl/consistent"
bounded
bounded指的是Consistent Hashing with Bounded Loads,即有界负载的一致性哈希。此处就不多赘述,大家可以参考此篇文章。后续有人有兴趣的话我可能单独开文讲一讲。
实现方式和普通一致性哈希一样,都是借助的开源库:
func (b *Bounded) Add(host string) { b.ch.Add(host)}
func (b *Bounded) Remove(host string) { b.ch.Remove(host)}
func (b *Bounded) Balance(key string) (string, error) { if len(b.ch.Hosts()) == 0 { return "", NoHostError } return b.ch.GetLeast(key)}
func (b *Bounded) Inc(host string) { b.ch.Inc(host)}
func (b *Bounded) Done(host string) { b.ch.Done(host)}
复制代码
least load
least load是最小负载算法,使用此算法,请求会选择集群中负载最小的机器。
func (h *host) Tag() interface{} { return h.name }
func (h *host) Key() float64 { return float64(h.load) }
type LeastLoad struct { sync.RWMutex heap *fibHeap.FibHeap}
func NewLeastLoad(hosts []string) Balancer { ll := &LeastLoad{heap: fibHeap.NewFibHeap()} for _, h := range hosts { ll.Add(h) } return ll}
func (l *LeastLoad) Add(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok != nil { return } _ = l.heap.InsertValue(&host{hostName, 0})}
func (l *LeastLoad) Remove(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } _ = l.heap.Delete(hostName)}
func (l *LeastLoad) Balance(_ string) (string, error) { l.RLock() defer l.RUnlock() if l.heap.Num() == 0 { return "", NoHostError } return l.heap.MinimumValue().Tag().(string), nil}
func (l *LeastLoad) Inc(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } h := l.heap.GetValue(hostName) h.(*host).load++ _ = l.heap.IncreaseKeyValue(h)}
func (l *LeastLoad) Done(hostName string) { l.Lock() defer l.Unlock() if ok := l.heap.GetValue(hostName); ok == nil { return } h := l.heap.GetValue(hostName) h.(*host).load-- _ = l.heap.DecreaseKeyValue(h)}
复制代码
代码中比较简单的使用了当前正在处理的请求数目来作为服务器的负载水平,并且借助了开源库"github.com/starwander/GoFibonacciHeap"来维护集群中服务器的负载值。
p2c
p2c 全称 Power of Two Random Choices,一般翻译为两次随机选择算法,出自论文The Power of Two Random Choices: A Survey ofTechniques and Results
大题的思路是从服务器列表中进行两次随机选择获取两个节点,然后进行比较选出最终的目标服务器节点。
const Salt = "%#!"
type host struct { name string load uint64}
type P2C struct { sync.RWMutex hosts []*host rnd *rand.Rand loadMap map[string]*host}
func (p *P2C) Balance(key string) (string, error) { p.RLock() defer p.RUnlock()
if len(p.hosts) == 0 { return "", NoHostError }
n1, n2 := p.hash(key) host := n2 if p.loadMap[n1].load <= p.loadMap[n2].load { host = n1 } return host, nil}
func (p *P2C) hash(key string) (string, string) { var n1, n2 string if len(key) > 0 { saltKey := key + Salt n1 = p.hosts[crc32.ChecksumIEEE([]byte(key))%uint32(len(p.hosts))].name n2 = p.hosts[crc32.ChecksumIEEE([]byte(saltKey))%uint32(len(p.hosts))].name return n1, n2 } n1 = p.hosts[p.rnd.Intn(len(p.hosts))].name n2 = p.hosts[p.rnd.Intn(len(p.hosts))].name return n1, n2}
func (p *P2C) Inc(host string) { p.Lock() defer p.Unlock()
h, ok := p.loadMap[host]
if !ok { return } h.load++}
func (p *P2C) Done(host string) { p.Lock() defer p.Unlock()
h, ok := p.loadMap[host]
if !ok { return }
if h.load > 0 { h.load-- }}
复制代码
代码的实现思路是这样的,根据用户的 ip 作为 key 来进行 hash 操作,如果 ip 为空随机选取两个服务器,如果不为空,则分别使用 ip 和 ip 加盐后ChecksumIEEE计算出的值来选取服务器,选出两个服务器后比较两者的负载状况,选择负载更小的那个。负载的计算方式和least load保持一致。
总结
我们可以看到Balancer实际上完成了简单的方向代理能力以及实现了多种的负载均衡算法,不仅可以作为服务器单独运行,还可以作为 sdk 提供负载均衡算法的使用。其代码实现较为简单,但是十分清晰,非常适合刚接触 golang 的开发者。
评论