開源數據品質解決方案——Apache Griffin入門寶典

提到格里芬—Griffin,大家想到更多的是籃球明星或者戰隊名,但在大數據領域Apache Griffin(以下簡稱Griffin)可是數據品質領域響噹噹的一哥。先說一句:Griffin是大數據品質監控領域唯一的Apache項目,懂了吧。

​ 在不重視數據品質的大數據發展時期,Griffin並不能引起重視,但是隨著數據治理在很多企業的全面開展與落地,數據品質的問題開始引起重視。

​ 還是那句話,商用版的解決方案暫時不在本文的討論範圍內,目前大數據流動公眾號對於數據治理工具的研究還是在開源方向,希望通過開源+二次開發結合的方式找到適合自己公司的數據治理工具箱。在未來有靠譜的商用方案,我們也會保持關注~

本文將從數據品質,Griffin簡介,Griffin架構,Griffin快速入門,Griffin批數據實戰,Griffin流數據實戰整合六個部分進行介紹,目的是帶大家快速的入門數據品質管理工具的使用。

本文檔版權屬於公眾號:大數據流動 所有。未經授權,請勿轉載與商用!

考慮到抄襲問題,Griffin後續的高階技術文章可能會付費,也希望大家能儘早加入數據治理、Griffin等相關技術群,我會將最新的文章與資料實時同步。

一、數據品質

​ 數據品質管理(Data Quality Management),是指對數據從計劃、獲取、存儲、共享、維護、應用、消亡生命周期的每個階段里可能引發的各類數據品質問題,進行識別、度量、監控、預警等一系列管理活動,並通過改善和提高組織的管理水平使得數據品質獲得進一步提高。

數據品質管理不是一時的數據治理手段,而是循環的管理過程。其終極目標是通過可靠的數據,提升數據在使用中的價值,並最終為企業贏得經濟效益。

​ 為什麼會有數據品質管理呢?

​ 大數據時代數據的核心不是「大」,而在於「有價值」,而有價值的關鍵在於「品質」。但現實是,數據往往存在很多問題:

  • 數據無法匹配
  • 數據不可識別
  • 時效性不強
  • 數據不一致
  • 。。。。

​ 那麼,解決數據品質要達到什麼目標呢?

總結來說就是可信和可用

可信就是讓數據具有實用性,準確性,及時性,完整性,有效性。

可用就是規範性和可讀性。

數據品質可能不是數據治理的最核心部分,但可能會成為數據治理落地的做大障礙。

提高數據品質有多種方式,比如建立統一的數據標準、提高人員的意識與能力等等。

而一個提高數據品質的高生產力方式就是使用數據品質管理工具

數據品質管理工具成熟的並不多,所以本文就不做無用的對比了,我們直接進入正題:Apache Griffin。

二、Griffin簡介

​ Griffin是一個開源的大數據數據品質解決方案,由eBay開源,它支援批處理和流模式兩種數據品質檢測方式,是一個基於Hadoop和Spark建立的數據品質服務平台 (DQSP)。它提供了一個全面的框架來處理不同的任務,例如定義數據品質模型、執行數據品質測量、自動化數據分析和驗證,以及跨多個數據系統的統一數據品質可視化。

Griffin於2016年12月進入Apache孵化器,Apache軟體基金會2018年12月12日正式宣布Apache Griffin畢業成為Apache頂級項目。

Griffin官網地址://griffin.apache.org/

Github地址://github.com/apache/griffin

在eBay的數據品質管理實踐中,需要花費很長時間去修複數據品質的問題,不管是批處理還是流處理,解決數據品質問題的時間都是巨大的,由此一個統一的數據品質系統就應運而生了。

在官網的定義中,Apache Griffin也早就更新為了批和流(Batch and Streaming)數據品質解決方案。Apache Griffin已經在朝著數據品質的統一管理平台而努力了。

Griffin主要有如下的功能特點:

  • 度量:精確度、完整性、及時性、唯一性、有效性、一致性。
  • 異常監測:利用預先設定的規則,檢測出不符合預期的數據,提供不符合規則數據的下載。
  • 異常告警:通過郵件或門戶報告數據品質問題。
  • 可視化監測:利用控制面板來展現數據品質的狀態。
  • 實時性:可以實時進行數據品質檢測,能夠及時發現問題。
  • 可擴展性:可用於多個數據系統倉庫的數據校驗。
  • 可伸縮性:工作在大數據量的環境中,目前運行的數據量約1.2PB(eBay環境)。
  • 自助服務:Griffin提供了一個簡潔易用的用戶介面,可以管理數據資產和數據品質規則;同時用戶可以通過控制面板查看數據品質結果和自定義顯示內容。

Apache Giffin目前的數據源包括HIVE, CUSTOM, AVRO, KAFKA。Mysql和其他關係型資料庫的擴展根據需要進行擴展。

當然Giffin也不是萬能的,目前Griffin還是有很多的問題的,選擇也要慎重:

Griffin的社區並不太活躍,可以共同討論的人不多。

目前最新版本還是0.6,可能會有一些問題。

網上技術文檔很少,當然這方面大數據流動也會不斷的輸出新的技術文檔幫助大家。

三、Griffin架構

​ 數據品質模組是大數據平台中必不可少的一個功能組件,以下Griffin作為一個開源的大數據數據品質解決方案,它支援批處理和流模式兩種數據品質檢測方式,可以從不同維度(比如離線任務執行完畢後檢查源端和目標端的數據數量是否一致、源表的數據空值數量等)度量數據資產,從而提升數據的準確度、可信度。

在Griffin的架構中,主要分為Define、Measure和Analyze三個部分,如下圖所示:

各部分的職責如下:

  • Define:主要負責定義數據品質統計的維度,比如數據品質統計的時間跨度、統計的目標(源端和目標端的數據數量是否一致,數據源里某一欄位的非空的數量、不重複值的數量、最大值、最小值、top5的值數量等)
  • Measure:主要負責執行統計任務,生成統計結果
  • Analyze:主要負責保存與展示統計結果

聽起來有些晦澀,我們來看一下一個完整的Griffin任務的執行流程。

  • 註冊數據,把想要檢測數據品質的數據源註冊到griffin。
  • 配置度量模型,可以從數據品質維度來定義模型,如:精確度、完整性、及時性、唯一性等。
  • 配置定時任務提交spark集群,定時檢查數據。
  • 在門戶介面上查看指標,分析數據品質校驗結果。

Griffin 系統主要分為:數據收集處理層(Data Collection&Processing Layer)、後端服務層(Backend Service Layer)和用戶介面(User Interface)

數據處理和存儲層:

對於批量分析,數據品質模型將根據 hadoop 中的數據源計算 Spark 集群中的數據品質指標。

對於近實時分析,使用來自消息傳遞系統的數據,然後數據品質模型將基於 Spark 集群計算實時數據品質指標。對於數據存儲,可以在後端使用Elasticsearch來滿足前端請求。

Apache Griffin 服務:

項目有提供Restful 服務來完成 Apache Griffin 的所有功能,例如探索數據集、創建數據品質度量、發布指標、檢索指標、添加訂閱等。因此,開發人員可以基於這些 Web 開發自己的用戶介面服務。

這種靈活性也讓Griffin 得到了越來越多的應用。

四、Griffin快速入門

Griffin的最新版本為0.6.0,本文的安裝部署也基於這個版本進行。

依賴準備

JDK (1.8 or later versions)
MySQL(version 5.6及以上)
Hadoop (2.6.0 or later)
Hive (version 2.x)
Spark (version 2.2.1)
Livy(livy-0.5.0-incubating)
ElasticSearch (5.0 or later versions)

大部分CDH已經自帶,這裡特別說一下Livy和ElasticSearch如何部署。

Livy是一個Spark的Rest伺服器。

//livy.apache.org/

準備livy安裝包。

  1. 將livy安裝包解壓到/opt/目錄下
  2. 創建livy用戶、log目錄並將livy的home目錄屬主修改為livy:hadoop
useradd livy -g hadoop
mkdir /var/log/livy
mkdir /var/run/livy
chown livy:hadoop /var/log/livy
chown livy:hadoop /var/run/livy
chown -R livy:hadoop /opt/cloudera/apache-livy-0.6.0-incubating-bin/

3.進入livy home目錄,在conf目錄下創建livy.conf、livy-env.sh、spark-blacklist.conf配置文件

livy.conf、livy-env.sh、spark-blacklist.conf

4.修改配置文件livy.conf,添加如下內容

livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.environment = production
livy.impersonation.enabled = true
livy.server.csrf_protection.enabled false
livy.server.port = 8998
livy.server.session.timeout = 3600000
livy.server.recovery.mode = recovery
livy.server.recovery.state-store=filesystem
livy.server.recovery.state-store.url=/tmp/livy

5.修改配置文件livy-env.sh,增加hadoop和Spark的配置資訊,如下

export JAVA_HOME=/usr/java/jdk1.8.0_181
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export SPARK_CONF_DIR=/etc/spark2/conf
export SPARK_HOME=/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh6.3.2.p0.1041012/lib/spark2
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LIVY_LOG_DIR=/var/log/livy
export LIVY_PID_DIR=/var/run/livy
export LIVY_SERVER_JAVA_OPTS="-Xmx2g"

6.修改配置文件spark-blacklist.conf

# Configuration override / blacklist. Defines a list of properties that users are not allowed
# to override when starting Spark sessions.
#
# This file takes a list of property names (one per line). Empty lines and lines starting with "#"
# are ignored.
#
# Disallow overriding the master and the deploy mode.
spark.master
spark.submit.deployMode
# Disallow overriding the location of Spark cached jars.
spark.yarn.jar
spark.yarn.jars
spark.yarn.archive
# Don't allow users to override the RSC timeout.
livy.rsc.server.idle-timeout
  1. core-site.xml 的群集範圍高級配置程式碼段(安全閥)」配置項增加如下內容

    <property>
        <name>hadoop.proxyuser.livy.groups</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.proxyuser.livy.hosts</name>
        <value>*</value>
    </property>
    

    8.在HDFS上創建livy的home目錄

    sudo -u hdfs hadoop fs -mkdir /user/livy
    sudo -u hdfs hadoop fs -chown livy:supergroup /user/livy
    

    9、啟動livy服務

livy-server start

elasticsearch5安裝,安裝包也已下載在資料包中。

 wget //artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.15.tar.gz

# tar -zxvf elasticsearch-5.6.15

# cd elasticsearch-5.6.15

# sh ./bin/elasticsearch

配置準備

1、首先在mysql中初始化quartz資料庫,這裡需要用到腳本Init_quartz_mysql_innodb.sql。

腳本可以加griffin群,領取資料包下載。

mysql -u <username> -p <password> < Init_quartz_mysql_innodb.sql

2、Hadoop和Hive:

從Hadoop伺服器拷貝配置文件到Livy伺服器上,這裡假設將配置文件放在/usr/data/conf目錄下。

在Hadoop伺服器上創建/home/spark_conf目錄,並將Hive的配置文件hive-site.xml上傳到該目錄下:

#創建/home/spark_conf目錄
hadoop fs -mkdir -p /home/spark_conf
#上傳hive-site.xml
hadoop fs -put hive-site.xml /home/spark_conf/

3、設置環境變數:

#!/bin/bash
export JAVA_HOME=/data/jdk1.8.0_192

#spark目錄
export SPARK_HOME=/usr/data/spark-2.1.1-bin-2.6.3
#livy命令目錄
export LIVY_HOME=/usr/data/livy/bin
#hadoop配置文件目錄
export HADOOP_CONF_DIR=/usr/data/conf

4、配置啟動Livy

更新livy/conf下的livy.conf配置文件:

livy.server.host = 127.0.0.1
livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.repl.enable-hive-context = true

啟動livy:

livy-server start

5、Elasticsearch配置:

在ES里創建griffin索引:

curl -XPUT //es:9200/griffin -d '
{
    "aliases": {},
    "mappings": {
        "accuracy": {
            "properties": {
                "name": {
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    },
                    "type": "text"
                },
                "tmst": {
                    "type": "date"
                }
            }
        }
    },
    "settings": {
        "index": {
            "number_of_replicas": "2",
            "number_of_shards": "5"
        }
    }
}
'

接下來進行源碼編譯打包。

Griffin的源碼結構很清晰,主要包括griffin-doc、measure、service和ui四個模組,其中griffin-doc負責存放Griffin的文檔,measure負責與spark交互,執行統計任務,service使用spring boot作為服務實現,負責給ui模組提供交互所需的restful api,保存統計任務,展示統計結果。

源碼導入構建完畢後,需要修改配置文件,具體修改的配置文件如下:

application.properties:mysql,hive,es配置

quartz.properties

sparkProperties.json

配置文件修改好後,在idea里的terminal里執行如下maven命令進行編譯打包:

mvn -Dmaven.test.skip=true clean install

命令執行完成後,會在service和measure模組的target目錄下分別看到service-0.6.0.jar和measure-0.6.0.jar兩個jar,將這兩個jar分別拷貝到伺服器目錄下。

1、使用如下命令將measure-0.4.0.jar這個jar上傳到HDFS的/griffin文件目錄里:

#改變jar名稱
mv measure-0.6.0.jar griffin-measure.jar
#上傳griffin-measure.jar到HDFS文件目錄里
hadoop fs -put measure-0.6.0.jar /griffin/

2、運行service-0.6.0.jar,啟動Griffin管理後台:

nohup java -jar service-0.6.0.jar>service.out 2>&1 &

幾秒鐘後,我們可以訪問Apache Griffin的默認UI(默認情況下,spring boot的埠是8080)。

//IP:8080

部分結果展示介面如下:

五、Griffin批數據實戰

官網給出了批處理數據的例子。

1、在hive里創建表demo_src和demo_tgt:

--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE `demo_src`(
  `id` bigint,
  `age` int,
  `desc` string) 
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_src';

--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE `demo_tgt`(
  `id` bigint,
  `age` int,
  `desc` string) 
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_tgt';

2、生成測試數據:

從//griffin.apache.org/data/batch/地址下載所有文件到Hadoop伺服器上,然後使用如下命令執行gen-hive-data.sh腳本:

nohup ./gen-hive-data.sh>gen.out 2>&1 &

注意觀察gen.out日誌文件,如果有錯誤,視情況進行調整。這裡我的測試環境Hadoop和Hive安裝在同一台伺服器上,因此直接運行腳本。

3、通過UI介面創建統計任務

選擇DataAssets

在該頁面可以看到數據資產展示

點擊Measures,創建度量頁面

通過下面的步驟來一步步創建

選擇數據源

選擇目標

將兩者關聯

設置一些參數

配置好提交

新增定時任務

用cron表達式建立任務

點擊DQ Metrics,看到效果。

六、Griffin流數據實戰

還會參考官網的例子。

示例流數據如下:

{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...

官方也提供了測試數據的腳本//griffin.apache.org/data/streaming/(已存資料包)

通過腳本可以源源不斷將數據寫入Kafka

#!/bin/bash

#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target

#every minute
set +e
while true
do
  /opt/module/data/gen-data.sh
  sleep 90
done
set -e

Flink部分就是簡單接收Kafka數據,然後再發向下游,部分程式碼片段如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer010<String> kafkaconsumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> dataStream = env.addSource(kafkaconsumer);

DataStream<String> target = dataStream.add...//具體處理邏輯

target..addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();

配合env.json

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "//es:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "zk:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

dq.json

{
  "name": "streaming_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/source",
        "info.path": "source",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-5m", "0"],
        "updatable": true
      }
    }, {
      "name": "tgt",
      "connectors": [
        {
          "type": "kafka",
          "version": "0.8",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "target",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/target",
        "info.path": "target",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-1m", "0"]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out":[
          {
            "type":"metric",
            "name": "accu"
          },
          {
            "type":"record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}

提交任務

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json

七、總結

數據管理工具目前來說還是非常匱乏的,Griffin提供的不僅僅是實現,還有數據品質管理的思路,這對於我們自研數據品質管理系統也是非常的寶貴的。

數據治理道路任重道遠,歡迎加入相關交流群,我們共同學習進步~

進入學習交流群領取學習資料包:

更專註效率才能更高,所以目前數據治理相關學習交流群按不同方向做了區分:

另外 數據治理工具箱 知識星球也已成立,這是一個數據治理落地實踐方向的知識星球。大數據流動發布的數據治理相關文章與資料(包括付費內容)都將在知識星球進行長期同步。星球的目標是收集數據治理實踐工具的相關資料,並定期組織實戰學習小組,讓數據治理的相關資料可以長久的保存,同時也解決文章被頻繁抄襲的問題,歡迎大家加入。

最後提醒,文檔版權為公眾號 大數據流動 所有,請勿商用。相關技術問題以及安裝包可以聯繫筆者獨孤風加入相關技術交流群討論獲取。