airflow的安裝和使用 – 完全版

之前試用了azkaban一小段時間,雖然上手快速方便,但是功能還是太簡單,不夠靈活。
Airflow使用代碼來管理任務,這樣應該是最靈活的,決定試一下。

我是python零基礎,在使用airflow的過程中可謂吃盡了苦頭。。好歹最後實現所有要求,兩三周的時間沒有白費
情緒穩定

看完這篇文章,可以達到如下目標:

  1. 安裝airflow
  2. 如何修改界面右上角顯示的時間到當前時區
  3. 如何添加任務
  4. 調試任務python代碼
  5. 如何啟動spark任務
  6. 如何限定任務同時執行的個數
  7. 如何手動觸發任務時傳入參數
  8. 如何在airflow界面上重新運行任務
  9. 如何查看任務log及所有任務的運行記錄
  10. 如何在任務失敗時發郵件(騰訊企業郵箱)
  11. 如何在任務失敗時發消息到企業微信

以下過程已經過去了有一段時間,當時記錄的也不一定很全面,如果有的不能執行,請留言告知。

安裝airflow

系統:Ubuntu 16
python: 3.7
airflow版本:1.10.10

保持pip3到最新版本

pip3 install –upgrade pip

安裝使用pip3

切換到root用戶執行: pip3 install apache-airflow
你以為敲完這條命令就可以去把個妹或者撩個漢再回來就裝好了,請坐下。

我碰到的錯誤:
Python.h not found
運行
sudo apt-get install python3.7-dev

某些依賴版本不對:
ERROR: pendulum 1.4.4 has requirement python-dateutil<3.0.0.0,>=2.6.0.0, but you'll have python-dateutil 2.4.2 which is incompatible.
ERROR: pandas 0.25.3 has requirement python-dateutil>=2.6.1, but you'll have python-dateutil 2.4.2 which is incompatible.
運行
pip install python-dateutil --upgrade
哪個包版本不對,更新哪個

數據庫使用mysql

相信你看這個文章的時候應該不會還沒有嘗試裝過airflow,所以airflow.cfg這個文件已經有了,在哪也很清楚

修改airflow.cfg:
sql_alchemy_conn = mysql://airflow:[email protected]:3306/airflow

使用root用戶連接到mysql:

create user 'airflow'@'%' identified by '123';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
set explicit_defaults_for_timestamp = 1; --這一行至關重要

再使用airflow用戶登錄mysql:
create database airflow CHARACTER SET = utf8mb4;

初始化數據庫

airflow initdb

這時候會報mysql依賴問題,如:
No module named '_mysql'

安裝python的mysql依賴:
No module named MySQLdb
python3: mysql錯誤:ModuleNotFoundError: No module named ‘ConfigParser’

這個時候終於可以啟動airflow了:
** 啟的時候不要使用root用戶,回到普通用戶 **

airflow webserver -p 8080
airflow scheduler

如何修改界面右上角顯示的時間到當前時區

相信應該所有人都會幹這個事情:
喲?airflow里有個時區的配置,改了應該就好了
default_timezone = Asia/Shanghai

然後去刷一下頁面
怎麼回事

還是UTC嘛,這配置騙人的嗎?
那麼看這一篇文章吧:
Airflow 修改時區

** 改的時候注意:** python的代碼是根據縮進來區別代碼塊的,所以拷代碼的時候一定要注意縮進沒有問題

如何添加任務

在~/airflow下創建dags文件夾,把.py文件放入即可
airflow啟動了一個叫 DagFileProcessorManager 的進程來掃描dags目錄,一但有文件個數變更,或者內容變更都會很快被檢測到
這個進程有相應的log文件,可以提供一些文件處理錯誤信息

調試任務python代碼

關閉schedule

這個時候已經開始寫任務的python代碼了,對於python小白與剛開始接觸airflow的小哥或老哥來說,簡直就是痛不欲生
有一個配置在調試的時候比較實用,就是關掉任務的schudle,只有手動觸發才會執行。
把dag的schedule_interval設置為None
schedule_interval=None

python小白實用技巧

還有python代碼里單引號和雙引號是等價的,如果有引號嵌套可以分開使用,避免麻煩的轉義,如:
hour = '{{ dag_run.conf["hour"] }}'

Jinja template

反正我第一眼看到這個東西,特別是官方教程里那一大塊的模板文本的時候,心裏只有一個字: WTF?!

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

其實也不是很複雜,這個玩意理解了以後還是比較方便的。除了在代碼中使用普通的python變量或者airflow內置變量之外,很多時候在字符串中也需要使用airflow的內置變量會比較靈活和方便,Jinja template提供的就是這個功能。

內置變量說明見:Macros reference

如何啟動spark任務

airflow是很強大很靈活的,官方提供了功能豐富的各種插件,各種JobOperator。來,簡單好用童叟無欺的SparkSubmitOperator了解一下?

我的需求很簡單,可以提交任務到不同的spark集群。這樣就要求不能使用機器默認的hadoop和spark環境變量,必須為每個任務指定獨立的配置文件。不知道是不是有大牛一次性成功的,反正我是試了無數次,一句話在心裏不停的重複:「這什麼吊東西!」
生氣

小可愚鈍,google能搜出來的都看過了,怎麼都不行,死活都不行,主要是環境變量不對。

調用linux腳本執行spark-submit是最靈活方便的辦法

轉念一想,還是傳統的spark提交方式最好用啊,就是執行sh腳本利用spark-submit進行提交,這樣spark就與airflow無關了,而且不管是環境變量還是參數配置都最靈活,傳入參數也很方便。
這樣只要使用普通的BashOperator就可以了,而且airflow應該專註如何調度任務,而不是還要兼顧任務的配置,就算SparkSubmitOperator可以工作,也是使用sh腳本的方式更好。

如何限定任務同時執行的個數

像spark任務通常使用的資源都會比較多,如果dag執行開始時間到當前時間間隔很長,或是暫停很長時間再開啟,那麼一開啟的時候schedule會瞬間創建大量任務,提交到默認的pool,這個pool默認的大小是128。這樣肯定是大家不希望看到的。

一個解決辦法,為每個spark任務創建單獨的pool,大小設置為1,這樣一個spark任務一次就只能有一個在運行狀態,後面都排隊。

界面上操作:[Admin] -> [Pools],slots設為1。
然後在spark task的operator里添加參數:pool='PoolName'

如何手動觸發任務時傳入參數

假設任務是每小時觸發一次,處理24小時前的數據,也就是今天8點處理昨天8點這一個小時的數據。除了schedule自動執行的邏輯,我們還希望可以手動觸發任務,並且指定某個小時重新處理。

** 註: ** 這個功能只有1.10.10才支持,就是在界面上點擊 [Trigger DAG] 的時候可以填入參數(固定為Json格式)。

先來看一下最終的結果
hour='{{ dag_run.conf["hour"] if dag_run.conf["hour"] else (execution_date - macros.timedelta(hours=24)).strftime("%Y%m%d%H") }}'
這裡使用了Jinja template,通過dag_run對象可以獲取界面上傳入的參數,如果沒有這個參數(也就是通過schedule觸發),那麼執行默認的邏輯(這裡是24之前),並且格式化時間與界面輸入保持一致。

如何在airflow界面上重新運行任務

這個功能默認的Executor是不支持的,需要安裝CeleryExecutor,而CeleryExecutor一個存放消息的框架,這裡我們選擇rabbitmq。

假定rabbitmq已經裝好。
安裝請看官方文檔:Celery Executor

配置

executor = CeleryExecutor
borker_url = amqp://user:[email protected]:port

** 註:** 如果rabbitmq是集群模式,這裡也是挑一台出來使用。指定所有節點我還沒有配置成功,如果有會配置的,請留言告知。

如何在界面上重跑任務呢?
界面上點擊dag進入dag管理界面,點擊[Tree View]。
Task每次運行都會用一個小方塊來表示,點擊小方塊,再點擊 [Run] 按鈕就可以了。

** 註:** Tree View 這裡最多只顯示固定數量的歷史記錄,如果再早的時間只能通過點擊 [Trigger DAG] 再指定參數運行。

任務運行時間的問題

這裡有一個關鍵的問題,在界面上點擊8個小時以前任務執行,那麼任務觸發的時候,運行的是8個小時之前的時間,還是當前時間呢?

如果我們是通過之前的hour變量的來指定時間的,那任務運行的時間就是8個小時之前,任務當時觸發的時間。為什麼呢?
我們在Jinja template里使用的變量 dag_run, execute_date這個並不是運行時變量,每次task觸發,相關的上下文信息都會存到數據庫里。所以8個小時之後我們再重新運行task的時候,是從數據庫中讀取當時的上下文信息,而不是現在的信息。

如何查看任務log及所有任務的運行記錄

查看所有任務的運行記錄

  1. DAG界面里的 [Graph View] -> 點擊任務 -> [Task Instances]
  2. 主菜單里的 [Browser] -> [Task Instances]

查看log

這就比較簡單了

  1. 點擊 [Tree View] 里的小方塊,可以查看log
  2. Task Intances 列表最後一列,也可以查看log

如何在任務失敗時發郵件(騰訊企業郵箱)

首先DAG的default_args需要配置

'email':['[email protected]'],
'email_on_failure': True

修改airflow.cfg

smtp_host = smtp.exmail.qq.com
smtp_starttls = False
smtp_ssl = True
smtp_port = 465
smtp_mail_from = [email protected]
smtp_user = [email protected]
smtp_password = password

實話說,這些配置都搞了好久才試出來,這種體驗簡直讓人慾哭無淚。當然,身為一個碼畜哭個什麼,到這裡已經被python和airflow的種問題折磨很多天了,素質三連走起來。

首先 smtp_ssl = True, smtp_port = 465 是一個重點。再次smtp_mail_from和smtp_user都使用同一個有效的郵箱地址。

如何在任務失敗時發消息到企業微信

有時候覺得發郵件可能還不夠,想把失敗消息發到企業微信,這樣更能及時的發現問題。

添加企業微信依賴

airflow官方支持釘釘的通知,企業微信通知就是根據這個改動而來,代碼有兩個py文件:airflow企業微信支持
把這兩個py文件放到 dags 目錄,也就是和dag的py文件放在一起。

使用方法:
3. 在企業微信群中創建機械人

  1. 右鍵點擊群

  2. 選擇 [Add Group Robot],並創建

  3. 獲取機械人的key:右鍵 [View Information],可以得到一個URL
    //qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx-xx-xx
    這個key的值就是機械人的ID

  4. 在airflow中創建企業微信的連接:[主菜單] -> [Admin] -> [Connections],配置填寫:

Conn Id: wechat_robot
Conn Type: HTTP
Host: //qyapi.weixin.qq.com
Password: 前面得到的key值,也就是機械人的ID

在代碼中使用

  1. 代碼中import WechatOperator
    from wechat_operator import WechatOperator

  2. 創建 failure call 方法:

def failure_callback(context):
    dagConf = context['dag_run'].conf
    taskInst = context['task_instance']

    hour = None
    if 'hour' in dagConf:
        hour = dagConf['hour']
    else:
        hour = (taskInst.execution_date - timedelta(hours=24)).strftime('%Y%m%d%H')

    message = 'Airflow任務失敗:\n' \
              'DAG: {}\n' \
              'Task: {}\n' \
              '時間: {}\n' \
        .format(taskInst.dag_id,
                taskInst.task_id,
                hour)
    return WechatOperator(
        task_id='wechat_task',
        wechat_conn_id='wechat_robot',
        message_type='text',
        message=message,
        at_all=True,
    ).execute(context)

這個代碼應該還是很好懂的,主要是為了創建 WechatOperator 對象。
有個邏輯來重新獲取執行時間(這裡必須使用代碼,而不能直接使用Jinja template),為的是在通知裏面可以直接看到是哪個時間出錯了。

  1. default_args添加 failure callback配置
    'on_failure_callbak': failure_callback

結束語

到這裡,總算是搭建好一個可以正式投入生產使用的環境了。

Airflow雖然很靈活,但是想真正滿足生產需求,還是經歷了不少痛苦。特別是要求會使用python,加上airflow官方文檔也不是很詳細,這兩點導致入門曲線太陡峭了。