package stress
import (
"math"
"net/http"
"sort"
"sync"
"time"
)
type Result struct {
Avg int64
P50 int64
P95 int64
P100 int64
Qps int64
Response []int64
}
var usedChan chan int64
/**
输入参数:url,执行次数
输出参数:平均响应时间,95% 响应时间
qps: = (1000/响应时间)* 并发数 = (200/77)*10 = 2597
*/
func Execute(url string, concurrence int64, times int64) *Result {
if times < 1 {
return &Result{}
}
usedChan = make(chan int64, times)
request(url, concurrence, times)
total, response := PrecessResponse(times)
index95 := getIndex(times, 0.95)
index50 := getIndex(times, 0.5)
avg := math.Ceil(float64(total) / float64(times))
qps := (1000 / avg) * float64(concurrence)
result := &Result{
Avg: int64(avg),
P50: response[index50],
P95: response[index95],
P100: response[times-1],
Qps: int64(qps),
Response: response,
}
return result
}
// 发起请求
func request(url string, concurrence int64, times int64) {
var i int64 = 0
var wg sync.WaitGroup
wg.Add(int(times))
limit := make(chan bool, concurrence) // 控制并发数, 每时每刻都持续一定的并发
for ; i < times; i++ {
limit <- true // 写入数据,当buff满了时,阻塞在这里。控制request的协程数量
go func(i int64) {
requestGet(url)
wg.Done()
<-limit // 释放一次请求
}(i)
}
wg.Wait()
close(usedChan)
}
// 处理耗时
func PrecessResponse(times int64) (int64, []int64) {
var total int64
response := make([]int64, 0, times)
for {
used, ok := <-usedChan
if !ok { // 判断channel是否被关闭且没有数据
break
}
total = total + used
response = append(response, used)
}
// 升序
sort.Slice(response, func(i, j int) bool {
return response[i] < response[j]
})
return total, response
}
// 获取response的index
func getIndex(times int64, scale float64) int {
result := float64(times) * scale
index := math.Floor(result)
if times < int64(index) {
return int(times)
}
return int(index)
}
// 执行请求
func requestGet(url string) {
//fmt.Println("request one time")
start := time.Now()
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Get(url)
if err != nil {
panic(err)
}
defer resp.Body.Close()
usedChan <- time.Now().Sub(start).Milliseconds()
}
评论