Easyswoole的WaitGroup和Csp组件的分析和使用
Easyswoole的WaitGroup和Csp组件的分析和使用
easyswoole可真是个好名字,只是提供了恰到好处的封装,即使是源码也保持了这样的风格。这种风格不论好坏可能都需要各位适应下,哈哈。下面一起来感受下es中的实现吧。
分析
<?php
namespace EasySwoole\Component;
use Swoole\Coroutine\Channel;
class WaitGroup
{
private $count = 0;
/** @var Channel */
private $channel;
private $success = 0;
private $size;
public function __construct(int $size = 128)
{
$this->size = $size;
// 主要的初始化功能在reset中 提供了一个传递任务执行情况的通道
// reset方法 还允许在当前协程多次使用同一个waitgroup变量
$this->reset();
}
public function add()
{
// wg中子任务的计数器
$this->count++;
}
function successNum(): int
{
return $this->success;
}
public function done()
{
// 当子协程任务执行完毕时 向管道中push了一个int 1
$this->channel->push(1);
}
// 最主要的逻辑在这里
// 其中的while循环 相当于在主协程中阻塞 直到全部的任务执行完成或者超时
public function wait(?float $timeout = 15)
{
if ($timeout <= 0) {
$timeout = PHP_INT_MAX;
}
$this->success = 0;
$left = $timeout;
// $this->count是还要等待完成的子协程数量
// $left是超时时间 即主协程继续向下执行还需要等待的时间(其实并不存在主子协程的概念 这里只是为了好区分)
// 当add的任务全部执行完成 或者 超过了设置的timeout 此wait函数执行完毕 程序继续向下执行
while (($this->count > 0) && ($left > 0)) {
$start = round(microtime(true), 3);
if ($this->channel->pop($left) === 1) {
// 每从通道pop一次就给还要执行的任务数量-1
$this->count--;
// 每从通道pop一次就给成功数量+1
$this->success++;
}
// 每循环一次更新一次还要wait的时间
$left = $left - (round(microtime(true), 3) - $start);
}
}
function reset()
{
$this->close();
$this->count = 0;
$this->success = 0;
$this->channel = new Channel($this->size);
}
function close()
{
if ($this->channel) {
$this->channel->close();
$this->channel = null;
}
}
function __destruct()
{
$this->close();
}
}
使用
# 确保开起了swoole的协程hook
$wg = new WaitGroup();
$wg->add();
go(function () use ($wg) {
defer(function () use ($wg) {
$wg->done();
});
sleep(1); # 子协程使用sleep模拟阻塞
var_dump("task1 done");
});
$wg->add();
go(function () use ($wg) {
defer(function () use ($wg) {
$wg->done();
});
sleep(5); # 子协程使用sleep模拟阻塞
var_dump('task2 done');
});
# 调用了wait方法 主协程就阻塞在wait方法中的while循环上 直到全部的子协程执行完毕或者超时才执行后面的代码
$wg->wait(); # 可以尝试在wait中传递较小的超时时间 检查效果
var_dump('main task done');
# 给出swoole协程切换的条件
1 协程中的代码遇到阻塞 如上的sleep
2 协程主动yield让出执行权
3 cpu计算超时 主动让出执行权(详细配置请查看swoole官方文档)
其他的切换条件请大家补充
分析
<?php
namespace EasySwoole\Component;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine;
class Csp
{
private $chan;
private $count = 0;
private $success = 0;
private $task = [];
// 初始化channel
function __construct(int $size = 8)
{
$this->chan = new Channel($size);
}
// 添加要并发执行的任务
function add($itemName, callable $call): Csp
{
$this->count = 0;
$this->success = 0;
$this->task[$itemName] = $call;
return $this;
}
function successNum(): int
{
return $this->success;
}
// 阻塞执行添加的任务 直到全部执行完毕 或者 超时
function exec(?float $timeout = 5)
{
if ($timeout <= 0) {
$timeout = PHP_INT_MAX;
}
$this->count = count($this->task);
// 创建协程并发执行add进来的任务
foreach ($this->task as $key => $call) {
Coroutine::create(function () use ($key, $call) {
// 调用add进来的闭包
$data = call_user_func($call);
// 将执行结果送入通道
$this->chan->push([
'key' => $key,
'result' => $data
]);
});
}
$result = [];
$start = microtime(true);
// 同样是通过while循环 将代码阻塞在exec方法上
while ($this->count > 0) {
// 超时返回false
$temp = $this->chan->pop(1);
if (is_array($temp)) {
$key = $temp['key'];
$result[$key] = $temp['result'];
$this->count--;
$this->success++;
}
// 超时就结束循环 代码向下执行
if (microtime(true) - $start > $timeout) {
break;
}
}
return $result;
}
}
使用
# 执行发现并没有先打印主协程中的 main task done,这是因为Csp的exec方法会阻塞当前协程
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("//www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("//www.easyswoole.com/Cn/Components/Component/waitGroup.html");
});
$res = $csp->exec();
var_dump($res);
go(function () {
var_dump("main task done");
});
# 如此使用触发了协程调度 导致先输出了主协程的main task done
go(function () {
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("//www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("//wiki.swoole.com/#/start/start_tcp_servers");
});
$res = $csp->exec();
var_dump($res);
});
var_dump('main task done');
# 有时候可能需要将csp放到自行创建的协程容器中执行 可以尝试下面配合WaitGroup的方式
$wg = new WaitGroup();
$wg->add();
go(function () use ($wg) {
defer(function () use ($wg) {
$wg->done();
});
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("//www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("//www.easyswoole.com/Cn/Components/Component/waitGroup.html");
});
$res = $csp->exec();
var_dump($res);
});
$wg->wait();
// go(function(){
// var_dump('main task done');
// });
var_dump('main task done');
请勿将以上的代码搬运到go中,因为golang在语言层面完全支持了协程调度,而swoole的协程不仅需要在协程容器中使用,并且实现协程的切换也需要相应的条件。
今天就说到这吧,发现错误欢迎指正,感谢!!!