­

Apache DolphinScheduler新一代分佈式工作流任務調度平台實戰-中

@

架構設計

總體架構

image-20220801143454572

  • MasterServer:MasterServer採用分佈式無中心設計理念,MasterServer主要負責 DAG 任務切分、任務提交監控,並同時監聽其它MasterServer和WorkerServer的健康狀態。 MasterServer服務啟動時向Zookeeper註冊臨時節點,通過監聽Zookeeper臨時節點變化來進行容錯處理。 MasterServer基於netty提供監聽服務。該服務主要包含:
    • Distributed Quartz分佈式調度組件,主要負責定時任務的啟停操作,當quartz調起任務後,Master內部會有線程池具體負責處理任務的後續操作
    • MasterSchedulerThread是一個掃描線程,定時掃描數據庫中的 command 表,根據不同的命令類型進行不同的業務操作
    • MasterExecThread主要是負責DAG任務切分、任務提交監控、各種不同命令類型的邏輯處理
    • MasterTaskExecThread主要負責任務的持久化
  • WorkerServer:WorkerServer也採用分佈式無中心設計理念,WorkerServer主要負責任務的執行和提供日誌服務。 WorkerServer服務啟動時向Zookeeper註冊臨時節點,並維持心跳。 Server基於netty提供監聽服務。Worker該服務主要包含:
    • FetchTaskThread主要負責不斷從Task Queue中領取任務,並根據不同任務類型調用TaskScheduleThread對應執行器。
  • ZooKeeper:ZooKeeper服務,系統中的MasterServer和WorkerServer節點都通過ZooKeeper來進行集群管理和容錯。另外系統還基於ZooKeeper進行事件監聽和分佈式鎖。 我們也曾經基於Redis實現過隊列,不過我們希望DolphinScheduler依賴到的組件盡量地少,所以最後還是去掉了Redis實現。
  • Task Queue:提供任務隊列的操作,目前隊列也是基於Zookeeper來實現。由於隊列中存的信息較少,不必擔心隊列里數據過多的情況,實際上我們壓測過百萬級數據存隊列,對系統穩定性和性能沒影響。
  • Alert:提供告警相關接口,接口主要包括告警兩種類型的告警數據的存儲、查詢和通知功能。其中通知功能又有郵件通知SNMP(暫未實現)兩種。
  • API:API接口層,主要負責處理前端UI層的請求。該服務統一提供RESTful api向外部提供請求服務。 接口包括工作流的創建、定義、查詢、修改、發佈、下線、手工啟動、停止、暫停、恢復、從該節點開始執行等等。
  • UI:系統的前端頁面,提供系統的各種可視化操作界面。

啟動流程圖

image-20220801143556961

架構設計思想簡述

DolphinScheduler架構和實現中有非常多的優秀設計思想,詳細可以查閱官方說明,下面是核心點簡述:

  • 去中心化vs中心化
    • DolphinScheduler的去中心化是Master/Worker註冊到Zookeeper中,實現Master集群和Worker集群無中心,並使用Zookeeper分佈式鎖來選舉其中的一台Master或Worker為「管理者」來執行任務。
  • 分佈式鎖實踐
    • DolphinScheduler使用ZooKeeper分佈式鎖來實現同一時刻只有一台Master執行Scheduler,或者只有一台Worker執行任務的提交。
  • 線程不足循環等待問題
    • 如果一個DAG中沒有子流程,則如果Command中的數據條數大於線程池設置的閾值,則直接流程等待或失敗。
    • 如果一個大的DAG中嵌套了很多子流程,則會產生「死等」狀態。增加一種資源不足的Command類型,如果線程池不足,則將主流程掛起。這樣線程池就有了新的線程,可以讓資源不足掛起的流程重新喚醒執行。注意:Master Scheduler線程在獲取Command的時候是FIFO的方式執行的。
  • 容錯設計
    • 容錯分為服務宕機容錯和任務重試,服務宕機容錯又分為Master容錯和Worker容錯兩種情況。
  • 任務優先級設計
    • 按照不同流程實例優先級優先於同一個流程實例優先級優先於同一流程內任務優先級優先於同一流程內任務提交順序依次從高到低進行任務處理。
      • 流程定義的優先級是考慮到有些流程需要先於其他流程進行處理,這個可以在流程啟動或者定時啟動時配置,共有5級,依次為HIGHEST、HIGH、MEDIUM、LOW、LOWEST。
      • 任務的優先級也分為5級,依次為HIGHEST、HIGH、MEDIUM、LOW、LOWEST。
  • Logback和netty實現日誌訪問
    • DolphinScheduler的輕量級性,所以選擇了gRPC實現遠程訪問日誌信息。
    • 使用自定義Logback的FileAppender和Filter功能,實現每個任務實例生成一個日誌文件。

負載均衡

  • DolphinScheduler-Master 分配任務至 worker,默認配置為線性加權負載。由於路由是在客戶端做的,即 master 服務,可以更改 master.properties 中的 master.host.selector 來配置算法。eg:master.host.selector=random(不區分大小寫);提供了三種算法:

    • 加權隨機(random)
    • 平滑輪詢(roundrobin)
    • 線性負載(lowerweight)
  • Worker 負載均衡配置:配置文件 worker.properties,權重

    • 上述所有的負載算法都是基於權重來進行加權分配的,權重影響分流結果。你可以在 修改 worker.weight 的值來給不同的機器設置不同的權重。
    • 預熱:考慮到 JIT 優化,我們會讓 worker 在啟動後低功率的運行一段時間,使其逐漸達到最佳狀態,這段過程我們稱之為預熱。因此 worker 在啟動後,他的權重會隨着時間逐漸達到最大(默認十分鐘,我們沒有提供配置項,如果需要,你可以修改並提交相關的 PR)
  • 負載均衡算法細述

    • 隨機(加權):該算法比較簡單,即在符合的 worker 中隨機選取一台(權重會影響他的比重)。
    • 平滑輪詢(加權):加權輪詢算法一個明顯的缺陷。即在某些特殊的權重下,加權輪詢調度會生成不均勻的實例序列,這種不平滑的負載可能會使某些實例出現瞬時高負載的現象,導致系統存在宕機的風險。為了解決這個調度缺陷,我們提供了平滑加權輪詢算法。每台 worker 都有兩個權重,即 weight(預熱完成後保持不變),current_weight(動態變化),每次路由。都會遍歷所有的 worker,使其 current_weight+weight,同時累加所有 worker 的 weight,計為 total_weight,然後挑選 current_weight 最大的作為本次執行任務的 worker,與此同時,將這台 worker 的 current_weight-total_weight。
    • 線性加權(默認算法):該算法每隔一段時間會向註冊中心上報自己的負載信息。
      • 我們主要根據兩個信息來進行判斷
        • load 平均值(默認是 CPU 核數 *2)
        • 可用物理內存(默認是 0.3,單位是 G)
      • 如果兩者任何一個低於配置項,那麼這台 worker 將不參與負載。(即不分配流量),可以在 worker.properties 修改下面的屬性來自定義配置
        • worker.max.cpuload.avg=-1 (worker最大cpuload均值,只有高於系統cpuload均值時,worker服務才能被派發任務. 默認值為-1: cpu cores * 2)
        • worker.reserved.memory=0.3 (worker預留內存,只有低於系統可用內存時,worker服務才能被派發任務,單位為G)

    緩存

    由於在master-server調度過程中,會產生大量的數據庫讀取操作,如tenant,user,processDefinition等,一方面對DB產生很大的讀壓力,另一方面則會使整個核心調度流程變得緩慢;考慮到這部分業務數據是讀多寫少的場景,故引入了緩存模塊,以減少DB讀壓力,加快核心調度流程;緩存模塊採用spring-cache機制,可直接在spring配置文件中配置是否開啟緩存(默認none關閉), 緩存類型;

    • 目前採用caffeine進行緩存管理,可自由設置緩存相關配置,如緩存大小、過期時間等;
    • 緩存讀取:緩存採用spring-cache的註解,配置在相關的mapper層,可參考如:TenantMapper.
    • 緩存更新:業務數據的更新來自於api-server, 而緩存端在master-server, 故需要對api-server的數據更新做監聽(aspect切面攔截@CacheEvict),當需要進行緩存驅逐時會通知master-server,master-server接收到cacheEvictCommand後進行緩存驅逐;
    • 需要注意的是:緩存更新的兜底策略來自於用戶在caffeine中的過期策略配置,請結合業務進行配置;

    實戰使用

參數

參數優先級

DolphinScheduler 中所涉及的參數值的定義可能來自三種類型:

  • 全局參數:在工作流保存頁面定義時定義的變量
  • 上游任務傳遞的參數:上游任務傳遞過來的參數
  • 本地參數:節點的自有變量,用戶在「自定義參數」定義的變量,並且用戶可以在工作流定義時定義該部分變量的值

因為參數的值存在多個來源,當參數名相同時,就需要會存在參數優先級的問題。DolphinScheduler 參數的優先級從高到低為:本地參數 > 上游任務傳遞的參數 > 全局參數

在上游任務傳遞的參數中,由於上游可能存在多個任務向下游傳遞參數,當上游傳遞的參數名稱相同時:

  • 下游節點會優先使用值為非空的參數
  • 如果存在多個值為非空的參數,則按照上游任務的完成時間排序,選擇完成時間最早的上游任務對應的參數

內置參數

基礎內置參數

變量名 聲明方式 含義
system.biz.date ${system.biz.date} 日常調度實例定時的定時時間前一天,格式為 yyyyMMdd
system.biz.curdate ${system.biz.curdate} 日常調度實例定時的定時時間,格式為 yyyyMMdd
system.datetime ${system.datetime} 日常調度實例定時的定時時間,格式為 yyyyMMddHHmmss

衍生內置參數

  • 支持代碼中自定義變量名,聲明方式:${變量名}。可以是引用 “系統參數”

  • 我們定義這種基準變量為 \([…] 格式的,\)[yyyyMMddHHmmss] 是可以任意分解組合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

  • 也可以通過以下兩種方式:

    • 使用add_months()函數,該函數用於加減月份, 第一個入口參數為[yyyyMMdd],表示返回時間的格式 第二個入口參數為月份偏移量,表示加減多少個月

      • 後 N 年:$[add_months(yyyyMMdd,12*N)]
      • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
      • 後 N 月:$[add_months(yyyyMMdd,N)]
      • 前 N 月:$[add_months(yyyyMMdd,-N)]
    • 直接加減數字 在自定義格式後直接「+/-」數字

      • 後 N 周:$[yyyyMMdd+7*N]
      • 前 N 周:$[yyyyMMdd-7*N]
      • 後 N 天:$[yyyyMMdd+N]
      • 前 N 天:$[yyyyMMdd-N]
      • 後 N 小時:$[HHmmss+N/24]
      • 前 N 小時:$[HHmmss-N/24]
      • 後 N 分鐘:$[HHmmss+N/24/60]
      • 前 N 分鐘:$[HHmmss-N/24/60]

本地參數和全局參數

本地參數的作用域:在任務定義頁面配置的參數,默認作用域僅限該任務,如果配置了參數傳遞則可將該參數作用到下游任務中。

使用前面shell演示工作流定義,在shell-nodeA添加本地參數:

  • dt:參數名
  • IN:IN 表示局部參數僅能在當前節點使用,OUT 表示局部參數可以向下游傳遞
  • DATE:數據類型,日期
  • $[yyyy-MM-dd]:自定義格式的衍生內置參數

image-20220802184914182

全局參數作用域:全局參數是指針對整個工作流的所有任務節點都有效的參數,在工作流定義頁面配置。本地任務引用全局參數的前提是已經定義了全局參數,使用方式和本地參數中的使用方式類似,但是參數的值需要配置成全局參數中的 key。

在shell-nodeB和shell-nodeC中的腳本輸出echo ${dt},然後點擊保存工作流,添加全局變量

image-20220802185556563

從任務實例點擊右邊查看日誌,可以看到shell-nodeA輸出的是當天日期2022-08-02,而shell-nodeB和shell-nodeC輸出的是前一天日期2022-08-01。

工作流傳參

DolphinScheduler 允許在任務間進行參數傳遞,目前傳遞方向僅支持上游單向傳遞給下游。目前支持這個特性的任務類型有Shell、SQL、Procedure。

當定義上游節點時,如果有需要將該節點的結果傳遞給有依賴關係的下游節點,需要在【當前節點設置】的【自定義參數】設置一個方向是 OUT 的變量。目前我們主要針對 SQL 和 SHELL 節點做了可以向下傳遞參數的功能。

  • 註:若節點之間沒有依賴關係,則局部參數無法通過上游傳遞。

下面通過 SHELL 任務來創建本地參數並賦值傳遞給下游,用戶需要傳遞參數,在定義 SHELL 腳本時,需要輸出格式為 ${setValue(key=value)} 的語句,key 為對應參數的 prop,value 為該參數的值。在shell-nodeA任務節點中的自定義參數中添加設置參數傳遞如下:

image-20220803135820606

在shell-nodeC中腳本輸出echo ${transfer},保存工作流定義-上線-運行,查看shell-nodeC任務示例的日誌,可以得到輸出了20220701達到工作流參數。

數據源管理

支持數據源

數據源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等數據源。

  • 點擊”數據源中心->創建數據源”,根據需求創建不同類型的數據源
  • 點擊”測試連接”,測試數據源是否可以連接成功(只有當數據源通過連接性測試後才能保存數據源)。

以 MySQL 為例,如果想要使用 MySQL 數據源,需要先在 mysql maven 倉庫 中下載對應版本的 JDBC 驅動,將其移入 api-server/libs 以及 worker-server/libs 文件夾中,最後重啟 api-serverworker-server 服務,即可使用 MySQL 數據源。如果你使用容器啟動 DolphinScheduler,同樣也是將 JDBC 驅動掛載放到以上兩個服務的對應路徑下後,重啟驅動即可。

創建MySQL數據源

由於前面部署DolphinScheduler集群已將MySQL的驅動複製到所有節點master和worker、api-server、alert-server上了,因此這裡可以開始創建MySQL數據源

  • 數據源:選擇 MYSQL
  • 數據源名稱:輸入數據源的名稱
  • 描述:輸入數據源的描述
  • IP 主機名:輸入連接 MySQL 的 IP
  • 端口:輸入連接 MySQL 的端口
  • 用戶名:設置連接 MySQL 的用戶名
  • 密碼:設置連接 MySQL 的密碼
  • 數據庫名:輸入連接 MySQL 的數據庫名稱
  • Jdbc 連接參數:用於 MySQL 連接的參數設置,以 JSON 形式填寫

image-20220802112921160

jdbc連接參數如下:

{
    "useSSL": "false",
    "useUnicode": "true",
    "characterEncoding": "utf-8",
    "allowMultiQueries": "true",
    "zeroDateTimeBehavior": "convertToNull",
    "allowPublicKeyRetrieval": "true"
}

點擊數據源記錄的數據源參數

image-20220802121308522

創建ClickHouse數據源

這裡我們使用前面部署好的ClickHouse,CLICKHOUSE數據源驅動原生已支持

image-20220802122100919

工作流實踐

SQL工作流

拖拉SQL引擎圖標,創建名稱為sql_node1任務定義

insert into table01 
values('1001',99.9,'2022-08-02 22:00:00');

CREATE TABLE table01
(
    `id` String,
    `price` Float64,
    `create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

image-20220802180852012

創建名稱為sql_node2任務定義

insert into table02 
values('1002',199.9,'2022-08-02 23:00:00');

CREATE TABLE table02
(
    `id` String,
    `price` Float64,
    `create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

image-20220802183954330
創建名稱為sql_node3任務定義

insert into table03 select * from table01 
union all select * from table02;

CREATE TABLE table03
(
    `id` String,
    `price` Float64,
    `create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

image-20220802182923983

編輯好工作流的依賴關係,sql_node1和sql_node2都完成後再執行sql_node3,保存名稱為sql演示工作流。

image-20220802182103074

上線sql演示工作流並點擊執行,查詢工作流實例執行結果

image-20220802183746437

查詢任務實例執行結果

image-20220802183645989

使用ClickHouse的客戶端登錄ClickHouse查詢數據,已經顯示正確的結果

image-20220802184058126

工作流定時

大部分任務都會有定時運行的需求,這就需要定時工作流,創建步驟:點擊項目管理->工作流->工作流定義,進入工作流定義頁面,上線工作流,點擊”定時”按鈕,彈出定時參數設置彈框:

  • 選擇起止時間。在起止時間範圍內,定時運行工作流;不在起止時間範圍內,不再產生定時工作流實例。

  • 添加一個每隔 5 分鐘執行一次的定時。

  • 失敗策略、通知策略、流程優先級、Worker 分組、通知組、收件人、抄送人同工作流運行參數。

    點擊”創建”按鈕,創建定時成功,此時定時狀態為”下線”,定時需上線才生效。

    定時上線:點擊”定時管理”按鈕,進入定時管理頁面,點擊”上線”按鈕,定時狀態變為”上線”,如下圖所示,工作流定時生效。

image-20220803163829335

告警

告警模塊支持場景

用戶需要創建告警實例,在創建告警實例時,需要選擇告警策略,有三個選項,成功發、失敗發,以及成功和失敗都發。在執行完工作流或任務時,如果觸發告警,調用告警實例發送方法會進行邏輯判斷,將告警實例與任務狀態進行匹配,匹配則執行該告警實例發送邏輯,不匹配則過濾。創建完告警實例後,需要同告警組進行關聯,一個告警組可以使用多個告警實例。 告警模塊支持場景如下:

image-20220803155642085

郵件告警示例

先準備一個郵箱,開啟POP3/SMTP服務,由於是要用來發送主要是SMTP,得到授權碼,保存修改。

image-20220803161649495

使用管理員用戶登錄,進入到安全中心,選擇告警實例管理,創建一個告警實例,然後選擇對應的告警插件EMAIL,填寫相關如下參數。

image-20220803162258355

然後選擇告警組管理,創建告警組,選擇相應的告警實例即可。

image-20220803162421446

工作流定義運行中配置通知策略為成功或失敗都發,告警組為前面創建測試告警組

image-20220803163335087

查看郵箱確認已收到郵件信息

image-20220803163503224

**本人博客網站 **IT小神 www.itxiaoshen.com