一文讀懂,硬核 Apache DolphinScheduler3.0 源碼解析
- 2022 年 9 月 15 日
- 筆記
- Apache DolphinScheduler
點亮 ⭐️ Star · 照亮開源之路
//github.com/apache/dolphinscheduler
本文目錄
-
1 DolphinScheduler的設計與策略
-
1.1 分散式設計
-
1.1.1 中心化
-
1.1.2 去中心化
-
1.2 DophinScheduler架構設計
-
1.3 容錯問題
-
1.3.1 宕機容錯
-
1.3.2 失敗重試
-
1.4 遠程日誌訪問
-
2 DolphinScheduler源碼分析
-
2.1 工程模組介紹與配置文件
-
2.1.1 工程模組介紹
-
2.1.2 配置文件
-
2.2 Api主要任務操作介面
-
2.3 Quaterz架構與運行流程
-
2.3.1 概念與架構
-
2.3.2 初始化與執行流程
-
2.3.3 集群運轉
-
2.4 Master啟動與執行流程
-
2.4.1 概念與執行邏輯
-
2.4.2 集群與槽(slot)
-
2.4.3 程式碼執行流程
-
2.5 Work啟動與執行流程
-
2.5.1 概念與執行邏輯
-
2.5.2 程式碼執行流程
-
2.6 rpc交互
-
2.6.1 Master與Worker交互
-
2.6.2 其他服務與Master交互
-
2.7 負載均衡演算法
-
2.7.1 加權隨機
-
2.7.2 線性負載
-
2.7.3 平滑輪詢
-
2.8 日誌服務
-
2.9 報警
-
3 後記
-
3.1 Make friends
-
3.2 參考文獻
前言
研究Apache Dolphinscheduler也是機緣巧合,平時負責基於xxl-job二次開發出來的調度平台,因為遇到了並發性能瓶頸,到了不得不優化重構的地步,所以搜索市面上應用較廣的調度平台以借鑒優化思路。
在閱讀完DolphinScheduler程式碼之後,便生出了將其設計與思考記錄下來的念頭,這便是此篇文章的來源。因為沒有正式生產使用,業務理解不一定透徹,理解可能有偏差,歡迎大家交流討論。
1 DolphinScheduler的設計與策略
大家能關注DolphinScheduler那麼一定對調度系統有了一定的了解,對於調度所涉及的到一些專有名詞在這裡就不做過多的介紹,重點介紹一下流程定義,流程實例,任務定義,任務實例。(沒有作業這個概念確實也很新奇,可能是不想和Quartz的JobDetail重疊)。
-
任務定義:各種類型的任務,是流程定義的關鍵組成,如sql,shell,spark,mr,python等;
-
任務實例:任務的實例化,標識著具體的任務執行狀態;
-
流程定義:一組任務節點通過依賴關係建立的起來的有向無環圖(DAG);
-
流程實例:通過手動或者定時調度生成的流程實例;
-
定時調度:系統採用Quartz 分散式調度器,並同時支援cron表達式可視化的生成;
1.1 分散式設計
分散式系統的架構設計基本分為中心化和去中心化兩種,各有優劣,憑藉各自的業務選擇。
1.1.1 中心化
中心化設計比較簡單,集群中的節點安裝角色可以分為Master和Slave兩種,如下圖:
Master: Master的角色主要負責任務分發並監督Slave的健康狀態,可以動態的將任務均衡到Slave上,以致Slave節點不至於「忙死」或」閑死」的狀態。
中心化設計存在一些問題。
第一點,一旦Master出現了問題,則群龍無首,整個集群就會崩潰。
為了解決這個問題,大多數Master/Slave架構模式都採用了主備Master的設計方案,可以是熱備或者冷備,也可以是自動切換或手動切換,而且越來越多的新系統都開始具備自動選舉切換Master的能力,以提升系統的可用性。
第二點,如果Scheduler在Master上,雖然可以支援一個DAG中不同的任務運行在不同的機器上,但是會產生Master的過負載。如果Scheduler在Slave上,一個DAG中所有的任務都只能在某一台機器上進行作業提交,在並行任務比較多的時候,Slave的壓力可能會比較大。
xxl-job就是採用這種設計方式,但是存在相應的問題。管理器(admin)宕機集群會崩潰,Scheduler在管理器上,管理器負責所有任務的校驗和分發,管理器存在過載的風險,需要開發者想方案解決。
1.1.2 去中心化
在去中心化設計里,通常沒有Master/Slave的概念,所有的角色都是一樣的,地位是平等的,去中心化設計的核心設計在於整個分散式系統中不存在一個區別於其他節點的「管理者」,因此不存在單點故障問題。
但由於不存在「管理者」節點所以每個節點都需要跟其他節點通訊才得到必須要的機器資訊,而分散式系統通訊的不可靠性,則大大增加了上述功能的實現難度。實際上,真正去中心化的分散式系統並不多見。
反而動態中心化分散式系統正在不斷湧出。在這種架構下,集群中的管理者是被動態選擇出來的,而不是預置的,並且集群在發生故障的時候,集群的節點會自發的舉行會議來選舉新的管理者去主持工作。
一般都是基於Raft演算法實現的選舉策略。Raft演算法,目前社區也有相應的PR,還沒合併。
DolphinScheduler的去中心化是Master/Worker註冊到註冊中心,實現Master集群和Worker集群無中心。
1.2 DophinScheduler架構設計
隨手盜用一張官網的系統架構圖,可以看到調度系統採用去中心化設計,由UI,API,MasterServer,Zookeeper,WorkServer,Alert等幾部分組成。
API: API介面層,主要負責處理前端UI層的請求。該服務統一提供RESTful api向外部提供請求服務。介面包括工作流的創建、定義、查詢、修改、發布、下線、手工啟動、停止、暫停、恢復、從該節點開始執行等等。
MasterServer: MasterServer採用分散式無中心設計理念,MasterServer集成了Quartz,主要負責 DAG 任務切分、任務提交監控,並同時監聽其它MasterServer和WorkerServer的健康狀態。MasterServer服務啟動時向Zookeeper註冊臨時節點,通過監聽Zookeeper臨時節點變化來進行容錯處理。WorkServer:WorkerServer也採用分散式無中心設計理念,WorkerServer主要負責任務的執行和提供日誌服務。WorkerServer服務啟動時向Zookeeper註冊臨時節點,並維持心跳。
ZooKeeper: ZooKeeper服務,系統中的MasterServer和WorkerServer節點都通過ZooKeeper來進行集群管理和容錯。另外系統還基於ZooKeeper進行事件監聽和分散式鎖。
**Alert:**提供告警相關介面,介面主要包括兩種類型的告警數據的存儲、查詢和通知功能,支援豐富的告警插件自由拓展配置。
1.3 容錯問題
容錯分為服務宕機容錯和任務重試,服務宕機容錯又分為Master容錯和Worker容錯兩種情況;
1.3.1 宕機容錯
服務容錯設計依賴於ZooKeeper的Watcher機制,實現原理如圖:
其中Master監控其他Master和Worker的目錄,如果監聽到remove事件,則會根據具體的業務邏輯進行流程實例容錯或者任務實例容錯,容錯流程圖相對官方文檔裡面的流程圖,人性化了些,大家可以參考一下,具體如下所示。
ZooKeeper Master容錯完成之後則重新由DolphinScheduler中Scheduler執行緒調度,遍歷 DAG 找到「正在運行」和「提交成功」的任務,對「正在運行」的任務監控其任務實例的狀態,對「提交成功」的任務需要判斷Task Queue中是否已經存在,如果存在則同樣監控任務實例的狀態,如果不存在則重新提交任務實例。
Master Scheduler執行緒一旦發現任務實例為」 需要容錯」狀態,則接管任務並進行重新提交。注意由於」 網路抖動」可能會使得節點短時間內失去和ZooKeeper的心跳,從而發生節點的remove事件。
對於這種情況,我們使用最簡單的方式,那就是節點一旦和ZooKeeper發生超時連接,則直接將Master或Worker服務停掉。
1.3.2 失敗重試
這裡首先要區分任務失敗重試、流程失敗恢復、流程失敗重跑的概念:
-
任務失敗重試是任務級別的,是調度系統自動進行的,比如一個Shell任務設置重試次數為3次,那麼在Shell任務運行失敗後會自己再最多嘗試運行3次。
-
流程失敗恢復是流程級別的,是手動進行的,恢復是從只能從失敗的節點開始執行或從當前節點開始執行。流程失敗重跑也是流程級別的,是手動進行的,重跑是從開始節點進行。
接下來說正題,我們將工作流中的任務節點分了兩種類型。
-
一種是業務節點,這種節點都對應一個實際的腳本或者處理語句,比如Shell節點、MR節點、Spark節點、依賴節點等。
-
還有一種是邏輯節點,這種節點不做實際的腳本或語句處理,只是整個流程流轉的邏輯處理,比如子流程節等。
每一個業務節點都可以配置失敗重試的次數,當該任務節點失敗,會自動重試,直到成功或者超過配置的重試次數。邏輯節點不支援失敗重試。但是邏輯節點裡的任務支援重試。
如果工作流中有任務失敗達到最大重試次數,工作流就會失敗停止,失敗的工作流可以手動進行重跑操作或者流程恢復操作。
1.4 遠程日誌訪問
由於Web(UI)和Worker不一定在同一台機器上,所以查看日誌不能像查詢本地文件那樣。
有兩種方案:
-
將日誌放到ES搜索引擎上;
-
通過netty通訊獲取遠程日誌資訊;
介於考慮到儘可能的DolphinScheduler的輕量級性,所以選擇了RPC實現遠程訪問日誌資訊,具體程式碼的實踐見2.8章節。
2 DolphinScheduler源碼分析
上一章的講解可能初步看起來還不是很清晰,本章的主要目的是從程式碼層面一一介紹第一張講解的功能。關於系統的安裝在這裡並不會涉及,安裝運行請大家自行探索。
2.1 工程模組介紹與配置文件
2.1.1 工程模組介紹
-
dolphinscheduler-alert 告警模組,提供告警服務;
-
dolphinscheduler-api web應用模組,提供 Rest Api 服務,供 UI 進行調用;
-
dolphinscheduler-common 通用的常量枚舉、工具類、數據結構或者基類 dolphinscheduler-dao 提供資料庫訪問等操作;
-
dolphinscheduler-remote 基於netty的客戶端、服務端 ;
-
dolphinscheduler-server 日誌與心跳服務 ;
-
dolphinscheduler-log-server LoggerServer 用於Rest Api通過RPC查看日誌;
-
dolphinscheduler-master MasterServer服務,主要負責 DAG 的切分和任務狀態的監控 ;
-
dolphinscheduler-worker WorkerServer服務,主要負責任務的提交、執行和任務狀態的更新;
-
dolphinscheduler-service service模組,包含Quartz、Zookeeper、日誌客戶端訪問服務,便於server模組和api模組調用 ;
-
dolphinscheduler-ui 前端模組;
2.1.2 配置文件
dolphinscheduler-common common.properties
dolphinscheduler-api application.yaml
dolphinscheduler-master application.yaml
dolphinscheduler-worker application.yaml
主要關注資料庫,quartz, zookeeper, masker, worker配置。
2.2 API主要任務操作介面
其他業務介面可以不用關注,只需要關注最最主要的流程上線功能介面,此介面可以發散出所有的任務調度相關的程式碼。
介面:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此介面會將定義的流程提交到Quartz調度框架;程式碼如下:
public Map<String, Object> setScheduleState(User loginUser, long projectCode, Integer id, ReleaseState scheduleStatus) { Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode); // check project auth boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); if (!hasProjectAndPerm) { return result; }
// check schedule exists Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) { putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); return result; } // check schedule release state if (scheduleObj.getReleaseState() == scheduleStatus) { logger.info(“schedule release is already {},needn’t to change schedule id: {} from {} to {}”, scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode())); return result; } List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode()); if (processTaskRelations.isEmpty()) { putMsg(result, Status.PROCESS_DAG_IS_EMPTY); return result; } if (scheduleStatus == ReleaseState.ONLINE) { // check process definition release state if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info(“not release process definition id: {} , name : {}”, processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); return result; } // check sub process definition release state List<Long> subProcessDefineCodes = new ArrayList<>(); processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); if (!subProcessDefineCodes.isEmpty()) { List<ProcessDefinition> subProcessDefinitionList = processDefinitionMapper.queryByCodes(subProcessDefineCodes); if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { /** * if there is no online process, exit directly */ if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info(“not release process definition id: {} , name : {}”, subProcessDefinition.getId(), subProcessDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId())); return result; } } } } }
// check master server exists List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; }
// set status scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try { switch (scheduleStatus) { case ONLINE: logger.info(“Call master client set schedule online, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers); setSchedule(project.getId(), scheduleObj); break; case OFFLINE: logger.info(“Call master client set schedule offline, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers); deleteSchedule(project.getId(), id); break; default: putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); return result; } } catch (Exception e) { result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? “set online failure” : “set offline failure”); throw new ServiceException(result.get(Constants.MSG).toString(), e); }
putMsg(result, Status.SUCCESS); return result; }
2.3 Quaterz架構與運行流程
2.3.1 概念與架構
Quartz 框架主要包括如下幾個部分:
-
SchedulerFactory:任務調度工廠,主要負責管理任務調度器;
-
Scheduler :任務調度器,主要負責任務調度,以及操作任務的相關介面;
-
Job :任務介面,實現類包含具體任務業務程式碼;
-
JobDetail:用於定義作業的實例;
-
Trigger:任務觸發器,主要存放 Job 執行的時間策略。例如多久執行一次,什麼時候執行,以什麼頻率執行等等;
-
JobBuilder :用於定義/構建 JobDetail 實例,用於定義作業的實例。
-
TriggerBuilder :用於定義/構建觸發器實例;
-
Calendar:Trigger 擴展對象,可以排除或者包含某個指定的時間點(如排除法定節假日);
-
JobStore:存儲作業和任務調度期間的狀態Scheduler的生命期,從 SchedulerFactory 創建它時開始,到 Scheduler 調用Shutdown() 方法時結束;
Scheduler 被創建後,可以增加、刪除和列舉 Job 和 Trigger,以及執行其它與調度相關的操作(如暫停 Trigger)。但Scheduler 只有在調用 start() 方法後,才會真正地觸發 trigger(即執行 job)
2.3.2 初始化與執行流程
Quartz的基本原理就是通過Scheduler來調度被JobDetail和Trigger定義的安裝Job介面規範實現的自定義任務業務對象,來完成任務的調度。基本邏輯如下圖:
程式碼時序圖如下:
基本內容就是初始化任務調度容器Scheduler,以及容器所需的執行緒池,數據交互對象JobStore,任務處理執行緒QuartzSchedulerThread用來處理Job介面的具體業務實現類。
DolphinScheduler的業務類是ProcessScheduleJob,主要功能就是根據調度資訊往commond表中寫數據。
2.3.3 集群運轉
需要注意的事:
-
當Quartz採用集群形式部署的時候,存儲介質不能使用記憶體的形式,也就是不能使用JobStoreRAM。
-
Quartz集群對於對於需要被調度的Triggers實例的掃描是使用資料庫鎖TRIGGER_ACCESS來完成的,保障此掃描過程只能被一個Quartz實例獲取到。程式碼如下:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> fireInstanceIds = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { fireInstanceIds.add(ft.getFireInstanceId()); } for (OperableTrigger tr : result) { if (fireInstanceIds.contains(tr.getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException(“error validating trigger acquisition”, e); } } }); }
3.集群失敗實例恢復需要注意的是各個實例恢復各自實例對應的異常實例,因為資料庫有調度容器的instanceId資訊。程式碼如下:
2.4 Master啟動與執行流程
2.4.1 概念與執行邏輯
關鍵概念:
Quartz相關:
-
Scheduler(任務調度容器,一般都是StdScheduler實例)。
-
ProcessScheduleJob:(實現Quarts調度框架的Job介面的業務類,專門生成DolphinScheduler資料庫業務表t_ds_commond數據);
DolphinScheduler相關:
-
NettyRemotingServer(netty服務端,包含netty服務端serverBootstrap對象與netty服務端業務處理對象serverHandler), NettyServerHandler:(netty服務端業務處理類:包含各類處理器以及處理器對應的執行執行緒池);
-
TaskPluginManager(任務插件管理器,不同類型的任務以插件的形式管理,在應用服務啟動的時候,通過@AutoService載入實現了TaskChannelFactory介面的工廠資訊到資料庫,通過工廠對象來載入各類TaskChannel實現類到快取);
-
MasterRegistryClient(master操作zk的客戶端,封裝了master對於zk的所有操作,註冊,查詢,刪除等);
-
MasterSchedulerService(掃描服務,包含業務執行執行緒和work包含的nettyhe護短,負責任務調度業務,slot來控制集群模式下任務不被重複調度,底層實現是zookeeper分散式鎖);
-
WorkflowExecuteThread(真正的業務處理執行緒,通過插槽獲取命令commond,執行之前會校驗slot的變化,如果變化不執行,關鍵功能就是構建任務相關的參數,定義,優先順序等,然後發送到隊列,供隊列處理執行緒消費);
-
CommonTaskProcessor(普通任務處理器,實現ITaskProcessor介面,根據業務分為普通,依賴,子任務,阻塞,條件任務類型,包含了任務的提交,運行,分發,殺死等業務,通過@AutoService載入的類,根本就是封裝了對);
-
TaskPriorityQueueImpl(任務隊列,負責任務隊列的存儲控制);
-
TaskPriorityQueueConsumer(任務隊列消費執行緒,負責任務的根據負載均衡策略在worker之間分發與執行);
-
ServerNodeManager (節點資訊控制器,負責節點註冊資訊更新與槽位(slot)變更,底層實現是zookeeper分散式鎖的應用);
-
EventExecuteService(事件處理執行緒,通過快取起來的任務處理執行緒,處理每個任務在處理過程中註冊在執行緒事件隊列中的事件);
-
FailoverExecuteThread(故障轉移執行緒,包含Master和worker的);
-
MasterRegistryDataListener(託管在zk管理框架cautor的故障監聽器,負責對worker和master註冊在zk上的節點的新增和刪除)。
主節點容錯程式碼如下,業務解釋見1.5.1Master容錯解釋:
2.4.2 集群與槽(slot)
其實這裡的採用Zookeer分散式鎖準確也不準確,為什麼這麼說,因為Slot是CommondId對Master列表長度取模來計算的,而Master列表長度的刷新是Zookeeper分散式鎖來控制,Master節點的調度數據掃描是通過Slot來控制的。
具體程式碼如下:
Slot刷新
Slot應用
2.4.3 程式碼執行流程
程式碼過於繁瑣,此處不再一一粘貼程式碼解釋各個類的功能,自行看程式碼更加清晰。
2.5Worker啟動與執行流程
2.5.1 概念與執行邏輯
-
NettyRemotingServer(worker包含的netty服務端) WorkerRegistryClient(zk客戶端,封裝了worker與zk相關的操作,註冊,查詢,刪除等) ;
-
TaskPluginManager(任務插件管理器,封裝了插件載入邏輯和任務實際執行業務的抽象) ;
-
WorkerManagerThread(任務工作執行緒生成器,消費netty處理器推進隊列的任務資訊,並生成任務執行執行緒提交執行緒池管理) ;
-
TaskExecuteProcessor(Netty任務執行處理器,生成master分發到work的任務資訊,並推送到隊列) ;
-
TaskExecuteThread(任務執行執行緒) ;
-
TaskCallbackService(任務回調執行緒,與master包含的netty client通訊);
-
AbstractTask(任務實際業務的抽象類,子類包含實際的任務執行業務,SqlTask,DataXTask等) ;
-
RetryReportTaskStatusThread(不關注)
2.5.2 程式碼執行流程
Worker節點程式碼時序圖如下:
程式碼過於繁瑣,此處不再一一粘貼程式碼解釋各個類的功能,自行看程式碼更加清晰。
2.6 RPC交互
因為節點和應用服務之間的RPC通訊都是基於Netty實現的,Netty相關知識不在這裡過多的講解,當前章節只涉及Master與Worker之間的交互模式的設計與實現。
整體設計如下
2.6.1 Master與Worker交互
Master與worker之間的業務邏輯的交互是基於Netty服務端與客戶端來實現Rpc通訊的,Master和Worker啟動的時候會將自己的Netty服務端資訊註冊到ZK相應的節點上,Master的任務分發執行緒和任務殺死等業務運行時,拉取ZK上的Worker節點資訊,根據負載均衡策略選擇一個節點(下章介紹負載均衡),構建Netty客戶端與Worker的Netty服務端通訊,Worker收到Master的RPC請求之後會快取Channel資訊並處理對應業務,同時Callback回調執行緒會獲取快取的通道來執行回調操作,這樣就形成的閉環。
任務的執行殺死,以及回調狀態處理等操作都是通過Netty客戶端與服務端綁定的Processer處理器來進行的。
Master部分具體程式碼如下:
Master啟動的時候會初始化Nettyserver,註冊對應的請求處理器到NettyHandler並啟動:
Master的NettyExecutorManager初始化的時候會將NettyRemotingClient也初始化,並且會註冊處理Worker回調請求的處理器,真正的埠綁定是在獲取到執行器埠之後:
任務分發程式碼如下:
Worker部分具體程式碼如下:
同理Woker在啟動的時候會初始化NettyServer,註冊對應處理器並啟動:
回調執行緒對象初始化的時候,會將包含的Nettyremotingclient一起初始化,並註冊好對應的業務處理器:
回調執行緒會通過其他執行器中快取下來的Chanel與Master的客戶端進行通訊:
2.6.2 其他服務與Master交互
以日誌服務為例,前端觸發請求日誌的介面,通過參數與資料庫交互獲取到Master的NettyServer資訊,然後構建Netty客戶端與Master進行通訊獲取日誌並返回。具體程式碼如下
Nettyclient隨著日誌業務對象初始化而初始化:
2.7 負載均衡演算法
Master在選擇執行器的時候DolphinScheduler提供了三種負載均衡演算法,且所有的演算法都用到了節點權重:加權隨機(random),平滑輪詢(roundrobin),線性負載(lowerweight)。通過配置文件來控制到底使用哪一個負載均衡策略,默認配置是權重策略:host-selector: lower_weight。
2.7.1 加權隨機
看程式碼更好理解:按照全部權重值求和,然後取匯總結果的隨機整數,隨機整數對原先所有host的權重累差,返回小於零的時候的host,沒有就隨機返回一個。
2.7.2 線性負載
權重計算邏輯:利用註冊的Cpu佔用、記憶體佔用以及載入因子還有啟動時間消耗做計算。
獲取權重最小的節點,並把節點權重置為最大。
2.7.3 平滑輪詢
這個演算法不是很好的能夠理解,所以我不知道我的理解是否正確,它有一個預熱的過程,之前都是取第一個,等到累計的權重超過最大就整數就開始按權重輪詢。
2.8 日誌服務
2.6.2已經介紹不在做過多的說明。
2.9 報警
暫未研究,目測基本就是根據規則篩選數據,然後調用指定類型的報警服務介面做報警操作,比如郵件,微信,簡訊通知等。
3 後記
3.1 Make friends
因為沒有正式生產使用,業務理解不一定透徹,理解可能有偏差,歡迎大家一起進入社區交流討論。
Apache DolphinScheduler Slack群鏈接://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1e36toy4n-5n9U2R__FDM05R~MJFFVBg
3.2 參考文獻
最後,感謝社區蔡順峰、鍾嘉傑和阮文俊對本文整理和修改提出建設性意見,以及對本文發布提供的幫助。
非常歡迎大家加入 DolphinScheduler 大家庭,融入開源世界!
我們鼓勵任何形式的參與社區,最終成為 Committer 或 PPMC,如:
-
將遇到的問題通過 GitHub 上 issue 的形式回饋出來。
-
回答別人遇到的 issue 問題。
-
幫助完善文檔。
-
幫助項目增加測試用例。
-
為程式碼添加註釋。
-
提交修復 Bug 或者 Feature 的 PR。
-
發表應用案例實踐、調度流程分析或者與調度相關的技術文章。
-
幫助推廣 DolphinScheduler,參與技術大會或者 meetup 的分享等。
歡迎加入貢獻的隊伍,加入開源從提交第一個 PR 開始。
- 比如添加程式碼注釋或找到帶有 」easy to fix」 標記或一些非常簡單的 issue(拼寫錯誤等) 等等,先通過第一個簡單的 PR 熟悉提交流程。
註:貢獻不僅僅限於 PR 哈,對促進項目發展的都是貢獻。
相信參與 DolphinScheduler,一定會讓您從開源中受益!
參與貢獻
隨著中國開源的迅猛崛起,Apache DolphinScheduler 社區迎來蓬勃發展,為了做更好用、易用的調度,真誠歡迎熱愛開源的夥伴加入到開源社區中來,為中國開源崛起獻上一份自己的力量,讓本土開源走向全球。
參與 DolphinScheduler 社區有非常多的參與貢獻的方式,包括:
貢獻第一個PR(文檔、程式碼) 我們也希望是簡單的,第一個PR用於熟悉提交的流程和社區協作以及感受社區的友好度。
社區匯總了以下適合新手的問題列表://github.com/apache/dolphinscheduler/issues/5689
非新手問題列表://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A”volunteer+wanted”
如何參與貢獻鏈接://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
來吧,DolphinScheduler開源社區需要您的參與,為中國開源崛起添磚加瓦吧,哪怕只是小小的一塊瓦,匯聚起來的力量也是巨大的。
參與開源可以近距離與各路高手切磋,迅速提升自己的技能,如果您想參與貢獻,我們有個貢獻者種子孵化群,可以添加社區小助手,手把手教會您( 貢獻者不分水平高低,有問必答,關鍵是有一顆願意貢獻的心 )。
添加小助手微信時請說明想參與貢獻。來吧,開源社區非常期待您的參與。