超大csv解析攻略
- 2019 年 10 月 4 日
- 筆記
版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/linzhiqiang0316/article/details/100864935
前段時間遇到這樣一個需求,解析csv文件數據,將數據封裝批量插入資料庫中。
咋一看確實沒什麼問題,但是看到文件大小的時候,差點沒吐一口老血,文件大小2.1g!!!如果用傳統的方式直接將csv文件流按行解析,然後封裝成po對象,結果很明顯,優雅的OOM掉了。
所以為了能夠成功解析這個超大文件,部落客查閱了大量的部落格,終於攻克這個問題了。因為這個坑相對比較大,所以這邊給大家分享一下,部落客的心路歷程,希望大家以後可以不掉到這個坑裡面。
方案研究:
萬能的錢
其實基於這種超大文件解析,有很多方案,最實在的辦法就是加錢,把自己伺服器記憶體懟上去,但是很可惜,我們公司沒錢,所以只能從程式碼層面解決了。
現有工具
一開始部落客也是想著,有沒有現成的工具可以直接拿來使用,後來很遺憾的發現沒有這樣的工具,所以只能自己來開發這個工具了。
當然有可能是有這樣的工具,但是部落客沒有發現,如果大家有更好的方案可以在文章下方留言哦。
核心問題點
解析超大csv文件且不會記憶體溢出,最常見的方案就是按行解析。這樣的好處就是不僅可以快速解析,而且不會有記憶體溢出的風險。
傳統流解析
那我們該如何實現按行解析的功能呢?之前部落客想過直接用字元流,然後readLine()方法進行解析,但是如果你只解析前半截還好,如果你想解析最後一行,那就必須將前面的所有數據也載入記憶體中,所以這種按行讀取也沒有解決根本問題。
隨機讀寫
那我們應該怎麼辦呢?大家不要著急,我們可以使用RandomAccessFile工具類來實現真正的按行解析。通過RandomAccessFile工具,我們可以跳到任意位置進行解析,但是這邊大家需要注意的是,RandomAccessFile工具的下標單位是位元組,所以沒有readLine()這邊簡便的方案,所以是否解析到行數據,需要我們自己去判斷。
善用工具
因為是csv文件解析,這邊我用的是CsvParser工具來進行csv解析(CsvParser據官網介紹,它的解析速度在同類工具中,也是數一數二的存在)。
方案設計
那原理介紹完畢之後,我們該如何來設計這個流程呢?因為就算可以按行解析,但是數據一多也會出現問題,這邊部落客想到兩種方案,下面給大家詳細介紹一下。
休眠模式解析

從上面流程圖可以看出來,第一種解析方案主要通過Thread.sleep(),執行緒休眠的方式實現批量按行解析的。
這種方案的好處是不需要藉助第三方工具就可以完成,實現起來簡單省事。
但是缺點也異常的明顯,這種方案雖然可以在執行緒休眠期間,通過jvm垃圾回收來保障記憶體不會OOM,但是這種方式不穩定性因素太多,還是有可能發生OOM的風險。而且因為是通過執行緒休眠,所以單次執行的時間會非常久,有可能會導致執行緒崩潰等不可控問題發生。
MQ非同步解析

通過MQ非同步解析方案流程如上所示,這種方案的好處非常明顯, 每次消費消息只解析一部分的數據,如果消費完畢之後,發現不是最後一條數據,則接著發送MQ消息,等待下次解析。通過這種非同步方式,我們完全不用擔心會出現上述的記憶體OOM等問題,但是這種方案技術實現比較困難,沒有執行緒休眠的方式簡便。
程式碼展示:
說了這麼多,我們來具體看看程式碼的實現吧,畢竟理論再完善,如果沒有程式碼也是扯淡。核心程式碼如下所示:
/** * csv文件解析(文件部分解析) * * @param sourcePath * @param charset * @param queryRows * @param position * @param isFirst * @throws IOException */ public static CsvDateDto readFileForPart(String sourcePath, String charset, long position, long queryRows, boolean isFirst) throws Exception { CsvDateDto csvDateDto = new CsvDateDto(); InputStream input = null; BufferedInputStream bufferedInputStream = null; BufferedReader reader = null; InputStreamReader inputStreamReader = null; // 全局csv數據 List<String[]> globalRows = new ArrayList<>(); try { //源文件 File files = new File(sourcePath); //得到映射讀文件的通道 FileChannel channels = new RandomAccessFile(files, "r").getChannel(); // 聲明讀源文件對象 MappedByteBuffer mappedByteBuffer = null; // 文件總大小 long size = files.length(); // 需要獲取的行數 queryRows = position + queryRows; if (queryRows > size) { throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION; } else if (queryRows <= 0) { throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION; } else { size = queryRows; } // 每次循環讀取的固定個數 long pageSize = getPageSize(position, size); //初始讀、寫大小 long readSize = pageSize; // 最後一次讀取位置 long lastPosition = 0; boolean lastPositionFlag = false; // 換行的次數,用來過濾頭節點 long count = 0; long brCount = 0; // 文件的position開始位置(從第二行開始) long startPosition = 0; // 臨時文件位元組數組 byte[] tempReadDataForByte = null; while (position < size) { input = null; count++; //每次讀源文件都重新構造對象 mappedByteBuffer = channels.map(FileChannel.MapMode.READ_ONLY, position, readSize); // 文件位元組數組 byte[] readDataForByte = new byte[(int) readSize]; // 換行位置標誌 boolean lastBrFlag = false; // 標誌的位置 int lastBrIndex = 0; for (int i = 0; i < readSize; i++) { //從源文件讀取位元組 readDataForByte[i] = mappedByteBuffer.get(i); // 最後一次循環 if ((position + readSize) == size) { lastPositionFlag = true; } // byte的換行符號 if (readDataForByte[i] == 10) { lastBrIndex = i; lastBrFlag = true; if (startPosition == 0) { // 將index坐標賦值給startPosition startPosition = lastBrIndex + 1; } } } if (startPosition != 0 && brCount == 0) { brCount = count; } // 如果count=1,代表找到首行位置已經確定 if (isFirst && count == brCount && startPosition != 0) { readSize = lastBrIndex + 1; if (readSize > startPosition) { int newSize = (int) (lastBrIndex - startPosition); tempReadDataForByte = new byte[newSize]; int j = 0; for (int i = (int) startPosition; i < lastBrIndex; i++) { tempReadDataForByte[j] = readDataForByte[i]; j++; } input = new ByteArrayInputStream(tempReadDataForByte); } if (input == null) { //累加每次讀寫的位元組 position += readSize; } } else { // 讀取到是數據不是最後一行,需要對byte進行過濾 if (lastBrFlag && readSize != lastBrIndex) { readSize = lastBrIndex + 1; tempReadDataForByte = new byte[(int) lastBrIndex]; for (int i = 0; i < lastBrIndex; i++) { tempReadDataForByte[i] = readDataForByte[i]; } input = new ByteArrayInputStream(tempReadDataForByte); } else { // 如果lastBrFlag=true,說明本次讀取到換行 if (lastBrFlag) { input = new ByteArrayInputStream(readDataForByte); } } } if (lastBrFlag && input != null) { // bufferedInputStream讀取數據 bufferedInputStream = new BufferedInputStream(input); // 封裝為字元流 inputStreamReader = new InputStreamReader(bufferedInputStream, Charset.forName(charset)); // 封裝為字元快取流 reader = new BufferedReader(inputStreamReader, 1 * 1024 * 1024); // 從reader中獲取解析的記錄 List<String[]> rows = getRowsData(reader, false).getRows(); globalRows.addAll(rows); // 清空集合,防止OOM rows.clear(); //累加每次讀寫的位元組 position += readSize; } // 最後一次循環標誌為true if (lastPositionFlag) { lastPosition = position; break; } //獲取下一頁size readSize = getNextPageSize(size, position); } // 是否是最後一次調度數據 if (lastPosition == files.length()) { csvDateDto.setLast(true); } else { csvDateDto.setLast(false); } csvDateDto.setLastPosition(lastPosition); csvDateDto.setGlobalRows(globalRows); return csvDateDto; } catch (IOException e) { logger.error("readFile--IO轉化異常,錯誤資訊為:{}", ExceptionUtil.formatException(e)); throw FileParseException.READ_FILE_EXCEPTION; } finally { // 釋放流資源 if (input != null) { input.close(); } if (bufferedInputStream != null) { bufferedInputStream.close(); } if (reader != null) { reader.close(); } if (inputStreamReader != null) { inputStreamReader.close(); } } }
數據測試:
CsvDateDto csvDateDto = CsvFileUtil.readFileForPart("E:/home/data/test-2.csv", "utf-8", 0, 1024, false); System.out.println("下一行開始坐標:"+csvDateDto.getLastPosition()); List<String[]> rows = csvDateDto.getGlobalRows(); for (String[] row : rows) { System.out.println("解析數據:"+Arrays.toString(row)); }
測試結果:
下一行開始坐標:765 解析數據:[1436, 27, 33, 173, 3354.03, 14/3/2018 15:10:50, 5/6/2018 13:40:37, 14/3/2018, 199, us, null, 3354.03, 96100, 454, 165.96, 368.82, 0, 165.96, 368.82, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3354.03, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null] 解析數據:[1440, 27, 33, 203, 3887.90, 14/3/2018 16:15:38, 13/7/2018 19:33:19, 13/3/2018, 253, us, null, 3887.90, 71271, 367, 130.82, 379.77, 0, 130.82, 379.77, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3887.90, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]
可以看到我們解析了兩行的數據,第三行的下一條開始坐標為765。
總結:
部落客還是比較推薦採用MQ非同步的方案,畢竟系統安全穩定比什麼都重要。
大家以為這樣就結束了嗎?不不不!!!不管是採用MQ非同步,還是執行緒休眠的方式都有一個很大的缺陷,那就是解析完csv時間會很久。如果系統對這個時效性要求比較高,那這兩種方案都會被pass掉,那我們要如何進行改造呢?哈哈哈,這個坑就由聰明的童鞋們來思考嘍~
今天的內容就講到這邊了,謝謝大家的閱讀。
