PHP网络编程之Libevent-EventBuffer(十七节)
- 2020 年 2 月 26 日
- 筆記
大家好,我是已经白热化的老李。
其实今天是接着上一篇文章尾巴的两个问题的延续,一个是多进程(线程)中使用Libevent,另一个是关于触发可写在时候那个需要自己维护的发送数据缓冲区。
众所周知,PHP里还是聊进程吧,线程没法聊。这个问题很简单,我们写个多进程的Libevent服务器一试就知道了,先看下下面这种写法,这种写法简单说就是首先创建一个EventBase对象,再fork之后的每个子进程中创建一个Event对象,也就是说这些子进程会共享同一个EventBase对象但却又分别持有自己的Event对象。
<?php $s_host = '0.0.0.0'; $i_port = 6666; $r_listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $r_listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $r_listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $r_listen_socket, $s_host, $i_port ); socket_listen( $r_listen_socket ); // 将$listen_socket设置为非阻塞IO socket_set_nonblock( $r_listen_socket ); $a_event_array = array(); $a_client_array = array(); function read_callback( $r_connection_socket, $i_event_flag, $o_event_base ) { $s_content = socket_read( $r_connection_socket, 1024 ); // 在这个客户端连接socket上添加 读事件 // 当这个客户端连接socket一旦满足可写条件,我们就可以向socket中写数据了 global $a_event_array; global $a_client_array; $o_write_event = new Event( $o_event_base, $r_connection_socket, Event::WRITE | Event::PERSIST, 'write_callback', array( 'content' => $s_content, ) ); $o_write_event->add(); $a_event_array[ intval( $r_connection_socket ) ]['write'] = $o_write_event; } function write_callback( $r_connection_socket, $i_event_flag, $a_data ) { global $a_event_array; global $a_client_array; $s_content = $a_data['content']; foreach( $a_client_array as $r_target_socket ) { if ( intval( $r_target_socket ) != intval( $r_connection_socket ) ) { socket_write( $r_target_socket, $s_content, strlen( $s_content ) ); } } $o_event = $a_event_array[ intval( $r_connection_socket ) ]['write']; $o_event->del(); unset( $a_event_array[ intval( $r_connection_socket ) ]['write'] ); } function accept_callback( $r_listen_socket, $i_event_flag, $o_event_base ) { echo posix_getpid().PHP_EOL; global $a_event_array; global $a_client_array; // socket_accept接受连接,生成一个新的socket,一个客户端连接socket $r_connection_socket = socket_accept( $r_listen_socket ); $a_client_array[] = $r_connection_socket; // 在这个客户端连接socket上添加 读事件 // 也就说 要从客户端连接上读取消息 $o_read_event = new Event( $o_event_base, $r_connection_socket, Event::READ | Event::PERSIST, 'read_callback', $o_event_base ); $o_read_event->add(); $a_event_array[ intval( $r_connection_socket ) ]['read'] = $o_read_event; } // 创建event-base $o_event_base = new EventBase(); $s_method_name = $o_event_base->getMethod(); if ( 'epoll' != $s_method_name ) { exit( "not epoll" ); } for( $i = 1; $i <= 4; $i++ ) { $i_pid = pcntl_fork(); if ( 0 == $i_pid ) { $o_event = new Event( $o_event_base, $r_listen_socket, Event::READ | Event::PERSIST, 'accept_callback', $o_event_base ); $o_event->add(); $o_event_base->loop(); } } while( true ) { sleep( 1 ); }
用telnet连接上去随便发个消息,不出意外应该会报下面这个错:
(题外话:这个错误非常有名,有兴趣的同学可以拿报错信息关键字去Google一下,你应该会得到不少有用的消息)
然后更改一下上面的代码,更正为每个子进程分别初始化自己的EventBase对象以及Event对象,看看还会不会有问题(下面代码仅仅是关键部分):
<?php for( $i = 1; $i <= 4; $i++ ) { $i_pid = pcntl_fork(); if ( 0 == $i_pid ) { // 创建event-base $o_event_base = new EventBase(); $s_method_name = $o_event_base->getMethod(); $o_event = new Event( $o_event_base, $r_listen_socket, Event::READ | Event::PERSIST, 'accept_callback', $o_event_base ); $o_event->add(); $o_event_base->loop(); } }
这次理论上是没有问题的,你们复制粘贴走亲自试一下。结论就是实际上Libevent并不是线程安全的,如果你要在多线程(进程)中使用时候,注意EventBase最好不要共享,如果一定要共享同一个EventBase对象,你就只能给这个EventBase对象加锁…
从这个实践中我们可以稍微升华思考一下:就是为什么Redis考虑使用并看起来坚持使用单进程单线程来保证主流程(保证主流程的意思就是非核心业务逻辑中的fork()或者thread_create()都不算)。太方便了,无论是在事件方面还是数据操作方面,都不用再纠结加锁的问题了,坚决不为自己找麻烦…
然后我们接着说下关于那个发送数据自定义缓冲区的问题,实际上这个问题是和EventBufferEvent类、EventBuffer类息息相关了,我个人认为这两个类是仅次于EventBase和Event、EventConfig之外最重要的类了。其实EventBufferEvent就是Libevent替我们封装好的缓冲区类,用起来就是复制粘贴、没啥心智负担,这五个类加起来就是Libevent的五大护教法王,掌握了这五个类(五大护教法王来自于《倚天屠龙记》中鼎盛时期的明教)剩余其他的类就都是小杂碎了…
简单说下EventBufferEvent的逻辑,这个类定义了两种缓冲区:
- 读缓冲区
- 写缓冲区
而这两种缓冲区又都分别设置了如下两个水位:
- 低水位
- 高水位
所以根据两两自由组合,一共得到四种结果:
- 读缓冲区的低水位
- 读缓冲区的高水位
- 写缓冲区的低水位
- 写缓冲区的高水位
下面开始按照顺序来依次说下四个结果分别代表什么含义,整清楚了这四个结果,这节也基本上也就可以告一段落了,如果你想参考Workerman代码那么我得告诉你:很可惜Workerman里并没有相关资料,因为截止到目前为止Workeman里暂时没有用到这些。
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $client_arr = array(); $event_arr = array(); $client_arr[ intval( $listen_socket ) ] = $listen_socket; function ev_read( $event_buffer_event, $connect_socket ) { global $client_arr; global $event_arr; $recv_content = $event_buffer_event->read( 1024 ); echo "read cb : ".$recv_content; //$event_buffer_event->readBuffer( $recv_content ); } function ev_accept( $listen_socket, $what, $event_base ) { global $client_arr; global $event_arr; // accept后,listen-socket上的可读事件将会被清空 $connect_socket = socket_accept( $listen_socket ); socket_set_nonblock( $connect_socket ); if ( !$connect_socket ) { return; } // 按照以往,我们在这里注册ev_read事件回调 // 然而现如今,我们不能再用Event了 // 而是需要用EventBufferEvent来代替传统的Event类 // 也就是带有EventBuffer功能的增强版的Event类 //$client_event = new Event( $event_base, $connect_socket, Event::READ | Event::PERSIST, 'ev_read', $event_base ); //$client_event->add(); // 这个是EventBufferEvent的初始化方法,大家可以参考文档里原型 // 我就不再赘述原型了 $client_event = new EventBufferEvent( $event_base, $connect_socket, 0 ); // 设置回调,我们先研究读事件 // 这里只设置了读事件回调 $client_event->setCallbacks( 'ev_read', NULL, NULL, $connect_socket ); $client_event->setTimeouts( 30, 30 ); // 给读缓冲区设置 低水位 和 高水位 $client_event->setWatermark( Event::READ, 2, 1024 ); $client_event->setPriority( 10 ); $client_event->enable( Event::READ ); $client_arr[ intval( $connect_socket ) ] = $connect_socket; $event_arr[ intval( $connect_socket ) ] = $client_event; } $event_config = new EventConfig(); $event_base = new EventBase( $event_config ); $event = new Event( $event_base, $listen_socket, Event::READ | Event::PERSIST, 'ev_accept', $event_base ); $event->add(); $event_base->loop();
首先,你把代码复制粘贴走跑起来,然后用telnet连接上去注意一个字都别发,然后直接敲回车观察一下服务器;然后你把低水位的2设置为3,然后继续重复上述telnet操作,注意观察服务器;然后你把低水位设置成4,不过这次就别干巴巴地只敲回车了,你尝试输入一个字符「a」回车发送,然后再输入「ab」回车发送,然后再输入「abc」回车发送,然后再输入「abcd」回车发送,然后再输入「abcde」回车发送,分别观察服务器然后再思考下,读缓冲区低水位的含义就很明显摆在眼前了(友情提示:不要忘了考虑telnet发送命令时末尾不可见的字符):所以读缓冲区低水位的含义就是读取操作累计的输入的数据大于等于低水位的时候,将会触发读回调函数,如果低水位设置为0表示每个读取操作都会引发读回调函数。
然后我们将注意力放到高水位的1024上去,我们将这个这个数字调整为6,然后我们输入字符串「1234567」试试…你可以用不同长度的字符进行测试,再次提示:不要忽略忘记telnet命令结尾不可见的字符。所以读缓冲区的高水位的含义也就摆在眼前了:输入数据的大小累计超过高水位后,将会停止读入一直到数据被读出,一直到剩余数据量再次低于高水位;如果该值被设置为0就表示没有上限。
然后我们在上述demo中加入写回调,开始逐渐研究一下输出缓冲区的高低水位含义与表现:
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $client_arr = array(); $event_arr = array(); $client_arr[ intval( $listen_socket ) ] = $listen_socket; function ev_write( $event_buffer_event, $connect_socket ) { global $client_arr; global $event_arr; echo "ev_write callback".PHP_EOL; // 下面这行代码先注释掉... // $event_buffer_event->write( "hi".PHP_EOL ); // 首先是生产环境,一定不能加这句...不然会死的很惨烈 // 其次是测试研究阶段,建议加上,不然配置低的也可能会死的很惨烈 sleep( 1 ); } function ev_accept( $listen_socket, $what, $event_base ) { global $client_arr; global $event_arr; // accept后,listen-socket上的可读事件将会被清空 $connect_socket = socket_accept( $listen_socket ); socket_set_nonblock( $connect_socket ); if ( !$connect_socket ) { return; } $client_event = new EventBufferEvent( $event_base, $connect_socket, EventBufferEvent::OPT_DEFER_CALLBACKS ); $client_event->setCallbacks( NULL, 'ev_write', NULL, $connect_socket ); $client_event->setTimeouts( 30, 2 ); $client_event->setWatermark( Event::WRITE, 0, 4 ); $client_event->enable( Event::WRITE ); $client_arr[ intval( $connect_socket ) ] = $connect_socket; $event_arr[ intval( $connect_socket ) ] = $client_event; } $event_config = new EventConfig(); $event_base = new EventBase( $event_config ); $event = new Event( $event_base, $listen_socket, Event::READ | Event::PERSIST, 'ev_accept', $event_base ); $event->add(); $event_base->loop();
上述代码复制粘贴走跑起来,然后继续用telnet客串一下我们的客户端,如果不出意外你连接上去后,服务器这里只会显示一次「ev_write callback」后就直接挺尸了,倒也不是说代码死机了,就是挺尸了,总之就是再也没反应了,你就是在telnet里狂拍回车TA也没任何反应…但是如果我们把第18行代码的注释去掉,再次运行服务端和客户端,你会看到新的世界…
会不会是输出缓冲区低水位为0导致的呢?
然而即便你把低水位0修改成大于0的数字,你依然会发现你的修改就是无谓的垂死挣扎,没有用的,你喊破嗓子都没用的,写回调依然会被不断触发,这是为啥呢?是这样的,输出缓冲区的低水位意义这样shai儿的:当输出缓冲区中的数据达到或者低于此水位时候,写回调就会被触发,默认是0时候表示只有当输出缓冲区为空的时候才会触发写回调。当然了,当低水位为2时,当输出缓冲区为空也一定会触发写回调。
我感觉这句话不太好理解…你仔细品
你们都知道,之前我一直重复说遇到实在理解不了的:就先背过,不定哪天突然就开窍了,前提是这个问题需要在你心中念念不忘。
然后是输出缓冲区的高水平位,呃… …那个咋说呢,实际上EventBufferVent并不会直接使用高水平位,不太好演示,这个我先琢磨下怎么说吧,后面文章里我会在恰好合适的时候补充说明;除此之外,还有EventBuffer类还没有,留个下一章咯~