Airflow自定義插件, 使用datax抽數
- 2019 年 10 月 3 日
- 筆記
Airflow自定義插件
Airflow之所以受歡迎的一個重要因素就是它的插件機制。Python成熟類庫可以很方便的引入各種插件。在我們實際工作中,必然會遇到官方的一些插件不足夠滿足需求的時候。這時候,我們可以編寫自己的插件。不需要你了解內部原理,甚至不需要很熟悉Python, 反正我連蒙帶猜寫的。
插件分類
Airflow的插件分為Operator和Sensor兩種。Operator是具體要執行的任務插件, Sensor則是條件傳感器,當我需要設定某些依賴的時候可以通過不同的sensor來感知條件是否滿足。
Airflow對插件提供的支持
插件肯定是Python文件了,系統必然需要加載才能執行。Airflow提供了一個簡單插件管理器,會掃描$AIRFLOW_HOME/plugins
加載我們的插件。
所以,我們只需要將寫好的插件放入這個目錄下就可以了。
插件語法
Operator和Sensor都聲明了需要的參數,Operator通過調用execute來執行, sensor通過poke來確認。以Operator為例子。
插件的使用過程為:
dag -> operator -> hook
Hook就是任務執行的具體操作了。
Operator通過繼承BaseOperator實現對dag相關屬性的綁定, Hook通過繼承BaseHook實現對系統配置和資源獲取的一些封裝。
自定義一個通知插件NotifyOperator
前文https://www.cnblogs.com/woshimrf/p/airflow-dag.html 提到我們通過自定義通知實現多功能任務告警,以下就是一個demo。
文件結構如下:
plugins │ ├── hooks │ └── operators
NotifyOperator
首先,在operators目錄下創建一個Operator.
# -*- coding: utf-8 -*- # from hooks.notify_hook import NotifyHook from airflow.operators.bash_operator import BaseOperator class NotifyOperator(BaseOperator): """ 使用通知服務發送通知 :param message: 內容 :type message: str or dict :param receivers: 英文逗號分割的羅盤賬號 :type receivers: str :param subject: 郵件主題 :type subject: str """ template_fields = ('message', 'subject') @apply_defaults def __init__(self, subject=None, message=None, receivers=None, *args, **kwargs): super().__init__(*args, **kwargs) self.message = message self.receivers = receivers self.subject = subject def execute(self, context): self.log.info('Sending notify message. receivers:{} message:{}'.format(self.receivers, self.message)) hook = NotifyHook( subject=self.subject, message=self.message, receivers=self.receivers ) hook.send()
- 繼承BaseOperator
- 引入NotifyHook, 這個還沒創建,等下創建
- template_fields, 想要使用模板變量替換,比如{{ds}}, 字段必須聲明到template_fields
- Operator執行的時候會調用execute方法, 這個就是執行的內容
上面可以看出,operator就是接口聲明。
NotifyHook
在hooks目錄下創建NotifyHook
# -*- coding: utf-8 -*- # import json import requests from airflow import AirflowException from airflow.hooks.http_hook import HttpHook class NotifyHook(HttpHook): """ 使用通知服務發送通知 :param send_type: 通知類型選填 MAIL,DINGDING,SMS,選填多個時中間用英文逗號隔開 :type send_type: str :param message: 內容 :type message: str or dict :param receivers: 英文逗號分割的賬號 :type receivers: str :param subject: 郵件主題 :type subject: str """ def __init__(self, notify_conn_id='notify_default', send_type='MAIL', subject=None, message=None, receivers=None, *args, **kwargs ): super().__init__(http_conn_id=notify_conn_id, *args, **kwargs) self.send_type = send_type self.message = message self.subject = subject self.receivers = receivers def _build_message(self): """ 構建data """ data = { "content": self.message, "contentType": "HTML", "receivers": self.receivers, "sendType": self.send_type, "sender": '【Airflow】', "subject": '【Airflow】' + self.subject } return json.dumps(data) def get_conn(self, headers=None): """ Overwrite HttpHook get_conn because just need base_url and headers and not don't need generic params :param headers: additional headers to be passed through as a dictionary :type headers: dict """ self.base_url = 'http://notify.ryan-miao.com' session = requests.Session() if headers: session.headers.update(headers) return session def send(self): """ Send Notify message """ data = self._build_message() self.log.info('Sending message: %s', data) resp = self.run(endpoint='/api/v2/notify/send', data=data, headers={'Content-Type': 'application/json', 'app-id': 'ryan', 'app-key': '123456'}) if int(resp.json().get('retCode')) != 0: raise AirflowException('Send notify message failed, receive error ' 'message %s', resp.text) self.log.info('Success Send notify message')
- 這裡使用的我自己的通知服務api調用。因為是http請求,所以直接繼承HttpHook來發送請求就可以了。
- http_conn_id是用來讀取數據庫中connection里配置的host的,這裡直接覆蓋,固定我們通知服務的地址。
- 通過拋出異常的方式來終止服務
如何使用
將上面兩個文件放到airflow對應的plugins目錄下, airflow就自動加載了。然後,當做任務類型使用
from operators.notify_operator import NotifyOperator notification = NotifyOperator( task_id="we_are_done", subject='發送郵件', message='content', receivers='ryanmiao' )
也可以直接執行。比如,我們前面提到任務失敗告警可以自定義通知。
from operators.notify_operator import NotifyOperator def mail_failure_callback(receivers): """ 失敗後郵件通知 :receivers 接收人,多個接收人用英文逗號分開 """ def mail_back(context): subject="【執行失敗】DAG {} TASK {} ds {}".format( context['task_instance'].dag_id, context['task_instance'].task_id, context['ds']) message="【執行失敗】DAG: {};<br> TASK: {} <br>; ds {} <br>; 原因: {} .<br>" "查看地址: http://airflow.ryan-miao.com/admin/airflow/tree?dag_id={}" .format( context['task_instance'].dag_id, context['task_instance'].task_id, context['ds'], context['exception'], context['task_instance'].dag_id) return NotifyOperator( task_id="mail_failed_notify_callback", subject=subject, message=message, receivers=receivers ).execute(context) return mail_back default_args = { 'owner': 'ryanmiao', 'depends_on_past': False, 'start_date': datetime(2019, 5, 1, 9), 'on_failure_callback': mail_failure_callback(receivers='ryanmiao'), 'retries': 0 } dag = DAG( 'example', default_args=default_args, schedule_interval=None)
自定義一個RDBMS2Hive插件
我們任務調度有個常見的服務是數據抽取到Hive,現在來製作這個插件,可以從關係數據庫中讀取數據,然後存儲到hive。這樣,用戶只要在airflow配置一下要抽數的database, table和目標hive table就可以實現每天數據入庫了。
異構數據傳輸轉換工具很多, 最簡單的就是使用原生的dump工具,將數據dump下來,然後import到另一個數據庫里。
比如postgres dump
將${sql}
查詢的列導出到文件${export_data_file}
psql -h$SRC_HOST_IP -U$SRC_USER_NAME -d$SRC_DB -p$SRC_HOST_PORT -c "copy (${sql}) to '${export_data_file}' WITH NULL AS ''"
然後導入hive
LOAD DATA LOCAL INPATH '${export_data_file}' INTO TABLE $TAR_TABLENAME PARTITION (BIZDATE='$BIZ_DATE')
對postgres來說,copy是最快的方案了, 但可能會遇到t
,n
等各種轉義符號,導出的txt文件或者cvs文件格式就會混亂,需要做對應符號轉義處理。
同樣, mysql 可以直接把數據查詢出來
cat search.sql | mysql -h"$SRC_HOST_IP" -u"$SRC_USER_NAME" -p"$SRC_USER_PWD" -P"$SRC_HOST_PORT" -D"$SRC_DB" --default-character-set=${mysql_charset} -N -s | sed "s/NULL/\\N/ig;s/\\\\n//ig" > result.txt
上述這些命令行的好處就是快,不好的地方在於shell命令的脆弱性和錯誤處理。最終,選擇了集成化的數據轉換工具datax. datax是阿里巴巴開源的一款異構數據源同步工具, 雖然看起來不怎麼更新了,但簡單使用還是可以的。https://github.com/alibaba/DataX
datax的用法相對簡單,按照文檔配置一下讀取數據源和目標數據源,然後執行調用就可以了。可以當做命令行工具來使用。
結合airflow,可以自己實現datax插件。通過讀取connections拿到數據源鏈接配置,然後生成datax的配置文件json,最後調用datax執行。下面是一個從pg或者mysql讀取數據,導入hive的插件實現。
主要思路是:
- hdfs創建一個目錄
- 生成datax配置文件
- datax執行配置文件,將數據抽取到hdfs
- hive命令行load hdfs
RDBMS2HiveOperator
# -*- coding: utf-8 -*- # # """ postgres或者mysql 入庫到hdfs """ import os import signal from hooks.rdbms_to_hive_hook import RDBMS2HiveHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator class RDBMS2HiveOperator(BaseOperator): """ 傳輸pg到hive https://github.com/alibaba/DataX :param conn_id: pg連接id :param query_sql : pg查詢語句 :param split_pk : pg分割主鍵, NONE表示不分割,指定後可以多線程分割,加快傳輸 :param hive_db : hive的db :param hive_table: hive的table :param hive_table_column column數組, column={name:a, type: int} 或者逗號分割的字符串, column=a,b,c :param hive_table_partition 分區bizdate值 """ template_fields = ('query_sql', 'hive_db', 'hive_table','hive_table_partition') ui_color = '#edd5f1' @apply_defaults def __init__(self, conn_id, query_sql, hive_db, hive_table, hive_table_column, hive_table_partition, split_pk=None, *args, **kwargs): super().__init__(*args, **kwargs) self.conn_id = conn_id self.query_sql = query_sql self.split_pk = split_pk self.hive_db = hive_db self.hive_table = hive_table self.hive_table_column = hive_table_column self.hive_table_partition = hive_table_partition def execute(self, context): """ Execute """ task_id = context['task_instance'].dag_id + "#" + context['task_instance'].task_id self.hook = RDBMS2HiveHook( task_id = task_id, conn_id = self.conn_id, query_sql = self.query_sql, split_pk=self.split_pk, hive_db=self.hive_db, hive_table=self.hive_table, hive_table_column=self.hive_table_column, hive_table_partition=self.hive_table_partition ) self.hook.execute(context=context) def on_kill(self): self.log.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)
RDBMS2HiveHook
# -*- coding: utf-8 -*- # """ datax入庫hive """ import subprocess import uuid import json import os from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook class RDBMS2HiveHook(BaseHook): """ Datax執行器 """ def __init__(self, task_id, conn_id, query_sql, hive_db, hive_table, hive_table_column, hive_table_partition, split_pk=None): self.task_id = task_id self.conn = self.get_connection(conn_id) self.query_sql = query_sql self.split_pk = split_pk self.hive_db = hive_db self.hive_table = hive_table self.hive_table_partition = hive_table_partition self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema)) self.hive_table_column = hive_table_column if isinstance(hive_table_column, str): self.hive_table_column = [] cl = hive_table_column.split(',') for item in cl: hive_table_column_item = { "name": item, "type": "string" } self.hive_table_column.append(hive_table_column_item) def Popen(self, cmd, **kwargs): """ Remote Popen :param cmd: command to remotely execute :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ self.sp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) for line in iter(self.sp.stdout): self.log.info(line.strip().decode('utf-8')) self.sp.wait() self.log.info("Command exited with return code %s", self.sp.returncode) if self.sp.returncode: raise AirflowException("Execute command failed") def generate_setting(self): """ datax速度等設置 """ self.setting= { "speed": { "byte": 104857600 }, "errorLimit": { "record": 0, "percentage": 0.02 } } return self.setting def generate_reader(self): """ datax reader """ conn_type = 'mysql' reader_name = 'mysqlreader' if(self.conn.conn_type == 'postgres'): conn_type = 'postgresql' reader_name = 'postgresqlreader' self.jdbcUrl = "jdbc:"+conn_type+"://"+self.conn.host.strip()+":"+str(self.conn.port)+"/"+ self.conn.schema.strip() self.reader = { "name": reader_name, "parameter": { "username": self.conn.login.strip(), "password": self.conn.password.strip(), "connection": [ { "querySql": [ self.query_sql ], "jdbcUrl": [ self.jdbcUrl ] } ] } } return self.reader def generate_writer(self): """ datax hdafs writer """ self.file_type = "text" self.hdfs_path = "/datax/"+self.hive_db+"/"+self.hive_table+"/"+self.hive_table_partition self.log.info("臨時存儲目錄:{}".format(self.hdfs_path)) self.writer = { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://nameservice1", "hadoopConfig": { "dfs.nameservices": "nameservice1", "dfs.ha.automatic-failover.enabled.nameservice1": True, "ha.zookeeper.quorum": "bigdata2-prod-nn01.ryan-miao.com:2181,bigdata2-prod-nn02.ryan-miao.com:2181,bigdata2-prod-nn03.ryan-miao.com:2181", "dfs.ha.namenodes.nameservice1": "namenode117,namenode124", "dfs.namenode.rpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8020", "dfs.namenode.servicerpc-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:8022", "dfs.namenode.http-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50070", "dfs.namenode.https-address.nameservice1.namenode117": "bigdata2-prod-nn01.ryan-miao.com:50470", "dfs.namenode.rpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8020", "dfs.namenode.servicerpc-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:8022", "dfs.namenode.http-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50070", "dfs.namenode.https-address.nameservice1.namenode124": "bigdata2-prod-nn02.ryan-miao.com:50470", "dfs.replication": 3, "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "fileType": self.file_type, "path": self.hdfs_path, "fileName": self.task_id, "column": self.hive_table_column, "writeMode": "nonConflict", "fieldDelimiter": "t" } } return self.writer def generate_config(self): content = [{ "reader": self.generate_reader(), "writer": self.generate_writer() }] job = { "setting": self.generate_setting(), "content": content } config = { "job": job } self.target_json = json.dumps(config) # write json to file self.json_file= '/tmp/datax_json_'+self.task_id+ uuid.uuid1().hex # 打開一個文件 fo = open(self.json_file, "w") fo.write(self.target_json) fo.close() self.log.info("write config json {}".format(self.json_file)) return self.json_file def execute(self, context): self.generate_config() # check hdfs_path hdfs_path = self.hdfs_path if(not hdfs_path.startswith('/datax/')): raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下") # 創建目錄 cmd = ['hadoop', 'fs', '-mkdir', '-p', hdfs_path] self.Popen(cmd) # 刪除文件 if(not hdfs_path.startswith('/datax/')): raise AirflowException("hdfs路徑填寫錯誤,不在/datax目錄下") files_path = hdfs_path+"/*"; try: cmd = ['hadoop', 'fs', '-rm', files_path] self.Popen(cmd) except Exception: self.log.info('ignore err, just make sure the dir is clean') pass # 上傳文件 datax_home = '/data/opt/datax/bin' cmd = [ 'python', datax_home + '/datax.py', self.json_file] self.Popen(cmd) # 刪除配置文件 os.remove(self.json_file) # hive加載 #hive load data from hdfs hql = "LOAD DATA INPATH '"+ hdfs_path + "' OVERWRITE INTO TABLE " + self.hive_db+"."+self.hive_table + " PARTITION (bizdate="+ self.hive_table_partition +")" cmd = ['hive', '-e', """ + hql + """] self.Popen(cmd)
如何使用
- admin登錄airflow
- 配置connection, 配置pg或者mysql的數據庫
- 修改hdfs集群配置信息
- 創建一個DAG
from airflow import DAG from operators.rdbms_to_hive_operator import RDBMS2HiveOperator from datetime import datetime, timedelta from dag_utils import compass_utils default_args = { 'owner': 'ryanmiao', 'depends_on_past': False, 'start_date': datetime(2019, 5, 1, 9), 'on_failure_callback': compass_utils.failure_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'), # 'on_success_callback': compass_utils.success_callback(dingding_conn_id='dingding_bigdata', receivers='ryanmiao'), 'retries': 0 } dag = DAG( 'example_pg2hive', default_args=default_args, schedule_interval=None) # CREATE TABLE test.pg2hive_test( # ftime int, # raw_cp_count int, # raw_to_delete_cp_count bigint, # create_time timestamp # ) # COMMENT '這個是測試datax表' # PARTITIONED BY (bizdate int) # ROW FORMAT DELIMITED # FIELDS TERMINATED BY 't' # LINES TERMINATED BY 'n' # STORED AS TEXTFILE; hive_table_column = "ftime,raw_cp_count,raw_to_delete_cp_count,create_time" t1 = RDBMS2HiveOperator( task_id='pg2hive', conn_id='pg_rdb_poi', query_sql='select ftime, raw_cp_count, raw_to_delete_cp_count, create_time from tbl_poi_report limit 1000', hive_db='test', hive_table='pg2hive_test', hive_table_column=hive_table_column, hive_table_partition="{{ ds_nodash }}", dag=dag )