使用 Airflow 幫助提升數據品質

  • 2020 年 2 月 19 日
  • 筆記

Airflow 是屬於 Apache 基金會的開源項目,可以幫助數據工程師構建完整的數據 ETL(數據抽取、轉換和載入)流程。Airflow 使用 DAG(有向無環圖)去描述整個工作流的運行流程,使用 Operator 決定工作流中的某個環節該如何執行。

一個 DAG 會包含四五個不同類型的 Operator ,有著兩三種文件系統或者資料庫的數據轉換,例如某個對接方每天會定時把處理好的數據根據省份的數量拆分成 31 個文件傳輸到 FTP 伺服器上,有時候因為對接方資源不足或者是其他什麼原因,導致 FTP 伺服器上文件出現了缺失,此時運行中的工作流不對文件數量進行檢查的話,在後續的 ETL 過程中就會導致數據不準確。因此在整個數據 ETL 過程中,如何證明數據在工作流中是否準確、數據品質能否夠高就很重要了。

在實踐中,我們認為這個判斷或者是證明過程是保證數據品質很重要的部分。因為應用的正確運行需要正確的數據,並且優秀的數據品質可以提高數據使用人員使用數據的信心。我們在思考如何保證數據品質的過程中,發現到糾正結果數據是非常困難的,因此需要儘可能在結果數據的產生過程中確認中間流程沒有問題,一旦中間流程的處理的數據出現問題應該及時把任務停止或者是數據沒有到齊的話,等待數據到齊再進行下一步流程。

藉助於 Airflow 的 Operator 良好的拓展能力,我們基於 BaseOperator 方法擴展了一系列與數據檢查相關的 Operator (以下簡稱 Check Operator),例如檢測 FTP 伺服器符合給定正則表達式的文件是否存在,數量是否正確。在 Airflow 運行工作流的過程中 Check Operator 會檢查數據是否正確,如果數據不正確則會停止整個工作流,反之亦然。

check_sftp_fs = SftpFSDQOperator(task_id='check_sftp_fs',                                   ssh_conn_id='ftp_conn_id',                                   filepath='/path/to/file',                                   file_count=31,                                   dag=dag)  

目前為止,我們使用的檢查方式歸納為一句話就是:使用某一種類型的查詢條件判斷返回的結果是否相等。雖然這種簡單粗暴的方式可以避免很多問題,但是我們希望能做到更多,比如針對數據數量的突增突減現象能有一個恰當的回饋,再比如對數據的完整性、規範性、一致性、準確性、唯一性、關聯性等方向有一個更好的判斷方式。