写点什么

Go 语言协程池实现第二弹

作者:FunTester
  • 2023-08-24
    北京
  • 本文字数:4064 字

    阅读完需:约 13 分钟

之前写了 Go 语言协程池的实践以及动态 QPS 的实现,本来计划就是开始做一些测试了,但是发现协程池的实现有些问题也有一些 BUG,所以连夜修改了部分功能。


为了不咋不明真相的读者造成困扰,赶紧写篇文章报告一下。

缺陷 &BUG

这里先把测试中遇到的问题和 BUG 梳理一下:


  1. 活跃协程数计算错误

  2. 执行数量和收到计数错误

  3. QPS 陡增和陡降的时候,无法及时增加压力和回收协程

  4. 协程回收存在问题不够优雅,效率太低

BUG 分析

活跃协程数

这里计数错误的原因是因为在原来的实现中多次使用了ReduceWorker()AddWorker()方法,导致没有将添加和减少的功能收拢到某一个时机统一处理,有两处重复使用的问题,导致Active计算错误。

执行数量和收到计数错误

这里问题出现在这个方法,本意是设计一个执行固定次数任务的方法。通过将一部分任务合并到同一个任务丢给协程池。但是在实现过程中并没有将原始的任务单独计数。同时在记录到所有收到的任务ReceiveTotal中也存在这个问题。


// ExecuteQps//  @Description: 执行任务固定次数//  @receiver pool//  @param t//  @param qps//func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {  mutiple := qps / pool.SingleTimes  remainder := qps % pool.SingleTimes  for i := 0; i < pool.SingleTimes; i++ {    pool.Execute(func() {      for i := 0; i < mutiple; i++ {        t()      }    })  }  pool.Execute(func() {    for i := 0; i < remainder; i++ {      t()    }  })}
复制代码

新建回收协程不及时

在 QPS 陡增陡降的场景测试中,存在 2 个,需要增加协程数的时候增加较慢,因为每 1 秒扫描一次的等待channel,增多增加 1 个,同样的也减少 1 个。这个效率非常低,设计太蠢了。

线程回收不优雅

在旧实现中用的是扫描等待channel判断是否增加还是减少协程。除了效率低以外,还存在无法及时回收和过度回收(正在运行的协程也会收到一个终止信号)。


旧的方法内容如下(现在看简直不忍直视):



// balance // @Description: 平衡活跃协程数 // @receiver pool // func (pool *GorotinesPool) balance() { if pool.status { if len(pool.tasks) > 0 && pool.active < int32(pool.Max) { pool.AddWorker() } if len(pool.tasks) == 0 && pool.active > int32(pool.Min) { pool.ReduceWorker() } } }
复制代码

解决方法

重写回收方法

这里参照了 Java 线程池的实现,通过设置一个最大空闲时间,通过这个设置来回收协程,这样即方便又能照顾到协程运行状态,避免过度回收。这里改造了一下worker()方法。


实现内容如下:


  // worker  //  @Description: 开始执行协程  //  @receiver pool  //  func (pool *GorotinesPool) worker() {     defer func() {        if p := recover(); p != nil {           log.Printf("execute task fail: %v", p)        }     }()  Fun:     for {        select {        case t := <-pool.tasks:           atomic.AddInt32(&pool.ExecuteTotal, 1)           t()        case <-time.After(pool.MaxIdle):           if pool.Active > int32(pool.Min) {              atomic.AddInt32(&pool.Active, -1)              break Fun           }        }     }  }
复制代码


这里顺道解决了协程回收效率不高的问题。

协程增加效率

这里做了 2 项修改:


  1. 增加扫描后单次增加协程数量,增加等待channel数量等量的协程数

  2. 调整func (pool *GorotinesPool) Execute(t func()) error合并任务的算法,采取固定的数量合并方案


增加协程数量:


// balance  //  @Description: 平衡活跃协程数  //  @receiver pool  //  func (pool *GorotinesPool) balance() {     if pool.status {        if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {           for i := 0; i < len(pool.tasks); i++ {              if int(pool.Active) < pool.Max {                 pool.AddWorker()              }           }        }     }  }
复制代码


调整合并任务方案:


// ExecuteQps//  @Description: 执行任务固定次数//  @receiver pool//  @param t//  @param qps//func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {  mutiple := qps / pool.SingleTimes  remainder := qps % pool.SingleTimes  for i := 0; i < mutiple; i++ {    pool.Execute(func() {      atomic.AddInt32(&pool.ExecuteTotal, -1)      for i := 0; i < pool.SingleTimes; i++ {        atomic.AddInt32(&pool.ExecuteTotal, 1)        t()      }    })  }  pool.Execute(func() {    atomic.AddInt32(&pool.ExecuteTotal, -1)    for i := 0; i < remainder; i++ {      atomic.AddInt32(&pool.ExecuteTotal, 1)      t()    }  })}
复制代码

计数不准确

这个纯属 BUG,改掉计数错误的地方,修复 BUG,代码已经在上述修复代码中有所体现了。

完整代码

package execute    import (     "errors"     "funtester/ftool"   "log"   "sync/atomic"   "time")    type GorotinesPool struct {     Max          int     Min          int     tasks        chan func()     status       bool     Active       int32     ExecuteTotal int32     SingleTimes  int     addTimeout   time.Duration     MaxIdle      time.Duration  }    type taskType int    const (     normal taskType = 0     reduce taskType = 1  )    // GetPool  //  @Description: 创建线程池  //  @param max 最大协程数  //  @param min 最小协程数  //  @param maxWaitTask 最大任务等待长度  //  @param timeout 添加任务超时时间,单位s  //  @return *GorotinesPool  //  func GetPool(max, min, maxWaitTask, timeout, maxIdle int) *GorotinesPool {     p := &GorotinesPool{        Max:          max,        Min:          min,        tasks:        make(chan func(), maxWaitTask),        status:       true,        Active:       0,        ExecuteTotal: 0,        SingleTimes:  10,        addTimeout:   time.Duration(timeout) * time.Second,        MaxIdle:      time.Duration(maxIdle) * time.Second,     }     for i := 0; i < min; i++ {        p.AddWorker()     }     go func() {        for {           if !p.status {              break           }           ftool.Sleep(1000)           p.balance()        }     }()     return p  }    // worker  //  @Description: 开始执行协程  //  @receiver pool  //  func (pool *GorotinesPool) worker() {     defer func() {        if p := recover(); p != nil {           log.Printf("execute task fail: %v", p)        }     }()  Fun:     for {        select {        case t := <-pool.tasks:           atomic.AddInt32(&pool.ExecuteTotal, 1)           t()        case <-time.After(pool.MaxIdle):           if pool.Active > int32(pool.Min) {              atomic.AddInt32(&pool.Active, -1)              break Fun           }        }     }  }    // Execute  //  @Description: 执行任务  //  @receiver pool  //  @param t  //  @return error  //  func (pool *GorotinesPool) Execute(t func()) error {     if pool.status {        select {        case pool.tasks <- func() {           t()        }:           return nil        case <-time.After(pool.addTimeout):           return errors.New("add tasks timeout")        }     } else {        return errors.New("pools is down")     }  }    // Wait  //  @Description: 结束等待任务完成  //  @receiver pool  //  func (pool *GorotinesPool) Wait() {     pool.status = false  Fun:     for {        if len(pool.tasks) == 0 || pool.Active == 0 {           break Fun        }        ftool.Sleep(1000)     }     defer close(pool.tasks)     log.Printf("execute: %d", pool.ExecuteTotal)  }    // AddWorker  //  @Description: 添加worker,协程数加1  //  @receiver pool  //  func (pool *GorotinesPool) AddWorker() {     atomic.AddInt32(&pool.Active, 1)     go pool.worker()  }    // balance  //  @Description: 平衡活跃协程数  //  @receiver pool  //  func (pool *GorotinesPool) balance() {     if pool.status {        if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {           for i := 0; i < len(pool.tasks); i++ {              if int(pool.Active) < pool.Max {                 pool.AddWorker()              }           }        }     }  }    // ExecuteQps  //  @Description: 执行任务固定次数  //  @receiver pool  //  @param t  //  @param qps  //  func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {     mutiple := qps / pool.SingleTimes     remainder := qps % pool.SingleTimes     for i := 0; i < mutiple; i++ {        pool.Execute(func() {           atomic.AddInt32(&pool.ExecuteTotal, -1)           for i := 0; i < pool.SingleTimes; i++ {              atomic.AddInt32(&pool.ExecuteTotal, 1)              t()           }        })     }     pool.Execute(func() {        atomic.AddInt32(&pool.ExecuteTotal, -1)        for i := 0; i < remainder; i++ {           atomic.AddInt32(&pool.ExecuteTotal, 1)           t()        }     })  }
复制代码


发布于: 刚刚阅读数: 3
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
Go语言协程池实现第二弹_FunTester_InfoQ写作社区