ADF 第七篇:控制流

Azure Data Factory 系列博客:

 

控制流的作用就是在管道的Activity之間設置約束條件,只有滿足一定的約束條件時,才會執行相應的Activity。控制流主要分為優先約束和Activity。

一,控制流的優先約束

從直觀上來看,控制流的優先約束使得Activity在Pipeline中按照一定的條件進行分支和連接。利用控制流可以實現如下場景:

 

如果Copy data活動執行成功,那麼管道將沿着OnSuccess的控制流,執行「Send Success Email」這一分支,發送成功Copy操作的詳細信息; 如果Copy data活動執行失敗,那麼管道將沿着OnFailure的控制流,執行「Send Fail Email」這一分支,發送Copy操作失敗的詳細信息。

1,OnSuccess分支

每一個Activity都會默認創建一個OnSuccess控制流,該控制流線的顏色是綠色的,拖動Activity右側的綠色方塊,UI會自動出現一個綠色的箭頭,把箭頭拖到下一個Web Activity上,就完成了OnSuccess控制流的設置。

2,添加OnFail分支

點擊任意一個Activity右下方的+號,彈出”Add activity on”列表,選擇 Failure,就會在Activity的右方彈出一個紅色的矩形,跟OnSuccess控制流一樣,拖放到下一個Activity上,就完成了OnFail控制流的設置。

3,利用控制流約束控制郵件的發送

如下圖所示,利用控制流來控制郵件的發送 ,當Copy data Activity執行成功時,發送copy成功執行的消息;當Copy data Activity失敗時,發送Copy 執行失敗的消息。

二,控制流Activity

控制流的Activity主要用於變量、循環和條件:

  • Append variable:向 Array類型的變量中追加變量值
  • Set variable:設置變量的值
  • Filter:在管道中使用 Filter 活動,把篩選器表達式應用到輸入數組。
  • Lookup:用於從數據源中檢索數據集,返回執行查詢或存儲過程的結果,如果查找Activity的輸出是單行的,那麼該Activity的輸出可以用於ForEach活動。
  • ForEach:循環執行內部的活動,循環的次數由items指定的集合決定,依次訪問集合中每一個元素。
  • If condition:If-Else 分支,條件為True時,執行Activity1,條件為False時,執行另一個Activity。
  • Switch:分支切換,根據條件,執行不同分支的Activity
  • Validation:驗證活動,它會阻止Pipeline的執行,直到應用的數據集存在或超時為止。
  • UntilUntil 活動提供的功能與 do-until 循環結構以編程語言提供的功能相同。 它在循環中將執行一組活動,直到與活動相關聯的條件的計算結果為 true,你可以在數據工廠中為 Until 活動指定超時值。

1,ForEach活動

ForEach 活動在管道中定義重複的控制流。 此活動用於循環訪問集合,並在循環中執行指定的活動。 此活動的循環實現類似於採用編程語言的 Foreach 循環結構。點擊ForEach圖標內部的「筆」,向內部添加活動。

ForEach活動的核心配置是Items屬性,該屬性需要通過「Add dynamic content」來配置,用戶可以通過系統變量(System variables)、Functions、Variables、和Activity outputs來作為Iteration。

要把Activity outpus作為ForEach活動的Iteration,首先要建立Activity和ForEach活動的優先約束,把上游Activity的輸出作為Iteration,執行ForEach活動內的Activity,直到窮盡數據集的所有item。

2,Lookup活動

Lookup活動的作用就是從Table、Query或Stored procedure中查找出數據行,為了保證Lookup活動的查詢性能,Lookup 活動的限制:最多可以返回 5000 行;如果結果集包含的記錄超過此範圍,將返回前 5000 行。Lookup活動的輸出最多支持 4 MB 左右。如果大小超過此限制,則活動會失敗。目前,Lookup活動在超時前的最長持續時間為 24 小時。

如果勾選”First row only”,那麼表示Lookup活動只返回第一行,如果不勾選,那麼返回所有行,但是要滿足Lookup活動的限制。

Looup活動輸出的結果可以作為ForEach的迭代器。

 

參考文檔:

Branching and chaining activities in an Azure Data Factory pipeline using the Azure portal