基于 swoole 的压测工具

用户头像
而立
关注
发布于: 2020 年 07 月 22 日

背景:

使用php语言,开发一个压测工具。

要求能接收请求地址、请求次数、请求并发数,执行请求,得到并输出结果,结果包括:平均响应时间,95%响应时间。

使用该工具压测:www.baidu.com

实现思路:

为了模拟并发请求,借助swoole的多进程和协程编程。

根据请求并发数,超过一定值(默认给了30,可以修改),计算应拉起的进程数,进程数最大不超过当前机器的cpu数量。

每个进程内使用协程模拟并发请求。

得到结果通过socket通信,发送给主进程计算并展示。

运行环境要求:

php >= 7.2

swoole >= 4.5

基本的流程:

代码:

<?php
require_once(__DIR__ . '/BenchmarkApplicationImplV1.php');
use Swoole\Coroutine;
//swoole hook curl
Swoole\Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL]);
$totalRequest = 30000;
$concurrent = 500;
$startTime = microtime(true);
var_dump("起始时间:" . $startTime . "; 并发数:" . $concurrent . "; 请求数:" . $totalRequest);
$application = new BenchmarkApplicationImplV1('http://www.baidu.com', $totalRequest, $concurrent);
$application->start();
$endTime = microtime(true);
var_dump("结束时间:" . $endTime . ",耗时:" . ($endTime - $startTime));



<?php
/**
* 主调度类接口
*/
interface BenchmarkApplication {
/**
* 启动请求。
*/
public function start();
}



<?php
require_once(__DIR__ . '/BenchmarkApplication.php');
require_once(__DIR__ . '/Request.php');
require_once(__DIR__ . '/TaskUtilImplV1.php');
use Swoole\Process;
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
/**
* 主进程实现类
*/
class BenchmarkApplicationImplV1 implements BenchmarkApplication
{
private $request;
private $totalRequest;
private $concurrent;
private $cpuNum;
/**
* 结果数组
*/
private $resultArr;
private $resultEnd = false;
/**
* 运行时进程数
*
* [
* [
* 'total_request' => 100,
* 'concurrent' => 10,
* ],
* [
* 'total_request' => 100,
* 'concurrent' => 11,
* ]
* ]
*/
private $runParam;
/**
* 默认单个进程可以处理的并发数
*/
private $defaultConcurentOneProcess = 30;
/**
* 初始化启动。
* 接收url,请求总数,并发数。
* 返回一个当前类的对象。
* 根据cpu个数,当前请求数,计算应启动的进程数,以及每个进程需要处理的请求个数
*/
public function __construct(string $url, int $totalRequest, int $concurrent)
{
$this->request = new Request();
$this->request->setUrl($url);
$this->cpuNum = $this->getCpuNum();
$this->totalRequest = $totalRequest;
$this->concurrent = $concurrent;
$this->resultArr = [];
}
/**
* 获取当前机器的cpu数量
* @return
*/
protected function getCpuNum(): int
{
return swoole_cpu_num();
}
/**
* 启动请求。
*/
public function start()
{
$this->init();
$this->runProcess();
}
public function getResult()
{
return $this->resultArr;
}
/**
* 根据并发数、总请求数、cpu数
* 计算应启动进程数、每个进程分配的请求数、每个进程的并发数
*/
protected function init()
{
$this->runParam = [];
//如果并发数超过 cpu数量 * 默认进程处理的并发数
//那么进程数 = cpu数量
//否则 单进程并发数 = 总并发数 / 进程数
if (ceil($this->concurrent / $this->defaultConcurentOneProcess) >= $this->cpuNum) {
$processNum = $this->cpuNum;
} else {
$processNum = ceil($this->concurrent / $this->defaultConcurentOneProcess);
}
//根据进程数计算平均请求数和平均并发数
//除不尽的部分归到最后一个进程上
$aveRequest = floor($this->totalRequest / $processNum);
$aveConcurremt = floor($this->concurrent / $processNum);
while ($processNum--) {
if ($this->totalRequest <= 0) {
break;
}
$tmp = [];
$tmp['total_request'] = $aveRequest;
$tmp['concurrent'] = $aveConcurremt;
$this->totalRequest -= $aveRequest;
$this->concurrent -= $aveConcurremt;
if ($this->totalRequest < $aveRequest) {
$tmp['total_request'] += $this->totalRequest;
$this->totalRequest = 0;
}
if ($this->concurrent < $aveConcurremt) {
$tmp['concurrent'] += $this->concurrent;
$this->concurrent = 0;
}
$this->runParam[] = $tmp;
}
}
/**
* 执行进程
*/
protected function runProcess()
{
$process = [];
//创建进程
foreach ($this->runParam as $oneProcessParam) {
//延迟1S启动,对齐启动时间
$timeStart = time() + 1;
$process[] = new Process(function (Process $proc) use ($oneProcessParam, $timeStart) {
$socket = $proc->exportSocket();
$taskUtil = new TaskUtilImplV1($this->request, $oneProcessParam['total_request'], $oneProcessParam['concurrent']);
while (time() < $timeStart) {
//阻塞等待,时间对齐
}
$taskUtil->doTask();
$result = $taskUtil->getResult();
foreach ($result as $one) {
$socket->send($one);
}
$socket->close();
$proc->exit();
}, false, 2, 1);
}
//启动进程
$sockets = [];
foreach ($process as $oneProcess) {
$oneProcess->start();
$sockets[] = $oneProcess->exportSocket();
}
//主进程开多个协程,等待子进程返回数据
$this->resultEnd = false;
Coroutine::create(function () use ($sockets) {
$wg = new WaitGroup();
foreach ($sockets as $oneSocket) {
$wg->add();
Coroutine::create(function () use ($wg, $oneSocket) {
$countResult = 0;
while ($oneSocket->checkLiveness()) {
$oneResult = $oneSocket->recv(65535, 1);
if (!empty($oneResult)) {
$countResult++;
$this->resultArr[] = $oneResult;
}
}
$wg->done();
});
}
$wg->wait();
//打印结果
$this->dumpResult();
});
foreach ($process as $oneProcess) {
$oneProcess->wait(true);
}
Process::wait(true);
}
/**
* 输出结果
*/
private function dumpResult() {
//输出结果
$totalCost = array_sum($this->resultArr);
var_dump("结果总数:". count($this->resultArr));
var_dump("总耗时:". $totalCost. " s");
var_dump("平均耗时:". bcdiv($totalCost, count($this->resultArr), 4). " s");
sort($this->resultArr);
//截取其中的90%
$this->resultArr = array_slice($this->resultArr, 0, floor(count($this->resultArr) * 0.95) );
var_dump("95% 数据");
$totalCost = array_sum($this->resultArr);
var_dump("结果总数:". count($this->resultArr));
var_dump("总耗时:". $totalCost. " s");
var_dump("平均耗时:". bcdiv($totalCost, count($this->resultArr), 4). " s");
}
}



<?php
/**
* 子进程处理单元接口类
*/
interface TaskUtil {
/**
* 执行任务
*/
public function doTask();
}



<?php
require_once(__DIR__ . '/TaskUtil.php');
require_once(__DIR__ . '/Request.php');
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
/**
* 子进程处理单元
* 请求任务执行单元
* 根据并发数启动对应的协程执行请求
*/
class TaskUtilImplV1 implements TaskUtil
{
/**
* @var Request
*/
private $request;
/**
* 请求总数
*/
private $totalRequest;
/**
* 并发数
*/
private $concurrent;
private $result;
/**
* 初始化。
* 接收请求任务对象,总请求数。
* 返回当前类的实例。
*/
public function __construct(Request $request, int $totalRequest, int $concurrent)
{
$this->request = $request;
$this->totalRequest = $totalRequest;
$this->concurrent = $concurrent;
$this->resultRequest = [];
}
/**
* 执行任务
*/
public function doTask()
{
$totalRequest = $this->totalRequest;
$concurrent = $this->concurrent;
//启动 $concurrent 个协程处理请求,平均每个处理为 $totalRequest / $concurrent
$aveRuest = floor($totalRequest / $concurrent);
$wg = new WaitGroup();
while ($concurrent--) {
//计算当前协程应请求个数,除不尽的个数归到最后一个协程
$requestNum = $aveRuest;
$totalRequest -= $aveRuest;
$totalRequest < $aveRuest && $requestNum += $totalRequest;
//启动协程执行请求,每个协程启动
$wg->add();
Coroutine::create(function () use ($wg, $requestNum) {
while ($requestNum--) {
$timeStart = microtime(true);
Request::request($this->request->getUrl());
//保存时间
$this->resultRequest[] = microtime(true) - $timeStart;
}
$wg->done();
});
}
$wg->wait();
}
public function getResult()
{
return $this->resultRequest;
}
}



<?php
/**
* 请求url记录
* 结合本次作业,做了简单处理
*/
class Request {
private $url;
/**
* Default constructor
*/
public function __construct() {
}
/**
* 设置请求的参数
*/
public function setUrl(string $url) {
$this->url = $url;
}
public function getUrl() {
return $this->url;
}
/**
* 请求执行单元
*结合本次作业,简单做了一个请求单元,如果实际使用,当前类需要重构
*/
public static function request($url) {
$headerArray =array("Content-type:application/json;","Accept:application/json");
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch,CURLOPT_HTTPHEADER,$headerArray);
$output = curl_exec($ch);
curl_close($ch);
$output = json_decode($output,true);
return $output;
}
}

结果展示



用户头像

而立

关注

还未添加个人签名 2018.01.25 加入

还未添加个人简介

评论

发布
暂无评论
基于swoole的压测工具