數倉建模—OneID
今天是我在上海租房的小區被封的第三天,由於我的大意,沒有屯吃的,外賣今天完全點不到了,中午的時候我找到了一包快過期的肉鬆餅,才補充了1000
焦耳的能量。但是中午去做核酸的時候,我感覺走路有點不穩,我看到大白的棉簽深入我的嘴裡,我竟然以為是吃的,差點咬住了,還好我有僅存的一點意識。下午我收到女朋友給我點的外賣——麵包(我不知道她是怎麼點到的外賣,我很感動),很精緻的麵包,擱平時我基本不喜歡吃麵包,但是已經到了這個份上,我大口吃起來,竟然覺得這是世界上最好吃的食物了。明天早晨5:50的鬧鐘,去叮咚和美團買菜,看能不能搶幾桶泡麵吧。願神保佑,我暗暗下著決心並祈禱著,胸前畫著十字。。。
數據倉庫系列文章(持續更新)
- 數倉架構發展史
- 數倉建模方法論
- 數倉建模分層理論
- 數倉建模—寬表的設計
- 數倉建模—指標體系
- 數據倉庫之拉鏈表
- 數倉—數據集成
- 數倉—數據集市
- 數倉—商業智慧系統
- 數倉—埋點設計與管理
- 數倉—ID Mapping
- 數倉—OneID
- 數倉—AARRR海盜模型
- 數倉—匯流排矩陣
- 數倉—數據安全
- 數倉—數據品質
- 數倉—數倉建模和業務建模
關注公眾號:
大數據技術派
,回復:資料
,領取1024G
資料。
OneID
前面我們學習了ID Mapping
,包括ID Mapping
的背景介紹和業務場景,以及如何使用Spark
實現ID Mapping
,這個過程中涉及到了很多東西,當然我們都通過文章的形式介紹給大家了,所以你再學習今天這一節之前,可以先看一下前面的文章
在上一節我們介紹ID Mapping 的時候我們就說過ID Mapping 是為了打通用戶各個維度的數據,從而消除數據孤島、避免數據歧義,從而更好的刻畫用戶,所以說ID Mapping是手段不是目的,目的是為了打通數據體系,ID Mapping最終的產出就是我們今天的主角OneID,也就是說數據收集過來之後通過ID Mapping 打通,從而產生OneID,這一步之後我們的整個數據體系就將使用OneID作為用戶的ID,這樣我們整個數據體系就得以打通
OneData
開始之前我們先看一下阿里的OneData 數據體系,從而更好認識一下OneID,前面我們說過ID Mapping 只是手段不是目的,目的是為了打通數據體系,ID Mapping最終的產出就是OneID
其實OneID在我們整個數據服務體系中,也只是起點不是終點或者說是手段,我們最終的目的是為了建設統一的數據資產體系。
沒有建設統一的數據資產體系之前,我們的數據體系建設存在下面諸多問題
- 數據孤島:各產品、業務的數據相互隔離,難以通過共性ID打通
- 重複建設:重複的開發、計算、存儲,帶來高昂的數據成本
- 數據歧義:指標定義口徑不一致,造成計算偏差,應用困難
在阿里巴巴 OneData 體系中,OneID 指統一數據萃取,是一套解決數據孤島問題的思想和方法。數據孤島是企業發展到一定階段後普遍遇到的問題。各個部門、業務、產品,各自定義和存儲其數據,使得這些數據間難以關聯,變成孤島一般的存在。
OneID的做法是通過統一的實體識別和連接,打破數據孤島,實現數據通融。簡單來說,用戶、設備等業務實體,在對應的業務數據中,會被映射為唯一識別(UID)上,其各個維度的數據通過這個UID進行關聯。
各個部門、業務、產品對業務實體的UID的定義和實現不一樣,使得數據間無法直接關聯,成為了數據孤島。基於手機號、身份證、郵箱、設備ID等資訊,結合業務規則、機器學習、圖演算法等演算法,進行 ID-Mapping,將各種 UID 都映射到統一ID上。通過這個統一ID,便可關聯起各個數據孤島的數據,實現數據通融,以確保業務分析、用戶畫像等數據應用的準確和全面。
OneModel 統一數據構建和管理
將指標定位細化為:
1. 原子指標
2. 時間周期
3. 修飾詞(統計粒度、業務限定, etc)
通過這些定義,設計出各類派生指標 基於數據分層,設計出維度表、明細事實表、匯總事實表,其實我們看到OneModel 其實沒有什麼新的內容,其實就是我們數倉建模的那一套東西
OneService 統一數據服務
OneService 基於復用而不是複製數據的思想,指得是我們的統一的數據服務,因為我們一直再提倡復用,包括我們數倉的建設,但是我們的數據服務這一塊卻是空白,所以OneService核心是服務的復用,能力包括:
- 利用主題邏輯表屏蔽複雜物理表的主題式數據服務
- 一般查詢+ OLAP 分析+在線服務的統一且多樣化數據服務
- 屏蔽多種異構數據源的跨源數據服務
OneID 統一數據萃取
基於統一的實體識別、連接和標籤生產,實現數據通融,包括:
- ID自動化識別與連接
- 行為元素和行為規則
- 標籤生產
OneID基於超強ID識別技術鏈接數據,高效生產標籤;業務驅動技術價值化,消除數據孤島,提升數據品質,提升數據價值。
而ID的打通,必須有ID-ID之間的兩兩映射打通關係,通過ID映射關係表,才能將多種ID之間的關聯打通,完全孤立的兩種ID是無法打通的。
打通整個ID體系,看似簡單,實則計算複雜,計算量非常大。假如某種對象有數億個個體,每個個體又有數十種不同的ID標識,任意兩種ID之間都有可能打通關係,想要完成這類對象的所有個體ID打通需要數億次計算,一般的機器甚至大數據集群都無法完成。
大數據領域中的ID-Mapping技術就是用機器學習演算法類來取代野蠻計算,解決對象數據打通的問題。基於輸入的ID關係對,利用機器學習演算法做穩定性和收斂性計算,輸出關係穩定的ID關係對,並生成一個UID作為唯一識別該對象的標識碼。
OneID實現過程中存在的問題
前面我們知道我們的ID Mapping 是通過圖計算實現,核心就是連通圖,其實實現OneID我們在打通ID 之後,我們就可以為一個個連通圖生成一個ID, 因為一個連通圖 就代表一個用戶,這樣我們生成的ID就是用戶的OneID,這裡的用戶指的是自然人,而不是某一個平台上的用戶。
OneID 的生成問題
首先我們需要一個ID 生成演算法,因為我們需要為大量用戶生成ID,我們的ID 要求是唯一的,所以在演算法設計的時候就需要考慮到這一點,我們並不推薦使用UUID,原因是UUID了可能會出現重複,而且UUID 沒有含義,所以我們不推薦使用UUID,我們這裡使用的是MD5 演算法,所以我們的MD5 演算法的參數是我們的圖的標示ID。
OneID 的更新問題
這裡的更新問題主要就是我們的數據每天都在更新,也就是說我們的圖關係在更新,也就是說我們要不要給這個自然人重新生成OneID ,因為他的圖關係可能發生了變化。
其實這裡我們不能為該自然人生成新的OneID ,否則我們數倉里的歷史數據可能無法關聯使用,所以我們的策略就是如果該自然人已經有OneID了,則不需要重新生成,其實這裡我們就是判斷該圖中的所有的頂點是否存在OneID,我們後面在程式碼中體現著一點。
OneID 的選擇問題
這個和上面的更新問題有點像,上面更新問題我們可以保證一個自然人的OneID不發生變化,但是選擇問題會導致發生變化,但是這個問題是圖計算中無法避免的,我們舉個例子,假設我們有用戶的兩個ID(A_ID,C_ID),但是這兩個ID 在當前是沒有辦法打通的,所以我們就會為這個兩個ID 生成兩個OneID,也就是(A_OneID,B_OneID),所以這個時候我們知道因為ID Mapping 不上,所以我們認為這兩個ID 是兩個人。
後面我們有了另外一個ID(B_ID),這個ID可以分別和其他的兩個ID 打通,也就是B_ID<——>A_ID , B_ID<——>C_ID 這樣我們就打通這個三個ID,這個時候我們知道
這個用戶存在三個ID,並且這個時候已經存在了兩個OneID,所以這個時候我們需要在這兩個OneID中選擇一個作為用戶的OneID,簡單粗暴點就可以選擇最小的或者是最大的。
我們選擇了之後,要將另外一個OneID對應的數據,對應到選擇的OneID 下,否則沒有被選擇的OneID的歷史數據就無法追溯了
OneID 程式碼實現
這個程式碼相比ID Mapping主要是多了OneID 的生成邏輯和更新邏輯 ,需要注意的是關於頂點集合的構造我們不是直接使用字元串的hashcode ,這是因為hashcode 很容易重複
object OneID {
val spark = SparkSession
.builder()
.appName("OneID")
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
val bizdate=args(0)
val c = Calendar.getInstance
val format = new SimpleDateFormat("yyyyMMdd")
c.setTime(format.parse(bizdate))
c.add(Calendar.DATE, -1)
val bizlastdate = format.format(c.getTime)
println(s" 時間參數 ${bizdate} ${bizlastdate}")
// dwd_patient_identity_info_df 就是我們用戶的各個ID ,也就是我們的數據源
// 獲取欄位,這樣我們就可以擴展新的ID 欄位,但是不用更新程式碼
val columns = spark.sql(
s"""
|select
| *
|from
| lezk_dw.dwd_patient_identity_info_df
|where
| ds='${bizdate}'
|limit
| 1
|""".stripMargin)
.schema.fields.map(f => f.name).filterNot(e=>e.equals("ds")).toList
// 獲取數據
val dataFrame = spark.sql(
s"""
|select
| ${columns.mkString(",")}
|from
| lezk_dw.dwd_patient_identity_info_df
|where
| ds='${bizdate}'
|""".stripMargin
)
// 數據準備
val data = dataFrame.rdd.map(row => {
val list = new ListBuffer[String]()
for (column <- columns) {
val value = row.getAs[String](column)
list.append(value)
}
list.toList
})
import spark.implicits._
// 頂點集合
val veritx= data.flatMap(list => {
for (i <- 0 until columns.length if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i))))
yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))
}).distinct
val veritxDF=veritx.toDF("id_hashcode","id")
veritxDF.createOrReplaceTempView("veritx")
// 生成邊的集合
val edges = data.flatMap(list => {
for (i <- 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) && (!"null".equals(list(i)))
; j <- i + 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) && (!"null".equals(list(j))))
yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, "")
}).distinct
// 開始使用點集合與邊集合進行圖計算訓練
val graph = Graph(veritx, edges)
val connectedGraph=graph.connectedComponents()
// 連通節點
val vertices = connectedGraph.vertices.toDF("id_hashcode","guid_hashcode")
vertices.createOrReplaceTempView("to_graph")
// 載入昨日的oneid 數據 (oneid,id,id_hashcode)
val ye_oneid = spark.sql(
s"""
|select
| oneid,id,id_hashcode
|from
| lezk_dw.dwd_patient_oneid_info_df
|where
| ds='${bizlastdate}'
|""".stripMargin
)
ye_oneid.createOrReplaceTempView("ye_oneid")
// 關聯獲取 已經存在的 oneid,這裡的min 函數就是我們說的oneid 的選擇問題
val exists_oneid=spark.sql(
"""
|select
| a.guid_hashcode,min(b.oneid) as oneid
|from
| to_graph a
|inner join
| ye_oneid b
|on
| a.id_hashcode=b.id_hashcode
|group by
| a.guid_hashcode
|""".stripMargin
)
exists_oneid.createOrReplaceTempView("exists_oneid")
// 不存在則生成 存在則取已有的 這裡nvl 就是oneid 的更新邏輯,存在則獲取 不存在則生成
val today_oneid=spark.sql(
s"""
|insert overwrite table dwd_patient_oneid_info_df partition(ds='${bizdate}')
|select
| nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode
|from
| to_graph a
|left join
| exists_oneid b
|on
| a.guid_hashcode=b.guid_hashcode
|left join
| veritx c
|on
| a.id_hashcode=c.id_hashcode
|left join
| veritx d
|on
| a.guid_hashcode=d.id_hashcode
|""".stripMargin
)
sc.stop
}
}
這個程式碼中我們使用了SparkSQL,其實你如果更加擅長RDD的API,也可以使用RDD 優化,需要注意的是網上的很多程式碼中使用了廣播變數,將vertices
變數廣播了出去,其實這個時候存在一個風險那就是如果你的vertices 變數非常大,你廣播的時候存在OOM 的風險,但是如果你使用了SparkSQL的話,Spark 就會根據實際的情況,幫你自動優化。
優化點 增量優化
我們看到我們每次都是全量的圖,其實我們可以將我們的OneID 表載入進來,然後將我們的增量數據和已有的圖數據進行合併,然後再去生成圖
val veritx = ye_veritx.union(to_veritx)
val edges = ye_edges.union(to_edges)
val graph = Graph(veritx, edges)
總結
ID Mapping
是OneID
的提前,OneID
是ID Mapping
的結果,所以要想做OneID
必須先做ID Mapping
;OneID
是為了打通整個數據體系的數據,所以OneID
需要以服務的方式對外提供服務,在數倉裡面就是作為基礎表使用,對外的話我們就需要提供介面對外提供服務