使用CoroutineChannel实现一个简单的MySQL连接池
- 2019 年 12 月 9 日
- 笔记
Channel 实现原理
- 通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗
- 底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗
方法
- Channel->push :当队列中有其他协程正在等待 pop 数据时,自动按顺序唤醒一个消费者协程。当队列已满时自动 yield 让出控制器,等待其他协程消费数据
- Channel->pop:当队列为空时自动 yield,等待其他协程生产数据。消费数据后,队列可写入新的数据,自动按顺序唤醒一个生产者协程
连接池
使用CoroutineChannel
来实现 MySQL 连接池可以使用 defer 特性来实现资源的回收,同时可以被协程调度,而且使用channel->pop
方法的时候,可以设置超时,减少一系列的心智负担
代码实现
use SwExampleMysqlPool; class MysqlPool { private static $instance; private $pool; //连接池容器,一个 channel private $config; /** * @param null $config * @return MysqlPool * @desc 获取连接池实例 */ public static function getInstance($config = null) { if (empty(self::$instance)) { if (empty($config)) { throw new RuntimeException("mysql config empty"); } self::$instance = new static($config); } return self::$instance; } /** * MysqlPool constructor. * * @param $config * @desc 初始化,自动创建实例,需要放在 workerstart 中执行 */ public function __construct($config) { if (empty($this->pool)) { $this->config = $config; $this->pool = new chan($config['pool_size']); for ($i = 0; $i < $config['pool_size']; $i++) { $mysql = new MySQL(); $res = $mysql->connect($config); if ($res == false) { //连接失败,抛异常 throw new RuntimeException("failed to connect mysql server."); } else { //mysql 连接存入 channel $this->put($mysql); } } } } /** * @param $mysql * @desc 放入一个 mysql 连接入池 */ public function put($mysql) { $this->pool->push($mysql); } /** * @return mixed * @desc 获取一个连接,当超时,返回一个异常 */ public function get() { $mysql = $this->pool->pop($this->config['pool_get_timeout']); if (false === $mysql) { throw new RuntimeException("get mysql timeout, all mysql connection is used"); } return $mysql; } /** * @return mixed * @desc 获取当时连接池可用对象 */ public function getLength() { return $this->pool->length(); } }
使用
<?php require '../vendor/autoload.php'; use SwExampleMysqlPool; $config = [ 'host' => '127.0.0.1', //数据库 ip 'port' => 3306, //数据库端口 'user' => 'root', //数据库用户名 'password' => 'root', //数据库密码 'database' => 'wordpress', //默认数据库名 'timeout' => 0.5, //数据库连接超时时间 'charset' => 'utf8mb4', //默认字符集 'strict_type' => true, //ture,会自动表数字转为 int 类型 'pool_size' => '3', //连接池大小 'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所有的连接都已在使用中) ]; //创建 http server $http = new SwooleHttpServer("0.0.0.0", 9501); $http->set([ //"daemonize" => true, // 常驻进程模式运行 "worker_num" => 1, "log_level" => SWOOLE_LOG_ERROR, ]); $http->on('WorkerStart', function ($serv, $worker_id) use ($config) { //worker 启动时,每个进程都初始化连接池,在 onRequest 中可以直接使用 try { MysqlPool::getInstance($config); } catch (Exception $e) { //初始化异常,关闭服务 echo $e->getMessage() . PHP_EOL; $serv->shutdown(); } catch (Throwable $throwable) { //初始化异常,关闭服务 echo $throwable->getMessage() . PHP_EOL; $serv->shutdown(); } }); $http->on('request', function ($request, $response) { //浏览器会自动发起这个请求 避免占用请求 if ($request->server['path_info'] == '/favicon.ico') { $response->end(''); return; } //获取数据库 if ($request->server['path_info'] == '/list') { go(function () use ($request, $response) { //从池子中获取一个实例 try { $pool = MysqlPool::getInstance(); $mysql = $pool->get(); defer(function () use ($mysql) { //利用 defer 特性,可以达到协程执行完成,归还$mysql 到连接池 //好处是 可能因为业务代码很长,导致乱用或者忘记把资源归还 MysqlPool::getInstance()->put($mysql); echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL; }); $result = $mysql->query("select * from wp_users"); $response->end(json_encode($result)); } catch (Exception $e) { $response->end($e->getMessage()); } }); return; } echo "get request:".date('Y-m-d H:i:s').PHP_EOL; if ($request->server['path_info'] == '/timeout') { go(function () use ($request, $response) { //从池子中获取一个实例 try { $pool = MysqlPool::getInstance(); $mysql = $pool->get(); defer(function () use ($mysql) { //协程执行完成,归还$mysql 到连接池 MysqlPool::getInstance()->put($mysql); echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL; }); $result = $mysql->query("select * from wp_users"); SwooleCoroutine::sleep(10); //sleep 10 秒,模拟耗时操作 $response->end(date('Y-m-d H:i:s').PHP_EOL.json_encode($result)); } catch (Exception $e) { $response->end($e->getMessage()); } }); return; } }); $http->start();
访问http://127.0.0.1:9501/list
可以看到正常的结果输出
访问http://127.0.0.1:9501/timeout
演示连接池取和存的过程
模拟 timeout, 需要浏览器打开 4 个 tab 页面,都请求http://127.0.0.1:9501/timeout
,前三个应该是等 10 秒出结果,第四个 500ms 后出超时结果
如果是 chrome 浏览器,会对完全一样的 url 做并发请求限制需要加一个随机数,例如http://127.0.0.1:9501/timeout?n=0
、http://127.0.0.1:9501/timeout?n=1
沈唁志,一个PHPer的成长之路!
任何个人或团体,未经允许禁止转载本文:《使用CoroutineChannel实现一个简单的MySQL连接池》,谢谢合作!