­

超大csv解析攻略

  • 2019 年 10 月 4 日
  • 筆記

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/linzhiqiang0316/article/details/100864935

前段时间遇到这样一个需求,解析csv文件数据,将数据封装批量插入数据库中。

咋一看确实没什么问题,但是看到文件大小的时候,差点没吐一口老血,文件大小2.1g!!!如果用传统的方式直接将csv文件流按行解析,然后封装成po对象,结果很明显,优雅的OOM掉了。

所以为了能够成功解析这个超大文件,博主查阅了大量的博客,终于攻克这个问题了。因为这个坑相对比较大,所以这边给大家分享一下,博主的心路历程,希望大家以后可以不掉到这个坑里面。

方案研究:

万能的钱

其实基于这种超大文件解析,有很多方案,最实在的办法就是加钱,把自己服务器内存怼上去,但是很可惜,我们公司没钱,所以只能从代码层面解决了。

现有工具

一开始博主也是想着,有没有现成的工具可以直接拿来使用,后来很遗憾的发现没有这样的工具,所以只能自己来开发这个工具了。

当然有可能是有这样的工具,但是博主没有发现,如果大家有更好的方案可以在文章下方留言哦。

核心问题点

解析超大csv文件且不会内存溢出,最常见的方案就是按行解析。这样的好处就是不仅可以快速解析,而且不会有内存溢出的风险。

传统流解析

那我们该如何实现按行解析的功能呢?之前博主想过直接用字符流,然后readLine()方法进行解析,但是如果你只解析前半截还好,如果你想解析最后一行,那就必须将前面的所有数据也加载内存中,所以这种按行读取也没有解决根本问题。

随机读写

那我们应该怎么办呢?大家不要着急,我们可以使用RandomAccessFile工具类来实现真正的按行解析。通过RandomAccessFile工具,我们可以跳到任意位置进行解析,但是这边大家需要注意的是,RandomAccessFile工具的下标单位是字节,所以没有readLine()这边简便的方案,所以是否解析到行数据,需要我们自己去判断。

善用工具

因为是csv文件解析,这边我用的是CsvParser工具来进行csv解析(CsvParser据官网介绍,它的解析速度在同类工具中,也是数一数二的存在)。

方案设计

那原理介绍完毕之后,我们该如何来设计这个流程呢?因为就算可以按行解析,但是数据一多也会出现问题,这边博主想到两种方案,下面给大家详细介绍一下。

休眠模式解析

从上面流程图可以看出来,第一种解析方案主要通过Thread.sleep(),线程休眠的方式实现批量按行解析的。

这种方案的好处是不需要借助第三方工具就可以完成,实现起来简单省事。

但是缺点也异常的明显,这种方案虽然可以在线程休眠期间,通过jvm垃圾回收来保障内存不会OOM,但是这种方式不稳定性因素太多,还是有可能发生OOM的风险。而且因为是通过线程休眠,所以单次执行的时间会非常久,有可能会导致线程崩溃等不可控问题发生。

MQ异步解析

通过MQ异步解析方案流程如上所示,这种方案的好处非常明显, 每次消费消息只解析一部分的数据,如果消费完毕之后,发现不是最后一条数据,则接着发送MQ消息,等待下次解析。通过这种异步方式,我们完全不用担心会出现上述的内存OOM等问题,但是这种方案技术实现比较困难,没有线程休眠的方式简便。

代码展示:

说了这么多,我们来具体看看代码的实现吧,毕竟理论再完善,如果没有代码也是扯淡。核心代码如下所示:

  /**       * csv文件解析(文件部分解析)       *       * @param sourcePath       * @param charset       * @param queryRows       * @param position       * @param isFirst       * @throws IOException       */      public static CsvDateDto readFileForPart(String sourcePath, String charset, long position, long queryRows, boolean isFirst) throws Exception {          CsvDateDto csvDateDto = new CsvDateDto();          InputStream input = null;          BufferedInputStream bufferedInputStream = null;          BufferedReader reader = null;          InputStreamReader inputStreamReader = null;          // 全局csv数据          List<String[]> globalRows = new ArrayList<>();          try {              //源文件              File files = new File(sourcePath);              //得到映射读文件的通道              FileChannel channels = new RandomAccessFile(files, "r").getChannel();              // 声明读源文件对象              MappedByteBuffer mappedByteBuffer = null;              // 文件总大小              long size = files.length();              // 需要获取的行数              queryRows = position + queryRows;              if (queryRows > size) {                  throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;              } else if (queryRows <= 0) {                  throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;              } else {                  size = queryRows;              }              // 每次循环读取的固定个数              long pageSize = getPageSize(position, size);              //初始读、写大小              long readSize = pageSize;              // 最后一次读取位置              long lastPosition = 0;              boolean lastPositionFlag = false;              // 换行的次数,用来过滤头节点              long count = 0;              long brCount = 0;              // 文件的position开始位置(从第二行开始)              long startPosition = 0;              // 临时文件字节数组              byte[] tempReadDataForByte = null;              while (position < size) {                  input = null;                  count++;                  //每次读源文件都重新构造对象                  mappedByteBuffer = channels.map(FileChannel.MapMode.READ_ONLY, position, readSize);                  // 文件字节数组                  byte[] readDataForByte = new byte[(int) readSize];                  // 换行位置标志                  boolean lastBrFlag = false;                  // 标志的位置                  int lastBrIndex = 0;                  for (int i = 0; i < readSize; i++) {                      //从源文件读取字节                      readDataForByte[i] = mappedByteBuffer.get(i);                      // 最后一次循环                      if ((position + readSize) == size) {                          lastPositionFlag = true;                      }                      // byte的换行符号                      if (readDataForByte[i] == 10) {                          lastBrIndex = i;                          lastBrFlag = true;                          if (startPosition == 0) {                              // 将index坐标赋值给startPosition                              startPosition = lastBrIndex + 1;                          }                      }                  }                  if (startPosition != 0 && brCount == 0) {                      brCount = count;                  }                  // 如果count=1,代表找到首行位置已经确定                  if (isFirst && count == brCount && startPosition != 0) {                      readSize = lastBrIndex + 1;                      if (readSize > startPosition) {                          int newSize = (int) (lastBrIndex - startPosition);                          tempReadDataForByte = new byte[newSize];                          int j = 0;                          for (int i = (int) startPosition; i < lastBrIndex; i++) {                              tempReadDataForByte[j] = readDataForByte[i];                              j++;                          }                          input = new ByteArrayInputStream(tempReadDataForByte);                      }                      if (input == null) {                          //累加每次读写的字节                          position += readSize;                      }                  } else {                      // 读取到是数据不是最后一行,需要对byte进行过滤                      if (lastBrFlag && readSize != lastBrIndex) {                          readSize = lastBrIndex + 1;                          tempReadDataForByte = new byte[(int) lastBrIndex];                          for (int i = 0; i < lastBrIndex; i++) {                              tempReadDataForByte[i] = readDataForByte[i];                          }                          input = new ByteArrayInputStream(tempReadDataForByte);                      } else {                          // 如果lastBrFlag=true,说明本次读取到换行                          if (lastBrFlag) {                              input = new ByteArrayInputStream(readDataForByte);                          }                      }                  }                  if (lastBrFlag && input != null) {                      // bufferedInputStream读取数据                      bufferedInputStream = new BufferedInputStream(input);                      // 封装为字符流                      inputStreamReader = new InputStreamReader(bufferedInputStream, Charset.forName(charset));                      // 封装为字符缓存流                      reader = new BufferedReader(inputStreamReader, 1 * 1024 * 1024);                      // 从reader中获取解析的记录                      List<String[]> rows = getRowsData(reader, false).getRows();                      globalRows.addAll(rows);                      // 清空集合,防止OOM                      rows.clear();                      //累加每次读写的字节                      position += readSize;                  }                  // 最后一次循环标志为true                  if (lastPositionFlag) {                      lastPosition = position;                      break;                  }                  //获取下一页size                  readSize = getNextPageSize(size, position);              }              // 是否是最后一次调度数据              if (lastPosition == files.length()) {                  csvDateDto.setLast(true);              } else {                  csvDateDto.setLast(false);              }              csvDateDto.setLastPosition(lastPosition);              csvDateDto.setGlobalRows(globalRows);              return csvDateDto;          } catch (IOException e) {              logger.error("readFile--IO转化异常,错误信息为:{}", ExceptionUtil.formatException(e));              throw FileParseException.READ_FILE_EXCEPTION;          } finally {              // 释放流资源              if (input != null) {                  input.close();              }              if (bufferedInputStream != null) {                  bufferedInputStream.close();              }              if (reader != null) {                  reader.close();              }              if (inputStreamReader != null) {                  inputStreamReader.close();              }          }      }

数据测试:

CsvDateDto csvDateDto = CsvFileUtil.readFileForPart("E:/home/data/test-2.csv", "utf-8", 0, 1024, false);  System.out.println("下一行开始坐标:"+csvDateDto.getLastPosition());  List<String[]> rows =  csvDateDto.getGlobalRows();  for (String[] row : rows) {      System.out.println("解析数据:"+Arrays.toString(row));  }

测试结果:

下一行开始坐标:765  解析数据:[1436, 27, 33, 173, 3354.03, 14/3/2018 15:10:50, 5/6/2018 13:40:37, 14/3/2018, 199, us, null, 3354.03, 96100, 454, 165.96, 368.82, 0, 165.96, 368.82, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3354.03, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]  解析数据:[1440, 27, 33, 203, 3887.90, 14/3/2018 16:15:38, 13/7/2018 19:33:19, 13/3/2018, 253, us, null, 3887.90, 71271, 367, 130.82, 379.77, 0, 130.82, 379.77, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3887.90, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]

可以看到我们解析了两行的数据,第三行的下一条开始坐标为765。

总结:

博主还是比较推荐采用MQ异步的方案,毕竟系统安全稳定比什么都重要。

大家以为这样就结束了吗?不不不!!!不管是采用MQ异步,还是线程休眠的方式都有一个很大的缺陷,那就是解析完csv时间会很久。如果系统对这个时效性要求比较高,那这两种方案都会被pass掉,那我们要如何进行改造呢?哈哈哈,这个坑就由聪明的童鞋们来思考喽~

今天的内容就讲到这边了,谢谢大家的阅读。