PHP網路編程之Libevent-EventBuffer(十七節)
- 2020 年 2 月 26 日
- 筆記
大家好,我是已經白熱化的老李。
其實今天是接著上一篇文章尾巴的兩個問題的延續,一個是多進程(執行緒)中使用Libevent,另一個是關於觸發可寫在時候那個需要自己維護的發送數據緩衝區。
眾所周知,PHP里還是聊進程吧,執行緒沒法聊。這個問題很簡單,我們寫個多進程的Libevent伺服器一試就知道了,先看下下面這種寫法,這種寫法簡單說就是首先創建一個EventBase對象,再fork之後的每個子進程中創建一個Event對象,也就是說這些子進程會共享同一個EventBase對象但卻又分別持有自己的Event對象。
<?php $s_host = '0.0.0.0'; $i_port = 6666; $r_listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $r_listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $r_listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $r_listen_socket, $s_host, $i_port ); socket_listen( $r_listen_socket ); // 將$listen_socket設置為非阻塞IO socket_set_nonblock( $r_listen_socket ); $a_event_array = array(); $a_client_array = array(); function read_callback( $r_connection_socket, $i_event_flag, $o_event_base ) { $s_content = socket_read( $r_connection_socket, 1024 ); // 在這個客戶端連接socket上添加 讀事件 // 當這個客戶端連接socket一旦滿足可寫條件,我們就可以向socket中寫數據了 global $a_event_array; global $a_client_array; $o_write_event = new Event( $o_event_base, $r_connection_socket, Event::WRITE | Event::PERSIST, 'write_callback', array( 'content' => $s_content, ) ); $o_write_event->add(); $a_event_array[ intval( $r_connection_socket ) ]['write'] = $o_write_event; } function write_callback( $r_connection_socket, $i_event_flag, $a_data ) { global $a_event_array; global $a_client_array; $s_content = $a_data['content']; foreach( $a_client_array as $r_target_socket ) { if ( intval( $r_target_socket ) != intval( $r_connection_socket ) ) { socket_write( $r_target_socket, $s_content, strlen( $s_content ) ); } } $o_event = $a_event_array[ intval( $r_connection_socket ) ]['write']; $o_event->del(); unset( $a_event_array[ intval( $r_connection_socket ) ]['write'] ); } function accept_callback( $r_listen_socket, $i_event_flag, $o_event_base ) { echo posix_getpid().PHP_EOL; global $a_event_array; global $a_client_array; // socket_accept接受連接,生成一個新的socket,一個客戶端連接socket $r_connection_socket = socket_accept( $r_listen_socket ); $a_client_array[] = $r_connection_socket; // 在這個客戶端連接socket上添加 讀事件 // 也就說 要從客戶端連接上讀取消息 $o_read_event = new Event( $o_event_base, $r_connection_socket, Event::READ | Event::PERSIST, 'read_callback', $o_event_base ); $o_read_event->add(); $a_event_array[ intval( $r_connection_socket ) ]['read'] = $o_read_event; } // 創建event-base $o_event_base = new EventBase(); $s_method_name = $o_event_base->getMethod(); if ( 'epoll' != $s_method_name ) { exit( "not epoll" ); } for( $i = 1; $i <= 4; $i++ ) { $i_pid = pcntl_fork(); if ( 0 == $i_pid ) { $o_event = new Event( $o_event_base, $r_listen_socket, Event::READ | Event::PERSIST, 'accept_callback', $o_event_base ); $o_event->add(); $o_event_base->loop(); } } while( true ) { sleep( 1 ); }
用telnet連接上去隨便發個消息,不出意外應該會報下面這個錯:
(題外話:這個錯誤非常有名,有興趣的同學可以拿報錯資訊關鍵字去Google一下,你應該會得到不少有用的消息)
然後更改一下上面的程式碼,更正為每個子進程分別初始化自己的EventBase對象以及Event對象,看看還會不會有問題(下面程式碼僅僅是關鍵部分):
<?php for( $i = 1; $i <= 4; $i++ ) { $i_pid = pcntl_fork(); if ( 0 == $i_pid ) { // 創建event-base $o_event_base = new EventBase(); $s_method_name = $o_event_base->getMethod(); $o_event = new Event( $o_event_base, $r_listen_socket, Event::READ | Event::PERSIST, 'accept_callback', $o_event_base ); $o_event->add(); $o_event_base->loop(); } }
這次理論上是沒有問題的,你們複製粘貼走親自試一下。結論就是實際上Libevent並不是執行緒安全的,如果你要在多執行緒(進程)中使用時候,注意EventBase最好不要共享,如果一定要共享同一個EventBase對象,你就只能給這個EventBase對象加鎖…
從這個實踐中我們可以稍微升華思考一下:就是為什麼Redis考慮使用並看起來堅持使用單進程單執行緒來保證主流程(保證主流程的意思就是非核心業務邏輯中的fork()或者thread_create()都不算)。太方便了,無論是在事件方面還是數據操作方面,都不用再糾結加鎖的問題了,堅決不為自己找麻煩…
然後我們接著說下關於那個發送數據自定義緩衝區的問題,實際上這個問題是和EventBufferEvent類、EventBuffer類息息相關了,我個人認為這兩個類是僅次於EventBase和Event、EventConfig之外最重要的類了。其實EventBufferEvent就是Libevent替我們封裝好的緩衝區類,用起來就是複製粘貼、沒啥心智負擔,這五個類加起來就是Libevent的五大護教法王,掌握了這五個類(五大護教法王來自於《倚天屠龍記》中鼎盛時期的明教)剩餘其他的類就都是小雜碎了…
簡單說下EventBufferEvent的邏輯,這個類定義了兩種緩衝區:
- 讀緩衝區
- 寫緩衝區
而這兩種緩衝區又都分別設置了如下兩個水位:
- 低水位
- 高水位
所以根據兩兩自由組合,一共得到四種結果:
- 讀緩衝區的低水位
- 讀緩衝區的高水位
- 寫緩衝區的低水位
- 寫緩衝區的高水位
下面開始按照順序來依次說下四個結果分別代表什麼含義,整清楚了這四個結果,這節也基本上也就可以告一段落了,如果你想參考Workerman程式碼那麼我得告訴你:很可惜Workerman里並沒有相關資料,因為截止到目前為止Workeman里暫時沒有用到這些。
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $client_arr = array(); $event_arr = array(); $client_arr[ intval( $listen_socket ) ] = $listen_socket; function ev_read( $event_buffer_event, $connect_socket ) { global $client_arr; global $event_arr; $recv_content = $event_buffer_event->read( 1024 ); echo "read cb : ".$recv_content; //$event_buffer_event->readBuffer( $recv_content ); } function ev_accept( $listen_socket, $what, $event_base ) { global $client_arr; global $event_arr; // accept後,listen-socket上的可讀事件將會被清空 $connect_socket = socket_accept( $listen_socket ); socket_set_nonblock( $connect_socket ); if ( !$connect_socket ) { return; } // 按照以往,我們在這裡註冊ev_read事件回調 // 然而現如今,我們不能再用Event了 // 而是需要用EventBufferEvent來代替傳統的Event類 // 也就是帶有EventBuffer功能的增強版的Event類 //$client_event = new Event( $event_base, $connect_socket, Event::READ | Event::PERSIST, 'ev_read', $event_base ); //$client_event->add(); // 這個是EventBufferEvent的初始化方法,大家可以參考文檔里原型 // 我就不再贅述原型了 $client_event = new EventBufferEvent( $event_base, $connect_socket, 0 ); // 設置回調,我們先研究讀事件 // 這裡只設置了讀事件回調 $client_event->setCallbacks( 'ev_read', NULL, NULL, $connect_socket ); $client_event->setTimeouts( 30, 30 ); // 給讀緩衝區設置 低水位 和 高水位 $client_event->setWatermark( Event::READ, 2, 1024 ); $client_event->setPriority( 10 ); $client_event->enable( Event::READ ); $client_arr[ intval( $connect_socket ) ] = $connect_socket; $event_arr[ intval( $connect_socket ) ] = $client_event; } $event_config = new EventConfig(); $event_base = new EventBase( $event_config ); $event = new Event( $event_base, $listen_socket, Event::READ | Event::PERSIST, 'ev_accept', $event_base ); $event->add(); $event_base->loop();
首先,你把程式碼複製粘貼走跑起來,然後用telnet連接上去注意一個字都別發,然後直接敲回車觀察一下伺服器;然後你把低水位的2設置為3,然後繼續重複上述telnet操作,注意觀察伺服器;然後你把低水位設置成4,不過這次就別乾巴巴地只敲回車了,你嘗試輸入一個字元「a」回車發送,然後再輸入「ab」回車發送,然後再輸入「abc」回車發送,然後再輸入「abcd」回車發送,然後再輸入「abcde」回車發送,分別觀察伺服器然後再思考下,讀緩衝區低水位的含義就很明顯擺在眼前了(友情提示:不要忘了考慮telnet發送命令時末尾不可見的字元):所以讀緩衝區低水位的含義就是讀取操作累計的輸入的數據大於等於低水位的時候,將會觸發讀回調函數,如果低水位設置為0表示每個讀取操作都會引發讀回調函數。
然後我們將注意力放到高水位的1024上去,我們將這個這個數字調整為6,然後我們輸入字元串「1234567」試試…你可以用不同長度的字元進行測試,再次提示:不要忽略忘記telnet命令結尾不可見的字元。所以讀緩衝區的高水位的含義也就擺在眼前了:輸入數據的大小累計超過高水位後,將會停止讀入一直到數據被讀出,一直到剩餘數據量再次低於高水位;如果該值被設置為0就表示沒有上限。
然後我們在上述demo中加入寫回調,開始逐漸研究一下輸出緩衝區的高低水位含義與表現:
<?php $host = '0.0.0.0'; $port = 6666; $listen_socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEADDR, 1 ); socket_set_option( $listen_socket, SOL_SOCKET, SO_REUSEPORT, 1 ); socket_bind( $listen_socket, $host, $port ); socket_listen( $listen_socket ); socket_set_nonblock( $listen_socket ); $client_arr = array(); $event_arr = array(); $client_arr[ intval( $listen_socket ) ] = $listen_socket; function ev_write( $event_buffer_event, $connect_socket ) { global $client_arr; global $event_arr; echo "ev_write callback".PHP_EOL; // 下面這行程式碼先注釋掉... // $event_buffer_event->write( "hi".PHP_EOL ); // 首先是生產環境,一定不能加這句...不然會死的很慘烈 // 其次是測試研究階段,建議加上,不然配置低的也可能會死的很慘烈 sleep( 1 ); } function ev_accept( $listen_socket, $what, $event_base ) { global $client_arr; global $event_arr; // accept後,listen-socket上的可讀事件將會被清空 $connect_socket = socket_accept( $listen_socket ); socket_set_nonblock( $connect_socket ); if ( !$connect_socket ) { return; } $client_event = new EventBufferEvent( $event_base, $connect_socket, EventBufferEvent::OPT_DEFER_CALLBACKS ); $client_event->setCallbacks( NULL, 'ev_write', NULL, $connect_socket ); $client_event->setTimeouts( 30, 2 ); $client_event->setWatermark( Event::WRITE, 0, 4 ); $client_event->enable( Event::WRITE ); $client_arr[ intval( $connect_socket ) ] = $connect_socket; $event_arr[ intval( $connect_socket ) ] = $client_event; } $event_config = new EventConfig(); $event_base = new EventBase( $event_config ); $event = new Event( $event_base, $listen_socket, Event::READ | Event::PERSIST, 'ev_accept', $event_base ); $event->add(); $event_base->loop();
上述程式碼複製粘貼走跑起來,然後繼續用telnet客串一下我們的客戶端,如果不出意外你連接上去後,伺服器這裡只會顯示一次「ev_write callback」後就直接挺屍了,倒也不是說程式碼死機了,就是挺屍了,總之就是再也沒反應了,你就是在telnet里狂拍回車TA也沒任何反應…但是如果我們把第18行程式碼的注釋去掉,再次運行服務端和客戶端,你會看到新的世界…
會不會是輸出緩衝區低水位為0導致的呢?
然而即便你把低水位0修改成大於0的數字,你依然會發現你的修改就是無謂的垂死掙扎,沒有用的,你喊破嗓子都沒用的,寫回調依然會被不斷觸發,這是為啥呢?是這樣的,輸出緩衝區的低水位意義這樣shai兒的:當輸出緩衝區中的數據達到或者低於此水位時候,寫回調就會被觸發,默認是0時候表示只有當輸出緩衝區為空的時候才會觸發寫回調。當然了,當低水位為2時,當輸出緩衝區為空也一定會觸發寫回調。
我感覺這句話不太好理解…你仔細品
你們都知道,之前我一直重複說遇到實在理解不了的:就先背過,不定哪天突然就開竅了,前提是這個問題需要在你心中念念不忘。
然後是輸出緩衝區的高水平位,呃… …那個咋說呢,實際上EventBufferVent並不會直接使用高水平位,不太好演示,這個我先琢磨下怎麼說吧,後面文章里我會在恰好合適的時候補充說明;除此之外,還有EventBuffer類還沒有,留個下一章咯~