写点什么

Golang 中如何使用 Singleflight 库进行并发请求合并

作者:Jack
  • 2023-05-07
    广东
  • 本文字数:2927 字

    阅读完需:约 10 分钟

Golang中如何使用Singleflight库进行并发请求合并

SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。


Singleflight 是一个用于合并并发请求的库,它的主要思想是:如果有多个并发请求同时请求同一个资源,那么只需要执行一次实际的请求,然后将结果返回给所有的请求者。


SingleFlight 在面对秒杀等大并发请求的场景,而且这些请求都是读请求时,你就可以把这些请求合并为一个请求,这样,你就可以将后端服务的压力从 n 降到 1。尤其是在面对后端是数据库这样的服务的时候,采用 SingleFlight 可以极大地提高性能。

一、Singleflight 的实现原理

SingleFlight 使用互斥锁 Mutex 和 Map 来实现。Mutex 提供并发时的读写保护,Map 用来保存同一个 key 的正在处理(in flight)的请求。


SingleFlight 的数据结构是 Group,它提供了三个方法。


  • Do:这个方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示 v 是否返回给多个请求。

  • DoChan:类似 Do 方法,只不过是返回一个 chan,等 fn 函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。

  • Forget:告诉 Group 忘记这个 key。这样一来,之后这个 key 请求会执行 f,而不是等待前一个未完成的 fn 函数的结果。


下面我们具体看看实现细节:


首先,SingleFlight 定义一个辅助对象 call,这个 call 就代表正在执行 fn 函数的请求或者是已经执行完的请求。Group 代表 SingleFlight。

// 代表一个正在处理的请求,或者已经处理完的请求
type call struct {
wg sync.WaitGroup
// 这个字段代表处理完的值,在waitgroup完成之前只会写一次
// waitgroup完成之后就读取这个值
val interface{}
err error
dups int
chans []chan<- Result
}
// group代表一个singleflight对象
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
复制代码


我们只需要查看一个 Do 方法,DoChan 的处理方法是类似的。

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {//如果已经存在相同的key
c.dups++
g.mu.Unlock()
c.wg.Wait() //等待这个key的第一个请求完成
return c.val, c.err, true //使用第一个key的请求结果
}
c := new(call) // 第一个请求,创建一个call
c.wg.Add(1)
g.m[key] = c //加入到key map中
g.mu.Unlock()
g.doCall(c, key, fn) // 调用方法
return c.val, c.err, c.dups > 0
}
复制代码


doCall 方法会实际调用函数 fn:

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
复制代码

这段代码是单机版 Singleflight 的核心处理逻辑,它的作用是处理单个请求的调用,实现了请求的合并以及结果的返回。具体来说,这段代码的实现包括以下几个步骤:

  • 定义了一个叫做 normalReturn 和 recovered 的布尔变量,用于标记请求是否正常返回以及是否发生了 panic。

  • 使用 defer 语句来进行清理工作,包括标记请求是否正常返回、释放锁、删除请求和处理 panic 等操作。

  • 在一个匿名函数中执行实际的请求处理函数,并将结果存储到 c.val 和 c.err 中。

  • 在请求处理函数执行完毕后,判断请求是否正常返回,如果没有则标记请求发生了 panic,并将 panic 信息存储到 c.err 中。

  • 最后根据请求的返回状态,将结果返回给所有等待的请求者。


二、使用场景

下面我们来看一下 Singleflight 的使用场景和应用案例:

  1. 缓存穿透

在分布式系统中,我们通常使用缓存来提高系统的性能。但是,如果某个缓存键对应的值不存在,那么每次请求都会穿透到后端服务,这会导致后端服务的压力增大。Singleflight 就可以解决这个问题,它可以在第一次请求后将结果缓存下来,然后将结果返回给所有的请求者。


  1. 防止重复请求

在某些场景下,我们可能会出现重复请求的情况,这会导致系统的性能下降。Singleflight 可以防止重复请求,它会在第一次请求后将结果缓存下来,并将结果返回给所有的请求者。

下面是 Singleflight 的示例代码:

package main
import (
"fmt"
"sync"
"golang.org/x/sync/singleflight"
)
var group singleflight.Group
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
v, err, _ := group.Do("key", func() (interface{}, error) {
fmt.Println("fetching data")
return i, nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println(v)
}
}(i)
}
wg.Wait()
}
复制代码

在这个示例代码中,我们创建了一个 Singleflight Group,并使用 group.Do 方法来执行请求。在第一次请求时,会执行传入的函数,并将结果缓存下来。对于后续的请求,Singleflight 会直接从缓存中获取结果,并将结果返回给所有的请求者。


总之,Singleflight 是一个非常有用的库,它可以帮助我们解决并发请求的问题,提高系统的性能。

发布于: 刚刚阅读数: 3
用户头像

Jack

关注

还未添加个人签名 2019-05-12 加入

作为一名技术追求者,我对科技、编程和创新充满热情。我始终关注最新的商业趋势和技术发展,努力将其应用于实践中,从而推动技术创新和改进。我善于思考和分析,具备较强的解决问题的能力和团队合作精神。

评论

发布
暂无评论
Golang中如何使用Singleflight库进行并发请求合并_Jack_InfoQ写作社区