如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内存时的性能损耗吗?答案是肯定的。Go 同步包中,sync.Pool提供了保存和访问一组临时对象并复用它们的能力。
对于一些创建成本昂贵、频繁使用的临时对象,使用sync.Pool可以减少内存分配,降低 GC 压力。因为Go的 gc 算法是根据标记清除改进的三色标记法,如果频繁创建大量临时对象,势必给 GC 标记带来负担,CPU 也很容易出现毛刺现象。当然需要注意的是:**存储在Pool中的对象随时都可能在不被通知的情况下被移除。 所以并不是所有频繁使用、创建昂贵的对象都适用,比如 DB 连接、线程池**。
Talk is cheap,Show me your code
因为Go1.13版本后对sync.Pool做了优化,放弃了利用sync.Mutex加锁的方式该用 CAS 加带环形数组的双向链表的方式来实现,本文基于Go1.15.8最新稳定版本分析。
基本使用
package main
import "sync"
type Person struct { Age int}
// 初始化poolvar personPool = sync.Pool{ New: func() interface{} { return new(Person) },}
func main() { // 获取一个实例 newPerson := personPool.Get().(*Person) // 回收对象 以备其他协程使用 defer personPool.Put(newPerson)
newPerson.Age = 25}
复制代码
使用起来比较简单大概分三步:
初始化 Pool
提供一个New函数,当 Pool 中未缓存该对象时调用
使用Get从缓存池中获取对象,接着进行业务逻辑处理即可
使用完毕 利用Put将对象交还给缓存池
需要注意的是:跟sync.Mutex一样sync.Pool第一次使用之后是不允许被拷贝的。
那sync.Pool对性能优化真的有这么大魔力吗?Benchmark 之
import ( "testing")
func BenchmarkWithoutPool(b *testing.B) { var p *Person b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < 10000; j++ { p = new(Person) p.Age = 30 } }}
func BenchmarkWithPool(b *testing.B) { var p *Person b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < 10000; j++ { p = personPool.Get().(*Person) p.Age = 30 personPool.Put(p) } }}
复制代码
基准测试结果:
BenchmarkWithoutPoolBenchmarkWithoutPool-8 7630 135523 ns/op 80000 B/op 10000 allocs/opBenchmarkWithPoolBenchmarkWithPool-8 9865 126072 ns/op 0 B/op 0 allocs/op
复制代码
工作原理
没有啥一张图搞不定的
如果不行 那就再来一张
sync.Pool 数据结构
type Pool struct { noCopy noCopy // 实际指向[]poolLocal 每个P对应一个poolLocal 数组大小取决于P的数量 runtime.GOMAXPROCS(0) local unsafe.Pointer localSize uintptr // []poolLocal的大小
victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array //当缓存池无对应对象时调用 New func() interface{}}
复制代码
相较于Go1.13之前版本,sync.Pool的结构体中新增了victim、victimSize字段
sync.Pool主要维护了一个sync.poolLocal的数组,数组大小由runtime.GOMAXPROCS(0)决定。
type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte}
// Local per-P Pool appendix.type poolLocalInternal struct { private interface{} // 只能被对应的P使用 shared poolChain // 本地的P可以从Head 进行pushHead/popHead 其他的P可以popTail.}
复制代码
poolLocal内部又由 P 私有空间private和共享空间shared。共享空间是一个双端队列,双端队列每个节点又对应着一个环形数组,听着貌似有点儿绕,老规矩上图:
poolDequeue算是个逻辑上的环形数组,字段vals存储着实际的值,出于操作原子性的考虑,headTail字段将首尾索引融合在一起,高 32 位为 head 的索引下标,低 32 位为 tail 的索引下标,head 和 tail 指向同一位置则表示环形数组为空。
代码佐证:
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { const mask = 1<<dequeueBits - 1 head = uint32((ptrs >> dequeueBits) & mask) tail = uint32(ptrs & mask) return}func (d *poolDequeue) pack(head, tail uint32) uint64 { const mask = 1<<dequeueBits - 1 return (uint64(head) << dequeueBits) | uint64(tail&mask)}
复制代码
sync.Pool实际使用过程中又将poolDequeue进行了包装,因为数组大小是固定,所以为了让他大小可变,将其包装成了poolChainElt双向链表。
操作方法
接下来我们来剖析一下sync.Pool几个核心流程
获取对象 p.Get
获取对象,大体流程:
将当前 goroutine 与 P 绑定并防止被抢占 具体是调用了runtime_procPin,返回poolLocal和 P 的 id
优先从私有空间获取对象
若私有空间没有,则尝试从共享区域获取
若共享区域也没拿到,则尝试从别人那边“偷”来一个
若偷都偷不到,那么自己手动 New 一个
func (p *Pool) Get() interface{} { // 将当前goroutine与P进行绑定 runtime_procPin禁用抢占 // 返回poolLocal与P的id l, pid := p.pin() x := l.private //尝试直接从私有空间拿 l.private = nil if x == nil { //从共享区域头部拿 x, _ = l.shared.popHead() if x == nil { //直接实在没有 尝试去别人那边看看能不能偷个 x = p.getSlow(pid) } } // 解除抢占禁用 runtime_procUnpin() // 都没有 那只好自己New一个 if x == nil && p.New != nil { x = p.New() } return x}
复制代码
那么我们来看看 goroutine 是怎么跟 P 绑定的
func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() // pinSlow中我们先存储local再存储localSize,这里我们以相反顺序加载 // 因为我们已经禁用了抢占 GC这期间不会发生 因此我们需要观察local的大小至少跟localSize一样 s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } // 运行过程中可能会存在调整P的情况 或者GC了 return p.pinSlow()}
复制代码
这里我们先调用runtime_procPin(),为啥它这么牛逼,不仅让 P 不会被抢占,还让 GC 为之折腰?
番外:禁止抢占
func runtime_procPin() int//go:linkname sync_runtime_procPin sync.runtime_procPin//go:nosplitfunc sync_runtime_procPin() int { return procPin()}//go:nosplitfunc procPin() int { _g_ := getg() mp := _g_.m
mp.locks++ return int(mp.p.ptr().id)}
复制代码
正如所见,兜兜转转实际绑定 goroutine 和 P、禁用抢占交给了procPin。procPin首先从TLS或专用寄存器拿到当前的goroutine,然后获取当前gorountine绑定的物理线程,并对物理线程的locks属性自增操作。这意味什么呢?
这里可能涉及到一些goroutine调度的内容,Go runtime 调度是一个 GPM 模型。G 为调度的基本单元,P 可以理解为运行 G 的逻辑 CPU M 为系统线程。何为抢占?
即,将m绑定的P给占用,因为Go runtime中 99.9%的任务都需要P才能执行任务。Go 运行时调度主要存在两种抢占的情况:
抢占实现
Go 中的抢占是sysmon实现的。对 没错就是runtime.main里的那个sysmon也是唯一一个脱离GPM模型只需GM即可运行的特例。sysmon中包含了netpool、retake、forcegc、scavengeheap,这里抢占我们需要关注下retake。
//go:nowritebarrierrecfunc sysmon() { ... // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } ...}func retake(now int64) uint32 { ... if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now {//G运行时间超过forcePreemptNS preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } ...}
复制代码
P 处于运行中或系统调用,检查G运行时间是否超过forcePreemptNS(10ms),超过则调用preemptone(p)抢占这个P
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false }
gp.preempt = true
// Every call in a go routine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt
// Request an async preemption of this P. if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) }
return true}
复制代码
主要是设置两个标志位gp.preempt和gp.stackguard0 主要起作用的是后者。通过将goroutine的stackguard0设置为 (1<<(8*sys.PtrSize) - 1)& -1314,导致P在执行G下一次的函数调用时,栈空间检查失败(stackguard0与 SP 寄存器比较),进而触发编译器安插的指令morestack。
//以asm_amd64.s为例TEXT runtime·morestack(SB),NOSPLIT,$0-0 ... ... // Call newstack on m->g0's stack. MOVQ m_g0(BX), BX MOVQ BX, g(CX) MOVQ (g_sched+gobuf_sp)(BX), SP CALL runtime·newstack(SB) CALL runtime·abort(SB) // crash if newstack returns RET
复制代码
morestack会调用newstack尝试栈扩容
//go:nowritebarrierrecfunc newstack() { ... ... if preempt { if !canPreemptM(thisg.m) { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } } ... ... }//go:nosplitfunc canPreemptM(mp *m) bool { return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning}
复制代码
newstack在栈扩容前会检查抢占标志位mp.locks!=0则不抢占。
如果抢占成功,则会继续调用gopreempt_m(gp)进而调用goschedImpl(gp)将P与当前m接触关联,设置goroutine状态casgstatus(gp, Grunning, Grunnable),然后将goroutine插入 Global runnable queue 等待下次调度。
至此,应该能彻底明白为啥runtime_procPin能够通过修改goroutine绑定的m的locks属性就能禁用抢占了。
但是还有个问题,为啥 GC 也拿它没办法?
关于Go的GC,大致有三种触发方式:
gcTriggerCycle 后台定时检查触发,如 runtime.sysmon
gcTriggerTimer 自上个 GC 周期超过 forcegcperiod 纳秒则触发 如runtime.forcegchelper
g cTriggerHeap 申请的堆内存大小达到触发阈值 如 runtime.mallocgc
最终都会调用gcStart(trigger gcTrigger) ,进而我们在 GC 的 STW 阶段执行中可以看到
func stopTheWorldWithSema() { _g_ := getg()
// If we hold a lock, then we won't be able to stop another M // that is blocked trying to acquire the lock. if _g_.m.locks > 0 { throw("stopTheWorld: holding locks") } lock(&sched.lock) sched.stopwait = gomaxprocs atomic.Store(&sched.gcwaiting, 1) preemptall() // stop current P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // try to retake all P's in Psyscall status for _, p := range allp { s := p.status if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ sched.stopwait-- } } // stop idle P's for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait-- } wait := sched.stopwait > 0 unlock(&sched.lock)
// wait for remaining P's to stop voluntarily if wait { for { // wait for 100us, then try to re-preempt in case of any races if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } preemptall() } }
// sanity checks bad := "" if sched.stopwait != 0 { bad = "stopTheWorld: not stopped (stopwait != 0)" } else { for _, p := range allp { if p.status != _Pgcstop { bad = "stopTheWorld: not stopped (status != _Pgcstop)" } } } if atomic.Load(&freezing) != 0 { // Some other thread is panicking. This can cause the // sanity checks above to fail if the panic happens in // the signal handler on a stopped thread. Either way, // we should halt this thread. lock(&deadlock) lock(&deadlock) } if bad != "" { throw(bad) }}
复制代码
大致逻辑先调用preemptall()尝试抢占所有的P,然后停掉当前P,遍历所有的P,如果P处于系统调用则直接stop掉;然后处理空闲的P;最后检查是否存在需要等待处理的P,如果有则循环等待,并尝试调用preemptall()
func preemptall() bool { res := false for _, _p_ := range allp { if _p_.status != _Prunning { continue } if preemptone(_p_) { res = true } } return res}
复制代码
到这里就很清晰了,我们又看到老朋友preemptone(_p_),显然GC会在STW阶段等下去,GC自然也无法执行下去。
好了 刚刚两个问题我们已经搞清楚了。书归正传 runtime_procPin能禁用P被抢占,那么runtime_procUnpin自然能解除禁用。完成goroutine与P的绑定,返回了当前P的 id,如果pid<p.localSize则说明当前 poolLocal 已经存在 直接利用地址偏移拿到poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp)}
复制代码
如果运行时P被调整了呢?那么尝试下p.pinSlow(),正如其名这个过程会有点儿慢
func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid], pid}
复制代码
pinSlow()上来第一件事儿 将我们之前设置的 P 禁用抢占给释放了。然后尝试获取全局排他锁allPoolsMu Mutex。这也能解释它为啥上来就释放掉之前的禁止占用,因为获取当前全局排他锁不一定能立马拿到啊。拿到锁之后又开启了禁止抢占 P,接着又判断了下uintptr(pid) < s 因为拿到锁之前P可能已经变化了。如果当前p.local=nil则将p放到全局的池子allPools []*Pool里,也是为啥刚才需要等待全局排他锁的原因。因为GC时会将原有的 pool 清理掉所以这里进行重建,原有 pool 真的没了吗?这个就跟之前提到的victim有点儿关系了 等会儿一起看。
至此,我们拿到了poolLocal,接着获取对象的顺序为
首先尝试从本地的 private 中获取
如果本地没拿到,则x, _ = l.shared.popHead()尝试从共享空间拿
func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = loadPoolChainElt(&d.prev) } return nil, false}
复制代码
共享空间是以PoolChainElt为节点的双向链表,首先我们尝试沿着双向链表prev的方向依次调用d.popHead()尝试从头部拿数据
func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if tail == head { // Queue is empty. return nil, false }
// Confirm tail and decrement head. We do this before // reading the value to take back ownership of this // slot. head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals[head&uint32(len(d.vals)-1)] break } }
val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // Zero the slot. Unlike popTail, this isn't racing with // pushHead, so we don't need to be careful here. *slot = eface{} return val, true}
复制代码
逻辑也比较简单
2.1 将headTail拆封 如果 head==tail 表明当前环形数组为空,直接返回
2.2 接着将 head 索引减 1,然后将 head、tail 再打包回去,通过 CAS 判断当前没有并发修改就拿到数据 跳出循环 否则循环等待
2.3 将 slot 转为 interface{}类型
2.4 将 slot 赋值为 eface{}
如果共享空间依然没拿到,那么想办法从其他P那偷个吧p.getSlow(pid)
func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // Try to steal one element from other procs. for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } }
// Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } }
// Mark the victim cache as empty for future gets don't bother // with it. atomic.StoreUintptr(&p.victimSize, 0)
return nil}
复制代码
3.1 拿到[]poolLocal 数组,遍历每个 poolLocal,并调用l.shared.popTail() 从其共享空间的尾部拿数据
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false }
for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok { return val, ok }
if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false }
// The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 }}
复制代码
首先拿到尾节点,然后在死循环中沿着双向链表next的方向不断获取PoolChainElt节点,尝试调用d.popTail()获取数据
func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if tail == head { // Queue is empty. return nil, false } ptrs2 := d.pack(head, tail+1) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true}
复制代码
与popHead比较像,不同在于一个从头部拿数据一个从尾部拿。首先依然是在死循环中先将headTail拆封,如果 tai l==head 表示环形数组为空,直接返回。否则将 tail+1 再封装好,同 CAS 规避并发问题 拿到数据则跳出循环,否则循环等待。
这里有一个跟popHead不同的是 先将 value 置为 nil 然后利用 CAS 来将 typ 置空操作atomic.StorePointer(&slot.typ, nil),原因很简单,pushHead和popTail一个从头放一个从尾拿数据,一旦碰头就会出现竞争。
3.2 那如果偷都偷不到,会进行以下操作
size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } }
// Mark the victim cache as empty for future gets don't bother // with it. atomic.StoreUintptr(&p.victimSize, 0)
复制代码
victim cache翻译过来叫“受害者缓存”
受害者缓存是由Norman Jouppi提出的一种提高缓存性能的硬件技术。如他的论文所述
>
Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.
大概意思就是在旧缓存和缓解重建的过程中,添加一个全关联的缓存(保存旧缓存数据)。也就是说当一级缓存踢出的数据,放到受害者缓存中。当我们在一级缓存未命中,则可以继续尝试从受害者缓存中查询。
如代码:
size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } }
// Mark the victim cache as empty for future gets don't bother // with it. atomic.StoreUintptr(&p.victimSize, 0)
复制代码
如果能理解,其实还是挺简单的,也就是
local1 ->GC ->local2 victim->local1
>
Local2 ->GC ->local3 victim->local2
很遗憾 getSlow 也没拿到 那只好自己手动 new 一个了
if x == nil && p.New != nil { x = p.New() }
复制代码
用完返回 Pool p.Put
看完 Get,接着看下Put
func (p *Pool) Put(x interface{}) { if x == nil { return } // 将goroutine与P绑定 runtime_procPin禁用抢占 返回poolLocal l, _ := p.pin() if l.private == nil {//优先放到私有空间 l.private = x x = nil } if x != nil { //放回共享空间 l.shared.pushHead(x) } // 解除抢占禁用 runtime_procUnpin()}
复制代码
基本逻辑:
如果放入对象为空 直接返回
调用p.pin获取poolLocal之前分析过大体类似
优先放入私有空间
若私有空间已满 则尝试放入共享空间
释放 P 禁止占用
func (c *poolChain) pushHead(val interface{}) { d := c.head if d == nil { // Initialize the chain. const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return }
newSize := len(d.vals) * 2 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit }
d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val)}
复制代码
putHead逻辑主要是将对象放到双向链表的对应节点的环形数组中。
先获取双向链表的 head 节点
若 head 节点为空 则初始化 head 节点 节点对应环形数组初始大小为 8
将对象放到环形数组中
func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // Queue is full. return false } slot := &d.vals[head&uint32(len(d.vals)-1)] typ := atomic.LoadPointer(&slot.typ) if typ != nil {// popTail可能还没处理完 return false }
// The head slot is free, so we own it. if val == nil { val = dequeueNil(nil) } *(*interface{})(unsafe.Pointer(slot)) = val atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true}
复制代码
跟popHead是相反的操作,大体也比较简单。先判断环形数组是否满了,满了则直接返回。因为pushHead跟popTail存在竞争关系,slot.typ不为空可能是popTail还没处理完。
关于 GC 清除数据问题
pool.go中的init函数组册了 GC 发生时如何清理 Pool 的函数,调用链如下
gcTrigger->gcStart()->clearpools()->poolCleanup()
func init() { runtime_registerPoolCleanup(poolCleanup)}//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanupfunc sync_runtime_registerPoolCleanup(f func()) { poolcleanup = f}func poolCleanup() { for _, p := range oldPools { p.victim = nil p.victimSize = 0 }
for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 }
oldPools, allPools = allPools, nil}
复制代码
逻辑很简单 正如上面讲victim说的那样。
最后的最后,细心的你可能发现 还遗漏了两个细节
noCopy
sync.Pool结构体中noCopy其实是为了防止sync.Pool使用过程中被拷贝。至于原因应该不用多说,因为Go并没有提供原生的强制不能拷贝的方法。所以采用这种方式,让go vet检测报错来实现。
举个例子
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.func (*noCopy) Lock() {}func (*noCopy) Unlock() {}type People struct { noCopy noCopy}
func say(p People) {
}
func main() { var p People say(p)}
复制代码
输出:
# command-line-arguments./demo.go:12:12: say passes lock by value: command-line-arguments.People contains command-line-arguments.noCopy./demo.go:18:6: call of say copies lock value: command-line-arguments.People contains command-line-arguments.noCopy
复制代码
当然直接执行不会报任何错
pad
type poolLocal struct { poolLocalInternal
// Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte}
复制代码
pad字段在这里没有啥业务意思,目的就是为了避免伪共享问题。因为我们为了缓解计算机 CPU 计算速度和内存的读取速度不匹配的矛盾,在他们之间增加了 L1 L2 L3 高速缓存,他们比内存小很多但是速度却是内存无法比拟的。
缓存系统中我们是以缓存行(cache line)为单位,通常大小为 64 字节。上面这张图,我们可以看到 L1、L2、L3 三级缓存他们和内存的读取速度当然取决于他们与 CPU 紧密程度。L1>L2>L3>内存
但是!我们现在使用的都是多核 CPU 的计算机,如何保证多核看到的数据的一致性呢?这里我们需要谈到一个协议-MESI 协议,M、E、S、I 分别表示缓存行的 4 个状态
M(修改,Modified):本地处理器已经修改缓存行,即是脏行,它的内容与内存中的内容不一样,并且此 cache 只有本地一个拷贝(专有);
>
E(专有,Exclusive):缓存行内容和内存中的一样,而且其它处理器都没有这行数据;
>
S(共享,Shared):缓存行内容和内存中的一样, 有可能其它处理器也存在此缓存行的拷贝;
>
I(无效,Invalid):缓存行失效, 不能使用。
他们转换关系如下:
现在假设我们有以下场景
有两个变量 X、Y 共享在了一个cache line中。如果 core1 想要更新 X,core2 想要更新 Y,更新完他们的缓存行都变成了 I 状态,即 L1 L2 上的缓存均不可用,这时如果其他线程再要访问 X Y 就只能从 L3 甚至从内存拿数据,其性能可想而知。
怎么解决呢?
解决伪共享的问题 业界大多采用 pad 填充的方式来解决,让数据独占一个 cacheline 降低数据关联共享的影响。比如 Java8 还提供了语法糖,通过添加注解@Contended自动进行缓存行填充。
总结
sync.Pool实现总体比较小巧,具体思想其实其他语言也都有影子,比如 Java 中的ForkJoinPool。但是往往简单设计的细节往往很值得我们去考究学习一下的。总结下知识点还真不少:
work stealing 算法
CAS 如何做到 lock-free
设置抢占标志 禁止 P 被占用 并制止 GC
Victim cache 受害者缓存是怎么回事儿
noCopy 是干啥的 怎么实现禁止拷贝
伪共享(false share)
Pool GC 的机制
不过这也符合 Go“少即是多”的设计理念。
评论