Swoole 实战:MySQL 查询器的实现(协程连接池版)

需求分析

本篇我们将通过 Swoole 实现一个自带连接池的 MySQL 查询器:

  1. 支持通过链式调用构造并执行 SQL 语句;
  2. 支持连接池技术;
  3. 支持多协程事务并发执行(协程安全性);
  4. 支持连接对象的健康检测;
  5. 支持连接对象断线重连;
  6. 程序需要可扩展,为未来的改造留好扩展点;

完整项目地址:[协程版 MySQL 查询器](//github.com/linvanda/mysql)

(注:该项目并非示例项目,而是生产可用的,已经在公司内部稳定使用。)

使用示例

  • 查询:
$query->select(['uid', 'name'])
  ->from('users u')
  ->join('auth_users au', "u.uid=au.uid")
  ->where(['uid' => $uid])
  ->groupBy("u.phone")
  ->having("count(u.phone)>1")
  ->orderBy("u.uid desc")
  ->limit(10, 0)
  ->list();
  • 插入:
$query->insert('users')
  ->values(
  [
    [
      'name' => 'linvanda',
      'phone' => '18687664562',
      'nickname' => '林子',
    ],
    [
      'name' => 'xiake',
      'phone' => '18989876543',
      'nickname' => '侠客',
    ],
  ]
)->execute();// 这里是批量插入,不需要批量插入的话,传入一维数组即可

// 延迟插入
$query->insert('users')
  ->delayed()
  ->values(
  [
    'name' => 'linvanda',
    'phone' => '18687664562',
    'nickname' => '林子',
  ]
)->execute();
  • 更新:
$query->update('users u')
  ->join('auth_users au', "u.uid=au.uid")
  ->set(['u.name' => '粽子'])
  ->where("u.uid=:uid", ['uid' => 123])
  ->execute();
  • 删除:
$query->delete('users')
  ->where("uid=:uid", ['uid' => 123])
  ->execute();
  • 事务:
$query->begin();
$query->update('users u')
  ->join('auth_users au', "u.uid=au.uid")
  ->set(['u.name' => '粽子'])
  ->where("u.uid=:uid", ['uid' => 123])
  ->execute();
...
$query->commit();

模块设计

  1. 查询模块:
    • 查询器(Query,入口)
    • SQL构造器(Builder)
  2. 事务模块:
    • 事务接口(ITransaction)
    • 协程版事务类(CoTransaction)
    • 协程上下文(TContext)
  3. 连接池模块:
    • 连接池接口(IPool)
    • 协程连接池类(CoPool)
  4. 数据库连接(驱动)模块:
    • 连接接口(IConnector)
    • 连接生成器接口(IConnectorBuilder)
    • 协程连接类(CoConnector)
    • 协程连接生成器(CoConnectorBuilder)
    • 数据库连接配置类(DBConfig)
    • 数据库连接(统计)信息类(ConnectorInfo)

我们希望通过统一的入口对外提供服务,将复杂性隐藏在内部。该统一入口由查询模块提供。该模块由查询器SQL 构造器构成,其中查询器作为外界唯一入口,而构造器是一个 Trait,因为这样可以让外界通过查询器入口直接使用构造器提供的 SQL 组装功能。

查询器通过事务模块执行 SQL。这里的事务有两个层面含义:数据库操作的事务性(显式或隐式事务,由 CoTransaction 类保障),以及多协程下的执行环境隔离性(由 TContext 类保障)。

事务模块需要通过数据库连接对象执行具体的 SQL。连接对象由连接池模块提供。

连接池模块维护(创建、回收、销毁)数据库连接对象,具体是通过数据库连接模块的连接生成器生成新数据库连接。

模块之间依赖于接口而非具体实现:查询模块依赖事务模块的 ITransaction 接口;事务模块依赖连接池模块的 IPool 接口和数据库连接模块的 IConnector 接口;连接池模块依赖数据库连接模块的 IConnectorBuilder 接口。

UML 类图

MySQL 查询器 UML 类图

下面,我们分模块具体讲解。

入口

由查询模块对外提供统一的使用入口。查询模块由两部分构成:查询器和 SQL 构造器。为了让调用方可以直接通过查询器来构造 SQL(而不用先实例化一个构造器构造 SQL 然后传给查询器),我将构造器设计成 Trait 供查询器 Query 使用。

我们先看看查询器类 Query:

class Query
{
    use Builder;

    public const MODEL_READ = 'read';
    public const MODEL_WRITE = 'write';

    private $transaction;
    
    public function __construct(ITransaction $transaction)
    {
        $this->transaction = $transaction;
    }

    /**
     * 开启事务
     */
    public function begin($model = 'write'): bool
    {
        return $this->transaction->begin($model);
    }

    /**
     * 提交事务
     */
    public function commit(): bool
    {
        return $this->transaction->commit();
    }

    /**
     * 回滚事务
     */
    public function rollback(): bool
    {
        return $this->transaction->rollback();
    }

    /**
     * 便捷方法:列表查询
     */
    public function list(): array
    {
        $list = $this->transaction->command(...$this->compile());
        if ($list === false) {
            throw new DBException($this->lastError(), $this->lastErrorNo());
        }

        return $list;
    }

    /**
     * 便捷方法:查询一行记录
     */
    public function one(): array
    {
        $list = $this->transaction->command(...$this->limit(1)->compile());

        if ($list === false) {
            throw new DBException($this->lastError(), $this->lastErrorNo());
        }

        if ($list) {
            return $list[0];
        }

        return [];
    }

    ...

    /**
     * 执行 SQL
     * 有两种方式:
     *  1. 调此方法时传入相关参数;
     *  2. 通过 Builder 提供的 Active Record 方法组装 SQL,调此方法(不传参数)执行并返回结果
     */
    public function execute(string $preSql = '', array $params = [])
    {
        if (!func_num_args()) {
            $result =  $this->transaction->command(...$this->compile());
        } else {
            $result = $this->transaction->command(...$this->prepareSQL($preSql, $params));
        }

        if ($result === false) {
            throw new DBException($this->lastError(), $this->lastErrorNo());
        }

        return $result;
    }

    public function lastInsertId()
    {
        return $this->transaction->lastInsertId();
    }
    
    public function affectedRows()
    {
        return $this->transaction->affectedRows();
    }
}

该入口类做了以下几件事情:

  • 提供 list()、one()、page()、execute() 等方法执行 SQL 语句,其内部是通过 transaction 实现的;
  • 通过 Builder 这个 Trait 对外提供 SQL 构造功能;
  • 委托 transaction 实现事务功能;

我们再简单看下 Builder 的实现:

Trait Builder
{
    ...
    public function select($fields = null)
    {
        if ($this->type) {
            return $this;
        }

        $this->type = 'select';
        $this->fields($fields);

        return $this;
    }

    /**
     * 预处理 SQL
     * @param string $sql 格式:select * from t_name where uid=:uid
     * @param array $params 格式:['uid' => $uid]
     * @return array 输出格式:sql: select * from t_name where uid=?,params: [$uid]
     * @throws \Exception
     */
    private function prepareSQL(string $sql, array $params)
    {
        $sql = trim($sql);

        if (!$params) {
            return [$sql, []];
        }

        preg_match_all('/:([^\s;]+)/', $sql, $matches);

        if (!($matches = $matches[1])) {
            return [$sql, []];
        }

        if (count($matches) !== count($params)) {
            throw new \Exception("SQL 占位数与参数个数不符。SQL:$sql,参数:" . print_r($params, true));
        }

        $p = [];
        foreach ($matches as $flag) {
            if (!array_key_exists($flag, $params)) {
                throw new \Exception("SQL 占位符与参数不符。SQL:$sql,参数:" . print_r($params, true));
            }

            $value = $params[$flag];

            if ($this->isExpression($value)) {
                $sql = preg_replace("/:$flag(?=\s|$)/", $value, $sql);
            } else {
                $p[] = $value;
            }
        }

        $sql = preg_replace('/:[-a-zA-Z0-9_]+/', '?', $sql);

        return [$sql, $p];
    }

    /**
     * 编译
     * 目前仅支持 select,update,insert,replace,delete
     * @param bool $reset 编译后是否重置构造器
     * @return array [$preSql, $params]
     */
    private function compile(bool $reset = true)
    {
        if (!$this->type) {
            return ['', []];
        }

        $method = 'compile' . ucfirst($this->type);
        if (method_exists($this, $method)) {
            $this->rawSqlInfo = $this->$method();
            if ($reset) {
                $this->reset();
            }
            return $this->rawSqlInfo;
        }

        return ['', []];
    }

    private function compileSelect()
    {
        $sql = "select $this->fields ";
        $params = [];

        if ($this->table) {
            $sql .= implode(
                ' ',
                array_filter([
                    'from',
                    $this->table,
                    $this->join,
                    $this->where,
                    $this->groupBy,
                    $this->having,
                    $this->orderBy,
                    $this->limitStr()
                ])
            );
            $params = array_merge($this->joinParams, $this->whereParams, $this->havingParams);
        }

        return [$this->trimSpace($sql), $params];
    }
    ...
    
    /**
     * 条件(where、on、having 等)
     * 为了记忆和使用方便,目前只提供了最基本的一些形式,复杂的条件请使用原生写法
     * $conditions 数组格式:
     * // 基本的 and 查询
     * [
     *      'uid' => 232,
     *      'name' => '里斯',
     *      'b.age' => 34,
     *      'level_id' => [1,2,3], // in
     *      'count' => new Expression('count + 1'),
     * ]
     *
     * [
     *      "(uid=:uid1 or uid=:uid2) and  count=:count", // 原生预处理 SQL
     *      ['uid1' => 12, 'uid2' => 13, 'count' => new Expression('count+1')]
     * ]
     * @param string|array $conditions
     * @return array [$preSql, $params],$preSql: 用 ? 占位的预处理 SQL
     * @throws \Exception
     */
    private function condition($conditions)
    {
        if (is_string($conditions)) {
            return [$conditions, []];
        }

        if (!$conditions || !is_array($conditions)) {
            return [];
        }

        if (is_int(key($conditions)) && count($conditions) <= 2) {
            if (count($conditions) == 1) {
                $conditions[1] = [];
            }

            return $this->prepareSQL($conditions[0], $conditions[1]);
        }

        $where = '1=1';
        $params = [];
        foreach ($conditions as $key => $condition) {
            $key = $this->plainText($key);
            if (is_array($condition)) {
                // in 查询
                $where .= " and $key in(" . implode(',', array_fill(0, count($condition), '?')) . ')';
                $params = array_merge($params, $condition);
            } else {
                // = 查询
                if ($this->isExpression($condition)) {
                    $where .= " and $key = $condition";
                } else {
                    $where .= " and $key = ?";
                    $params[] = $condition;
                }
            }
        }

        return [str_replace('1=1 and ', '', $where), $params];
    }

    ...
}

构造器主要提供和 SQL 子句对应的方法来构造和编译 SQL,并提供对原生 SQL 的支持。
该构造器并未对所有的 SQL 语句做方法上的实现(比如子查询),只对最常用的功能提供了支持,复杂的 SQL 建议直接写 SQL 语句(一些框架对复杂 SQL 构造也提供了方法级别的支持,但这其实会带来使用和维护上的复杂性,它导致 SQL 不够直观)。

完整的查询模块代码

事务

事务是集中管理 SQL 执行上下文的地方,所有的 SQL 都是在事务中执行的(没有调 begin() 则是隐式事务)。

我们的查询器是协程安全的,即一个 Query 实例可以在多个协程中并发执行事务而不会相互影响。协程安全性是通过事务模块保证的,这里需要处理两个维度的“事务”:数据库维度和协程维度。不但需要保证数据库事务的完整执行,还要保证多个协程间的 SQL 执行不会相互影响。

我们先看一个多协程并发执行事务的例子(在两个子协程中使用同一个 Query 实例执行事务:先从数据库查询用户信息,然后更新姓名):

$query = new Query(...);

for ($i = 0; $i < 2; $i++) {
    go(function () use ($query) {
        $query->begin();
        $user = $query->select("uid,name")->from("users")->where("phone=:phone", ["phone" => "13908987654"])->one();
        $query->update('users')->set(['name' => "李四"])->where("uid=:uid", ['uid' => $user['uid']])->execute();
        $query->commit();
    });
}

上面代码执行步骤如图:

在上图两个协程不断切换过程中,各自的事务是在独立执行的,互不影响。

现实中,我们会在仓储中使用查询器,每个仓储持有一个查询器实例,而仓储是单例模式,多协程共享的,因而查询器也是多协程共享的。如下:

/**
 * MySQL 仓储基类
 * 仓储是单例模式(通过容器实现单例),多协程会共享同一个仓储实例
 */
abstract class MySQLRepository extends Repository implements ITransactional
{
    /**
     * 查询器
     */
    protected $query;

    public function __construct()
    {
        if (!$this->dbAlias()) {
            throw new \Exception('dbName can not be null');
        }

        // 通过工厂创建查询器实例
        $this->query = MySQLFactory::build($this->dbAlias());
    }

    ...
}

事务模块是如何实现协程并发事务的隔离性呢?我们用协程上下文 TContext 类实现协程间数据的隔离,事务类 CoTransaction 持有 TContext 实例,事务中所有的状态信息都通过 TContext 存取,以实现协程间状态数据互不影响。

我们先看看协程上下文类:

class TContext implements \ArrayAccess
{
    private $container = [];

	...

    public function offsetGet($offset)
    {
        if (!isset($this->container[Co::getuid()])) {
            return null;
        }

        return $this->container[Co::getuid()][$offset] ?? null;
    }

    public function offsetSet($offset, $value)
    {
        $cuid = Co::getuid();
        if (!isset($this->container[$cuid])) {
            $this->init();
        }
        $this->container[$cuid][$offset] = $value;
    }

    private function init()
    {
        $this->container[Co::getuid()] = [];
        // 协程退出时需要清理当前协程上下文
        Co::defer(function () {
            unset($this->container[Co::getuid()]);
        });
    }
}

协程上下文内部通过 $container 数组维护每个协程的数据。该类实现了 ArrayAccess 接口,可以通过下标访问,如:

// 创建上下文实例
$context = new TContext();
// 设置当前协程的数据
$context["model"] = "write";
// 访问当前协程的数据
$context["model"];

再看看事务。

事务接口定义:

interface ITransaction
{
    public function begin(string $model = 'write', bool $isImplicit = false): bool;
    /**
     * 发送 SQL 指令
     */
    public function command(string $preSql, array $params = []);
    /**
     * 提交事务
     * @param bool $isImplicit 是否隐式事务,隐式事务不会向 MySQL 提交 commit (要求数据库服务器开启了自动提交的配置)
     * @return bool
     * @throws \Exception
     */
    public function commit(bool $isImplicit = false): bool;
    public function rollback(): bool;
    /**
     * 获取或设置当前事务执行模式
     * @param string 读/写模式 read/write
     * @return string 当前事务执行模式
     */
    public function model(?string $model = null): string;
	...
    /**
     * 获取一次事务中执行的 SQL 列表
     * @return array
     */
    public function sql():array;
}

上面接口定义了事务管理器的主要工作:开启事务、执行 SQL、提交/回滚事务以及和本次事务执行相关的信息。

我们再来看看它的实现类 CoTransaction,该类是整个查询器中最重要的类,我们把整个类的代码完整贴出来:

/**
 * 协程版事务管理器
 * 注意:事务开启直到提交/回滚的过程中会一直占用某个 IConnector 实例,如果有很多长事务,则会很快耗完连接池资源
 */
class CoTransaction implements ITransaction
{
    private $pool;
    // 事务的所有状态信息(运行状态、SQL、运行模式、运行结果等)都是存储在上下文中
    private $context;

	/**
	 * 创建事务实例时需要提供连接池,并在内部创建该事物的协程上下文实例
	 */
    public function __construct(IPool $pool)
    {
        $this->pool = $pool;
        $this->context = new TContext();
    }
    
    public function __destruct()
    {
        // 如果事务没有结束,则回滚
        if ($this->isRunning()) {
            $this->rollback();
        }
    }

    /**
     * 开启事务
     */
    public function begin(string $model = 'write', bool $isImplicit = false): bool
    {
        // 如果事务已经开启了,则直接返回
        if ($this->isRunning()) {
            return true;
        }

        // 事务模式(决定从读连接池还是写连接池拿连接对象)
        $this->model($model);
        // 设置事务运行状态
        $this->isRunning(true);

        // 获取数据库连接
        try {
            if (!($connector = $this->connector())) {
                throw new ConnectException("获取连接失败");
            }
        } catch (\Exception $exception) {
            $this->isRunning(false);
            throw new TransactionException($exception->getMessage(), $exception->getCode());
        }

		// 开启新事务前,需要清除上一次事务的数据
        $this->resetLastExecInfo();
        $this->clearSQL();

		// 调用数据库连接对象的 begin 方法开始事务(如果是隐式事务则不调用)
        return $isImplicit || $connector->begin();
    }

    /**
     * 执行 SQL 指令
     * 如果是隐式事务,则在该方法中自动调用 begin 和 commit 方法
     */
    public function command(string $preSql, array $params = [])
    {
        if (!$preSql) {
            return false;
        }

        // 是否隐式事务:外界没有调用 begin 而是直接调用 command 则为隐式事务
        $isImplicit = !$this->isRunning();

        // 如果是隐式事务,则需要自动开启事务
        if ($isImplicit && !$this->begin($this->calcModelFromSQL($preSql), true)) {
            return false;
        }

		// 执行 SQL
        $result = $this->exec([$preSql, $params]);

        // 隐式事务需要及时提交
        if ($isImplicit && !$this->commit($isImplicit)) {
            return false;
        }
        
        return $result;
    }

    /**
     * 提交事务
     */
    public function commit(bool $isImplicit = false): bool
    {
        if (!$this->isRunning()) {
            return true;
        }

        $result = true;
        if (!$isImplicit) {
        	// 显式事务才需要真正提交到 MySQL 服务器
            if ($conn = $this->connector(false)) {
                $result = $conn->commit();
                if ($result === false) {
                    // 执行失败,试图回滚
                    $this->rollback();
                    return false;
                }
            } else {
                return false;
            }
        }

        // 释放事务占用的资源
        $this->releaseTransResource();

        return $result;
    }

    /**
     * 回滚事务
     * 无论是提交还是回滚,都需要释放本次事务占用的资源
     */
    public function rollback(): bool
    {
        if (!$this->isRunning()) {
            return true;
        }

        if ($conn = $this->connector(false)) {
            $conn->rollback();
        }

        $this->releaseTransResource();
        return true;
    }

    /**
     * 获取或设置当前事务执行模式
     */
    public function model(?string $model = null): string
    {
        // 事务处于开启状态时不允许切换运行模式
        if (!isset($model) || $this->isRunning()) {
            return $this->context['model'];
        }

        $this->context['model'] = $model === 'read' ? 'read' : 'write';

        return $model;
    }

    public function lastInsertId()
    {
        return $this->getLastExecInfo('insert_id');
    }

    public function affectedRows()
    {
        return $this->getLastExecInfo('affected_rows');
    }

    public function lastError()
    {
        return $this->getLastExecInfo('error');
    }

    public function lastErrorNo()
    {
        return $this->getLastExecInfo('error_no');
    }

	/**
	 * 本次事务执行的所有 SQL
	 * 该版本并没有做记录
	 */
    public function sql(): array
    {
        return $this->context['sql'] ?? [];
    }

    /**
     * 释放当前事务占用的资源
     */
    private function releaseTransResource()
    {
        // 保存本次事务相关执行结果供外界查询使用
        $this->saveLastExecInfo();
        // 归还连接资源
        $this->giveBackConnector();

        unset($this->context['model']);

        $this->isRunning(false);
    }

    /**
     * 保存事务最终执行的一些信息
     */
    private function saveLastExecInfo()
    {
        if ($conn = $this->connector(false)) {
            $this->context['last_exec_info'] = [
                'insert_id' => $conn->insertId(),
                'error' => $conn->lastError(),
                'error_no' => $conn->lastErrorNo(),
                'affected_rows' => $conn->affectedRows(),
            ];
        } else {
            $this->context['last_exec_info'] = [];
        }
    }

    private function resetLastExecInfo()
    {
        unset($this->context['last_exec_info']);
    }

    private function getLastExecInfo(string $key)
    {
        return isset($this->context['last_exec_info']) ? $this->context['last_exec_info'][$key] : '';
    }

    /**
     * 执行指令池中的指令
     * @param $sqlInfo
     * @return mixed
     * @throws
     */
    private function exec(array $sqlInfo)
    {
        if (!$sqlInfo || !$this->isRunning()) {
            return true;
        }

        return $this->connector()->query($sqlInfo[0], $sqlInfo[1]);
    }

    private function clearSQL()
    {
        unset($this->context['sql']);
    }

    private function calcModelFromSQL(string $sql): string
    {
        if (preg_match('/^(update|replace|delete|insert|drop|grant|truncate|alter|create)\s/i', trim($sql))) {
            return 'write';
        }

        return 'read';
    }

    /**
     * 获取连接资源
     */
    private function connector(bool $usePool = true)
    {
        if ($connector = $this->context['connector']) {
            return $connector;
        }

        if (!$usePool) {
            return null;
        }

        $this->context['connector'] = $this->pool->getConnector($this->model());

        return $this->context['connector'];
    }

    /**
     * 归还连接资源
     */
    private function giveBackConnector()
    {
        if ($this->context['connector']) {
            $this->pool->pushConnector($this->context['connector']);
        }

        unset($this->context['connector']);
    }

    private function isRunning(?bool $val = null)
    {
        if (isset($val)) {
            $this->context['is_running'] = $val;
        } else {
            return $this->context['is_running'] ?? false;
        }
    }
}

该类中,一次 SQL 执行(无论是显式事务还是隐式事务)的步骤:

begin -> exec -> commit/rollback

  1. begin:
    • 判断是否可开启新事务(如果已有事务在运行,则不可开启);
    • 设置事务执行模式(read/write);
    • 将当前事务状态设置为 running;
    • 获取连接对象;
    • 清理本事务实例中上次事务的痕迹(上下文、SQL);
    • 调连接对象的 begin 启动数据库事务;
  2. exec:
    • 调用连接对象的 query 方法执行 SQL(prepare 模式);
  3. commit:
    • 判断当前状态是否可提交(running 状态才可以提交);
    • 调用连接对象的 commit 方法提交数据库事务(如果失败则走回滚);
    • 释放本次事务占用的资源(保存本次事务执行的相关信息、归还连接对象、清除上下文里面相关信息)
  4. rollback:
    • 判断当前状态是否可回滚;
    • 调用连接对象的 rollback 回滚数据库事务;
    • 释放本次事务占用的资源(同上);

优化:

CoTransaction 依赖 IPool 连接池,这种设计并不合理(违反了迪米特法则)。从逻辑上说,事务管理类真正依赖的是连接对象,而非连接池对象,因而事务模块应该依赖连接模块而不是连接池模块。让事务管理类依赖连接池,一方面向事务模块暴露了连接管理的细节, 另一方面意味着如果使用该事务管理类,就必须使用连接池技术。

一种优化方案是,在连接模块提供一个连接管理类供外部(事务模块)取还连接:

interface IConnectorManager
{
	public function getConnector() IConnector;
	public function giveBackConnector(IConnector $conn);
}

IConnectorManager 注入到 CoTransaction 中:

class CoTransaction implements ITransaction
{
    ...
    
	public function __construct(IConnectorManager $connMgr)
	{
		...
	}
}

连接管理器 IConnectorManager 承担了工厂方法角色,至此,事务模块仅依赖连接模块,而不用依赖连接池。

连接池

连接池模块由 IPool 接口和 CoPool 实现类组成。

连接池模块和连接模块之间的关系比较巧妙(上面优化后的方案)。从高层(接口层面)来说,连接池模块依赖连接模块:连接池操作(取还)IConnector 的实例;从实现上来说,连接模块同时又依赖连接池模块:PoolConnectorManager(使用连接池技术的连接管理器)依赖连接池模块来操作连接对象(由于该依赖是实现层面的而非接口层面,因而它不是必然的,如果连接管理器不使用连接池技术则不需要依赖连接池模块)。“连接管理器”这个角色很重要:它对外(事务模块)屏蔽了连接池模块的存在,代价是在内部引入了对连接池模块的依赖(也就是用内部依赖换外部依赖)。

经过上面的分析我们得出,连接池模块和连接模块具有较强的耦合性,连接模块可以对外屏蔽掉连接池模块的存在,因而在设计上我们可以将这两个模块看成一个大模块放在一个目录下面,在该目录下再细分成两个内部模块即可。

我们先看看连接池接口。

interface IPool
 {
     /**
      * 从连接池中获取连接对象
      */
    public function getConnector(string $type = 'write'): IConnector;
     /**
      * 归还连接
      */
     public function pushConnector(IConnector $connector): bool;

     /**
      * 连接池中连接数
      * @return array ['read' => 3, 'write' => 3]
      */
     public function count(): array;

     /**
      * 关闭连接池
      */
     public function close(): bool;
 }

从上面接口定义我们发现,该连接池并非通用连接池,而是针对数据库连接做的定制(count() 方法返回的数据里面有 read、write,它暴露了一个细节:该连接池内部维护了读写两种连接池)。此设计也透露出该模块和连接模块的强耦合性。

实现类 CoPool 稍显复杂,我们贴出代码:

class CoPool implements IPool
{
    ...
    protected static $container = [];
    protected $readPool;
    protected $writePool;
    // 数据库连接生成器,连接池使用此生成器创建连接对象
    protected $connectorBuilder;
    // 连接池大小
    protected $size;
    // 记录每个连接的相关信息
    protected $connectsInfo = [];
    // 当前存活的连接数(包括不在池中的)
    protected $connectNum;
    // 读连接数
    protected $readConnectNum;
    // 写连接数
    protected $writeConnectNum;
    protected $maxSleepTime;
    protected $maxExecCount;
    protected $status;
    // 连续等待连接对象失败次数(一般是长事务导致某次事务处理长时间占用连接资源)
    protected $waitTimeoutNum;

    protected function __construct(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000)
    {
        // 创建读写 Channel
        $this->readPool = new co\Channel($size);
        $this->writePool = new co\Channel($size);
        ...
    }

    /**
     * 单例
     * 实际是伪单例
     */
    public static function instance(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000): CoPool
    {
        // 同一个连接生成器创建的连接对象由同一个连接池管理
        if (!isset(static::$container[$connectorBuilder->getKey()])) {
            static::$container[$connectorBuilder->getKey()] = new static($connectorBuilder, $size, $maxSleepTime, $maxExecCount);
        }

        return static::$container[$connectorBuilder->getKey()];
    }

    /**
     * 从连接池中获取 MySQL 连接对象
     */
    public function getConnector(string $type = 'write'): IConnector
    {
        if (!$this->isOk()) {
            throw new PoolClosedException("连接池已经关闭,无法获取连接");
        }

        // 根据读写模式选择是使用读连接池还是写连接池
        $pool = $this->getPool($type);

        // 连接池是空的,试图创建连接
        if ($pool->isEmpty()) {
            // 超额,不能再创建,需等待
            // 此处试图创建的数量大于连接池真正大小,是为了应对高峰期等异常情况。归还的时候多创建出来的会直接被关闭掉(随建随销)
            if (($type == 'read' ? $this->readConnectNum : $this->writeConnectNum) > $this->size * 6) {
                // 多次等待失败,则直接返回
                // 优化点:这里面没有针对读写池分别计数
                if ($this->waitTimeoutNum > self::MAX_WAIT_TIMEOUT_NUM) {
                    // 超出了等待失败次数限制,直接抛异常
                    throw new ConnectFatalException("多次获取连接超时,请检查数据库服务器状态");
                }

                // 等待连接归还
                $conn = $pool->pop(4);

                // 等待失败
                if (!$conn) {
                    switch ($pool->errCode) {
                        case SWOOLE_CHANNEL_TIMEOUT:
                            // 优化:要区分读池子超时还是写池子超时
                            $this->waitTimeoutNum++;
                            $errMsg = "获取连接超时";
                            break;
                        case SWOOLE_CHANNEL_CLOSED:
                            $errMsg = "获取连接失败:连接池已关闭";
                            break;
                        default:
                            $errMsg = "获取连接失败";
                            break;
                    }

                    throw new ConnectException($errMsg);
                }
            } else {
                try {
                    // 创建新连接
                    $conn = $this->createConnector($type);
                } catch (ConnectException $exception) {
                    if ($exception->getCode() == 1040) {
                        // 连接数据库时失败:Too many connections,此时需要等待被占用的连接归还
                        // 这里可以优化下:先判断下该连接池有无在维护的连接,有的话才等待
                        $conn = $pool->pop(4);
                    }

                    if (!$conn) {
                        // 等待连接超时,记录超时次数
                        if ($pool->errCode == SWOOLE_CHANNEL_TIMEOUT) {
                            $this->waitTimeoutNum++;
                        }

                        throw new ConnectException($exception->getMessage(), $exception->getCode());
                    }
                }
            }
        } else {
            // 从连接池获取
            $conn = $pool->pop(1);
        }

        // 变更该连接的状态信息
        $connectInfo = $this->connectInfo($conn);
        $connectInfo->popTime = time();
        $connectInfo->status = ConnectorInfo::STATUS_BUSY;
        // 成功拿到连接,将等待次数清零
        $this->waitTimeoutNum = 0;

        return $conn;
    }

    /**
     * 归还连接
     * @param IConnector $connector
     * @return bool
     */
    public function pushConnector(IConnector $connector): bool
    {
        if (!$connector) {
            return true;
        }

        $connInfo = $this->connectInfo($connector);
        $pool = $this->getPool($connInfo->type);

        // 如果连接池有问题(如已关闭)、满了、连接有问题,则直接关闭
        if (!$this->isOk() || $pool->isFull() || !$this->isHealthy($connector)) {
            return $this->closeConnector($connector);
        }

        // 变更连接状态
        if ($connInfo) {
            $connInfo->status = ConnectorInfo::STATUS_IDLE;
            $connInfo->pushTime = time();
        }

        return $pool->push($connector);
    }

    /**
     * 关闭连接池
     * @return bool
     */
    public function close(): bool
    {
        $this->status = self::STATUS_CLOSED;

        // 关闭通道中所有的连接。等待5ms为的是防止还有等待push的排队协程
        while ($conn = $this->readPool->pop(0.005)) {
            $this->closeConnector($conn);
        }
        while ($conn = $this->writePool->pop(0.005)) {
            $this->closeConnector($conn);
        }
        // 关闭通道
        $this->readPool->close();
        $this->writePool->close();

        return true;
    }

    public function count(): array
    {
        return [
            'read' => $this->readPool->length(),
            'write' => $this->writePool->length()
        ];
    }

    protected function closeConnector(IConnector $connector)
    {
        if (!$connector) {
            return true;
        }

        $objId = $this->getObjectId($connector);
        // 关闭连接
        $connector->close();
        // 处理计数
        $this->untickConnectNum($this->connectsInfo[$objId]->type);
        // 删除统计信息
        unset($this->connectsInfo[$objId]);
        return true;
    }

    protected function isOk()
    {
        return $this->status == self::STATUS_OK;
    }

    protected function getPool($type = 'write'): co\Channel
    {
        if (!$type || !in_array($type, ['read', 'write'])) {
            $type = 'write';
        }

        return $type === 'write' ? $this->writePool : $this->readPool;
    }

    /**
     * 创建新连接对象
     * @param string $type
     * @return IConnector
     * @throws \Dev\MySQL\Exception\ConnectException
     */
    protected function createConnector($type = 'write'): IConnector
    {
        $conn = $this->connectorBuilder->build($type);

        if ($conn) {
            // 要在 connect 前 tick(计数),否则无法阻止高并发协程打入(因为 connect 会造成本协程控制权让出,此时本次计数还没有增加)
            $this->tickConnectNum($type);

            try {
                $conn->connect();
            } catch (ConnectException $exception) {
                // 撤销 tick
                $this->untickConnectNum($type);
                throw new ConnectException($exception->getMessage(), $exception->getCode());
            }

            // 创建本连接的统计信息实例
            $this->connectsInfo[$this->getObjectId($conn)] = new ConnectorInfo($conn, $type);
        }

        return $conn;
    }

    protected function tickConnectNum(string $type)
    {
        $this->changeConnectNum($type, 1);
    }

    protected function untickConnectNum(string $type)
    {
        $this->changeConnectNum($type, -1);
    }

    private function changeConnectNum(string $type, $num)
    {
        $this->connectNum = $this->connectNum + $num;
        if ($type == 'read') {
            $this->readConnectNum = $this->readConnectNum + $num;
        } else {
            $this->writeConnectNum = $this->writeConnectNum + $num;
        }
    }

    /**
     * 检查连接对象的健康情况,以下情况视为不健康:
     * 1. SQL 执行次数超过阈值;
     * 2. 连接对象距最后使用时间超过阈值;
     * 3. 连接对象不是连接池创建的
     * @param IConnector $connector
     * @return bool
     */
    protected function isHealthy(IConnector $connector): bool
    {
        $connectorInfo = $this->connectInfo($connector);
        if (!$connectorInfo) {
            return false;
        }

        // 如果连接处于忙态(一般是还处于事务未提交状态),则一律返回 ok
        if ($connectorInfo->status === ConnectorInfo::STATUS_BUSY) {
            return true;
        }

        if (
            $connectorInfo->execCount() >= $this->maxExecCount ||
            time() - $connectorInfo->lastExecTime() >= $this->maxSleepTime
        ) {
            return false;
        }

        return true;
    }

    protected function connectInfo(IConnector $connector): ConnectorInfo
    {
        return $this->connectsInfo[$this->getObjectId($connector)];
    }

    protected function getObjectId($object): string
    {
        return spl_object_hash($object);
    }
}

这里有几点需要注意:

  1. 连接池使用的是伪单例模式,同一个生成器对应的是同一个连接池实例;
  2. 连接池内部维护了读写两个池子,生成器生成的读写连接对象分别放入对应的池子里面;
  3. 从连接池取连接对象的时候,如果连接池为空,则根据情况决定是创建新连接还是等待。此处并非是在池子满了的情况下就等待,而是会超额创建,为的是应对峰值等异常情况。当然一个优化点是,将溢出比例做成可配置的,由具体的项目决定溢出多少。另外,如果创建新连接的时候数据库服务器报连接过多的错误,也需要转为等待连接归还;
  4. 如果多次等待连接失败(超时),则后面的请求会直接抛出异常(直到池子不为空)。这里有个优化点:目前的实现没有区分是读池子超时还是写池子超时;
  5. 归还连接时,如果池子满了,或者连接寿命到期了,则直接关闭连接;

后面在连接模块会讲解连接生成器,到时我们会知道一个连接池实例到底维护的是哪些连接对象。

连接

连接模块负责和数据库建立连接并发出 SQL 请求,其底层使用 Swoole 的 MySQL 驱动。连接模块由连接对象连接生成器构成,对外暴露 IConnectorIConnectorBuilder 接口。

(在我们的优化版本中,一方面引入了连接管理器 IConnectorManager,另一方面将连接模块连接池模块合并成一个大模块,因而整个连接模块对外暴露的是 IConnectorManagerIConnector 两个接口。)

连接对象的实现比较简单,我们重点看下 CoConnector 里面查询的处理:

class CoConnector implements IConnector
{
    ...

    public function __destruct()
    {
        $this->close();
    }

    public function connect(): bool
    {
        if ($this->mysql->connected) {
            return true;
        }

        $conn = $this->mysql->connect($this->config);

        if (!$conn) {
            throw new ConnectException($this->mysql->connect_error, $this->mysql->connect_errno);
        }

        return $conn;
    }

    /**
     * 执行 SQL 语句
     */
    public function query(string $sql, array $params = [], int $timeout = 180)
    {
        $prepare = $params ? true : false;

        $this->execCount++;
        $this->lastExecTime = time();

        // 是否要走 prepare 模式
        if ($prepare) {
            $statement = $this->mysql->prepare($sql, $timeout);

            // 失败,尝试重新连接数据库
            if ($statement === false && $this->tryReconnectForQueryFail()) {
                $statement = $this->mysql->prepare($sql, $timeout);
            }

            if ($statement === false) {
                $result = false;
                goto done;
            }

            // execute
            $result = $statement->execute($params, $timeout);

            // 失败,尝试重新连接数据库
            if ($result === false && $this->tryReconnectForQueryFail()) {
                $result = $statement->execute($params, $timeout);
            }
        } else {
            $result = $this->mysql->query($sql, $timeout);

            // 失败,尝试重新连接数据库
            if ($result === false && $this->tryReconnectForQueryFail()) {
                $result = $this->mysql->query($sql, $timeout);
            }
        }

        done:
        $this->lastExpendTime = time() - $this->lastExecTime;
        $this->peakExpendTime = max($this->lastExpendTime, $this->peakExpendTime);

        return $result;
    }

    ...

    /**
     * 失败重连
     */
    private function tryReconnectForQueryFail()
    {
        if ($this->mysql->connected || !in_array($this->mysql->errno, [2006, 2013])) {
            return false;
        }

        // 尝试重新连接
        $connRst = $this->connect();

        if ($connRst) {
            // 连接成功,需要重置以下错误(swoole 在重连成功后并没有重置这些属性)
            $this->mysql->error = '';
            $this->mysql->errno = 0;
            $this->mysql->connect_error = '';
            $this->mysql->connect_errno = 0;
        }

        return $connRst;
    }

    ...
}

这里重点关注下查询的时候断线重连机制:先尝试执行 SQL,如果返回连接失败,则尝试重新连接。

我们再看看连接生成器:

class CoConnectorBuilder implements IConnectorBuilder
{
    protected static $container = [];

    protected $writeConfig;
    protected $readConfigs;
    protected $key;

    /**
     * 创建生成器的时候需要提供读写连接配置,其中读配置是一个数组(可以有多个读连接)
     */
    protected function __construct(DBConfig $writeConfig = null, array $readConfigs = [])
    {
        $this->writeConfig = $writeConfig;
        $this->readConfigs = $readConfigs;
    }

    /**
     * 这里采用伪单例:同样的读写配置使用同一个生成器
     */
    public static function instance(DBConfig $writeConfig = null, array $readConfigs = []): CoConnectorBuilder
    {
        if ($writeConfig && !$readConfigs) {
            $readConfigs = [$writeConfig];
        }

        $key = self::calcKey($writeConfig, $readConfigs);
        if (!isset(self::$container[$key])) {
            $builder = new static($writeConfig, $readConfigs);
            $builder->key = $key;
            self::$container[$key] = $builder;
        }

        return self::$container[$key];
    }

    /**
     * 创建并返回 IConnector 对象
     * @param string $connType read/write
     * @return IConnector
     */
    public function build(string $connType = 'write'): IConnector
    {
        /** @var DBConfig */
        $config = $connType == 'read' ? $this->getReadConfig() : $this->writeConfig;

        if (!($config instanceof DBConfig)) {
            return null;
        }

        return new CoConnector($config->host, $config->user, $config->password, $config->database, $config->port, $config->timeout, $config->charset);
    }

    public function getKey(): string
    {
        return $this->key;
    }

    private function getReadConfig()
    {
        return $this->readConfigs[mt_rand(0, count($this->readConfigs) - 1)];
    }

    /**
     * 根据配置计算 key,完全相同的配置对应同样的 key
     */
    private static function calcKey(DBConfig $writeConfig = null, array $readConfigs = []): string
    {
        $joinStr = function ($conf)
        {
            $arr = [
                $conf->host,
                $conf->port,
                $conf->user,
                $conf->password,
                $conf->database,
                $conf->charset,
                $conf->timeout,
            ];
            sort($arr);
            return implode('-', $arr);
        };

        $readArr = [];
        foreach ($readConfigs as $readConfig) {
            $readArr[] = $joinStr($readConfig);
        }

        sort($readArr);

        return md5($joinStr($writeConfig) . implode('$', $readArr));
    }
}

该生成器是针对一主多从数据库架构的(包括未走读写分离的),如果使用是是其他数据库架构(如多主架构),则创建其他生成器即可。

同一套读写配置使用同一个生成器,对应的连接池也是同一个。

DBConfig 是一个 DTO 对象,不再阐述。

查询器的组装

使用工厂组装查询器实例:

class MySQLFactory
{
    /**
     * @param string $dbAlias 数据库配置别名,对应配置文件中数据库配置的 key
     */
    public static function build(string $dbAlias): Query
    {
        // 从配置文件获取数据库配置
        $dbConf = Config::getInstance()->getConf("mysql.$dbAlias");
        if (!$dbConf) {
            throw new ConfigNotFoundException("mysql." . $dbAlias);
        }

        if (!isset($dbConf['read']) && !isset($dbConf['write'])) {
            $writeConf = $dbConf;
            $readConfs = [$writeConf];
        } else {
            $writeConf = $dbConf['write'] ?? [];
            $readConfs = $dbConf['read'] ?? [$writeConf];
        }

        $writeConfObj = self::createConfObj($writeConf);
        $readConfObjs = [];

        foreach ($readConfs as $readConf) {
            $readConfObjs[] = self::createConfObj($readConf);
        }

        // 创建生成器、连接池、事务管理器
        // 在优化后版本中,用连接管理器代替连接池的位置即可
        $mySQLBuilder = CoConnectorBuilder::instance($writeConfObj, $readConfObjs);
        $pool = CoPool::instance($mySQLBuilder, $dbConf['pool']['size'] ?? 30);
        $transaction = new CoTransaction($pool);

        return new Query($transaction);
    }

    private static function createConfObj(array $config): DBConfig
    {
        if (!$config) {
            throw new Exception("config is null");
        }

        return new DBConfig(
            $config['host'],
            $config['user'],
            $config['password'],
            $config['database'],
            $config['port'] ?? 3306,
            $config['timeout'] ?? 3,
            $config['charset'] ?? 'utf8'
        );
    }
}

至此,整个查询器的编写、创建和使用就完成了。

总结

  1. 项目的开发需要划分模块,模块之间尽量减少耦合,通过接口通信(模块之间依赖接口而不是实现);
  2. 如果两个模块之间具有强耦合性,则往往意味着两者本身应该归并到同一个模块中,在其内部划分子模块,对外屏蔽内部细节,如本项目的连接模块和连接池模块;
  3. 如果模块之间存在不合常理的依赖关系,则意味着模块划分有问题,如本项目中的事务模块依赖连接池模块;
  4. 有问题的模块划分往往违反第一点(也就是迪米特法则),会造成模块暴露细节、过多的依赖关系,影响设计的灵活性、可扩展性,如本项目中事务模块依赖连接池模块(虽然是实现层面的依赖而非接口层面),造成要使用 CoTransaction 时必须同时使用连接池;
  5. 编写生产可用的项目时需要注意处理异常场景,如本项目中从连接池获取连接对象,以及在连接对象上执行 SQL 时的断线重连;
  6. 设计本身是迭代式的,并非一蹴而就、一次性设计即可完成的,本项目在开发过程中已经经历过几次小重构,在本次分析时仍然发现一些设计上的缺陷。重构属于项目开发的一部分;

优化版 UML 图: