超大csv解析攻略
- 2019 年 10 月 4 日
- 筆記
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/linzhiqiang0316/article/details/100864935
前段时间遇到这样一个需求,解析csv文件数据,将数据封装批量插入数据库中。
咋一看确实没什么问题,但是看到文件大小的时候,差点没吐一口老血,文件大小2.1g!!!如果用传统的方式直接将csv文件流按行解析,然后封装成po对象,结果很明显,优雅的OOM掉了。
所以为了能够成功解析这个超大文件,博主查阅了大量的博客,终于攻克这个问题了。因为这个坑相对比较大,所以这边给大家分享一下,博主的心路历程,希望大家以后可以不掉到这个坑里面。
方案研究:
万能的钱
其实基于这种超大文件解析,有很多方案,最实在的办法就是加钱,把自己服务器内存怼上去,但是很可惜,我们公司没钱,所以只能从代码层面解决了。
现有工具
一开始博主也是想着,有没有现成的工具可以直接拿来使用,后来很遗憾的发现没有这样的工具,所以只能自己来开发这个工具了。
当然有可能是有这样的工具,但是博主没有发现,如果大家有更好的方案可以在文章下方留言哦。
核心问题点
解析超大csv文件且不会内存溢出,最常见的方案就是按行解析。这样的好处就是不仅可以快速解析,而且不会有内存溢出的风险。
传统流解析
那我们该如何实现按行解析的功能呢?之前博主想过直接用字符流,然后readLine()方法进行解析,但是如果你只解析前半截还好,如果你想解析最后一行,那就必须将前面的所有数据也加载内存中,所以这种按行读取也没有解决根本问题。
随机读写
那我们应该怎么办呢?大家不要着急,我们可以使用RandomAccessFile工具类来实现真正的按行解析。通过RandomAccessFile工具,我们可以跳到任意位置进行解析,但是这边大家需要注意的是,RandomAccessFile工具的下标单位是字节,所以没有readLine()这边简便的方案,所以是否解析到行数据,需要我们自己去判断。
善用工具
因为是csv文件解析,这边我用的是CsvParser工具来进行csv解析(CsvParser据官网介绍,它的解析速度在同类工具中,也是数一数二的存在)。
方案设计
那原理介绍完毕之后,我们该如何来设计这个流程呢?因为就算可以按行解析,但是数据一多也会出现问题,这边博主想到两种方案,下面给大家详细介绍一下。
休眠模式解析

从上面流程图可以看出来,第一种解析方案主要通过Thread.sleep(),线程休眠的方式实现批量按行解析的。
这种方案的好处是不需要借助第三方工具就可以完成,实现起来简单省事。
但是缺点也异常的明显,这种方案虽然可以在线程休眠期间,通过jvm垃圾回收来保障内存不会OOM,但是这种方式不稳定性因素太多,还是有可能发生OOM的风险。而且因为是通过线程休眠,所以单次执行的时间会非常久,有可能会导致线程崩溃等不可控问题发生。
MQ异步解析

通过MQ异步解析方案流程如上所示,这种方案的好处非常明显, 每次消费消息只解析一部分的数据,如果消费完毕之后,发现不是最后一条数据,则接着发送MQ消息,等待下次解析。通过这种异步方式,我们完全不用担心会出现上述的内存OOM等问题,但是这种方案技术实现比较困难,没有线程休眠的方式简便。
代码展示:
说了这么多,我们来具体看看代码的实现吧,毕竟理论再完善,如果没有代码也是扯淡。核心代码如下所示:
/** * csv文件解析(文件部分解析) * * @param sourcePath * @param charset * @param queryRows * @param position * @param isFirst * @throws IOException */ public static CsvDateDto readFileForPart(String sourcePath, String charset, long position, long queryRows, boolean isFirst) throws Exception { CsvDateDto csvDateDto = new CsvDateDto(); InputStream input = null; BufferedInputStream bufferedInputStream = null; BufferedReader reader = null; InputStreamReader inputStreamReader = null; // 全局csv数据 List<String[]> globalRows = new ArrayList<>(); try { //源文件 File files = new File(sourcePath); //得到映射读文件的通道 FileChannel channels = new RandomAccessFile(files, "r").getChannel(); // 声明读源文件对象 MappedByteBuffer mappedByteBuffer = null; // 文件总大小 long size = files.length(); // 需要获取的行数 queryRows = position + queryRows; if (queryRows > size) { throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION; } else if (queryRows <= 0) { throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION; } else { size = queryRows; } // 每次循环读取的固定个数 long pageSize = getPageSize(position, size); //初始读、写大小 long readSize = pageSize; // 最后一次读取位置 long lastPosition = 0; boolean lastPositionFlag = false; // 换行的次数,用来过滤头节点 long count = 0; long brCount = 0; // 文件的position开始位置(从第二行开始) long startPosition = 0; // 临时文件字节数组 byte[] tempReadDataForByte = null; while (position < size) { input = null; count++; //每次读源文件都重新构造对象 mappedByteBuffer = channels.map(FileChannel.MapMode.READ_ONLY, position, readSize); // 文件字节数组 byte[] readDataForByte = new byte[(int) readSize]; // 换行位置标志 boolean lastBrFlag = false; // 标志的位置 int lastBrIndex = 0; for (int i = 0; i < readSize; i++) { //从源文件读取字节 readDataForByte[i] = mappedByteBuffer.get(i); // 最后一次循环 if ((position + readSize) == size) { lastPositionFlag = true; } // byte的换行符号 if (readDataForByte[i] == 10) { lastBrIndex = i; lastBrFlag = true; if (startPosition == 0) { // 将index坐标赋值给startPosition startPosition = lastBrIndex + 1; } } } if (startPosition != 0 && brCount == 0) { brCount = count; } // 如果count=1,代表找到首行位置已经确定 if (isFirst && count == brCount && startPosition != 0) { readSize = lastBrIndex + 1; if (readSize > startPosition) { int newSize = (int) (lastBrIndex - startPosition); tempReadDataForByte = new byte[newSize]; int j = 0; for (int i = (int) startPosition; i < lastBrIndex; i++) { tempReadDataForByte[j] = readDataForByte[i]; j++; } input = new ByteArrayInputStream(tempReadDataForByte); } if (input == null) { //累加每次读写的字节 position += readSize; } } else { // 读取到是数据不是最后一行,需要对byte进行过滤 if (lastBrFlag && readSize != lastBrIndex) { readSize = lastBrIndex + 1; tempReadDataForByte = new byte[(int) lastBrIndex]; for (int i = 0; i < lastBrIndex; i++) { tempReadDataForByte[i] = readDataForByte[i]; } input = new ByteArrayInputStream(tempReadDataForByte); } else { // 如果lastBrFlag=true,说明本次读取到换行 if (lastBrFlag) { input = new ByteArrayInputStream(readDataForByte); } } } if (lastBrFlag && input != null) { // bufferedInputStream读取数据 bufferedInputStream = new BufferedInputStream(input); // 封装为字符流 inputStreamReader = new InputStreamReader(bufferedInputStream, Charset.forName(charset)); // 封装为字符缓存流 reader = new BufferedReader(inputStreamReader, 1 * 1024 * 1024); // 从reader中获取解析的记录 List<String[]> rows = getRowsData(reader, false).getRows(); globalRows.addAll(rows); // 清空集合,防止OOM rows.clear(); //累加每次读写的字节 position += readSize; } // 最后一次循环标志为true if (lastPositionFlag) { lastPosition = position; break; } //获取下一页size readSize = getNextPageSize(size, position); } // 是否是最后一次调度数据 if (lastPosition == files.length()) { csvDateDto.setLast(true); } else { csvDateDto.setLast(false); } csvDateDto.setLastPosition(lastPosition); csvDateDto.setGlobalRows(globalRows); return csvDateDto; } catch (IOException e) { logger.error("readFile--IO转化异常,错误信息为:{}", ExceptionUtil.formatException(e)); throw FileParseException.READ_FILE_EXCEPTION; } finally { // 释放流资源 if (input != null) { input.close(); } if (bufferedInputStream != null) { bufferedInputStream.close(); } if (reader != null) { reader.close(); } if (inputStreamReader != null) { inputStreamReader.close(); } } }
数据测试:
CsvDateDto csvDateDto = CsvFileUtil.readFileForPart("E:/home/data/test-2.csv", "utf-8", 0, 1024, false); System.out.println("下一行开始坐标:"+csvDateDto.getLastPosition()); List<String[]> rows = csvDateDto.getGlobalRows(); for (String[] row : rows) { System.out.println("解析数据:"+Arrays.toString(row)); }
测试结果:
下一行开始坐标:765 解析数据:[1436, 27, 33, 173, 3354.03, 14/3/2018 15:10:50, 5/6/2018 13:40:37, 14/3/2018, 199, us, null, 3354.03, 96100, 454, 165.96, 368.82, 0, 165.96, 368.82, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3354.03, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null] 解析数据:[1440, 27, 33, 203, 3887.90, 14/3/2018 16:15:38, 13/7/2018 19:33:19, 13/3/2018, 253, us, null, 3887.90, 71271, 367, 130.82, 379.77, 0, 130.82, 379.77, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3887.90, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]
可以看到我们解析了两行的数据,第三行的下一条开始坐标为765。
总结:
博主还是比较推荐采用MQ异步的方案,毕竟系统安全稳定比什么都重要。
大家以为这样就结束了吗?不不不!!!不管是采用MQ异步,还是线程休眠的方式都有一个很大的缺陷,那就是解析完csv时间会很久。如果系统对这个时效性要求比较高,那这两种方案都会被pass掉,那我们要如何进行改造呢?哈哈哈,这个坑就由聪明的童鞋们来思考喽~
今天的内容就讲到这边了,谢谢大家的阅读。