PHP MySQL 快速导入10万条数据

项目背景

数据来源:所有数据均为外部导入,最大数据量在10w+

输出数据:导出经过业务处理之后的数据

使用框架:fastadmin

 

涉及的问题:

1、数据读取

2、数据保存

 

使用数据:10w+

解决方案:

方案一:直接利用框架提供的功能导入Excel数据

结果:一分钟之后超时,最终执行完成时间在3分钟左右

分析:其中数据读取和数据保存(使用模型批量保存拆分为100,1000,10000)都十分耗时,而且在超时之后,系统其它功能无法响应。

可行性:不可行

 

方案二:将Excel数据换成csv格式

结果:一分钟之后还是超时,最终执行时间大约90s左右。

分析:读取csv格式的时候需要转码(这里很耗时,取消之后很快但是数据乱码了)。数据保存(使用模型批量保存拆分为100,1000,10000)部分依然很耗时。

可行性:不可行

 

方案三:取消转码,使用MySQL原生方式保存数据

结果:偶尔会成功(这里是单条数据方式插入),大约执行时间在60s左右。

分析:MySQL在执行sql语句的时候多次解析语句导致消耗很多时间,可以将语句合并为一条语句执行。

可行性:勉强可以

 

方案四:由于方案三可以知道多条语句执行比较耗时,修改为合并执行

结果:成功,大约耗时10-20s

分析:MySQL的语句是有大小限制,避免后续数据量大的时候出现错误。这里将数据拆分多次执行(10000,20000,50000 耗时差距在1s左右,基本可以看做没区别),最后将合并之后的sql语句在进行转码,最终耗时也在10-20s之间。
可行性:可以。

 

 

数据处理基础代码如下(网上代码修改,如有侵权,联系必删)

<?php

namespace app\common\library;
use think\Db;

class CsvReader {
    private $csv_file;
    private $spl_object = null;
    private $error;
    
    public function __construct($csv_file = '') {
        if($csv_file && file_exists($csv_file)) {
            $this->csv_file = $csv_file;
        }
    }
    
    public function set_csv_file($csv_file) {
        if(!$csv_file || !file_exists($csv_file)) {
            $this->error = 'File invalid';
            return false;
        }
        $this->csv_file = $csv_file;
        $this->spl_object = null;
    }
    
    public function get_csv_file() {
        return $this->csv_file;
    }
    
    private function _file_valid($file = '') {
        $file = $file ? $file : $this->csv_file;
        if(!$file || !file_exists($file)) {
            return false;
        }
        if(!is_readable($file)) {
            return false;
        }
        return true;
    }
    
    private function _open_file() {
        if(!$this->_file_valid()) {
            $this->error = 'File invalid';
            return false;
        }
        if($this->spl_object == null) {
            $this->spl_object = new \SplFileObject($this->csv_file, 'rb');
        }
        return true;
    }

    /**
     * 读取时候直接转码
     * @* @param integrity $start 数据长度
     * @* @param integrity $length 数据长度
     * @* @param Callback $call 需要处理业务回调方法
     * @* @param Array $para 表头名称与数据库表字段对应关系
     */
    public function get_data($start = 0, $length = 0, $call=null, $para=null) {
        if(!$this->_open_file()) {
            return false;
        }
        $length = $length ? $length : $this->get_lines();
        $start = $start - 1;
        $start = ($start < 0) ? 0 : $start;
        
        $data = [];
        $this->spl_object->seek($start);
        $rowindex=[];
        $tpara = [];
        if (!empty($para)) {
            $thead = (array)$this->spl_object->fgetcsv();
            $index = 0;
            foreach ($thead as $key => $val) {
                $encoding = mb_detect_encoding($val, ['utf-8', 'gbk', 'latin1', 'big5']);
                if ($encoding != 'utf-8') {
                    $val = mb_convert_encoding($val, 'utf-8', $encoding);
                }
                if (isset($para[$val])) {
                    $rowindex[] = $index;
                    $tpara[$index] = $para[$val];
                }
                
                $index++;
            }
            $this->spl_object->next();
        }

        while ($length-- && !$this->spl_object->eof()) {
            $tdata =(array)$this->spl_object->fgetcsv();
            $db = [];

            foreach ($rowindex as $index) {
                if(!isset($tdata[$index]))continue;

                $value = $tdata[$index];
                $value = is_null($value) ? '' : trim($value);
                $encoding = mb_detect_encoding($value, ['utf-8', 'gbk', 'latin1', 'big5']);
                if ($encoding != 'utf-8') {
                    $db[$tpara[$index]] = mb_convert_encoding($value, 'utf-8', $encoding);
                }
                else{
                    $db[$tpara[$index]] = $value;
                }
            }

            if ($call) {
                $t=call_user_func_array( $call ,[$db]);    
                
                if ($t) {
                    $data[] = $t;
                }        
            }else {                
                $data[]=$db;
            }
            $this->spl_object->next();
        }
        return $data;
    }

    /**
     * 读取时候不转码,并且保存到数据库
     * @* @param Int $start 数据长度
     * @* @param Int $length 数据长度
     * @* @param Callback $call 需要处理业务回调方法
     * @* @param Array $para 表头名称与数据库表字段对应关系
     * @* @param String $table 需要出入的表名
     */
    public function get_data1($length = 0, $start = 0,$call=null,$para=null,$table=null) {
        if(!$this->_open_file()) {
            return false;
        }
        $length = $length ? $length : $this->get_lines();
        $start = $start - 1;
        $start = ($start < 0) ? 0 : $start;
        
        $data = [];
        $this->spl_object->seek($start);
        $rowindex=[];
        
        $tpara = []; //表头索引与数据库字段对应关系
        if (!empty($para)) {
            $thead = (array)$this->spl_object->fgetcsv();
            // var_dump($thead);
            $index = 0;
            foreach ($thead as $key => $val) {
                $encoding = mb_detect_encoding($val, ['utf-8', 'gbk', 'latin1', 'big5']);
                if ($encoding != 'utf-8') {
                    $val = mb_convert_encoding($val, 'utf-8', $encoding);
                }
                if (isset($para[$val])) {
                    $rowindex[] = $index;
                    $tpara[$index] = $para[$val];
                }
                
                $index++;
            }
            $this->spl_object->next();
        }

        while ($length-- && !$this->spl_object->eof()) {
            $tdata =(array)$this->spl_object->fgetcsv();
            $db = [];

            foreach ($rowindex as $idx) {
                if(!isset($tdata[$idx]))continue;

                $value = $tdata[$idx];
                $value = is_null($value) ? '' : trim($value);
                $db[$tpara[$idx]] = $value;
            }

            // 如需要处理业务,数字可以,其他文字不行(未转码)
            if ($call) {
                $t=call_user_func_array( $call ,[$db]);    
                
                if ($t) {
                    $data[] = $t;
                }        
            }else {                
                $data[]=$db;
            }
            $this->spl_object->next();

            if (count($data)==10000) {            
                $this->insertsdb($table,$para,$data);
                $data=[];
            }
        }

        if (count($data)>0) {            
            $this->insertsdb($table,$para,$data);
        }
    }
    /**
     * @* @param String $table 表名
     * @* @param Array $para 要插入表的字段
     * @* @param Array $data 要插入表的数据
     */
    public function insertsdb($table,$para,$data)
    {                
        $sql = 'insert into '.$table.' (';
        $p = implode(',',$para);
        $sql .= $p.') values';

        $tmpval='';
        foreach ($data as $key => $value) {
            $v = implode('","',array_values($value));
            
            $tmpval .='("'.$v.'"),';
        }

        $sql .= rtrim($tmpval,',');
        
        // 利用事务也可以减少执行时间
        Db::startTrans();
        try{ 
            $encoding = mb_detect_encoding($sql, ['utf-8', 'gbk', 'latin1', 'big5']);
            $sql = mb_convert_encoding($sql, 'utf-8', $encoding);
            $sql = str_replace("\\","",$sql);
            Db::execute($sql);
            // 提交事务
            Db::commit();    
        } catch (\Exception $e) {
            // 回滚事务
            Db::rollback();
            
        }
    
    }
    public function get_lines() {
        if(!$this->_open_file()) {
            return false;
        }
        $this->spl_object->seek(filesize($this->csv_file));
        return $this->spl_object->key();
    }
    
    public function get_error() {
        return $this->error;
    }
}

 

调用代码如下

    public function import()
    {
        $file = $this->request->request('file');
        if (!$file) {
            $this->error(__('Parameter %s can not be empty', 'file'));
        }
        $filePath = ROOT_PATH . DS . 'public' . DS . $file;
        if (!is_file($filePath)) {
            $this->error(__('No results were found'));
        }
        $csv = new CsvReader($filePath);
        // 此处公司业务不便公布(大约有20个参数),根据自己业务自行调整
        $para =[
            '单位详细名称'=>'company_name',
            '省份'=>'province',
            '地市'=>'city',
            '区县'=>'country',
            '是否关闭'=>'is_shutdown',
            '公司Id'=>'company_id'
        ];
        $data = $csv->get_data1(0,0,[$this,'handdbCallBack'],$para,'fa_bs_pollutant_discharge_info');
        if($data)
        {
            $this->error('失败');
        } 
        $this->success();
    }

    /**
     * 业务处理回调
     */
    public function handdbCallBack($row)
    {

        if ($row) {

            // 当前编码是CP936
            // 这里可以使用 $encoding = mb_detect_encoding($str, ['utf-8', 'gbk', 'latin1', 'big5']) 
            // 获取当前数据编码
            $encoding='CP936';
            $test = mb_convert_encoding('是', $encoding, 'utf-8');
            if (isset($row['is_shutdown'])) {
                $row['is_shutdown']=$test==$row['is_shutdown']?1:0;
            }
            // 此处为雪花ID生成,自行百度
            $row['company_id'] = $this->snow->generateID();
            
            return $row;
        }
        return null;
    }

 

关于导出部分,可以直接导出csv格式,速度会很快。这里就不上代码了,自行百度。

 

以上是所有内容,以此记录一下,便于以后查阅。如有问题或者更好的解决方案欢迎各位指出。谢谢!!!