聯童科技基於incubator-dolphinscheduler從0到1構建大數據調度平台之路
- 2021 年 2 月 20 日
- 筆記
- DolphinScheduler, 大數據, 調度系統
聯童科技是一家智慧化母嬰童產業平台,從事母嬰童行業以及互聯網技術多年,擁有豐富的母嬰門店運營和系統開發經驗,在會員經營和商品經營方面,能夠圍繞會員需求,深入場景,更貼近合作夥伴和消費者,提供最優服務產品,公司致力於以技術來驅動母嬰童產業的發展,公司也希望藉助於大數據為客戶提供更多智慧數據分析和決策分析,大數據是公司重點發展的一部分,公司從成立初期起就搭建了大數據團隊,有了大數據團隊後,大數據調度平台的構建自然是最基礎也是最重要的環節。
一、為什麼選擇incubator-dolphinscheduler
1、incubator-dolphinscheduler是一個由中國公司發起的開源項目,中國本土社區成員非常活躍,更加容易去進行社區溝通,同時聯童也希望能加入到這個社區中,一起把這個由本土成員為主成立的社區做的更好。
2、incubator-dolphinscheduler 能夠支撐非常多的應用場景
- 以DAG圖的方式將Task按照任務的依賴關係關聯起來,可實時可視化監控任務的運行狀態
- 支援豐富的任務類型:Shell、MR、Spark、SQL(mysql、postgresql、hive、sparksql),Python,Sub_Process、Procedure,flink,datax,sqoop,http等
- 支援工作流定時調度、依賴調度、手動調度、手動暫停/停止/恢復,同時支援失敗重試/告警、從指定節點恢復失敗、Kill任務等操作
- 支援工作流優先順序、任務優先順序及任務的故障轉移及任務超時告警/失敗
- 支援工作流全局參數及節點自定義參數設置
- 支援資源文件的在線上傳/下載,管理等,支援在線文件創建、編輯
- 支援任務日誌在線查看及滾動、在線下載日誌等
- 實現集群HA,通過Zookeeper實現Master集群和Worker集群去中心化
- 支援對
Master/Worker
cpu load,memory,cpu在線查看 - 支援工作流運行歷史樹形/甘特圖展示、支援任務狀態統計、流程狀態統計
- 支援補數
- 支援多租戶
- 支援國際化
其中DAG圖 借鑒自spark ,在dolphinscheduler 一個工作流可以對應多個工作任務,每一個工作任務對應一個DAG中的節點。
3、incubator-dolphinscheduler在保證了高並發和高可用的設計時,架構思路也相對簡單,技術架構中沒有引入非常多的複雜技術組件,降低了學習和維護的成本。
incubator-dolphinscheduler在設計時,除了zookeeper外,沒有引入太多複雜的技術組件。整個架構以zookeeper 作為集群管理,採用去中心化思想進行設計。
二、incubator-dolphinscheduler功能的不足
1、無法支援串列調度策略
incubator-dolphinscheduler 在一開始設計時,只支援並行調度,不支援串列調度,而在聯童中,大部分場景都是需要串列運行的,也就是每一個工作流任務都只能有一個實例在運行,同一個工作流任務中必須要等前一個實例執行結束,下一個實例才能開始執行,這種場景大多出現在准實時任務中。
2、任務依賴不夠強大,只能支援被動等待依賴執行成功,無法主動觸發下游工作流實例運行
如下圖所示,只能支援在創建任務時,被動去等待依賴執行成功,無法在當前任務執行成功後,主動去觸發別的工作流任務執行。
3、部分模組中用戶體驗不足,並且在數據量大時,部分模組數據查詢性能較慢
4、缺少比較完備的監控體系
在 incubator-dolphinscheduler 只提供了一些簡單的監控,當有多大幾千個任務在運行時,很難做到完備監控,更是缺少對每一個任務運行的性能分析。
三、我們對於incubator-dolphinscheduler的功能升級開發
1、增加串列調度的支援
如下圖所示,我們在原有並行執行的基礎上,增加了串列執行方式。
在串列執行時,我們還增加了串列執行的隊列功能,每一任務都可以指定隊列的長度大小。
2、增加主動觸發下游工作流實例運行
如下圖所示,我們在原有並行執行的基礎上,增加主動觸發下游一個或者多個工作流實例運行。
運行後效果如下:
3、一些較大的Bug修復
聯童在使用 incubator-dolphinscheduler時,同樣也踩過不少的坑,這裡我們舉其中一個例子,比如在內部使用時,同事回饋最多的問題就是調度任務的日誌刷新不及時,有時候很久才能刷新出日誌。後來經過源碼分析,發現是源碼中存在了一些不太健壯的處理導致了這個問題。
incubator-dolphinscheduler 中AbstractCommandExecutor.java 部分源碼
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * //www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.worker.task; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; /** * abstract command executor */ public abstract class AbstractCommandExecutor { /** * rules for extracting application ID */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); protected StringBuilder varPool = new StringBuilder(); /** * process */ private Process process; /** * log handler */ protected Consumer<List<String>> logHandler; /** * logger */ protected Logger logger; /** * log list */ protected final List<String> logBuffer; /** * taskExecutionContext */ protected TaskExecutionContext taskExecutionContext; /** * taskExecutionContextCacheManager */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; public AbstractCommandExecutor(Consumer<List<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; this.logBuffer = Collections.synchronizedList(new ArrayList<>()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } /** * build process * * @param commandFile command file * @throws IOException IO Exception */ private void buildProcess(String commandFile) throws IOException { // setting up user to run commands List<String> command = new LinkedList<>(); //init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory processBuilder.directory(new File(taskExecutionContext.getExecutePath())); // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands command.add("sudo"); command.add("-u"); command.add(taskExecutionContext.getTenantCode()); command.add(commandInterpreter()); command.addAll(commandOptions()); command.add(commandFile); // setting commands processBuilder.command(command); process = processBuilder.start(); // print command printCommand(command); } .......... /** * get the standard output of the process * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { if (line.startsWith("${setValue(")) { varPool.append(line.substring("${setValue(".length(), line.length() - 2)); varPool.append("$VarPool$"); } else { logBuffer.add(line); lastFlushTime = flush(lastFlushTime); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); close(inReader); } } }); parseProcessOutputExecutorService.shutdown(); } ................ /** * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } /** * close buffer reader * * @param inReader in reader */ private void close(BufferedReader inReader) { if (inReader != null) { try { inReader.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } protected List<String> commandOptions() { return Collections.emptyList(); } protected abstract String buildCommandFilePath(); protected abstract String commandInterpreter(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; }
在這段源碼中,parseProcessOutput(Process process) 方法是負責任務日誌的獲取以及Flush。 但是由於採用了BufferedReader 中的readLine() 方法來讀取任務進程的process.getInputStream()日誌,由於readLine() 是一個阻塞方法,
flush(long lastFlushTime) 方法在處理時有一個判斷條件if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now – lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL),只有當日誌條數達到64條或者間隔1s時才會
flush。按理說,程式碼其實是要實現至少每隔1s會flash 一次日誌,但是由於readLine() 是一個阻塞方法,所以並不會一直在執行,而是readLine()必須是讀取到新數據後,才會執行flush方法。 那麼在出現1s內產生的任務日誌不滿足64條,而任務又很久沒有新日誌出現時,就會觸發這個bug。例如執行如下一個shell 腳本任務,由於每個執行步驟產生的日誌少,而且每個步驟執行的時間又很久,時間間隔很大,就會出現很久都不會刷新上一次產生的日誌。
#!/bin/bash echo "hello world" exec 10m sleep 100000s echo "hello world2" exec 10m sleep 100000s echo "hello world3" exec 10m sleep 100000s
之後我們對這段源碼進行了重寫,採用了兩個執行緒進行處理,一個執行緒負責readline(),一個執行緒負責flush.做到在readline()方法的執行緒阻塞時,不影響flush執行緒的處理。
public abstract class AbstractCommandExecutor { /** * rules for extracting application ID */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); /** * process */ private Process process; /** * log handler */ protected Consumer<List<String>> logHandler; /** * logger */ protected Logger logger; /** * log list */ protected final List<String> logBuffer; protected boolean logOutputIsScuccess = false; /** * taskExecutionContext */ protected TaskExecutionContext taskExecutionContext; /** * taskExecutionContextCacheManager */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; ......... /** * get the standard output of the process * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); getOutputLogService.submit(() -> { BufferedReader inReader = null; try { inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); String line;while ((line = inReader.readLine()) != null) { logBuffer.add(line); } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { logOutputIsScuccess = true; close(inReader); } }); getOutputLogService.shutdown(); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); parseProcessOutputExecutorService.submit(() -> { try { long lastFlushTime = System.currentTimeMillis(); while (logBuffer.size() > 0 || !logOutputIsScuccess) { if (logBuffer.size() > 0) { lastFlushTime = flush(lastFlushTime); } else { Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { clear(); } }); parseProcessOutputExecutorService.shutdown(); } ....... /** * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) throws InterruptedException { long now = System.currentTimeMillis(); /** * when log buffer siz or flush time reach condition , then flush */ if (logBuffer.size() >= Constants.DEFAULT_LOG_ROWS_NUM || now - lastFlushTime > Constants.DEFAULT_LOG_FLUSH_INTERVAL) { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); logBuffer.clear(); } return lastFlushTime; } ....... }
4、將調度系統的監控接入到prometheus和grafana中
incubator-dolphinscheduler 只提供了一些如下的簡單實時監控,尤其缺少對任務的監控。
聯童在此基礎上,引入了prometheus和grafana。
使用prometheus和grafana 不但可以監控到調度系統任務的總體運行,也可以監控到單個任務的運行耗時曲線等。
5、對incubator-dolphinscheduler 的性能優化
待稍後晚點補充
四、聯童對於開源社區的擁抱和回饋
聯童雖然是一家新興起的母嬰童公司,但是在成立的初始,就秉承著以技術來驅動母嬰童產業的發展,公司擁有一個非常好的技術團隊,也一直在擁抱開源社區,目前已經引入了incubator-dolphinscheduler、prometheus、grafana 、hadoop、spark、flink、hive、presto……等很多開源項目來支撐公司的技術驅動。在未來,聯童也一定回不斷的去回饋開源社區,去提供更多的Pull requests,貢獻自己的一份力量。