數倉建模—OneID

今天是我在上海租房的小區被封的第三天,由於我的大意,沒有屯吃的,外賣今天完全點不到了,中午的時候我找到了一包快過期的肉鬆餅,才補充了1000焦耳的能量。但是中午去做核酸的時候,我感覺走路有點不穩,我看到大白的棉簽深入我的嘴裡,我竟然以為是吃的,差點咬住了,還好我有僅存的一點意識。下午我收到女朋友給我點的外賣——麵包(我不知道她是怎麼點到的外賣,我很感動),很精緻的麵包,擱平時我基本不喜歡吃麵包,但是已經到了這個份上,我大口吃起來,竟然覺得這是世界上最好吃的食物了。明天早晨5:50的鬧鐘,去叮咚和美團買菜,看能不能搶幾桶泡麵吧。願神保佑,我暗暗下著決心並祈禱著,胸前畫著十字。。。

數據倉庫系列文章(持續更新)

  1. 數倉架構發展史
  2. 數倉建模方法論
  3. 數倉建模分層理論
  4. 數倉建模—寬表的設計
  5. 數倉建模—指標體系
  6. 數據倉庫之拉鏈表
  7. 數倉—數據集成
  8. 數倉—數據集市
  9. 數倉—商業智慧系統
  10. 數倉—埋點設計與管理
  11. 數倉—ID Mapping
  12. 數倉—OneID
  13. 數倉—AARRR海盜模型
  14. 數倉—匯流排矩陣
  15. 數倉—數據安全
  16. 數倉—數據品質
  17. 數倉—數倉建模和業務建模

關注公眾號:大數據技術派,回復: 資料,領取1024G資料。

OneID

前面我們學習了ID Mapping,包括ID Mapping 的背景介紹和業務場景,以及如何使用Spark 實現ID Mapping,這個過程中涉及到了很多東西,當然我們都通過文章的形式介紹給大家了,所以你再學習今天這一節之前,可以先看一下前面的文章

  1. Spark實戰—GraphX編程指南
  2. 數倉建模—ID Mapping

在上一節我們介紹ID Mapping 的時候我們就說過ID Mapping 是為了打通用戶各個維度的數據,從而消除數據孤島、避免數據歧義,從而更好的刻畫用戶,所以說ID Mapping是手段不是目的,目的是為了打通數據體系,ID Mapping最終的產出就是我們今天的主角OneID,也就是說數據收集過來之後通過ID Mapping 打通,從而產生OneID,這一步之後我們的整個數據體系就將使用OneID作為用戶的ID,這樣我們整個數據體系就得以打通

OneData

開始之前我們先看一下阿里的OneData 數據體系,從而更好認識一下OneID,前面我們說過ID Mapping 只是手段不是目的,目的是為了打通數據體系,ID Mapping最終的產出就是OneID

其實OneID在我們整個數據服務體系中,也只是起點不是終點或者說是手段,我們最終的目的是為了建設統一的數據資產體系。

沒有建設統一的數據資產體系之前,我們的數據體系建設存在下面諸多問題

  1. 數據孤島:各產品、業務的數據相互隔離,難以通過共性ID打通
  2. 重複建設:重複的開發、計算、存儲,帶來高昂的數據成本
  3. 數據歧義:指標定義口徑不一致,造成計算偏差,應用困難

在阿里巴巴 OneData 體系中,OneID 指統一數據萃取,是一套解決數據孤島問題的思想和方法。數據孤島是企業發展到一定階段後普遍遇到的問題。各個部門、業務、產品,各自定義和存儲其數據,使得這些數據間難以關聯,變成孤島一般的存在。

OneID的做法是通過統一的實體識別和連接,打破數據孤島,實現數據通融。簡單來說,用戶、設備等業務實體,在對應的業務數據中,會被映射為唯一識別(UID)上,其各個維度的數據通過這個UID進行關聯。

各個部門、業務、產品對業務實體的UID的定義和實現不一樣,使得數據間無法直接關聯,成為了數據孤島。基於手機號、身份證、郵箱、設備ID等資訊,結合業務規則、機器學習、圖演算法等演算法,進行 ID-Mapping,將各種 UID 都映射到統一ID上。通過這個統一ID,便可關聯起各個數據孤島的數據,實現數據通融,以確保業務分析、用戶畫像等數據應用的準確和全面。

OneModel 統一數據構建和管理

將指標定位細化為:

1. 原子指標
2. 時間周期
3. 修飾詞(統計粒度、業務限定, etc)

通過這些定義,設計出各類派生指標 基於數據分層,設計出維度表、明細事實表、匯總事實表,其實我們看到OneModel 其實沒有什麼新的內容,其實就是我們數倉建模的那一套東西

OneService 統一數據服務

OneService 基於復用而不是複製數據的思想,指得是我們的統一的數據服務,因為我們一直再提倡復用,包括我們數倉的建設,但是我們的數據服務這一塊卻是空白,所以OneService核心是服務的復用,能力包括:

  • 利用主題邏輯表屏蔽複雜物理表的主題式數據服務
  • 一般查詢+ OLAP 分析+在線服務的統一且多樣化數據服務
  • 屏蔽多種異構數據源的跨源數據服務

OneID 統一數據萃取

基於統一的實體識別、連接和標籤生產,實現數據通融,包括:

  • ID自動化識別與連接
  • 行為元素和行為規則
  • 標籤生產

OneID基於超強ID識別技術鏈接數據,高效生產標籤;業務驅動技術價值化,消除數據孤島,提升數據品質,提升數據價值。

而ID的打通,必須有ID-ID之間的兩兩映射打通關係通過ID映射關係表,才能將多種ID之間的關聯打通,完全孤立的兩種ID是無法打通的

打通整個ID體系,看似簡單,實則計算複雜,計算量非常大。假如某種對象有數億個個體,每個個體又有數十種不同的ID標識,任意兩種ID之間都有可能打通關係,想要完成這類對象的所有個體ID打通需要數億次計算,一般的機器甚至大數據集群都無法完成。

大數據領域中的ID-Mapping技術就是用機器學習演算法類來取代野蠻計算,解決對象數據打通的問題。基於輸入的ID關係對,利用機器學習演算法做穩定性和收斂性計算,輸出關係穩定的ID關係對,並生成一個UID作為唯一識別該對象的標識碼。

OneID實現過程中存在的問題

前面我們知道我們的ID Mapping 是通過圖計算實現,核心就是連通圖,其實實現OneID我們在打通ID 之後,我們就可以為一個個連通圖生成一個ID, 因為一個連通圖 就代表一個用戶,這樣我們生成的ID就是用戶的OneID,這裡的用戶指的是自然人,而不是某一個平台上的用戶。

OneID 的生成問題

首先我們需要一個ID 生成演算法,因為我們需要為大量用戶生成ID,我們的ID 要求是唯一的,所以在演算法設計的時候就需要考慮到這一點,我們並不推薦使用UUID,原因是UUID了可能會出現重複,而且UUID 沒有含義,所以我們不推薦使用UUID,我們這裡使用的是MD5 演算法,所以我們的MD5 演算法的參數是我們的圖的標示ID。

OneID 的更新問題

這裡的更新問題主要就是我們的數據每天都在更新,也就是說我們的圖關係在更新,也就是說我們要不要給這個自然人重新生成OneID ,因為他的圖關係可能發生了變化。

其實這裡我們不能為該自然人生成新的OneID ,否則我們數倉里的歷史數據可能無法關聯使用,所以我們的策略就是如果該自然人已經有OneID了,則不需要重新生成,其實這裡我們就是判斷該圖中的所有的頂點是否存在OneID,我們後面在程式碼中體現著一點。

OneID 的選擇問題

這個和上面的更新問題有點像,上面更新問題我們可以保證一個自然人的OneID不發生變化,但是選擇問題會導致發生變化,但是這個問題是圖計算中無法避免的,我們舉個例子,假設我們有用戶的兩個ID(A_ID,C_ID),但是這兩個ID 在當前是沒有辦法打通的,所以我們就會為這個兩個ID 生成兩個OneID,也就是(A_OneID,B_OneID),所以這個時候我們知道因為ID Mapping 不上,所以我們認為這兩個ID 是兩個人。

後面我們有了另外一個ID(B_ID),這個ID可以分別和其他的兩個ID 打通,也就是B_ID<——>A_ID , B_ID<——>C_ID 這樣我們就打通這個三個ID,這個時候我們知道

這個用戶存在三個ID,並且這個時候已經存在了兩個OneID,所以這個時候我們需要在這兩個OneID中選擇一個作為用戶的OneID,簡單粗暴點就可以選擇最小的或者是最大的。

我們選擇了之後,要將另外一個OneID對應的數據,對應到選擇的OneID 下,否則沒有被選擇的OneID的歷史數據就無法追溯了

OneID 程式碼實現

這個程式碼相比ID Mapping主要是多了OneID 的生成邏輯和更新邏輯 ,需要注意的是關於頂點集合的構造我們不是直接使用字元串的hashcode ,這是因為hashcode 很容易重複

object OneID  {
    val spark = SparkSession
      .builder()
      .appName("OneID")
      .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    val bizdate=args(0)
    val c = Calendar.getInstance
    val format = new SimpleDateFormat("yyyyMMdd")
    c.setTime(format.parse(bizdate))

    c.add(Calendar.DATE, -1)
    val bizlastdate = format.format(c.getTime)

    println(s" 時間參數  ${bizdate}    ${bizlastdate}")
    // dwd_patient_identity_info_df 就是我們用戶的各個ID ,也就是我們的數據源
    // 獲取欄位,這樣我們就可以擴展新的ID 欄位,但是不用更新程式碼
    val columns = spark.sql(
      s"""
         |select
         |   *
         |from
         |   lezk_dw.dwd_patient_identity_info_df
         |where
         |   ds='${bizdate}'
         |limit
         |   1
         |""".stripMargin)
      .schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList

    // 獲取數據
    val dataFrame = spark.sql(
      s"""
        |select
        |   ${columns.mkString(",")}
        |from
        |   lezk_dw.dwd_patient_identity_info_df
        |where
        |   ds='${bizdate}'
        |""".stripMargin
    )

    // 數據準備
    val data = dataFrame.rdd.map(row => {
      val list = new ListBuffer[String]()
      for (column <- columns) {
        val value = row.getAs[String](column)
        list.append(value)
      }
      list.toList
    })
    import spark.implicits._
    // 頂點集合
    val veritx= data.flatMap(list => {
      for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))
        yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))

    }).distinct

    val veritxDF=veritx.toDF("id_hashcode","id")
    veritxDF.createOrReplaceTempView("veritx")

    // 生成邊的集合
    val edges = data.flatMap(list => {
      for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))
           ; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))
      yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")
    }).distinct


    // 開始使用點集合與邊集合進行圖計算訓練
    val graph = Graph(veritx, edges)
    val connectedGraph=graph.connectedComponents()

    // 連通節點
    val  vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")
    vertices.createOrReplaceTempView("to_graph")

    // 載入昨日的oneid 數據 (oneid,id,id_hashcode) 
    val ye_oneid = spark.sql(
      s"""
        |select
        |   oneid,id,id_hashcode
        |from
        |   lezk_dw.dwd_patient_oneid_info_df
        |where
        |   ds='${bizlastdate}'
        |""".stripMargin
    )
    ye_oneid.createOrReplaceTempView("ye_oneid")

    // 關聯獲取 已經存在的 oneid,這裡的min 函數就是我們說的oneid 的選擇問題
    val exists_oneid=spark.sql(
      """
        |select
        |   a.guid_hashcode,min(b.oneid) as oneid
        |from
        |   to_graph a
        |inner join
        |   ye_oneid b
        |on
        |   a.id_hashcode=b.id_hashcode
        |group by
        |   a.guid_hashcode
        |""".stripMargin
    )
    exists_oneid.createOrReplaceTempView("exists_oneid")
    // 不存在則生成 存在則取已有的 這裡nvl 就是oneid  的更新邏輯,存在則獲取 不存在則生成
    val today_oneid=spark.sql(
      s"""
        |insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')
        |select
        |   nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode
        |from
        |   to_graph a
        |left join
        |   exists_oneid b
        |on
        |   a.guid_hashcode=b.guid_hashcode
        |left join
        |   veritx c
        |on
        |   a.id_hashcode=c.id_hashcode
        |left join
        |   veritx d
        |on
        |   a.guid_hashcode=d.id_hashcode
        |""".stripMargin
    )
    sc.stop
  }

}

這個程式碼中我們使用了SparkSQL,其實你如果更加擅長RDD的API,也可以使用RDD 優化,需要注意的是網上的很多程式碼中使用了廣播變數,將vertices 變數廣播了出去,其實這個時候存在一個風險那就是如果你的vertices 變數非常大,你廣播的時候存在OOM 的風險,但是如果你使用了SparkSQL的話,Spark 就會根據實際的情況,幫你自動優化。

優化點 增量優化

我們看到我們每次都是全量的圖,其實我們可以將我們的OneID 表載入進來,然後將我們的增量數據和已有的圖數據進行合併,然後再去生成圖

val veritx = ye_veritx.union(to_veritx)
val edges = ye_edges.union(to_edges)

val graph = Graph(veritx, edges)

總結

  1. ID MappingOneID 的提前,OneIDID Mapping 的結果,所以要想做OneID必須先做ID Mapping;
  2. OneID 是為了打通整個數據體系的數據,所以OneID 需要以服務的方式對外提供服務,在數倉裡面就是作為基礎表使用,對外的話我們就需要提供介面對外提供服務