package execute
import (
"errors"
"funtester/ftool" "log" "sync/atomic" "time")
type GorotinesPool struct {
Max int
Min int
tasks chan func()
status bool
Active int32
ExecuteTotal int32
SingleTimes int
addTimeout time.Duration
MaxIdle time.Duration
}
type taskType int
const (
normal taskType = 0
reduce taskType = 1
)
// GetPool
// @Description: 创建线程池
// @param max 最大协程数
// @param min 最小协程数
// @param maxWaitTask 最大任务等待长度
// @param timeout 添加任务超时时间,单位s
// @return *GorotinesPool
//
func GetPool(max, min, maxWaitTask, timeout, maxIdle int) *GorotinesPool {
p := &GorotinesPool{
Max: max,
Min: min,
tasks: make(chan func(), maxWaitTask),
status: true,
Active: 0,
ExecuteTotal: 0,
SingleTimes: 10,
addTimeout: time.Duration(timeout) * time.Second,
MaxIdle: time.Duration(maxIdle) * time.Second,
}
for i := 0; i < min; i++ {
p.AddWorker()
}
go func() {
for {
if !p.status {
break
}
ftool.Sleep(1000)
p.balance()
}
}()
return p
}
// worker
// @Description: 开始执行协程
// @receiver pool
//
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for {
select {
case t := <-pool.tasks:
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
case <-time.After(pool.MaxIdle):
if pool.Active > int32(pool.Min) {
atomic.AddInt32(&pool.Active, -1)
break Fun
}
}
}
}
// Execute
// @Description: 执行任务
// @receiver pool
// @param t
// @return error
//
func (pool *GorotinesPool) Execute(t func()) error {
if pool.status {
select {
case pool.tasks <- func() {
t()
}:
return nil
case <-time.After(pool.addTimeout):
return errors.New("add tasks timeout")
}
} else {
return errors.New("pools is down")
}
}
// Wait
// @Description: 结束等待任务完成
// @receiver pool
//
func (pool *GorotinesPool) Wait() {
pool.status = false
Fun:
for {
if len(pool.tasks) == 0 || pool.Active == 0 {
break Fun
}
ftool.Sleep(1000)
}
defer close(pool.tasks)
log.Printf("execute: %d", pool.ExecuteTotal)
}
// AddWorker
// @Description: 添加worker,协程数加1
// @receiver pool
//
func (pool *GorotinesPool) AddWorker() {
atomic.AddInt32(&pool.Active, 1)
go pool.worker()
}
// balance
// @Description: 平衡活跃协程数
// @receiver pool
//
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {
for i := 0; i < len(pool.tasks); i++ {
if int(pool.Active) < pool.Max {
pool.AddWorker()
}
}
}
}
}
// ExecuteQps
// @Description: 执行任务固定次数
// @receiver pool
// @param t
// @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i++ {
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < pool.SingleTimes; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < remainder; i++ {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
评论