任務調度神器 airflow 之初體驗
- 2020 年 2 月 19 日
- 筆記
airflow 是 apache下孵化項目,是純 Python 編寫的一款非常優雅的開源調度平台。github 上有 8971 個星(貓註:現在是 15.4K),是非常受歡迎的調度工具。airflow 使用 DAG (有向無環圖) 來定義工作流,配置作業依賴關係非常方便,豪不誇張地說:方便程度簡直甩其他任務調度工具一條街。
airflow 有着以下天然優勢:
1. 靈活易用,airflow 本身是 Python 編寫的,且工作流的定義也是 Python 編寫,有了 Python 膠水的特性,沒有什麼任務是調度不了的,有了開源的代碼,沒有什麼問題是無法解決的,你完全可以修改源碼來滿足個性化的需求,而且更重要的是代碼都是 —human-readable 。
2. 功能強大,自帶的 Operators 都有15+,也就是說本身已經支持 15 種不同類型的作業,而且還是可自定義 Operators,什麼 shell 腳本,python,mysql,oracle,hive等等,無論不傳統數據庫平台還是大數據平台,統統不在話下,對官方提供的不滿足,完全可以自己編寫 Operators。
3. 優雅,作業的定義很簡單明了, 基於 jinja 模板引擎很容易做到腳本命令參數化,web 界面更是也非常 —human-readable ,誰用誰知道。
4. 極易擴展,提供各種基類供擴展, 還有多種執行器可供選擇,其中 CeleryExcutor 使用了消息隊列來編排多個工作節點(worker), 可分佈式部署多個 worker ,airflow 可以做到無限擴展。
5. 豐富的命令工具,你甚至都不用打開瀏覽器,直接在終端敲命令就能完成測試,部署,運行,清理,重跑,追數等任務,想想那些靠着在界面上不知道點擊多少次才能部署一個小小的作業時,真覺得 airflow 真的太友好了。
airflow 是免費的,我們可以將一些常做的巡檢任務,定時腳本(如 crontab ),ETL處理,監控等任務放在 airflow 上集中管理,甚至都不用再寫監控腳本,作業出錯會自動發送日誌到指定人員郵箱,低成本高效率地解決生產問題。但是由於中文文檔太少,大多不夠全全,因此想快速上手並不十分容易。首先要具備一定的 Python 知識,反覆閱讀官方文檔,理解調度原理。本系列分享由淺入深,逐步細化,嘗試為你揭開 airflow 的面紗。
組成部分
從一個使用者的角度來看,調度工作都有以下功能:
1. 系統配置($AIRFLOW_HOME/airflow.cfg)
2. 作業管理($AIRFLOW_HOME/dags/xxxx.py)
3. 運行監控(webserver)
4. 報警(郵件或短訊)
5. 日誌查看(webserver 或 $AIRFLOW_HOME/logs/*)
6. 跑批耗時分析(webserver)
7. 後台調度服務(scheduler)
除了短訊需要自己實現,其他功能 airflow 都有,而且在 airflow 的 webserver 上我們可以直接配置數據庫連接來寫 sql 查詢,做更加靈活的統計分析。
除了以上的組成部分,我們還需要知道一些概念
一些概念:
DAG
Linux 的 crontab 和 windows 的任務計劃,他們可以配置定時任務或間隔任務,但不能配置作業之前的依賴關係。airflow 中 DAG 就是管理作業依賴關係的。DAG 的英文 directed acyclic graphs 即有向無環圖,下圖 1 便是一個簡單的 DAG

圖 1:DAG 示例
在 airflow 中這種 DAG 是通過編寫 Python 代碼來實現的,DAG 的編寫非常簡單,官方提供了很多的例子,在安裝完成後,啟動 webserver 即可看到 DAG 樣例的源碼(其實定義了 DAG 對象的 python 程序),稍做修改即可成為自己的 DAG 。上圖 1 中 DAG 中的依賴關係通過下述三行代碼即可完成:

是不是非常簡潔,並且是 —human-readable。
操作符-Operators
DAG 定義一個作業流,Operators 則定義了實際需要執行的作業。airflow 提供了許多 Operators 來指定我們需要執行的作業:
- BashOperator – 執行 bash 命令或腳本。
- SSHOperator – 執行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
- PythonOperator – 執行 Python 函數。
- EmailOperator – 發送 Email。
- HTTPOperator – 發送一個 HTTP 請求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等. – 執行 SQL 任務。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator 你懂得。 除了以上這些 Operators 還可以方便的自定義 Operators 滿足個性化的任務需求。 後續會介紹如何使用這些 Operators 敬請關注。
時區-timezone
airflow 1.9 之前的版本使用本地時區來定義任務開始日期,scheduler_interval 中 crontab 表達式中的定時也是依據本地時區為準,但 airflow 1.9 及後續新版本將默認使用 UTC 時區來確保 airflow 調度的獨立性,以避免不同機器使用不同時區導致運行錯亂。如果調度的任務集中在一個時區上,或不同機器,但使用同一時區時,需要對任務的開始時間及 cron 表達式進行時區轉換,或直接使用本地時區。目前 1.9 的穩定版本還不支持時區配置,後續版本會加入時區配置,以滿足使用本地時區的需求。
web服務器-webserver
webserver 是 airflow 的界面展示,可顯示 DAG 視圖,控制作業的啟停,清除作業狀態重跑,數據統計,查看日誌,管理用戶及數據連接等。不運行 webserver 並不影響 airflow 作業的調度。
調度器-schduler
調度器 schduler 負責讀取 DAG 文件,計算其調度時間,當滿足觸發條件時則開啟一個執行器的實例來運行相應的作業,必須持續運行,不運行則作業不會跑批。
工作節點-worker
當執行器為 CeleryExecutor 時,需要開啟一個 worker。
執行器-Executor
執行器有 SequentialExecutor, LocalExecutor, CeleryExecutor
- SequentialExecutor 為順序執行器,默認使用 sqlite 作為知識庫,由於 sqlite 數據庫的原因,任務之間不支持並發執行,常用於測試環境,無需要額外配置。
- LocalExecutor 為本執行器,不能使用 sqlite 作為知識庫,可以使用 mysql,postgress,db2,oracle 等各種主流數據庫,任務之間支持並發執行,常用於生產環境,需要配置數據庫連接 url。
- CeleryExecutor 為 Celery 執行器,需要安裝 Celery ,Celery 是基於消息隊列的分佈式異步任務調度工具。需要額外啟動工作節點-worker。使用 CeleryExecutor 可將作業運行在遠程節點上。
以一張思維導圖總結今天的內容:

更多詳細信息請可訪問官方文檔