PowerJob 應對龐大任務的錦囊妙計:MapReduce

本文適合有 Java 基礎知識的人群

作者:HelloGitHub-Salieri

HelloGitHub 推出的《講解開源項目》系列。講解 PowerJob 系列即將接近尾聲,本系列的乾貨你還喜歡嗎?歡迎留言說下你的感受和後面想看的內容。

項目地址:

//github.com/KFCFans/PowerJob

一、MapReduce 概念介紹

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念 Map(映射)和 Reduce(歸約),是它們的主要思想,都是從函數式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分佈式並行編程的情況下,將自己的程序運行在分佈式系統上。當前的軟件實現是指定一個 Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的 Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

以上這一大段不算難懂的文字就是 MapReduce 的官方概念,從「大規模數據集」這個關鍵字可以看出,MapReduce 是面向大數據處理領域設計的,是分治思想的一種經典實現,簡單概括下就是把一大坨數據通過 Map 方法切分為較小的、單機能夠處理的數據塊進行處理(shuffle),處理完成後通過 Reduce 方法匯總結果,具體流程如下圖所示。

二、需求背景

PowerJob 作為任務調度中間件,核心職責是負責任務的調度。而 MapReduce 作為一個大數據處理模型,核心功能是大規模數據的並行處理。從表象看,PowerJob 和 MapReduce 純屬八杆子打不着的關係~相信很多人第一眼看到 PowerJob 和 MapReduce 這兩個關鍵詞一起出現時,都會有以下心理活動:

「你一個任務調度框架咋就硬要扯上 MapReduce 那麼高端的概念呢?就硬蹭唄?」

其實這個問題,換個角度來思考,就能找到答案。

一般來講,需要定時調度執行的都是離線數據同步任務,對於一些有一定體量的業務來說,這個離線數據規模可能很大,單機無法很好的完成計算。為了解決這個問題,目前市面上的調度框架普遍支持靜態分片這種相對比較簡陋的方式來完成分佈式計算,即通過指定分片數量來調動固定數量的機器執行固定區間的任務。但很顯然,這種方式非常不靈活,局限性也非常大。

那麼如何實現複雜且龐大任務的分佈式計算呢?阿里巴巴的 SchedulerX 團隊給出了 MapReduce 這樣的答案。通過自己編程的形式,實現 Map 方法,完成任務的切分,再通過 Reduce 匯總子任務結果,即可完成高度可定製的分佈式計算。

PowerJob 的 MapReduce 實現便是借鑒了這一先進的思想,這裡再次感謝 SchedulerX 團隊~

三、示例用法

在 PowerJob 中,MapReduce 不再是高高在上、難以觸碰的概念。得益於強大的底層實現和優雅的 API 設計,開發者僅需要寥寥數行代碼便可完成大型任務的分佈式計算,具體示例如下。

對於有分佈式計算需求的任務,我們需要繼承特定的抽象類 MapReduceProcessor 來開啟分佈式計算能力,該接口要求開發者實現兩個方法,分別是 processreduce。前者負責任務的具體執行,後者負責彙集所有子任務得出具體的結果。同時,該抽象類默認提供兩個可用方法:isRootTaskmap。通過調用 isRootTask 方法可以判斷出當前 Task 是否為根任務,如果是根任務,則進行任務的切分(PowerJob 支持任意級 map,並不只有在根任務才能切分任務),然後調用 map 方法分發子任務。

下面放一段簡單的代碼示例幫助大家理解。下面這段代碼模擬了目前市面上主流的「靜態分片」式分佈式處理,即通過控制台指定分片數量和參數(比如分3片,分片參數為:1=a&2=b&3=c)來控制參與計算的機器數量和起始參數。雖然是「殺雞焉用牛刀」的示例,不過還是能幫助大家很好理解 PowerJob MapReduce 處理器的強大之處!

首先,我們通過 context 的 getJobParams 方法獲取控制台配置的參數,即分片參數 1=a&2=b&3=c。這個分片參數代表現在需要有 3 台機器參與執行,每台機器上子任務的起始參數分別為 a、b、c。因此,我們可以根據該規則創建子任務對象 SubTask,傳入分片索引 index 和 分片參數 params。

完成子任務的切分後,即可調用 map 方法完成任務的分發。

分發後該子任務會再次進入 process 方法,只不過本次是以 SubTask 而不是 RootTask 的身份進入。我們可以通過 context.getSubTask() 方法獲取之前 map 出去的對象,該方法的返回值是 Object,因此我們需要使用 Java instaneof 關鍵字判斷類型(當然,如果沒有多級 map,那麼該對象只可能是 SubTask 類型,直接強轉即可),如果該對象為 SubTask 類型,即進行了子任務處理階段,開始編寫子任務處理邏輯即可。

當所有子任務執行完畢後,PowerJob 會調用 reduce 方法,傳入所有子任務的運行結果,便於開發者構建該任務的最終結果。

@Component
public class StaticSliceProcessor extends MapReduceProcessor {

    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        
        // root task 負責分發任務
        if (isRootTask()) {
            // 從控制台傳遞分片參數,假設格式為KV:1=a&2=b&3=c
            String jobParams = context.getJobParams();
            Map<String, String> paramsMap = Splitter.on("&").withKeyValueSeparator("=").split(jobParams);

            List<SubTask> subTasks = Lists.newLinkedList();
            paramsMap.forEach((k, v) -> subTasks.add(new SubTask(Integer.parseInt(k), v)));
            return map(subTasks, "SLICE_TASK");
        }

        Object subTask = context.getSubTask();
        if (subTask instanceof SubTask) {
            // 實際處理
            // 當然,如果覺得 subTask 還是很大,也可以繼續分發哦
            
            return new ProcessResult(true, "subTask:" + ((SubTask) subTask).getIndex() + " process successfully");
        }
        return new ProcessResult(false, "UNKNOWN BUG");
    }

    @Override
    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
        // 按需求做一些統計工作... 不需要的話,直接使用 Map 處理器即可
        return new ProcessResult(true, "xxxx");
    }

    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    private static class SubTask {
        private int index;
        private String params;
    }
}

四、原理實現

PowerJob 的 MapReduce 思想主要來源於《Schedulerx2.0 分佈式計算原理&最佳實踐》這篇文章。

出於功能職責的劃分(powerjob-server 僅負責任務的調度和運維),整個 MapReduce 任務的計算由執行器 powerjob-worker 自主完成。

為了便於模型的設計和功能的劃分,PowerJob 為執行器節點分配了 3 種角色,分別是 TaskTracker、ProcessorTracker 和 Processor。

  • TaskTracker 是每一個任務的主節點,充當集群中的 master 角色,因此每個任務每次只會產生一個 TaskTracker。它負責子任務的分發、狀態監控和集群中各執行節點的健康檢查,並定期將任務的運行時信息上報給 server。
  • ProcessorTracker 是每一個執行器節點中負責執行器管理的角色,每個任務在每個執行器節點(JVM 實例)上都會產生一個 ProcessorTracker。它負責管理執行器節點任務的執行,包括接受來自 TaskTracker 的任務、上報本機任務執行情況和執行狀態等功能。
  • Processor 是每一個執行器節點中負責具體執行任務的角色,也就是真正的執行單元,每個任務在每個執行器節點都會生成若干個 Processor(沒錯!就是控制台「實例並發數」所決定的數量)。它接受來自 ProcessorTracker 派發的執行任務並完成計算。

當需要執行分佈式任務時,powerjob-server 會根據集群中各個 worker 節點的內存佔用、CPU 使用率和磁盤使用率進行健康度計算,得分最高的節點將作為本次任務的 master 節點,即承擔 TaskTracker 的職責。TaskTracker 在接收到來自 server 的任務執行請求時被創建,並完成三個階段的初始化:

  • 首先需要初始化內嵌的 H2 數據庫,用於存儲所有子任務的派發情況和執行情況。
  • 存儲就位後,TaskTracker 會根據 server 派發下來的任務內容,構建根任務,並將其持久化到內嵌數據庫。
  • 最後 TaskTracker 會創建一系列定時任務,包括子任務定時派發、子任務執行狀態檢查、worker 健康度檢查和任務整體執行狀態上報。

ProcessorTracker 在接收到來自 TaskTracker 的子任務執行請求時被創建,並根據請求中攜帶的任務信息構建出執行所需要的線程池和對應的處理器。當子任務的運行狀態發生變更後,ProcessorTracker 需要及時將最新狀態反饋給 TaskTracker。

至於 Processor,本質上就是封裝了每個子任務上下文信息的線程,由 ProcessorTracker 提交到執行線程池進行執行,並向上級彙報自己的執行狀態。

上圖清晰地展示了 PowerJob MapReduce 的工作原理,由於 MapReduce 確實算得上是非常複雜和精妙的實現,一篇文章的篇幅肯定是無法將細節說的一清二楚的。因此本文偏向於整體上的介紹,為大家講述核心組件的劃分依據和主要功能。如果對具體的細節有興趣,那麼源碼是最好的資料~在本文的指導下,我個人認為花不了一天時間就能差不多看懂~

五、最後

好了,以上就是本文的全部內容了~也是 PowerJob 技術專欄的收官之作。本來打算在這裡寫點「離別感言」,講一下自己這一路的心路歷程,不過越寫越長,都快趕上正文的篇幅了…so~就偷偷放到下期吧~


關注 HelloGitHub 公眾號