超大csv解析攻略

  • 2019 年 10 月 4 日
  • 筆記

版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。

本文鏈接:https://blog.csdn.net/linzhiqiang0316/article/details/100864935

前段時間遇到這樣一個需求,解析csv文件數據,將數據封裝批量插入資料庫中。

咋一看確實沒什麼問題,但是看到文件大小的時候,差點沒吐一口老血,文件大小2.1g!!!如果用傳統的方式直接將csv文件流按行解析,然後封裝成po對象,結果很明顯,優雅的OOM掉了。

所以為了能夠成功解析這個超大文件,部落客查閱了大量的部落格,終於攻克這個問題了。因為這個坑相對比較大,所以這邊給大家分享一下,部落客的心路歷程,希望大家以後可以不掉到這個坑裡面。

方案研究:

萬能的錢

其實基於這種超大文件解析,有很多方案,最實在的辦法就是加錢,把自己伺服器記憶體懟上去,但是很可惜,我們公司沒錢,所以只能從程式碼層面解決了。

現有工具

一開始部落客也是想著,有沒有現成的工具可以直接拿來使用,後來很遺憾的發現沒有這樣的工具,所以只能自己來開發這個工具了。

當然有可能是有這樣的工具,但是部落客沒有發現,如果大家有更好的方案可以在文章下方留言哦。

核心問題點

解析超大csv文件且不會記憶體溢出,最常見的方案就是按行解析。這樣的好處就是不僅可以快速解析,而且不會有記憶體溢出的風險。

傳統流解析

那我們該如何實現按行解析的功能呢?之前部落客想過直接用字元流,然後readLine()方法進行解析,但是如果你只解析前半截還好,如果你想解析最後一行,那就必須將前面的所有數據也載入記憶體中,所以這種按行讀取也沒有解決根本問題。

隨機讀寫

那我們應該怎麼辦呢?大家不要著急,我們可以使用RandomAccessFile工具類來實現真正的按行解析。通過RandomAccessFile工具,我們可以跳到任意位置進行解析,但是這邊大家需要注意的是,RandomAccessFile工具的下標單位是位元組,所以沒有readLine()這邊簡便的方案,所以是否解析到行數據,需要我們自己去判斷。

善用工具

因為是csv文件解析,這邊我用的是CsvParser工具來進行csv解析(CsvParser據官網介紹,它的解析速度在同類工具中,也是數一數二的存在)。

方案設計

那原理介紹完畢之後,我們該如何來設計這個流程呢?因為就算可以按行解析,但是數據一多也會出現問題,這邊部落客想到兩種方案,下面給大家詳細介紹一下。

休眠模式解析

從上面流程圖可以看出來,第一種解析方案主要通過Thread.sleep(),執行緒休眠的方式實現批量按行解析的。

這種方案的好處是不需要藉助第三方工具就可以完成,實現起來簡單省事。

但是缺點也異常的明顯,這種方案雖然可以在執行緒休眠期間,通過jvm垃圾回收來保障記憶體不會OOM,但是這種方式不穩定性因素太多,還是有可能發生OOM的風險。而且因為是通過執行緒休眠,所以單次執行的時間會非常久,有可能會導致執行緒崩潰等不可控問題發生。

MQ非同步解析

通過MQ非同步解析方案流程如上所示,這種方案的好處非常明顯, 每次消費消息只解析一部分的數據,如果消費完畢之後,發現不是最後一條數據,則接著發送MQ消息,等待下次解析。通過這種非同步方式,我們完全不用擔心會出現上述的記憶體OOM等問題,但是這種方案技術實現比較困難,沒有執行緒休眠的方式簡便。

程式碼展示:

說了這麼多,我們來具體看看程式碼的實現吧,畢竟理論再完善,如果沒有程式碼也是扯淡。核心程式碼如下所示:

  /**       * csv文件解析(文件部分解析)       *       * @param sourcePath       * @param charset       * @param queryRows       * @param position       * @param isFirst       * @throws IOException       */      public static CsvDateDto readFileForPart(String sourcePath, String charset, long position, long queryRows, boolean isFirst) throws Exception {          CsvDateDto csvDateDto = new CsvDateDto();          InputStream input = null;          BufferedInputStream bufferedInputStream = null;          BufferedReader reader = null;          InputStreamReader inputStreamReader = null;          // 全局csv數據          List<String[]> globalRows = new ArrayList<>();          try {              //源文件              File files = new File(sourcePath);              //得到映射讀文件的通道              FileChannel channels = new RandomAccessFile(files, "r").getChannel();              // 聲明讀源文件對象              MappedByteBuffer mappedByteBuffer = null;              // 文件總大小              long size = files.length();              // 需要獲取的行數              queryRows = position + queryRows;              if (queryRows > size) {                  throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;              } else if (queryRows <= 0) {                  throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;              } else {                  size = queryRows;              }              // 每次循環讀取的固定個數              long pageSize = getPageSize(position, size);              //初始讀、寫大小              long readSize = pageSize;              // 最後一次讀取位置              long lastPosition = 0;              boolean lastPositionFlag = false;              // 換行的次數,用來過濾頭節點              long count = 0;              long brCount = 0;              // 文件的position開始位置(從第二行開始)              long startPosition = 0;              // 臨時文件位元組數組              byte[] tempReadDataForByte = null;              while (position < size) {                  input = null;                  count++;                  //每次讀源文件都重新構造對象                  mappedByteBuffer = channels.map(FileChannel.MapMode.READ_ONLY, position, readSize);                  // 文件位元組數組                  byte[] readDataForByte = new byte[(int) readSize];                  // 換行位置標誌                  boolean lastBrFlag = false;                  // 標誌的位置                  int lastBrIndex = 0;                  for (int i = 0; i < readSize; i++) {                      //從源文件讀取位元組                      readDataForByte[i] = mappedByteBuffer.get(i);                      // 最後一次循環                      if ((position + readSize) == size) {                          lastPositionFlag = true;                      }                      // byte的換行符號                      if (readDataForByte[i] == 10) {                          lastBrIndex = i;                          lastBrFlag = true;                          if (startPosition == 0) {                              // 將index坐標賦值給startPosition                              startPosition = lastBrIndex + 1;                          }                      }                  }                  if (startPosition != 0 && brCount == 0) {                      brCount = count;                  }                  // 如果count=1,代表找到首行位置已經確定                  if (isFirst && count == brCount && startPosition != 0) {                      readSize = lastBrIndex + 1;                      if (readSize > startPosition) {                          int newSize = (int) (lastBrIndex - startPosition);                          tempReadDataForByte = new byte[newSize];                          int j = 0;                          for (int i = (int) startPosition; i < lastBrIndex; i++) {                              tempReadDataForByte[j] = readDataForByte[i];                              j++;                          }                          input = new ByteArrayInputStream(tempReadDataForByte);                      }                      if (input == null) {                          //累加每次讀寫的位元組                          position += readSize;                      }                  } else {                      // 讀取到是數據不是最後一行,需要對byte進行過濾                      if (lastBrFlag && readSize != lastBrIndex) {                          readSize = lastBrIndex + 1;                          tempReadDataForByte = new byte[(int) lastBrIndex];                          for (int i = 0; i < lastBrIndex; i++) {                              tempReadDataForByte[i] = readDataForByte[i];                          }                          input = new ByteArrayInputStream(tempReadDataForByte);                      } else {                          // 如果lastBrFlag=true,說明本次讀取到換行                          if (lastBrFlag) {                              input = new ByteArrayInputStream(readDataForByte);                          }                      }                  }                  if (lastBrFlag && input != null) {                      // bufferedInputStream讀取數據                      bufferedInputStream = new BufferedInputStream(input);                      // 封裝為字元流                      inputStreamReader = new InputStreamReader(bufferedInputStream, Charset.forName(charset));                      // 封裝為字元快取流                      reader = new BufferedReader(inputStreamReader, 1 * 1024 * 1024);                      // 從reader中獲取解析的記錄                      List<String[]> rows = getRowsData(reader, false).getRows();                      globalRows.addAll(rows);                      // 清空集合,防止OOM                      rows.clear();                      //累加每次讀寫的位元組                      position += readSize;                  }                  // 最後一次循環標誌為true                  if (lastPositionFlag) {                      lastPosition = position;                      break;                  }                  //獲取下一頁size                  readSize = getNextPageSize(size, position);              }              // 是否是最後一次調度數據              if (lastPosition == files.length()) {                  csvDateDto.setLast(true);              } else {                  csvDateDto.setLast(false);              }              csvDateDto.setLastPosition(lastPosition);              csvDateDto.setGlobalRows(globalRows);              return csvDateDto;          } catch (IOException e) {              logger.error("readFile--IO轉化異常,錯誤資訊為:{}", ExceptionUtil.formatException(e));              throw FileParseException.READ_FILE_EXCEPTION;          } finally {              // 釋放流資源              if (input != null) {                  input.close();              }              if (bufferedInputStream != null) {                  bufferedInputStream.close();              }              if (reader != null) {                  reader.close();              }              if (inputStreamReader != null) {                  inputStreamReader.close();              }          }      }

數據測試:

CsvDateDto csvDateDto = CsvFileUtil.readFileForPart("E:/home/data/test-2.csv", "utf-8", 0, 1024, false);  System.out.println("下一行開始坐標:"+csvDateDto.getLastPosition());  List<String[]> rows =  csvDateDto.getGlobalRows();  for (String[] row : rows) {      System.out.println("解析數據:"+Arrays.toString(row));  }

測試結果:

下一行開始坐標:765  解析數據:[1436, 27, 33, 173, 3354.03, 14/3/2018 15:10:50, 5/6/2018 13:40:37, 14/3/2018, 199, us, null, 3354.03, 96100, 454, 165.96, 368.82, 0, 165.96, 368.82, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3354.03, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]  解析數據:[1440, 27, 33, 203, 3887.90, 14/3/2018 16:15:38, 13/7/2018 19:33:19, 13/3/2018, 253, us, null, 3887.90, 71271, 367, 130.82, 379.77, 0, 130.82, 379.77, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3887.90, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]

可以看到我們解析了兩行的數據,第三行的下一條開始坐標為765。

總結:

部落客還是比較推薦採用MQ非同步的方案,畢竟系統安全穩定比什麼都重要。

大家以為這樣就結束了嗎?不不不!!!不管是採用MQ非同步,還是執行緒休眠的方式都有一個很大的缺陷,那就是解析完csv時間會很久。如果系統對這個時效性要求比較高,那這兩種方案都會被pass掉,那我們要如何進行改造呢?哈哈哈,這個坑就由聰明的童鞋們來思考嘍~

今天的內容就講到這邊了,謝謝大家的閱讀。