使用CoroutineChannel實現一個簡單的MySQL連接池

  • 2019 年 12 月 9 日
  • 筆記

Channel 通道,類似於 go 語言的 chan,支援多生產者協程和多消費者協程,Swoole 底層自動實現了協程的切換和調度

Channel 實現原理

  • 通道與 PHP 的 Array 類似,僅佔用記憶體,沒有其他額外的資源申請,所有操作均為記憶體操作,無 IO 消耗
  • 底層使用 PHP 引用計數實現,無記憶體拷貝。即使是傳遞巨大字元串或數組也不會產生額外性能消耗

方法

  1. Channel->push :當隊列中有其他協程正在等待 pop 數據時,自動按順序喚醒一個消費者協程。當隊列已滿時自動 yield 讓出控制器,等待其他協程消費數據
  2. Channel->pop:當隊列為空時自動 yield,等待其他協程生產數據。消費數據後,隊列可寫入新的數據,自動按順序喚醒一個生產者協程

連接池

使用CoroutineChannel來實現 MySQL 連接池可以使用 defer 特性來實現資源的回收,同時可以被協程調度,而且使用channel->pop方法的時候,可以設置超時,減少一系列的心智負擔

程式碼實現

use SwExampleMysqlPool;    class MysqlPool  {      private static $instance;      private $pool;  //連接池容器,一個 channel      private $config;        /**       * @param  null $config       * @return MysqlPool       * @desc   獲取連接池實例       */      public static function getInstance($config = null)      {          if (empty(self::$instance)) {              if (empty($config)) {                  throw new RuntimeException("mysql config empty");              }              self::$instance = new static($config);          }            return self::$instance;      }       /**      * MysqlPool constructor.      *      * @param $config      * @desc  初始化,自動創建實例,需要放在 workerstart 中執行      */     public function __construct($config)     {        if (empty($this->pool)) {           $this->config = $config;           $this->pool = new chan($config['pool_size']);           for ($i = 0; $i < $config['pool_size']; $i++) {              $mysql = new MySQL();              $res = $mysql->connect($config);              if ($res == false) {                 //連接失敗,拋異常                 throw new RuntimeException("failed to connect mysql server.");              } else {                 //mysql 連接存入 channel                 $this->put($mysql);              }           }        }     }       /**       * @param $mysql       * @desc  放入一個 mysql 連接入池       */      public function put($mysql)      {          $this->pool->push($mysql);      }        /**       * @return mixed       * @desc   獲取一個連接,當超時,返回一個異常       */      public function get()      {          $mysql = $this->pool->pop($this->config['pool_get_timeout']);          if (false === $mysql) {              throw new RuntimeException("get mysql timeout, all mysql connection is used");          }          return $mysql;      }        /**       * @return mixed       * @desc   獲取當時連接池可用對象       */      public function getLength()      {          return $this->pool->length();      }    }

使用

<?php    require '../vendor/autoload.php';    use SwExampleMysqlPool;    $config = [      'host' => '127.0.0.1',   //資料庫 ip      'port' => 3306,          //資料庫埠      'user' => 'root',        //資料庫用戶名      'password' => 'root', //資料庫密碼      'database' => 'wordpress',   //默認資料庫名      'timeout' => 0.5,       //資料庫連接超時時間      'charset' => 'utf8mb4', //默認字符集      'strict_type' => true,  //ture,會自動表數字轉為 int 類型      'pool_size' => '3',     //連接池大小      'pool_get_timeout' => 0.5, //當在此時間內未獲得到一個連接,會立即返回。(表示所有的連接都已在使用中)  ];    //創建 http server  $http = new SwooleHttpServer("0.0.0.0", 9501);  $http->set([        //"daemonize" => true, // 常駐進程模式運行        "worker_num" => 1,        "log_level" => SWOOLE_LOG_ERROR,      ]);    $http->on('WorkerStart', function ($serv, $worker_id) use ($config) {          //worker 啟動時,每個進程都初始化連接池,在 onRequest 中可以直接使用          try {              MysqlPool::getInstance($config);          } catch (Exception $e) {              //初始化異常,關閉服務              echo $e->getMessage() . PHP_EOL;              $serv->shutdown();          } catch (Throwable $throwable) {              //初始化異常,關閉服務              echo $throwable->getMessage() . PHP_EOL;              $serv->shutdown();          }      });    $http->on('request', function ($request, $response) {            //瀏覽器會自動發起這個請求 避免佔用請求          if ($request->server['path_info'] == '/favicon.ico') {              $response->end('');              return;          }            //獲取資料庫          if ($request->server['path_info'] == '/list') {              go(function () use ($request, $response) {                      //從池子中獲取一個實例                      try {                          $pool = MysqlPool::getInstance();                          $mysql = $pool->get();                          defer(function () use ($mysql) {                                  //利用 defer 特性,可以達到協程執行完成,歸還$mysql 到連接池                                  //好處是 可能因為業務程式碼很長,導致亂用或者忘記把資源歸還                                  MysqlPool::getInstance()->put($mysql);                                  echo "當前可用連接數:" . MysqlPool::getInstance()->getLength() . PHP_EOL;                              });                          $result = $mysql->query("select * from wp_users");                          $response->end(json_encode($result));                      } catch (Exception $e) {                         $response->end($e->getMessage());                      }              });              return;          }            echo "get request:".date('Y-m-d H:i:s').PHP_EOL;          if ($request->server['path_info'] == '/timeout') {              go(function () use ($request, $response) {                      //從池子中獲取一個實例                      try {                          $pool = MysqlPool::getInstance();                          $mysql = $pool->get();                          defer(function () use ($mysql) {                                  //協程執行完成,歸還$mysql 到連接池                                  MysqlPool::getInstance()->put($mysql);                                  echo "當前可用連接數:" . MysqlPool::getInstance()->getLength() . PHP_EOL;                          });                          $result = $mysql->query("select * from wp_users");                          SwooleCoroutine::sleep(10); //sleep 10 秒,模擬耗時操作                          $response->end(date('Y-m-d H:i:s').PHP_EOL.json_encode($result));                      } catch (Exception $e) {                         $response->end($e->getMessage());                      }              });              return;          }      });    $http->start();

訪問http://127.0.0.1:9501/list可以看到正常的結果輸出

訪問http://127.0.0.1:9501/timeout演示連接池取和存的過程

模擬 timeout, 需要瀏覽器打開 4 個 tab 頁面,都請求http://127.0.0.1:9501/timeout,前三個應該是等 10 秒出結果,第四個 500ms 後出超時結果

如果是 chrome 瀏覽器,會對完全一樣的 url 做並發請求限制需要加一個隨機數,例如http://127.0.0.1:9501/timeout?n=0http://127.0.0.1:9501/timeout?n=1

沈唁志,一個PHPer的成長之路! 任何個人或團體,未經允許禁止轉載本文:《使用CoroutineChannel實現一個簡單的MySQL連接池》,謝謝合作!