swoole(6)Task非同步任務

一:什麼是task進程?

task進程是獨立與worker進程的一組進程  ,他主要處理耗時較長的業務邏輯,並且不影響worker進程處理客戶端的請求.worker進程通過task()函數把數據投遞到Task進程去處理

開啟task功能(task功能是默認關閉的 ,開啟task功能需要滿足2個條件) 

  1. 配置task進程的數量(task_worker_num)
  2. 註冊task的回掉函數onTask和onfinish

二:task設計流程

  1. worker進程中,我們調用對應的task()方法發送數據通知到task worker進程
  2. task worker進程會在onTask()回調中接收到這些數據,並進行處理
  3. 處理完會可以通過finish函數或直接return消息給worker進程
  4. worker進程在onFinish()進程收到消息並進行處理

程式碼:

<?php  /**   * Created by PhpStorm.   * User: huahua   * Date: 2020/3/6   * Time: 上午10:09   */    //基於第一個 TCP 伺服器,只需要增加 onTask 和 onFinish2 個事件回調函數即可。  //另外需要設置 task 進程數量,可以根據任務的耗時和任務量配置適量的 task 進程。    $serv = new SwooleServer('127.0.0.1',9800);    $serv->set([      'worker_num'=>2, //設置進程      //配置此參數後將會啟用 task 功能。所以 Server 務必要註冊 onTask、onFinish 2 個事件回調函數。如果沒有註冊,伺服器程式將無法啟動。      'task_worker_num' => 4, //配置 Task 進程的數量。      'task_ipc_mode'=>2,//設置 Task 進程與 Worker 進程之間通訊的方式  ]);  //監聽連接進入事件  $serv->on('Connect', function ($serv, $fd) {      echo "Client: Connect.n";  });    //監聽數據接收事件  $serv->on('Receive', function ($serv, $fd, $from_id, $data) {      $data=['tid'=>time()];      $task_id = $serv->task($data);////投遞到taskWorker進程組      echo "task id = {$task_id},from id = $from_id".PHP_EOL;  });    $serv->on('task',function ($serv, $task_id, $from_id, $data){      echo "task id = {$task_id},from id = $from_id,進程id =".posix_getpid().PHP_EOL;      echo '我是task任務';      $serv->finish("data -> ok");  });    $serv->on('finish',function ($serv, $task_id, $data){      echo "task id = {$task_id}".PHP_EOL;  });  //監聽連接關閉事件  $serv->on('Close', function ($serv, $fd) {      echo "Client: Close.n";  });    //啟動伺服器  $serv->start();

cli下運行結果:

$task_id不等於workerStart中的worker_id,而是swoole維護的任務自增長id  $from_id表示來自於哪個woker投遞而來的任務,id等於worker_id值  $serv->worker_id等於workerStart中的worker_id, 此處是task_woker的id  $serv->worker_pid 此處是task_woker的進程pid

 三:task任務切分

場景:假設有一台伺服器專門處理前台投遞的數據,利用簡單的任務拆分,分配到相應的進程去處理

思路:

  • 1.將一個大的任務拆分成相應份數(是由$task_worker_num數量來確定)
  • 2.通過foreach循環將數據投遞到指定的task進程,範圍是(0-(task_worker_num-1))區間之內
  • 3.執行失敗的任務,需要保留,重新投遞執行(進程間通訊管道方式)
$server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){      for ($i=0;$i<100;$i++){          $tasks[] =['id'=>$i,'msg'=>time()];      }      $count=count($tasks);      $data=array_chunk($tasks,ceil($count/3));      foreach ($data as $k=>$v){          $server->task($v,$k);  //(0-task_woker_num-1)      }    });