PHP中on回调的实现(十六节)
- 2020 年 2 月 19 日
- 笔记
各位好,我是老李。和老李一同完成《PHP网络编程》,虽然我知道实际上从头到尾可能只有我一个人在搞。我告诉你们一定要好好在家好好学习、远程工作,不要折腾地自己最后连班都没法上了,要好好学习、要不断学习、要终身学习。

上个章节我送了大家一篇番外:
今天这篇是和上篇番外紧密结合的,因为我答应大家了,要通过今天这一篇中的代码表演一波儿啥叫阻塞、啥叫非阻塞、啥叫异步非阻塞…这年月,听到的异步非阻塞次数太TM多了,似乎每个高IO的程序都离不开这个组合词!
这个词语,席卷八荒,说出去拉风又嚣张
所以呢,今天我们搞一个非常有意思的科研方向,那就是Workerman里的那种on是咋实现的。作为一个24k的泥腿子,php-fpm才是星光大道,复制粘贴是拿手兵器,composer install是撒手锏,CURD一把梭,PHP里的一大坨函数几乎都是[ 同步阻塞 ],复制粘贴起来毫无后顾之忧,上来就是干,最后在在业务里随手搞两个sleep( one ),以后优化响应速度就是这么轻松简单,So easy!哪里不会点哪里~
但是用Workerman或者Nodejs,on是一定避免不了的,天生丽质的[ 异步非阻塞 ]注定会让程序写法变成这样。因为调用方(研究僧)自己不会主动获取数据,靠的是被调用方(阿梅)的通知,所以调用方(研究僧)就只能靠on('某事件')这种方式来实现业务逻辑。
那么,大声的告诉我!!!如果我们基于select IO复用或者epoll IO复用搞一个[ 异步非阻塞 ]的程序,纯PHP的on该如何实现?老李手把手教你分两步基于Libevent-epoll搞坨代码整清楚[ 异步非阻塞 ](epoll按照定义严格意义上表达应该是同步非阻塞),第一步先整明白[ 阻塞 ]与[ 非阻塞 ],第二步去整[ 异步 ]
阻塞/非阻塞
来,之前select那一节的代码,复制粘贴过来:
<?php $host = '0.0.0.0'; $port = 6666; // 创建了一个listen-socket $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); while ( true ) { // socket_accept会阻塞,你可以理解为listen-socket为阻塞IO // 虽然被包在了while循环里,但是不会打空炮不断执行 $connection_socket = socket_accept( $listen_socket ); socket_recv( $connection_socket, $recv_content, 8, MSG_WAITALL ); echo $recv_content.PHP_EOL; socket_close( $connection_socket ); }
上面这个demo里的$listen_socket就是阻塞的,所以当socket_accept()执行的时候会被阻塞,如果你有兴趣想验证一下的话也很简单,你在socket_accept()后面随便echo个内容就行了,while不会打空炮的… …
然后,我们做一个骚操作:通过socket_set_nonblock()函数将$listen_socket变成非阻塞IO。变成非阻塞的意思就是当socket_accpet()调用的时候,如果没有新的客户端连接,程序不会等待而是继续往下执行!
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); // 变成非阻塞IO,就比上面的demo代码多了这一行 // 但是运行起来,结果大大的不同 socket_set_nonblock( $listen_socket ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); while ( true ) { $connection_socket = socket_accept( $listen_socket ); socket_recv( $connection_socket, $recv_content, 8, MSG_WAITALL ); echo $recv_content.PHP_EOL; socket_close( $connection_socket ); }
此时程序运行起来结果却是大大的不同:因为你会发现while循环开始打空炮了而且还会报Notice级别的错误,你的电脑屏幕瞬间会被这种错误文案打满,如果你电脑配置足够低的话,顺便送一次免费重启也不是不可能…

这里无论你用为了规避这种非阻塞导致的错误,有一种馊主意就是在socket_accept()函数前面加上一个@符号,而我们作为高端人士怎么能够容忍这种沙雕写法,必须要要向优雅看齐!优雅如Lavarel(不知道有没有拼错…)。我们只需要需要对socket_accpet()的写法xue微做个小调整即可,这是一个小小的骚操作,然而一骚起来就无法无天:
while ( true ) { if ( false !== ( $connection_socket = socket_accept( $listen_socket ) ) ) { socket_recv( $connection_socket, $recv_content, 8, MSG_WAITALL ); echo $recv_content.PHP_EOL; socket_close( $connection_socket ); } }
怎么样?看起来复杂了一些牛逼了一些,而且确实不报那个Notice了…所以你以为这就是完美的非阻塞了么?真是年轻人…

这段程序跑起来几分钟后,你的笔记本是不是开始脸红浑身发烫了?你贴上去仔细偷听一下,是不是还能听到笔记本开始喘着低沉的粗气了?黝黑而又坚硬的笔记本那滚烫的肌肤,让你实在忍不住了,大手又猛又粗暴地掀开了键盘上那一层薄薄的本就可有可无的覆盖物,你的呼吸也开始低沉而急促了,大脑已经停止了正常理性的思考,有些人甚至已经停下了手里的针线活在浏览器里打开了一个新的标签页并依次输入:www.91********… …
呵,男人~~
你都不问问你本子为啥会发热扇风?看下CPU占用率感受一下?

造成这样的原因是什么,一是我前面文章里解释过,二是这篇文章前面也说过了,三是我提醒你结合下非阻塞我举的例子,如果这样你还想不清楚…
这个$listen_socket变成非阻塞IO本是好事,但是非阻塞导致while循环不断打空炮,如果有客户端请求连接还好,但是没有的时候TA就这么一直打空炮,你想想挂了空档猛踩油门,难受不?那能不毁车?所以非阻塞的最佳应用场景是什么,就是当真有客户端来连接的时候,再在这个非阻塞的$listen_socket上发生accept,说白了[ 非阻塞 ]要结合[ 异步事件 ],这样就避免了打空炮行为同时还能保证[ 非阻塞 ]。
异步
先说好了这里的异步并不是指符合APUE书中定义的那个[ 异步 ],而是指上层代码整体流程的异步,更不是指AIO。这里结合下Select IO复用来简单实现一下初步的流程:
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $client = array( $listen_socket ); while ( true ) { $read = $client; $write = array(); $exception = array(); $ret = socket_select( $read, $write, $exception, NULL ); if ( $ret <= 0 ) { continue; } // 就是说,如果 listen-socket 中有事件,listen-socket能有啥事件:就是用新的客户端来了 if ( in_array( $listen_socket, $read ) ) { $connection_socket = socket_accept( $listen_socket ); if ( !$connection_socket ) { continue; } $client[] = $connection_socket; $key = array_search( $listen_socket, $read ); unset( $read[ $key ] ); } // 对于其他socket foreach( $read as $read_key => $read_fd ) { socket_recv( $read_fd, $recv_content, 1024, 0 ); if ( !$recv_content ) { unset( $client[ $read_key ] ); socket_close( $read_fd ); continue; } echo $recv_content; unset( $client[ $read_key ] ); socket_shutdown( $read_fd ); socket_close( $read_fd ); } }
至于Select IO的用法和详解不是本章重点,重点是:
- 首先你先看下你CPU还打空炮不?
- 其次是如何基于上面代码改造成比较优雅的on~
<?php class Server { private $host = '0.0.0.0'; private $port = 6666; private $listen_socket = null; private $client_array = array(); private $closure_array = array(); public function init() { $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $this->host, $this->port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $this->client = array( $listen_socket ); $this->listen_socket = $listen_socket; } // 这个函数就相当于注册回调函数... public function on( $event_name, Closure $func ) { $this->closure_array[ $event_name ] = $func; } public function run() { while ( true ) { $read = $this->client; $write = array(); $exception = array(); $ret = socket_select( $read, $write, $exception, NULL ); if ( $ret <= 0 ) { continue; } // 就是说,如果 listen-socket 中有事件,listen-socket能有啥事件:就是用新的客户端来了 if ( in_array( $this->listen_socket, $read ) ) { $connection_socket = socket_accept( $this->listen_socket ); if ( !$connection_socket ) { continue; } $this->client[] = $connection_socket; $key = array_search( $this->listen_socket, $read ); unset( $read[ $key ] ); // connect时:触发 $func = isset( $this->closure_array['connect'] ) ? $this->closure_array['connect'] : false ; if ( false !== $func ) { call_user_func_array( $func, array() ); } } // 对于其他socket foreach( $read as $read_key => $read_fd ) { socket_recv( $read_fd, $recv_content, 1024, 0 ); if ( !$recv_content ) { unset( $this->client[ $read_key ] ); socket_close( $read_fd ); continue; } // 收到消息时:触发 $func = isset( $this->closure_array['message'] ) ? $this->closure_array['message'] : false ; if ( false !== $func ) { call_user_func_array( $func, array( $recv_content ) ); } unset( $this->client[ $read_key ] ); socket_shutdown( $read_fd ); socket_close( $read_fd ); } } } } $server = new Server(); $server->init(); // 这里通过on函数来注册 // 这里就是利用了PHP里的Closure,其实下面function就是Closure $server->on( 'connect', function() { echo "触发connect".PHP_EOL; } ); $server->on( 'message', function( $data ) { echo "触发message,收到数据:".$data.PHP_EOL; } ); $server->run();
用telnet客串客户端感受一下?

有些泥腿子们可能之前用过Workerman,Workerman的回调函数方式是$server->onConnect()这种风格的,而我们用的是和Swoole、NodeJS那种靠拢的$server->on( 'action' )风格的,无论用的哪种方式都不重要,因为这些都是上层的表现风格而已,重要的是什么:
- 一、你的PHP基础知识里是否给了Closure一席之地
- 二、你是否知道call_user_func()以及call_user_func_array()
上述两点是实现PHP版本异步回调用法的基石。