30G 上億數據的超大文件,如何快速導入生產環境?

Hello,大家好,我是樓下小黑哥~

如果給你一個包含一億行數據的超大文件,讓你在一周之內將數據轉化導入生產資料庫,你會如何操作?

上面的問題其實是小黑哥前段時間接到一個真實的業務需求,將一個老系統歷史數據通過線下文件的方式遷移到新的生產系統。

由於老闆們已經敲定了新系統上線時間,所以只留給小黑哥一周的時間將歷史數據導入生產系統。

由於時間緊,而數據量又超大,所以小黑哥設計的過程想到一下解決辦法:

  • 拆分文件
  • 多執行緒導入

歡迎關注我的公眾號:小黑十一點半,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn

拆分文件

首先我們可以寫個小程式,或者使用拆分命令 split 將這個超大文件拆分一個個小文件。

-- 將一個大文件拆分成若干個小文件,每個文件 100000 行
split -l 100000 largeFile.txt -d -a 4 smallFile_

這裡之所以選擇先將大文件拆分,主要考慮到兩個原因:

第一如果程式直接讀取這個大文件,假設讀取一半的時候,程式突然宕機,這樣就會直接丟失文件讀取的進度,又需要重新開頭讀取。

而文件拆分之後,一旦小文件讀取結束,我們可以將小文件移動一個指定文件夾。

這樣即使應用程式宕機重啟,我們重新讀取時,只需要讀取剩餘的文件。

第二,一個文件,只能被一個應用程式讀取,這樣就限制了導入的速度。

而文件拆分之後,我們可以採用多節點部署的方式,水平擴展。每個節點讀取一部分文件,這樣就可以成倍的加快導入速度。

多執行緒導入

當我們拆分完文件,接著我們就需要讀取文件內容,進行導入。

之前拆分的時候,設置每個小文件包含 10w 行的數據。由於擔心一下子將 10w 數據讀取應用中,導致堆記憶體佔用過高,引起頻繁的 Full GC,所以下面採用流式讀取的方式,一行一行的讀取數據。

當然了,如果拆分之後文件很小,或者說應用的堆記憶體設置很大,我們可以直接將文件載入到應用記憶體中處理。這樣相對來說簡單一點。

逐行讀取的程式碼如下:

File file = ...
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line=iterator.nextLine();
        convertToDB(line);
    }

}

上面程式碼使用 commons-io 中的 LineIterator類,這個類底層使用了 BufferedReader 讀取文件內容。它將其封裝成迭代器模式,這樣我們可以很方便的迭代讀取。

如果當前使用 JDK1.8 ,那麼上述操作更加簡單,我們可以直接使用 JDK 原生的類 Files將文件轉成 Stream 方式讀取,程式碼如下:

Files.lines(Paths.get("文件路徑"), Charset.defaultCharset()).forEach(line -> {
    convertToDB(line);
});

其實仔細看下 Files#lines底層源碼,其實原理跟上面的 LineIterator類似,同樣也是封裝成迭代器模式。

多執行緒的引入存在的問題

上述讀取的程式碼寫起來不難,但是存在效率問題,主要是因為只有單執行緒在導入,上一行數據導入完成之後,才能繼續操作下一行。

為了加快導入速度,那我們就多來幾個執行緒,並發導入。

多執行緒我們自然將會使用執行緒池的方式,相關程式碼改造如下:

File file = ...;
ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
  			// 文件數量,假設文件包含 10W 行
        new ArrayBlockingQueue<>(10*10000),
  			 // guava 提供
        new ThreadFactoryBuilder().setNameFormat("test-%d").build());
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> {
            convertToDB(line);
        });
    }

}

上述程式碼中,每讀取到一行內容,就會直接交給執行緒池來執行。

我們知道執行緒池原理如下:

  1. 如果核心執行緒數未滿,將會直接創建執行緒執行任務。
  2. 如果核心執行緒數已滿,將會把任務放入到隊列中。
  3. 如果隊列已滿,將會再創建執行緒執行任務。
  4. 如果最大執行緒數已滿,隊列也已滿,那麼將會執行拒絕策略。

執行緒池執行流程圖

由於我們上述執行緒池設置的核心執行緒數為 5,很快就到達了最大核心執行緒數,後續任務只能被加入隊列。

為了後續任務不被執行緒池拒絕,我們可以採用如下方案:

  • 將隊列容量設置成很大,包含整個文件所有行數
  • 將最大執行緒數設置成很大,數量大於件所有行數

以上兩種方案都存在同樣的問題,第一種是相當於將文件所有內容載入到記憶體,將會佔用過多記憶體。

而第二種創建過多的執行緒,同樣也會佔用過多記憶體。

一旦記憶體佔用過多,GC 無法清理,就可能會引起頻繁的 Full GC,甚至導致 OOM,導致程式導入速度過慢。

解決這個問題,我們可以如下兩種解決方案:

  • CountDownLatch 批量執行
  • 擴展執行緒池

CountDownLatch 批量執行

JDK 提供的 CountDownLatch,可以讓主執行緒等待子執行緒都執行完成之後,再繼續往下執行。

利用這個特性,我們可以改造多執行緒導入的程式碼,主體邏輯如下:

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    // 存儲每個任務執行的行數
    List<String> lines = Lists.newArrayList();
    // 存儲非同步任務
    List<ConvertTask> tasks = Lists.newArrayList();
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        lines.add(line);
        // 設置每個執行緒執行的行數
        if (lines.size() == 1000) {
            // 新建非同步任務,注意這裡需要創建一個 List
            tasks.add(new ConvertTask(Lists.newArrayList(lines)));
            lines.clear();
        }
        if (tasks.size() == 10) {
            asyncBatchExecuteTask(tasks);
        }

    }
    // 文件讀取結束,但是可能還存在未被內容
    tasks.add(new ConvertTask(Lists.newArrayList(lines)));
    // 最後再執行一次
    asyncBatchExecuteTask(tasks);
}

這段程式碼中,每個非同步任務將會導入 1000 行數據,等積累了 10 個非同步任務,然後將會調用 asyncBatchExecuteTask 使用執行緒池非同步執行。

/**
 * 批量執行任務
 *
 * @param tasks
 */
private static void asyncBatchExecuteTask(List<ConvertTask> tasks) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
    for (ConvertTask task : tasks) {
        task.setCountDownLatch(countDownLatch);
        executorService.submit(task);
    }
    // 主執行緒等待非同步執行緒 countDownLatch 執行結束
    countDownLatch.await();
    // 清空,重新添加任務
    tasks.clear();
}

asyncBatchExecuteTask 方法內將會創建 CountDownLatch,然後主執行緒內調用 await方法等待所有非同步執行緒執行結束。

ConvertTask 非同步任務邏輯如下:

/**
 * 非同步任務
 * 等數據導入完成之後,一定要調用 countDownLatch.countDown()
 * 不然,這個主執行緒將會被阻塞,
 */
private static class ConvertTask implements Runnable {

    private CountDownLatch countDownLatch;

    private List<String> lines;

    public ConvertTask(List<String> lines) {
        this.lines = lines;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            for (String line : lines) {
                convertToDB(line);
            }
        } finally {
            countDownLatch.countDown();
        }
    }
}

ConvertTask任務類邏輯就非常簡單,遍歷所有行,將其導入到資料庫中。所有數據導入結束,調用 countDownLatch#countDown

一旦所有非同步執行緒執行結束,調用 countDownLatch#countDown,主執行緒將會被喚醒,繼續執行文件讀取。

雖然這種方式解決上述問題,但是這種方式,每次都需要積累一定任務數才能開始非同步執行所有任務。

另外每次都需要等待所有任務執行結束之後,才能開始下一批任務,批量執行消耗的時間等於最慢的非同步任務消耗的時間。

這種方式執行緒池中執行緒存在一定的閑置時間,那有沒有辦法一直壓榨執行緒池,讓它一直在幹活呢?

擴展執行緒池

回到最開始的問題,文件讀取導入,其實就是一個生產者-消費者消費模型。

主執行緒作為生產者不斷讀取文件,然後將其放置到隊列中。

非同步執行緒作為消費者不斷從隊列中讀取內容,導入到資料庫中。

一旦隊列滿載,生產者應該阻塞,直到消費者消費任務。

其實我們使用執行緒池的也是一個生產者-消費者消費模型,其也使用阻塞隊列。

那為什麼執行緒池在隊列滿載的時候,不發生阻塞?

這是因為執行緒池內部使用 offer 方法,這個方法在隊列滿載的時候不會發生阻塞,而是直接返回 。

那我們有沒有辦法在執行緒池隊列滿載的時候,阻塞主執行緒添加任務?

其實是可以的,我們自定義執行緒池拒絕策略,當隊列滿時改為調用 BlockingQueue.put 來實現生產者的阻塞。

RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                // should not be interrupted
            }
        }

    }
};

這樣一旦執行緒池滿載,主執行緒將會被阻塞。

使用這種方式之後,我們可以直接使用上面提到的多執行緒導入的程式碼。

ExecutorService executorService = new ThreadPoolExecutor(
        5,
        10,
        60,
        TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("test-%d").build(),
        (r, executor) -> {
            if (!executor.isShutdown()) {
                try {
                  	// 主執行緒將會被阻塞
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // should not be interrupted
                }
            }

        });
File file = new File("文件路徑");

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
    while (iterator.hasNext()) {
        String line = iterator.nextLine();
        executorService.submit(() -> convertToDB(line));
    }
}    

小結

一個超大的文件,我們可以採用拆分文件的方式,將其拆分成多份文件,然後部署多個應用程式提高讀取速度。

另外讀取過程我們還可以使用多執行緒的方式並發導入,不過我們需要注意執行緒池滿載之後,將會拒絕後續任務。

我們可以通過擴展執行緒池,自定義拒絕策略,使讀取主執行緒阻塞。

好了,今天文章內容就到這裡,不知道各位有沒有其他更好的解決辦法,歡迎留言討論。

歡迎關注我的公眾號:小黑十一點半,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn

Tags: