分散式任務調度ScheduleMaster
1.什麼是ScheduleMaster
ScheduleMaster是分散式任務調度
系統。簡稱:集中任務調度系統,最簡單的理解ScheduleMaster,就是對不同的系統裡面的調度任務做統一管理的框架。
例如我們現在有多個系統,每個系統針對自己處理不同的業務場景。衍生出自己的調度任務,想像一下,如果每個系統人為去維護,那隨著調度任務越來越多,人是崩潰的吧,可見維護和技術成本是巨大的,這時我們需要選擇分散式任務系統框架做統一的管理
當然有目前有很多相對優秀分散式任務系統框架,我們主要學習 ScheduleMaster
2.使用ScheduleMaster
1.首先我們需要使用NET Core web Api創建幾個模擬的微服務,分別為 考勤、算薪、郵件、簡訊
2.下載開源的ScheduleMaster,並且使用ScheduleMaster調度我們的微服務介面
- sqlserver:"Persist Security Info = False; User ID =sa; Password =123456; Initial Catalog =schedule_master; Server =."
- postgresql:"Server=localhost;Port=5432;Database=schedule_master;User Id=postgres;Password=123456;Pooling=true;MaxPoolSize=20;"
- mysql:"Data Source=localhost;Database=schedule_master;User ID=root;Password=123456;pooling=true;CharSet=utf8mb4;port=3306;sslmode=none;TreatTinyAsBoolean=true"
修改Host的配置文件和支援的資料庫,框架默認使用Mysql
修改Web的配置文件和支援的資料庫,框架默認使用Mysql
3.進入Hos.ScheduleMaster.Web項目的發布目錄,dotnet Hos.ScheduleMaster.Web.dll
啟動項目 ,此時會生成資料庫
登錄帳號:admin
密碼:111111
4.進入Hos.ScheduleMaster.QuartzHost項目的發布目錄,執行命令,啟動項目
dotnet Hos.ScheduleMaster.QuartzHost.dll --urls //*:30003
1.配置Http調度任務
5.準備就緒後,使用後台查看節點管理,可以看到web主節點30000和任務調度的介面30002已經在運行
6.使用任務列表菜單,創建定時調度任務,配置基礎資訊和元數據配置,然後點擊保存就開始執行任務
2.配置程式集調度任務
1.創建一個類庫,安裝ScheduleMaster庫, 創建一個類繼承TaskBase,實現抽象方法,然後編譯程式集
namespace TaskExcuteService
{
class AssemblyTask : TaskBase
{
public override void Run(TaskContext context)
{
context.WriteLog("程式集任務");
}
}
}
2.找到debug目錄,去掉Base程式集,然後添加為壓縮文件
3.在web介面找到任務配置,選擇程式集進行基本配置,配置元數據,輸入程式集名稱,然後輸入類,並將程式集上傳,保存就可以運行了
3.使用Api接入任務
為了方便業務系統更好的接入調度系統,ScheduleMaster創建任務不僅可以在控制台中實現,系統也提供了WebAPI供業務系統使用程式碼接入,這種方式對延時任務來說尤其重要。
1.API Server 對接流程
-
在控制台中創建好專用的API對接用戶帳號。
-
使用對接帳號的用戶名設置為http header中的
ms_auth_user
值。 -
使用經過哈希運算過的秘鑰設置為http header中的
ms_auth_secret值
,計算規則:按{用戶名}{hash(密碼)}{用戶名}的格式拼接得到字元串str,然後再對str做一次hash運算即得到最終秘鑰,hash函數是小寫的32位MD5演算法。 -
使用form格式發起http調用,如果非法用戶會返回401-Unauthorized。
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("ms_auth_user", "admin");
client.DefaultRequestHeaders.Add("ms_auth_secret", SecurityHelper.MD5($"admin{SecurityHelper.MD5("111111")}}admin"));
所有介面採用統一的返回格式,欄位如下:
參數名稱 | 參數類型 | 說明 |
---|---|---|
Success | bool | 是否成功 |
Status | int | 結果狀態,0-請求失敗 1-請求成功 2-登錄失敗 3-參數異常 4-數據異常 |
Message | string | 返回的消息 |
Data | object | 返回的數據 |
2.創建程式集任務
使用API創建任務的方式不支援上傳程式包,所以在任務需要啟動時要確保程式包已通過其他方式上傳,否則會啟動失敗。
-
介面地址:
//yourip:30000/api/task/create
-
請求類型:
POST
-
參數格式:
application/x-www-form-urlencoded
-
返回結果:創建成功返回任務id
-
參數列表:
參數名稱 | 參數類型 | 是否必填 | 說明 |
---|---|---|---|
MetaType | int | 是 | 任務類型,這裡固定是1 |
Title | string | 是 | 任務名稱 |
RunLoop | bool | 是 | 是否按周期執行 |
CronExpression | string | 否 | cron表達式,如果RunLoop為true則必填 |
AssemblyName | string | 是 | 程式集名稱 |
ClassName | string | 是 | 執行類名稱,包含完整命名空間 |
StartDate | DateTime | 是 | 任務開始時間 |
EndDate | DateTime | 否 | 任務停止時間,為空表示不限停止時間 |
Remark | string | 否 | 任務描述說明 |
Keepers | List<int> | 否 | 監護人id |
Nexts | List<guid> | 否 | 子級任務id |
Executors | List<string> | 否 | 執行節點名稱 |
RunNow | bool | 否 | 創建成功是否立即啟動 |
Params | List<ScheduleParam> | 否 | 自定義參數列表,也可以通過CustomParamsJson欄位直接傳json格式字元串 |
ScheduleParam:
參數名稱 | 參數類型 | 是否必填 | 說明 |
---|---|---|---|
ParamKey | string | 是 | 參數名稱 |
ParamValue | string | 是 | 參數值 |
ParamRemark | string | 否 | 參數說明 |
HttpClient client = new HttpClient();
List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
args.Add(new KeyValuePair<string, string>("MetaType", "1"));
args.Add(new KeyValuePair<string, string>("RunLoop", "true"));
args.Add(new KeyValuePair<string, string>("CronExpression", "33 0/8 * * * ?"));
args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));
args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));
args.Add(new KeyValuePair<string, string>("Title", "程式集介面測試任務"));
args.Add(new KeyValuePair<string, string>("AssemblyName", "Hos.ScheduleMaster.Demo"));
args.Add(new KeyValuePair<string, string>("ClassName", "Hos.ScheduleMaster.Demo.Simple"));
args.Add(new KeyValuePair<string, string>("CustomParamsJson", "[{\"ParamKey\":\"k1\",\"ParamValue\":\"1111\",\"ParamRemark\":\"r1\"},{\"ParamKey\":\"k2\",\"ParamValue\":\"2222\",\"ParamRemark\":\"r2\"}]"));
args.Add(new KeyValuePair<string, string>("Keepers", "1"));
args.Add(new KeyValuePair<string, string>("Keepers", "2"));
//args.Add(new KeyValuePair<string, string>("Nexts", ""));
//args.Add(new KeyValuePair<string, string>("Executors", ""));
HttpContent reqContent = new FormUrlEncodedContent(args);
var response = await client.PostAsync("//localhost:30000/api/Task/Create", reqContent);
var content = await response.Content.ReadAsStringAsync();
Debug.WriteLine(content);
3.創建HTTP任務
-
介面地址:
//yourip:30000/api/task/create
-
請求類型:
POST
-
參數格式:
application/x-www-form-urlencoded
-
返回結果:創建成功返回任務id
-
參數列表:
參數名稱 | 參數類型 | 是否必填 | 說明 |
---|---|---|---|
MetaType | int | 是 | 任務類型,這裡固定是2 |
Title | string | 是 | 任務名稱 |
RunLoop | bool | 是 | 是否按周期執行 |
CronExpression | string | 否 | cron表達式,如果RunLoop為true則必填 |
StartDate | DateTime | 是 | 任務開始時間 |
EndDate | DateTime | 否 | 任務停止時間,為空表示不限停止時間 |
Remark | string | 否 | 任務描述說明 |
HttpRequestUrl | string | 是 | 請求地址 |
HttpMethod | string | 是 | 請求方式,僅支援GET\POST\PUT\DELETE |
HttpContentType | string | 是 | 參數格式,僅支援application/json和application/x-www-form-urlencoded |
HttpHeaders | string | 否 | 自定義請求頭,ScheduleParam列表的json字元串 |
HttpBody | string | 是 | 如果是json格式參數,則是對應參數的json字元串;如果是form格式參數,則是對應ScheduleParam列表的json字元串。 |
Keepers | List<int> | 否 | 監護人id |
Nexts | List<guid> | 否 | 子級任務id |
Executors | List<string> | 否 | 執行節點名稱 |
RunNow | bool | 否 | 創建成功是否立即啟動 |
HttpClient client = new HttpClient();
List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
args.Add(new KeyValuePair<string, string>("MetaType", "2"));
args.Add(new KeyValuePair<string, string>("RunLoop", "true"));
args.Add(new KeyValuePair<string, string>("CronExpression", "22 0/8 * * * ?"));
args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));
args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));
args.Add(new KeyValuePair<string, string>("Title", "Http介面測試任務"));
args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "//localhost:56655/api/1.0/value/jsonpost"));
args.Add(new KeyValuePair<string, string>("HttpMethod", "POST"));
args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json"));
args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]"));
args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }"));
HttpContent reqContent = new FormUrlEncodedContent(args);
var response = await client.PostAsync("//localhost:30000/api/Task/Create", reqContent);
var content = await response.Content.ReadAsStringAsync();
Debug.WriteLine(content);
4.創建延時任務
-
介面地址:
//yourip:30000/api/delaytask/create
-
請求類型:
POST
-
參數格式:
application/x-www-form-urlencoded
-
返回結果:創建成功返回任務id
-
參數列表:
參數名稱 | 參數類型 | 是否必填 | 說明 |
---|---|---|---|
SourceApp | string | 是 | 來源 |
Topic | string | 是 | 主題 |
ContentKey | string | 是 | 業務關鍵字 |
DelayTimeSpan | int | 是 | 延遲相對時間 |
DelayAbsoluteTime | DateTime | 是 | 延遲絕對時間 |
NotifyUrl | string | 是 | 回調地址 |
NotifyDataType | string | 是 | 回調參數格式,僅支援application/json和application/x-www-form-urlencoded |
NotifyBody | string | 是 | 回調參數,json格式字元串 |
for (int i = 0; i < 5; i++)
{
int rndNum = new Random().Next(20, 500);
List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();
args.Add(new KeyValuePair<string, string>("SourceApp", "TestApp"));
args.Add(new KeyValuePair<string, string>("Topic", "TestApp.Trade.TimeoutCancel"));
args.Add(new KeyValuePair<string, string>("ContentKey", i.ToString()));
args.Add(new KeyValuePair<string, string>("DelayTimeSpan", rndNum.ToString()));
args.Add(new KeyValuePair<string, string>("DelayAbsoluteTime", DateTime.Now.AddSeconds(rndNum).ToString("yyyy-MM-dd HH:mm:ss")));
args.Add(new KeyValuePair<string, string>("NotifyUrl", "//localhost:56655/api/1.0/value/delaypost"));
args.Add(new KeyValuePair<string, string>("NotifyDataType", "application/json"));
args.Add(new KeyValuePair<string, string>("NotifyBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }"));
HttpContent reqContent = new FormUrlEncodedContent(args);
var response = await client.PostAsync("//localhost:30000/api/DelayTask/Create", reqContent);
var content = await response.Content.ReadAsStringAsync();
Debug.WriteLine(content);
}
4.框架簡單分析
1.全局設計
根據官網的設計圖,以及操作的流程,簡單總結一下任務全局流程為客戶端—–>master——>work—–>執行任務。從大的架構方向的思想就是,將原有單個服務中業務和任務調度混合的方式做一些改變,使業務和調度分離,專門把任務調度部分剝離出來,作為一個獨立的進程,來統一調用和管理任務,而原有服務只做業務處理。
####### 2.Master和Work分析
我們主要從主節點,從節點,數據表這幾個方面來簡單分析,整個業務的時序流程,從本質來看並不複雜master和Work之間的關係是相互配合的,可能乍一看下面整個圖有點混亂,下面來解釋一下,其實Master節點將任務添加到數據中,然後work節點,去從對應的數據表中取出任務數據,然後根據任務對應的配置,生成配置資訊,調用Quartz執行任務,這就是我們整個框架中最核心的業務。
當然master和work除了主要承載了整個管理系統的UI可視化、後台業務操作、任務執行之外,如果從細節以及實現方式來說主要做了以下事情:
Master
- 1.分配任務執行和選擇節點
- 2.對work節點進行健康檢查,對任務進行故障轉移
Work
- 1.取出任務配置資訊
- 2.使用Quartz根據配置運行任務
- 3.使用反射調用程式集
- 4.使用httpclient調用http 介面