Airflow自定義插件, 使用datax抽數

  • 2019 年 10 月 3 日
  • 筆記

Airflow自定義插件

Airflow之所以受歡迎的一個重要因素就是它的插件機制。Python成熟類庫可以很方便的引入各種插件。在我們實際工作中,必然會遇到官方的一些插件不足夠滿足需求的時候。這時候,我們可以編寫自己的插件。不需要你了解內部原理,甚至不需要很熟悉Python, 反正我連蒙帶猜寫的。

插件分類

Airflow的插件分為Operator和Sensor兩種。Operator是具體要執行的任務插件, Sensor則是條件傳感器,當我需要設定某些依賴的時候可以通過不同的sensor來感知條件是否滿足。

Airflow對插件提供的支持

插件肯定是Python文件了,系統必然需要加載才能執行。Airflow提供了一個簡單插件管理器,會掃描$AIRFLOW_HOME/plugins加載我們的插件。

所以,我們只需要將寫好的插件放入這個目錄下就可以了。

插件語法

Operator和Sensor都聲明了需要的參數,Operator通過調用execute來執行, sensor通過poke來確認。以Operator為例子。

插件的使用過程為:

dag  -> operator -> hook

Hook就是任務執行的具體操作了。

Operator通過繼承BaseOperator實現對dag相關屬性的綁定, Hook通過繼承BaseHook實現對系統配置和資源獲取的一些封裝。

自定義一個通知插件NotifyOperator

前文https://www.cnblogs.com/woshimrf/p/airflow-dag.html 提到我們通過自定義通知實現多功能任務告警,以下就是一個demo。

文件結構如下:

plugins  │   ├── hooks  │   └── operators

NotifyOperator

首先,在operators目錄下創建一個Operator.

# -*- coding: utf-8 -*-  #    from hooks.notify_hook import NotifyHook  from airflow.operators.bash_operator import BaseOperator    class NotifyOperator(BaseOperator):      """      使用通知服務發送通知        :param message: 內容      :type message: str or dict      :param receivers: 英文逗號分割的羅盤賬號      :type receivers: str      :param subject: 郵件主題      :type subject: str      """      template_fields = ('message', 'subject')        @apply_defaults      def __init__(self,                   subject=None,                   message=None,                   receivers=None,                   *args,                   **kwargs):          super().__init__(*args, **kwargs)          self.message = message          self.receivers = receivers          self.subject = subject        def execute(self, context):          self.log.info('Sending notify message. receivers:{}  message:{}'.format(self.receivers, self.message))          hook = NotifyHook(              subject=self.subject,              message=self.message,              receivers=self.receivers          )          hook.send()
  • 繼承BaseOperator
  • 引入NotifyHook, 這個還沒創建,等下創建
  • template_fields, 想要使用模板變量替換,比如{{ds}}, 字段必須聲明到template_fields
  • Operator執行的時候會調用execute方法, 這個就是執行的內容

上面可以看出,operator就是接口聲明。

NotifyHook

在hooks目錄下創建NotifyHook

# -*- coding: utf-8 -*-  #    import json  import requests  from airflow import AirflowException  from airflow.hooks.http_hook import HttpHook    class NotifyHook(HttpHook):      """      使用通知服務發送通知        :param send_type: 通知類型選填 MAIL,DINGDING,SMS,選填多個時中間用英文逗號隔開      :type send_type: str      :param message: 內容      :type message: str or dict      :param receivers: 英文逗號分割的賬號      :type receivers: str      :param subject: 郵件主題      :type subject: str      """        def __init__(self,                   notify_conn_id='notify_default',                   send_type='MAIL',                   subject=None,                   message=None,                   receivers=None,                   *args,                   **kwargs                   ):          super().__init__(http_conn_id=notify_conn_id, *args, **kwargs)          self.send_type = send_type          self.message = message          self.subject = subject          self.receivers = receivers          def _build_message(self):          """          構建data          """          data = {                  "content": self.message,                  "contentType": "HTML",                  "receivers": self.receivers,                  "sendType": self.send_type,                  "sender": '【Airflow】',                  "subject": '【Airflow】' + self.subject              }          return json.dumps(data)        def get_conn(self, headers=None):          """          Overwrite HttpHook get_conn because just need base_url and headers and          not don't need generic params            :param headers: additional headers to be passed through as a dictionary          :type headers: dict          """          self.base_url = 'http://notify.ryan-miao.com'          session = requests.Session()          if headers:              session.headers.update(headers)          return session        def send(self):          """          Send Notify message          """          data = self._build_message()          self.log.info('Sending message: %s',  data)          resp = self.run(endpoint='/api/v2/notify/send',                          data=data,                          headers={'Content-Type': 'application/json',                                   'app-id': 'ryan',                                   'app-key': '123456'})          if int(resp.json().get('retCode')) != 0:              raise AirflowException('Send notify message failed, receive error '                                     'message %s', resp.text)          self.log.info('Success Send notify message')
  • 這裡使用的我自己的通知服務api調用。因為是http請求,所以直接繼承HttpHook來發送請求就可以了。
  • http_conn_id是用來讀取數據庫中connection里配置的host的,這裡直接覆蓋,固定我們通知服務的地址。
  • 通過拋出異常的方式來終止服務

如何使用

將上面兩個文件放到airflow對應的plugins目錄下, airflow就自動加載了。然後,當做任務類型使用

from operators.notify_operator import NotifyOperator    notification = NotifyOperator(                      task_id="we_are_done",                      subject='發送郵件',                      message='content',                      receivers='ryanmiao'                  )  

也可以直接執行。比如,我們前面提到任務失敗告警可以自定義通知。

from operators.notify_operator import NotifyOperator    def mail_failure_callback(receivers):      """      失敗後郵件通知      :receivers  接收人,多個接收人用英文逗號分開      """        def mail_back(context):          subject="【執行失敗】DAG {} TASK {} ds {}".format(                                                  context['task_instance'].dag_id,                                                  context['task_instance'].task_id,                                                  context['ds'])          message="【執行失敗】DAG:  {};<br> TASK:  {} <br>; ds {} <br>;   原因: {} .<br>"                   "查看地址: http://airflow.ryan-miao.com/admin/airflow/tree?dag_id={}"                   .format(                  context['task_instance'].dag_id,                  context['task_instance'].task_id,                  context['ds'],                  context['exception'],                  context['task_instance'].dag_id)            return NotifyOperator(                      task_id="mail_failed_notify_callback",                      subject=subject,                      message=message,                      receivers=receivers                  ).execute(context)        return mail_back    default_args = {      'owner': 'ryanmiao',      'depends_on_past': False,      'start_date': datetime(2019, 5, 1, 9),      'on_failure_callback': mail_failure_callback(receivers='ryanmiao'),      'retries': 0  }    dag = DAG(      'example', default_args=default_args, schedule_interval=None)

自定義一個RDBMS2Hive插件

我們任務調度有個常見的服務是數據抽取到Hive,現在來製作這個插件,可以從關係數據庫中讀取數據,然後存儲到hive。這樣,用戶只要在airflow配置一下要抽數的database, table和目標hive table就可以實現每天數據入庫了。

異構數據傳輸轉換工具很多, 最簡單的就是使用原生的dump工具,將數據dump下來,然後import到另一個數據庫里。

比如postgres dump

${sql}查詢的列導出到文件${export_data_file}

psql -h$SRC_HOST_IP -U$SRC_USER_NAME -d$SRC_DB -p$SRC_HOST_PORT -c  "copy (${sql}) to '${export_data_file}' WITH NULL AS ''"

然後導入hive

LOAD DATA LOCAL INPATH '${export_data_file}' INTO TABLE $TAR_TABLENAME PARTITION (BIZDATE='$BIZ_DATE')

對postgres來說,copy是最快的方案了, 但可能會遇到t,n等各種轉義符號,導出的txt文件或者cvs文件格式就會混亂,需要做對應符號轉義處理。

同樣, mysql 可以直接把數據查詢出來

cat search.sql | mysql -h"$SRC_HOST_IP" -u"$SRC_USER_NAME" -p"$SRC_USER_PWD" -P"$SRC_HOST_PORT" -D"$SRC_DB" --default-character-set=${mysql_charset} -N -s | sed "s/NULL/\\N/ig;s/\\\\n//ig" > result.txt

上述這些命令行的好處就是快,不好的地方在於shell命令的脆弱性和錯誤處理。最終,選擇了集成化的數據轉換工具datax. datax是阿里巴巴開源的一款異構數據源同步工具, 雖然看起來不怎麼更新了,但簡單使用還是可以的。https://github.com/alibaba/DataX

datax的用法相對簡單,按照文檔配置一下讀取數據源和目標數據源,然後執行調用就可以了。可以當做命令行工具來使用。

結合airflow,可以自己實現datax插件。通過讀取connections拿到數據源鏈接配置,然後生成datax的配置文件json,最後調用datax執行。下面是一個從pg或者mysql讀取數據,導入hive的插件實現。

主要思路是:

  1. hdfs創建一個目錄
  2. 生成datax配置文件
  3. datax執行配置文件,將數據抽取到hdfs
  4. hive命令行load hdfs

RDBMS2HiveOperator

# -*- coding: utf-8 -*-  #  #    """  postgres或者mysql 入庫到hdfs  """  import os  import signal    from hooks.rdbms_to_hive_hook import RDBMS2HiveHook  from airflow.exceptions import AirflowException  from airflow.models import BaseOperator      class RDBMS2HiveOperator(BaseOperator):      """      傳輸pg到hive      https://github.com/alibaba/DataX        :param conn_id: pg連接id      :param query_sql : pg查詢語句      :param split_pk  : pg分割主鍵, NONE表示不分割,指定後可以多線程分割,加快傳輸      :param hive_db   : hive的db      :param hive_table: hive的table      :param hive_table_column  column數組, column={name:a, type: int} 或者逗號分割的字符串, column=a,b,c      :param hive_table_partition 分區bizdate值      """      template_fields = ('query_sql',  'hive_db', 'hive_table','hive_table_partition')      ui_color = '#edd5f1'        @apply_defaults      def __init__(self,                   conn_id,                   query_sql,                   hive_db,                   hive_table,                   hive_table_column,                   hive_table_partition,                   split_pk=None,                   *args,                   **kwargs):          super().__init__(*args, **kwargs)          self.conn_id = conn_id          self.query_sql = query_sql          self.split_pk = split_pk          self.hive_db = hive_db          self.hive_table = hive_table          self.hive_table_column = hive_table_column          self.hive_table_partition = hive_table_partition          def execute(self, context):          """          Execute          """          task_id = context['task_instance'].dag_id + "#" + context['task_instance'].task_id            self.hook = RDBMS2HiveHook(                          task_id = task_id,                          conn_id = self.conn_id,                          query_sql = self.query_sql,                          split_pk=self.split_pk,                          hive_db=self.hive_db,                          hive_table=self.hive_table,                          hive_table_column=self.hive_table_column,                          hive_table_partition=self.hive_table_partition                          )          self.hook.execute(context=context)          def on_kill(self):          self.log.info('Sending SIGTERM signal to bash process group')          os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)  

RDBMS2HiveHook

# -*- coding: utf-8 -*-  #    """  datax入庫hive  """  import subprocess  import uuid  import json  import os    from airflow.exceptions import AirflowException  from airflow.hooks.base_hook import BaseHook      class RDBMS2HiveHook(BaseHook):      """      Datax執行器      """        def __init__(self,                   task_id,                   conn_id,                   query_sql,                   hive_db,                   hive_table,                   hive_table_column,                   hive_table_partition,                   split_pk=None):          self.task_id = task_id          self.conn = self.get_connection(conn_id)          self.query_sql = query_sql          self.split_pk = split_pk          self.hive_db = hive_db          self.hive_table = hive_table          self.hive_table_partition = hive_table_partition          self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema))            self.hive_table_column = hive_table_column          if isinstance(hive_table_column, str):              self.hive_table_column = []              cl = hive_table_column.split(',')              for item in cl:                  hive_table_column_item = {                      "name": item,                      "type": "string"                  }                  self.hive_table_column.append(hive_table_column_item)          def Popen(self, cmd, **kwargs):          """          Remote Popen            :param cmd: command to remotely execute          :param kwargs: extra arguments to Popen (see subprocess.Popen)          :return: handle to subprocess          """          self.sp = subprocess.Popen(              cmd,              stdout=subprocess.PIPE,              stderr=subprocess.STDOUT,              **kwargs)            for line in iter(self.sp.stdout):              self.log.info(line.strip().decode('utf-8'))            self.sp.wait()            self.log.info("Command exited with return code %s", self.sp.returncode)            if self.sp.returncode:              raise AirflowException("Execute command failed")            def generate_setting(self):          """           datax速度等設置          """          self.setting= {              "speed": {                   "byte": 104857600              },              "errorLimit": {                  "record": 0,                  "percentage": 0.02              }          }          return self.setting        def generate_reader(self):          """          datax reader          """          conn_type = 'mysql'          reader_name = 'mysqlreader'          if(self.conn.conn_type == 'postgres'):              conn_type = 'postgresql'              reader_name = 'postgresqlreader'            self.jdbcUrl =  "jdbc:"+conn_type+"://"+self.conn.host.strip()+":"+str(self.conn.port)+"/"+ self.conn.schema.strip()          self.reader =  {              "name": reader_name,              "parameter": {                  "username": self.conn.login.strip(),                  "password": self.conn.password.strip(),                  "connection": [                      {                          "querySql": [                              self.query_sql                          ],                          "jdbcUrl": [                              self.jdbcUrl                          ]                      }                  ]              }          }            return self.reader        def generate_writer(self):          """          datax hdafs writer          """          self.file_type = "text"          self.hdfs_path = "/datax/"+self.hive_db+"/"+self.hive_table+"/"+self.hive_table_partition          self.log.info("臨時存儲目錄:{}".format(self.hdfs_path))          self.writer = {                      "name": "hdfswriter",                      "parameter": {                          "defaultFS": "hdfs://nameservice1",                          "hadoopConfig": {                              "dfs.nameservices": "nameservice1",                              "dfs.ha.automatic-failover.enabled.nameservice1": True,                              "ha.zookeeper.quorum": "bigdata2-prod-nn01.ryan-miao.com:2181,bigdata2-prod-nn02.ryan-miao.com:2181,bigdata2-prod-nn03.ryan-miao.com:2181",                              "dfs.ha.namenodes.nameservice1": "namenode117,namenode124",                              "dfs.namenode.rpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8020",                              "dfs.namenode.servicerpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8022",                              "dfs.namenode.http-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50070",                              "dfs.namenode.https-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50470",                              "dfs.namenode.rpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8020",                              "dfs.namenode.servicerpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8022",                              "dfs.namenode.http-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50070",                              "dfs.namenode.https-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50470",                              "dfs.replication": 3,                              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"                          },                          "fileType": self.file_type,                          "path": self.hdfs_path,                          "fileName": self.task_id,                          "column": self.hive_table_column,                          "writeMode": "nonConflict",                          "fieldDelimiter": "t"                      }              }          return self.writer        def generate_config(self):          content = [{              "reader": self.generate_reader(),              "writer": self.generate_writer()          }]            job = {              "setting": self.generate_setting(),              "content": content          }            config = {              "job": job          }            self.target_json = json.dumps(config)            # write json to file          self.json_file= '/tmp/datax_json_'+self.task_id+ uuid.uuid1().hex          # 打開一個文件          fo = open(self.json_file, "w")          fo.write(self.target_json)          fo.close()          self.log.info("write config json {}".format(self.json_file))          return self.json_file            def execute(self, context):          self.generate_config()          # check hdfs_path          hdfs_path = self.hdfs_path          if(not hdfs_path.startswith('/datax/')):              raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下")            # 創建目錄          cmd = ['hadoop', 'fs', '-mkdir', '-p', hdfs_path]          self.Popen(cmd)            # 刪除文件          if(not hdfs_path.startswith('/datax/')):              raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下")            files_path = hdfs_path+"/*";          try:              cmd = ['hadoop', 'fs', '-rm', files_path]              self.Popen(cmd)          except Exception:              self.log.info('ignore err, just make sure the dir is clean')              pass              # 上傳文件          datax_home = '/data/opt/datax/bin'          cmd = [ 'python', datax_home + '/datax.py', self.json_file]          self.Popen(cmd)          # 刪除配置文件          os.remove(self.json_file)            # hive加載          #hive load data from hdfs          hql = "LOAD DATA INPATH '"+ hdfs_path + "' OVERWRITE  INTO TABLE "                 + self.hive_db+"."+self.hive_table + " PARTITION (bizdate="+ self.hive_table_partition  +")"          cmd = ['hive', '-e', """ + hql + """]          self.Popen(cmd)

如何使用

  1. admin登錄airflow
  2. 配置connection, 配置pg或者mysql的數據庫
  3. 修改hdfs集群配置信息
  4. 創建一個DAG
from airflow import DAG    from operators.rdbms_to_hive_operator import RDBMS2HiveOperator  from datetime import datetime, timedelta  from dag_utils import compass_utils      default_args = {      'owner': 'ryanmiao',      'depends_on_past': False,      'start_date': datetime(2019, 5, 1, 9),      'on_failure_callback': compass_utils.failure_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'),      # 'on_success_callback': compass_utils.success_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'),      'retries': 0  }    dag = DAG(      'example_pg2hive', default_args=default_args, schedule_interval=None)    # CREATE TABLE test.pg2hive_test(  #      ftime int,  #      raw_cp_count int,  #      raw_to_delete_cp_count bigint,  #      create_time timestamp  #      )  #  COMMENT '這個是測試datax表'  #  PARTITIONED BY (bizdate int)  # ROW FORMAT DELIMITED  # FIELDS TERMINATED BY 't'  # LINES TERMINATED BY 'n'  #  STORED AS TEXTFILE;    hive_table_column = "ftime,raw_cp_count,raw_to_delete_cp_count,create_time"    t1 = RDBMS2HiveOperator(      task_id='pg2hive',      conn_id='pg_rdb_poi',      query_sql='select ftime, raw_cp_count, raw_to_delete_cp_count, create_time from tbl_poi_report limit 1000',      hive_db='test',      hive_table='pg2hive_test',      hive_table_column=hive_table_column,      hive_table_partition="{{ ds_nodash }}",      dag=dag  )