微博数仓数据延时优化方案

前言

本文以离线数据仓库为背景,重点介绍因传输链路数据延时的不确定性,可能导致部分延迟文件无法参与正常的调度计算或同步,导致数据缺失的问题成因、业务影响及相应的解决方案。

关于这类问题的处理,有这么一种论调:我们认为正常情况下,缺失 数据的比例是很小的,可以大致认为数据是可用的的;或者我们可以推后一下计算的时间,让数据尽可能的传输完整;诸如此类…。

如果认可这种论调,可以直接忽略本文的内容。

我们是一个有 态度 的数据团队,旨在精确评估用户(整体/个体)的性能或行为情况,以优质的数据驱动业务优化,数据必须做到客观条件下最大限度地精准。

数仓架构

数据仓库使用 Hive 构建,日志或数据以文件形式(Text/ORCFile)存储于HDFS。数仓整体划分以下3层:

  • ODS(面向原始日志的数据表)
  • DW(面向业务主题的数据表)
  • DM(面向业务应用的数据表)

日志来源

日志(原始日志)来源可以是多样的:

  • 实时数据流(Kafka/Flume/Scribe)
  • 离线数据推送(Rsync)
  • 日志接口(Http/Wget)
  • 其它

无论使用哪一种方式,都会使用统一的目录规范存储于HDFS,如下:

${BASE_DIR}/业务日志名称/日期(yyyy_MM_dd)/小时(HH)/日志文件名称(带有时间戳)

假设业务日志名称为 www_spoollxrsaansnq8tjw0_aliyunXweibo,存储目录结构示例:

业务日志目录:
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo

日期目录:
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_22
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_23
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24

小时目录:
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/09
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/10
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/11

日志文件:
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/11/202006241100-node-aliyun-al01.xiaoka.tv-www_spoollxrsaansnq8tjw0_aliyunXweibo.log
${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/11/202006241100-node-aliyun-static.yizhibo.com-www_spoollxrsaansnq8tjw0_aliyunXweibo.log

可见,目录规范设计的存储最小时间粒度为 小时。实时数据流使用定时(如:5分钟)迭代新文件的方式存储数据,存储的具体日期/小时目录由创建新文件时的 当前时间 决定;离线数据推送或日志接口会从日志文件名称中提取时间戳,存储到该时间戳对应的日期/小时目录。

:需要留心不同来源的日志存储于HDFS的方式,后续会再次涉及到这部分内容。

数据表创建及分区动态挂载

为业务日志 www_spoollxrsaansnq8tjw0_aliyunXweibo 创建相应的原始日志数据表(ODS),假设数据表名称 aliyunXweibo :

CREATE EXTERNAL TABLE IF NOT EXISTS aliyunXweibo (
    col1 STRING,
    col2 STRING,
    col3 STRING
    ...
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
;

数据表 aliyunXweibo 类型为外表(EXTERNAL),使用 dt 为分区(Partition)字段,格式:yyyyMMddHH,与HDFS存储目录中的日期/小时相互对应,如:

dt = '2020062400'

对应着

${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/00

分区动态挂载就是建立数据表分区与HDFS日期/小时目录的 关系。我们部署有一套专用的服务,定时执行以下2个操作:

  1. 如果某个业务日志的日期/小时目录存在,但业务数据表中相应的分区不存在,则执行 挂载

    ALTER TABLE aliyunXweibo ADD PARTITION(dt = '2020062400') LOCATION '${BASE_DIR}/www_spoollxrsaansnq8tjw0_aliyunXweibo/2020_06_24/00';
    
  2. 如果某个业务数据表中的分区存在,但业务日志中相应的日期/小时目录不存在(过期被删除),则执行 移除挂载

    ALTER TABLE aliyunXweibo DROP IF EXISTS PARTITION(dt = '2020062400');
    

数据计算

原始日志数据表创建及分区挂载完成之后,便可以根据业务分析需求,开发部署相应的业务计算应用(Hive SQL/Spark SQL/Spark Application,DW/DM),计算结果以数据表(DW/DM)的形式存储于数据仓库中,这些数据表使用 dt 为分区字段,时间粒度为 小时

数据表分区挂载、DW数据计算应用及DM数据计算应用可以简单理解为使用 小时 为时间周期定时(如:每小时第5分钟)运行,调度及相互之间的依赖关系均由调度系统负责。

假设调度时间为 2020-06-24 01:05:00,调度系统中的应用会挂载和计算 上一小时 的数据,数据表分区挂载及DW/DM数据计算应用会运行完成之后,各个数据表新生成的分区如下:

aliyunXweibo(ODS)
dt = '2020062400'

aliyunXweibo_DW(DW)
dt = '2020062400'

aliyunXweibo_DM(DM)
dt = '2020062400'

:实际场景中,这一步会比较复杂(也请留心),这里仅用于理解流程,详情见后文。

 
综上所述,数据仓库经过日志存储、数据表创建及分区挂载、数据计算等一系列相关的流程之后,最后会形成典型的 ODS/DW/DM 3层业务数据表,用以支撑业务不同场景下的应用。

数据同步

业务场景中,有一个很重要的应用部分就是数据可视化或数据接口,其显著特点是要求 查询响应耗时低 (如:1-3 s),数据来源通常是数据仓库中的DW/DM数据表。如前文所述,数据仓库使用 Hive 构建,鉴于 Hive 自身的设计及特点,无法满足 查询响应耗时低 这个要求。通常的做法是将需要查询的数据表数据 同步 到合适的查询引擎(如:ElasticSearch、ClickHouse、MySQL等),然后使用这些查询引擎服务于数据可视化或数据接口。

数据同步 就是指将数据仓库中数据表(DW/DM)的数据同步至适用于 即席查询(OLAP)场景的查询引擎中的过程;每次追加同步一个分区的数据。

也就是说,DW/DM数据计算应用运行完成之后,还有会相应的数据同步应用,负责同步DW/DM数据表中相应时间区分中数据至外部的查询引擎。

继续使用 数据计算 中的示例,假设需要同步的DW/DM数据表分区:

aliyunXweibo_DW(DW)
dt = '2020062400'

aliyunXweibo_DM(DM)
dt = '2020062400'

实际同步时会使用这样的SQL:

SELECT
    *
FROM
    aliyunXweibo_DW
WHERE
    dt = '2020062400'

数据延时

前文中描述的数据仓库/数据计算/数据同步,应该是目前业界普通采用的一种数据方案,实际工作场景中以此为指导也可以运行的不错。那么,问题在哪里?会影响什么?

细节,细节,还是细节 !!!

以微博客户端为例,用户使用过程中会产生大量的性能或行为日志,这些日志因为业务场景的差异,日志上报的时机也是不同的。考虑到用户体验的问题,有一种常用的策略是 客户端切换至后台,且网络连接Wifi,这时会将客户端中的日志上传至服务端。服务端接收到日志之后会中转至 Kafka,供有需要的业务方消费使用。

:日志中通常会包含时间戳,表示日志记录时间或用户行为时间。后文统一称之为 日志时间

简单思考就可以看出,客户端切换后台网络连接Wifi 都是很不可控的因素,受用户使用行为或使用场景影响较大。比如:用户于Wifi环境下连续刷微博1小时,我们可以认为这1小时之内是没有任何日志上传,只有等用户使用完微博,将客户端切换至后台且连接Wifi的条件下日志才会上传。也说是说,用户这部分日志被服务端接收到至少会延迟1小时。

客户端记录的 日志时间 与服务端接收到日志的 当前时间 之间的 时间差,就是 数据延时

数据延时 是绝对存在且不可避免的。这里的 延时 特指超出正常传输本身所需时间之外的部分。

对于存储的影响?

对于实时数据流,即使我们可以实时消费 (Kafka)中的数据,但数据本身可能是延时的,即日志时间与消费的当前时间相比滞后。消费 数据存储至HDFS时,可以有2种时间策略:

  1. 当前时间

    使用 当前时间 创建相应日期/小时目录的文件,将之后 5分钟 (假设文件迭代周期为5分钟)之内消费的数据写入到该文件中;下一个 5分钟 到来时,继续使用 当前时间 创建新文件及写入数据。

    影响:可能会导致当前的日期/小时目录的文件被写入 日志。这里的 ,是说从这些日志的 日志时间 来看,并不属于当前的日期/小时目录,可能属于过去某个日期/小时目录(数据延时),也可能属于下一个日期/小时目录(文件迭代不及时)。

  2. 日志时间

    使用 日志时间 创建相应日期/小时目录的文件,将消费的数据按 日志时间 写入到特定文件中;已创建的文件如果 5分钟 (假设文件迭代周期为5分钟)之内没有新的数据写入则关闭。

    影响:可能会导致过去的某个日期/小时目录的文件被不定时的写入日志;也就是说,使用某个日期/小时目录中的文件时,可能还有部分数据没有完全传输完成。

 
对于离线数据推送,因为使用日志文件名称中的时间戳决定写入哪个日期/小时目录,仅仅可能会导致类似于实时数据流使用 日志时间 的影响,后续不再赘述。
 
存储层的影响会 扩散 到计算层。

对于计算或同步的影响?

当我们想计算某个数据表 前1小时 的数据时,因为数据时延及存储层选用时间策略的关系,可能会遇到以下问题:

  1. 前1小时目录中,可能包含不确定比例的 数据(当前时间);

  2. 前1小时目录中,可能缺失不确定比例的数据(日志时间);

  3. 存储层使用 当前时间;

    计算ODS数据表 前1小时 分区数据时,因为这个小时分区目录中可能包含不属于该小时的数据,保存(Insert)计算结果到DW数据表时实际会写出多个小时分区,如:

    aliyunXweibo(ODS)
    dt = '2020062400'
    
    aliyunXweibo_DW(DW)
    dt = '2020062322'
    dt = '2020062323'
    dt = '2020062400'
    dt = '2020062401'
    

    其中,aliyunXweibo_DW(2020062322) 与 aliyunXweibo_DW(2020062323) 是因为 aliyunXweibo(2020062400) 中包含有因数据延时导致的 日志导致的;aliyunXweibo_DW(2020062401) 是因为 aliyunXweibo(2020062400) 中包含有因文件迭代不及时的 超前 日志导致的。注意,aliyunXweibo_DW(2020062322) 与 aliyunXweibo_DW(2020062323) 这2个分区在前2个调度的周期中已计算且同步完成,这次相当于是在已有分区中 追加 (Append) 部分数据。

    如前所述,这次同步 aliyunXweibo_DW(2020062400) 会使用SQL:

    SELECT
        *
    FROM
        aliyunXweibo_DW
    WHERE
        dt = '2020062400'
    

    很明显,aliyunXweibo_DW(2020062311) 与 aliyunXweibo_DW(2020062312) 中新 追加 的数据是无法参与同步的,对于查询引擎而言,这部分 追加 的数据是永远不可见的,相当于 丢失。aliyunXweibo_DW(2020062401) 中的数据会被即将发生的 下一次 调度所同步。

  4. 存储层使用 日志时间

    计算ODS数据表 前1小时 分区数据时,保存(Insert)计算结果到DW数据表时仅会写出1个小时分区,如:

    aliyunXweibo(ODS)
    dt = '2020062400'
    
    aliyunXweibo_DW(DW)
    dt = '2020062400'
    

    同步 aliyunXweibo_DW(2020062400) 会使用SQL:

    SELECT
        *
    FROM
        aliyunXweibo_DW
    WHERE
        dt = '2020062400'
    

    因为计算时属于这个小时分区目录中的文件可能还没有被全部传输完成,这部分文件永远也不会参与后续的调度计算或同步,相当于 丢失

解决方案

最简单的解决方案就是定时重新计算且更新(覆盖)一定时间范围内的历史数据,考数据量级、资源成本及数据服务影响情况,相信大多数公司不会选择这种方案。

思路

我们解决方案的核心是 增量增量计算增量同步

  1. 增量计算

    每一次调度计算 上一小时 的数据时,不是计算 上一小时分区目录的文件,而是以增量的方式计算 上一小时新写入的文件

  2. 增量同步

    每一次调度同步 上一小时 的数据时,不是同步 上一小时分区目录的文件,而是以增量的方式同步 上一小时新写入的文件

Hive 原生支持以全表或分区的方式读取数据,我们需要进行相应地扩展使其支持:

读取数据表指定时间围内写入的文件数据

:文件写入时间使用文件最后修改时间(mtime)表示,可通过Hadoop FileSystem API获取。

Hive SQL 不支持表述这样的逻辑,假设 存在如下属性:

  • dip.mapreduce.input.fileinputformat.sync:用于表示开启自定义特性,默认不开启。
  • dip.mapreduce.input.fileinputformat.starttime:用于表示文件最后修改时间的起始时间点,毫秒,闭区间。
  • dip.mapreduce.input.fileinputformat.endtime:用于表示文件最后修改时间的截止时间点,毫秒,闭区间。

如果需要读取数据表 aliyunXweibo 于 [2020-06-24 00:00:00, 2020-06-24 00:59:59] 内写入的文件数据,SQL如下:

set dip.mapreduce.input.fileinputformat.sync=true;

set dip.mapreduce.input.fileinputformat.starttime=1592928000000;
set dip.mapreduce.input.fileinputformat.endtime=1592931599000;

SELECT
    *
FROM
    aliyunXweibo

假设 成立的场景下,Hive 执行逻辑:

  1. 加载数据表 aliyunXweibo 目录下的所有文件列表;
  2. 遍历文件列表中的每一个文件,根据文件最后修改时间(mtime)过滤掉不在指定时间范围内([1592928000000, 1592931599000])的文件;
  3. 使用过滤之后的文件列表,完成后续的流程;

上述方式会扫描全表的文件,如果考虑文件数目较多,可以结合业务情况约束时间分区范围。

如果需要读取数据表 aliyunXweibo,时间分区 [2020061701, 2020062401],时间范围 [2020-06-24 00:00:00, 2020-06-24 00:59:59]内写入的文件数据:

set dip.mapreduce.input.fileinputformat.sync=true;

set dip.mapreduce.input.fileinputformat.starttime=1592928000000;
set dip.mapreduce.input.fileinputformat.endtime=1592931599000;

SELECT
    *
FROM
    aliyunXweibo
WHERE
    dt >= '2020061701'
    AND dt <= '2020062401'

此时,Hive 执行逻辑:

  1. 加载数据表 aliyunXweibo 指定分区目录下的文件列表;
  2. 遍历文件列表中的每一个文件,根据文件最后修改时间(mtime)过滤掉不在指定时间范围([1592928000000, 1592931599000])内的文件;
  3. 使用过滤之后的文件列表,完成后续的流程;

 
调度计算或同步需要读取 上一小时 的数据时,只需要通过调度时间转换为 [starttime, endtime] 即可。

 
如上所述,我们是通过自定义Hive配置属性的方式,使其支持按指定时间范围以增量的方式读取文件的。那么,具体如何实现呢

Hive读取文件是通过 Hadoop MapReduce InputFormat 实现的,以 TEXTFILE (默认) 为例,实际使用的是 TextInputFormat(org.apache.hadoop.mapred.TextInputFormat),继承自 FileInputFormat (org.apache.hadoop.mapred.FileInputFormat),重点介绍以下 3 个方法:

  • getInputPaths

    通过属性 mapreduce.input.fileinputformat.inputdir 获取输入目录,谁来设置这个属性的值呢?
    Hive会解析SQL语句,从中提取出数据表名称及分区过滤条件,动态计算需要读取的HDFS目录,然后将目录值(多个)设置到属性 mapreduce.input.fileinputformat.inputdir 中。
     

  • listStatus

    通过 getInputPaths() 获取输入目录,获取这些目录中所有文件对应的文件状态信息(FileStatus)。其中,文件状态信息中包括文件最后修改时间(modification_time)。

    listStatus() 也支持自定义过滤器(PathFilter),用于根据文件路径(Path)过滤文件,可以通过属性 mapreduce.input.pathFilter.class 指定。
     

  • getSplits

    通过 listStatus() 获取输入文件,形成切片(Split)用于后续处理逻辑使用。

 
受 listStatus() 启发,既然可以通过自定义过滤器过滤文件,我们也可以扩展代码,获取自定义属性,然后根据文件最后修改时间过滤文件。

源码扩展

以 Hadoop 2.8.2 为例,在源文件 hadoop-2.8.2-src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java listStatus() 方法末尾处加入代码(260 ~ 279):

254     sw.stop();
255     if (LOG.isDebugEnabled()) {
256       LOG.debug("Time taken to get FileStatuses: "
257           + sw.now(TimeUnit.MILLISECONDS));
258     }
259
260     boolean sync = job.getBoolean("dip.mapreduce.input.fileinputformat.sync", false);
261     if (sync && ArrayUtils.isNotEmpty(result)) {
262       long startTime = job.getLong("dip.mapreduce.input.fileinputformat.starttime", Long.MIN_VALUE);
263       long endTime = job.getLong("dip.mapreduce.input.fileinputformat.endtime", Long.MAX_VALUE);
264
265       result =
266           Arrays.stream(result)
267               .filter(
268                   file -> {
269                     boolean meet =
270                         startTime <= file.getModificationTime()
271                             && file.getModificationTime() <= endTime;
272                     if (meet) {
273                       LOG.info("Input meet path: " + file.getPath().toString());
274                     }
275
276                     return meet;
277                   })
278               .toArray(FileStatus[]::new);
279     }
280
281     LOG.info("Total input files to process : " + result.length);
282     return result;
283   }

代码说明

  1. 获取是否开启自定义特性;

    boolean sync = job.getBoolean("dip.mapreduce.input.fileinputformat.sync", false);
    
  2. 如果开启自定义特性(sync 为 true)且输入文件不为空,则获取文件写入时间范围,并执行过滤;

    获取文件写入起始时间(startTime)、截止时间(endTime);

    long startTime = job.getLong("dip.mapreduce.input.fileinputformat.starttime", Long.MIN_VALUE);
    long endTime = job.getLong("dip.mapreduce.input.fileinputformat.endtime", Long.MAX_VALUE);
    

    遍历文件列表(result)中的每一个文件(file),判断文件最后修改时间(file.getModificationTime())是否位于指定的文件写入时间范围([startTime, endTime]);

    startTime <= file.getModificationTime()
        && file.getModificationTime() <= endTime;
    

编译部署

编译

mvn clean package -Dmaven.test.skip=true -Djavac.version=1.8

使用编译生成的Jar文件

hadoop-2.8.2-src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/target/hadoop-mapreduce-client-core-2.8.2.jar

替换Hadoop部署目录的Jar文件

hadoop-2.8.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.8.2.jar

:如果使用的是HiveServer2,需要重启实例。

使用示例

假设我们需要统计数据表 mytable 指定文件写入时间范围 [1592915400000, 1592922600000] 内的行数。

Hive

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

set dip.mapreduce.input.fileinputformat.sync=true;

set dip.mapreduce.input.fileinputformat.starttime=1592915400000;
set dip.mapreduce.input.fileinputformat.endtime=1592922600000;

SELECT
    COUNT(1)
FROM
    mytable

:Hive 对于 InputFormat 有一些自定义的封装,默认使用org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,详情参考相关文档。

Spark

from pyspark.sql import SparkSession

if __name__ == '__main__':
    sql = '''
    SELECT
        COUNT(1)
    FROM
        mytable
    '''
    sql = sql.strip()

    session = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()

    session.sql('set dip.mapreduce.input.fileinputformat.sync=true')
    
    session.sql('set dip.mapreduce.input.fileinputformat.starttime=1592917200000')
    session.sql('set dip.mapreduce.input.fileinputformat.endtime=1592928000000')

    session.sql(sql).show()

    session.stop()

:测试时可能在数据表 mytable 位于HDFS的存储目录中放入不同时间(文件最后修改时间)的文件进行模拟。

特别说明

Hive 使用的 InputFormat 实现类有很多,且不是所有的实现类都继承自 FileInputFormat,本文提供的只是一种通用的解决思案,实际使用时需要根据具体使用的 InputFormat 进行扩展。

结语

本文中讨论的 数据缺失 问题,之前与同事的讨论中曾一度被称之为 业界难题 进而被一度搁置。同时也可以看出,解决方案的实现过程虽然会涉及少量Hadoop源代码的扩展,但本质是十分简单的。问题的解决还是取决于工程师对技术及业务的掌控程度,仅供大家参考。

 
 
 
avatar