服務編排–Conductor 文檔翻譯 (介紹與基本概念)
- 2019 年 11 月 21 日
- 筆記
本文是對 Conductor 文檔的簡單翻譯,建議你認真閱讀,如果閱讀後你仍然不知道如何使用,可以繼續關注本部落格,我會在後續的部落格中更新 Conductor 實戰
介紹
Conductor是一個微服務的編排引擎
Conductor 優點
Conductor,幫助我們協調基於微服務的流程,具有以下功能:
- 允許創建複雜的流程/業務流,其中由微服務實現單個任務。
- 基於JSON DSL的定義執行流程。
- 為這些流程提供可見性和可追溯性。
- 在暫停,恢復,重啟等周圍公開控制語義,以獲得更好的devops體驗。
- 允許更多地重用現有的微服務,為管理提供更容易的途徑。
- 用戶介面可視化流程。
- 能夠在需要時同步處理所有任務。
- 能夠擴展數百萬個並發運行的流程。
- 由客戶端提取的排隊服務支援。
- 能夠在HTTP或其他傳輸上運行,例如gRPC。
為什麼不進行點對點編排?
通過點對點任務編排,我們發現隨著業務需求和複雜性的增長難以擴展。發布/訂閱模型適用於最簡單的流程, 但很快就突出了與該方法相關的一些問題:
- 流程「嵌入」在多個應用程式的程式碼中。
- 通常,圍繞輸入/輸出,SLA等存在緊密耦合和假設,使得更難以適應不斷變化的需求。
- 幾乎沒有辦法系統地回答「我們用過程X做了多少」?
基本概念
工作流定義
工作流是使用基於JSON的DSL定義的,包括一組作為工作流的一部分執行的任務。任務是在遠程機器上執行的控制任務(fork,條件等)或應用程式任務(例如編碼文件)。
任務定義
- 所有任務都需要在活動工作流程使用之前進行註冊。
- 任務可以在多個工作流程中重複使用。工人任務分為兩類:
- 系統任務
- 工人任務
系統任務
系統任務在Conductor伺服器的JVM內執行,並由Conductor管理,以實現其可執行性和可擴展性。
名稱 |
目的 |
---|---|
DYNAMIC |
基於任務的輸入表達式派生的工作任務,而不是靜態定義為計劃的一部分 |
DECIDE |
決策任務 – 實現案例……開關樣式分叉 |
FORK |
分叉一組並行的任務。計劃每個集合併行執行 |
FORK_JOIN_DYNAMIC |
與FORK類似,但FORK_JOIN_DYNAMIC不是在並行執行計劃中定義的任務集,而是根據此任務的輸入表達式生成並行任務 |
JOIN |
補充FORK和FORK_JOIN_DYNAMIC。用於合併一個或多個並行分支* |
SUB_WORKFLOW |
將另一個工作流嵌套為子工作流任務。在執行時,它實例化子工作流並等待它完成 |
EVENT |
在支援的事件系統中生成事件(例如,Conductor,SQS) |
Conductor提供了一個API來創建在與引擎相同的JVM中執行的用戶定義任務。有關詳細資訊,請參閱WorkflowSystemTask介面。
工人任務
工作人員任務由應用程式實現,並在與Conductor不同的環境中運行。工作人員任務可以用任何語言實現。這些任務通過REST API端點與Conductor伺服器通訊,以輪詢任務發現並執行,並在執行後更新其狀態。
工作人員任務由計劃中的任務類型SIMPLE標識。
工作流任務的生命周期

元數據定義
任務定義
Conductor維護著一個工作人員任務類型的註冊表。在工作流程中使用之前必須註冊任務類型。
例
{ "name": "encode_task", "retryCount": 3, "timeoutSeconds": 1200, "inputKeys": [ "sourceRequestId", "qcElementType" ], "outputKeys": [ "state", "skipped", "result" ], "timeoutPolicy": "TIME_OUT_WF", "retryLogic": "FIXED", "retryDelaySeconds": 600, "responseTimeoutSeconds": 3600 }
領域 |
描述 |
筆記 |
---|---|---|
name |
任務類型 |
唯一 |
retryCount |
任務標記為失敗時嘗試重試的次數 |
|
retryLogic |
重試機制 |
看下面的可能值 |
timeoutSeconds |
以毫秒為單位的時間,在此之後,如果在轉換到IN_PROGRESS狀態後未完成任務,則將任務標記為TIMED_OUT |
如果設置為0,則不會超時 |
timeoutPolicy |
任務的超時策略 |
看下面的可能值 |
responseTimeoutSeconds |
如果大於0,則在此時間之後未更新狀態時,將重新安排任務。當工作人員輪詢任務但由於錯誤/網路故障而無法完成時很有用。 |
|
outputKeys |
任務輸出的鍵集。用於記錄任務的輸出 |
|
重試邏輯
- FIXED :重新安排任務後的任務 retryDelaySeconds
- EXPONENTIAL_BACKOFF:重新安排之後 retryDelaySeconds * attempNo
超時政策
- RETRY :再次重試該任務
- TIME_OUT_WF:工作流程標記為TIMED_OUT並終止
- ALERT_ONLY:註冊計數器(task_timeout)
工作流定義
使用基於JSON的DSL定義工作流。
例
{ "name": "encode_and_deploy", "description": "Encodes a file and deploys to CDN", "version": 1, "tasks": [ { "name": "encode", "taskReferenceName": "encode", "type": "SIMPLE", "inputParameters": { "fileLocation": "${workflow.input.fileLocation}" } }, { "name": "deploy", "taskReferenceName": "d1", "type": "SIMPLE", "inputParameters": { "fileLocation": "${encode.output.encodeLocation}" } } ], "outputParameters": { "cdn_url": "${d1.output.location}" }, "schemaVersion": 2 }
領域 |
描述 |
筆記 |
---|---|---|
name |
工作流程的名稱 |
|
description |
工作流程的描述性名稱 |
|
version |
用於標識架構版本的數字欄位。使用遞增數字 |
啟動工作流程執行時,如果未指定,則使用具有最高版本的定義 |
tasks |
一系列任務定義,如下所述。 |
|
outputParameters |
用於生成工作流輸出的JSON模板 |
如果未指定,則將輸出定義為上次執行的任務的輸出 |
inputParameters |
輸入參數列表。用於記錄工作流程所需的輸入 |
可選的 |
工作流程中的任務
tasks工作流中的屬性定義要按該順序執行的任務數組。以下是每項任務所需的強制性最低參數:
領域 |
描述 |
筆記 |
---|---|---|
name |
任務名稱。在開始工作流程之前,必須使用Conductor註冊為任務類型 |
|
taskReferenceName |
別名用於在工作流程中引用任務。必須是獨一無二的。 |
|
type |
任務類型。SIMPLE用於遠程工作人員或其中一個系統任務類型執行的任務 |
|
description |
任務描述 |
可選的 |
optional |
對或錯。設置為true時 – 即使任務失敗,工作流也會繼續。任務的狀態反映為COMPLETED_WITH_ERRORS |
默認為 false |
inputParameters |
JSON模板,用於定義給予任務的輸入 |
有關詳細資訊,請參見「接線輸入和輸出」 |
除了這些參數,需要進行具體的任務類型附加參數記錄在這裡
連接輸入和輸出
當觸發新的執行時,客戶端會為工作流提供輸入。工作流輸入是通過${workflow.input…}表達式提供的JSON有效負載。
基於inputParameters工作流定義中配置的模板,為工作流中的每個任務提供輸入。 inputParameters是一個JSON片段,其值包含用於在執行期間映射工作流的輸入或輸出或其他任務的值的參數。
映射值的語法遵循以下模式:
$ {SOURCE.input / output.JSONPath}
– |
– |
---|---|
SOURCE |
可以是任何任務的「工作流程」或引用名稱 |
input/output |
指源的輸入或輸出 |
JSONPath |
JSON路徑表達式從源的輸入/輸出中提取JSON片段 |
JSON路徑支援
Conductor支援JSONPath規範並從此處使用Java實現。
例
考慮一個任務,其輸入配置為使用來自工作流的輸入/輸出參數和名為loc_task的任務。
{ "inputParameters": { "movieId": "${workflow.input.movieId}", "url": "${workflow.input.fileLocation}", "lang": "${loc_task.output.languages[0]}", "http_request": { "method": "POST", "url": "http://example.com/${loc_task.output.fileId}/encode", "body": { "recipe": "${workflow.input.recipe}", "params": { "width": 100, "height": 100 } }, "headers": { "Accept": "application/json", "Content-Type": "application/json" } } } }
請將以下內容視為工作流輸入
{ "movieId": "movie_123", "fileLocation":"s3://moviebucket/file123", "recipe":"png" }
並且loc_task的輸出如下;
{ "fileId": "file_xxx_yyy_zzz", "languages": ["en","ja","es"] }
在安排任務時,Conductor將合併工作流輸入和loc_task輸出中的值,並按如下方式創建任務輸入:
{ "movieId": "movie_123", "url": "s3://moviebucket/file123", "lang": "en", "http_request": { "method": "POST", "url": "http://example.com/file_xxx_yyy_zzz/encode", "body": { "recipe": "png", "params": { "width": 100, "height": 100 } }, "headers": { "Accept": "application/json", "Content-Type": "application/json" } } }
系統任務的創建
(DYNAMIC) 動態任務定義
參數:
名稱 |
描述 |
---|---|
dynamicTaskNameParam |
任務輸入中用於計劃任務的值的參數名稱。例如,如果參數的值是ABC,則調度的下一個任務是「ABC」類型。 |
例
{ "name": "user_task", "taskReferenceName": "t1", "inputParameters": { "files": "${workflow.input.files}", "taskToExecute": "${workflow.input.user_supplied_task}" }, "type": "DYNAMIC", "dynamicTaskNameParam": "taskToExecute" }
如果使用輸入參數user_supplied_task的值作為user_task_2啟動工作流,則Conductor將在計劃此動態任務時調度user_task_2。
(DECIDE)決策任務定義
決策任務類似於case…switch程式語言中的語句。該任務需要3個參數:
參數:
名稱 |
描述 |
---|---|
caseValueParam |
任務輸入中參數的名稱,其值將用作開關。 |
decisionCases |
可以鍵入值的映射值的caseValueParam值是要執行的任務列表。 |
defaultCase |
在判定案例中找不到匹配值時要執行的任務列表(默認條件) |
例
{ "name": "decide_task", "taskReferenceName": "decide1", "inputParameters": { "case_value_param": "${workflow.input.movieType}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "Show": [ { "name": "setup_episodes", "taskReferenceName": "se1", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_episode_artwork", "taskReferenceName": "ga", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ], "Movie": [ { "name": "setup_movie", "taskReferenceName": "sm", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_movie_artwork", "taskReferenceName": "gma", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ] } }
Fork 並行任務定義
Fork用於調度並行任務集。
參數:
名稱 |
描述 |
---|---|
forkTasks |
任務列表列表。每個子列表計劃並行執行。但是,子列表中的任務是以串列方式安排的。 |
例
{ "forkTasks": [ [ { "name": "task11", "taskReferenceName": "t11" }, { "name": "task12", "taskReferenceName": "t12" } ], [ { "name": "task21", "taskReferenceName": "t21" }, { "name": "task22", "taskReferenceName": "t22" } ] ] }
執行時,task11和task21被安排在同一時間執行。
Dynamic Fork (動態分支)
Dynamic fork與FORK_JOIN任務相同。除了在運行時使用任務的輸入提供要並行的任務列表。當並行的任務數量不固定並根據輸入而變化時很有用。
名稱 |
描述 |
---|---|
dynamicForkTasksParam |
包含要並行執行的工作流任務配置列表的參數的名稱 |
dynamicForkTasksInputParamName |
參數的名稱,其值應為帶有鍵的映射,作為分叉任務的引用名稱和值作為分叉任務的輸入 |
例
{ "inputParameters": { "dynamicTasks": "${taskA.output.dynamicTasksJSON}", "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}" } "type": "FORK_JOIN_DYNAMIC", "dynamicForkTasksParam": "dynamicTasks", "dynamicForkTasksInputParamName": "dynamicTasksInput" }
將taskA的輸出視為:
{ "dynamicTasksInputJSON": { "forkedTask1": { "width": 100, "height": 100, "params": { "recipe": "jpg" } }, "forkedTask2": { "width": 200, "height": 200, "params": { "recipe": "jpg" } } }, "dynamicTasksJSON": [ { "name": "encode_task", "taskReferenceName": "forkedTask1", "type": "SIMPLE" }, { "name": "encode_task", "taskReferenceName": "forkedTask2", "type": "SIMPLE" } ] }
執行時,Dynamic fork 任務將調度兩個類型為「encode_task」的並行任務,引用名稱為「forkedTask1」和「forkedTask2」,輸入由_ dynamicTasksInputJSON_指定
Dynamic Fork and Join
Join任務必須遵循FORK_JOIN_DYNAMIC
工作流定義必須包含一個Join任務定義,後跟FORK_JOIN_DYNAMIC任務。但是,考慮到任務的動態特性,此Join不需要joinOn參數。在完成之前,連接將等待所有並行分支任務完成。
與FORK不同,FORK可以執行並行流,每個fork按順序執行一系列任務,FORK_JOIN_DYNAMIC僅限於每個fork一個任務。但是,並行任務可以是子工作流,允許更複雜的執行流。
Join
Join任務用於等待fork任務生成的一個或多個任務的完成。
參數
名稱 |
描述 |
---|---|
joinOn |
任務引用名稱列表,JOIN將等待完成。 |
例
{ "joinOn": ["taskRef1", "taskRef3"] }
Join 任務輸出
Fork任務的輸出將是一個JSON對象,其中key是任務引用名稱,value是fork任務的輸出。
子工作流程
子工作流任務允許在另一個工作流中嵌套工作流。
參數
名稱 |
描述 |
---|---|
subWorkflowParam |
任務引用名稱列表,JOIN將等待完成。 |
例
{ "name": "sub_workflow_task", "taskReferenceName": "sub1", "inputParameters": { "requestId": "${workflow.input.requestId}", "file": "${encode.output.location}" }, "type": "SUB_WORKFLOW", "subWorkflowParam": { "name": "deployment_workflow", "version": 1 } }
執行時,deployment_workflow使用兩個輸入參數requestId和file執行a 。生成的工作流程完成後,任務標記為已完成。如果子工作流終止或失敗,則任務被標記為失敗並在配置時重試。
Wait
Wait 任務被實現為保持在IN_PROGRESS狀態的門,除非標記為外部觸發器COMPLETED或FAILED由外部觸發器標記。要使用Wait任務,請將任務類型設置為WAIT
參數 沒有要求
Wait 任務的外部觸發器
任務資源端點可用於將任務的狀態更新為終止狀態。
Contrib模組提供SQS集成,外部系統可以將消息放入伺服器偵聽的預配置隊列中。當消息到達時,它們被標記為COMPLETED或FAILED。
SQS隊列
- 可以使用以下API檢索伺服器用於更新任務狀態的SQS隊列:
GET /queue
- 更新任務狀態時,消息需要符合以下規範:
- 消息必須是有效的JSON字元串。
- 消息JSON應包含一個名為key的鍵externalId,該值是一個包含以下鍵的JSONified字元串:
- workflowId:工作流程的ID
- taskRefName:應更新的任務引用名稱。
- 每個隊列代表一個特定的任務狀態,並相應地標記任務。例如,發送到COMPLETED隊列的消息將任務狀態標記為COMPLETED。
- 任務的輸出隨消息更新。
示例SQS有效負載:
{ "some_key": "valuex", "externalId": "{"taskRefName":"TASK_REFERENCE_NAME","workflowId":"WORKFLOW_ID"}" }
HTTP
HTTP任務用於通過HTTP調用另一個微服務。
參數
該任務需要一個輸入參數http_request,該參數作為任務輸入的一部分,具有以下詳細資訊:
名稱 |
描述 |
---|---|
URI |
服務的URI。使用vipAddress或包含伺服器地址時可以是部分的。 |
method |
HTTP方法。其中一個GET,PUT,POST,DELETE,OPTIONS,HEAD |
accept |
根據伺服器的要求接受標頭。 |
contentType |
內容類型 – 支援的類型是text / plain,text / html和application / json |
headers |
要與請求一起發送的其他http標頭的映射。 |
body |
請求正文 |
vipAddress |
使用基於發現的服務URL時。 |
HTTP任務輸出
名稱 |
描述 |
---|---|
response |
JSON主體包含響應(如果存在) |
headers |
響應標題 |
statusCode |
整數狀態程式碼 |
例
任務使用vipAddress輸入有效負載
{ "http_request": { "vipAddress": "examplevip-prod", "uri": "/", "method": "GET", "accept": "text/plain" } }
任務使用絕對URL輸入
{ "http_request": { "uri": "http://example.com/", "method": "GET", "accept": "text/plain" } }
該任務被標記為FAILED無法完成請求或遠程伺服器返回非成功的狀態程式碼。
注意
HTTP任務當前僅支援Content-Type作為application / json,並且能夠解析文本以及JSON響應。目前不支援XML輸入/輸出。但是,如果無法將響應解析為JSON或Text,則將字元串表示形式存儲為文本值。
Event (事件)
事件任務提供將事件(消息)發布到Conductor或外部事件系統(如SQS)的功能。事件任務對於為工作流和任務創建基於事件的依賴項非常有用。
參數
名稱 |
描述 |
---|---|
sink |
生成的事件的合格名稱。例如,導體或sqs:sqs_queue_name |
例
{ "sink": 'sqs:example_sqs_queue_name' }
使用Conductor作為接收器生成事件時,事件名稱遵循以下結構: conductor::
對於SQS,請使用隊列的名稱而不是URI。Conductor根據名稱查找URI。
警告
使用SQS時,將ContribsModule添加到部署中。需要使用AWSCredentialsProvider為Conductor配置模組,以便能夠使用AWS API。
支援的接收器
- Conductor
- SQS
事件任務輸入
給予事件任務的輸入可作為有效負載用於已發布的消息。例如,如果消息被放入SQS隊列(接收器是sqs),則消息有效負載將是任務的輸入。
事件任務輸出
event_produced 生成的事件的名稱。