Apache Hudi在Hopworks機器學習的應用
- 2021 年 7 月 4 日
- 筆記
Hopsworks特徵存儲庫統一了在線和批處理應用程式的特徵訪問而屏蔽了雙資料庫系統的複雜性。我們構建了一個可靠且高性能的服務,以將特徵物化到在線特徵存儲庫,不僅僅保證低延遲訪問,而且還保證在服務時間可以訪問最新鮮的特徵值。
企業機器學習模型為指導產品用戶交互提供了價值價值。通常這些 ML 模型應用於整個實體資料庫,例如由唯一主鍵標識用戶。離線應用程式的一個示例是預測客戶終身價值(Customer Lifetime Value),其中可以定期(每晚、每周)分批預測,然後用於選擇營銷活動的目標受眾。然而更先進的人工智慧應用程式可以實時指導用戶交互,例如推薦系統。對於這些在線應用程式,模型輸入的某些部分(特徵向量)將在應用程式本身中可用,例如最後點擊的按鈕,而特徵向量的其他部分則依賴於歷史或上下文數據,必須檢索後端存儲,例如用戶在過去一小時內點擊按鈕的次數或按鈕是否為熱門按鈕。
在這篇部落格中,我們將深入探討在線應用程式的需求細節,以及 Hopsworks Feature Store 如何抽象並規避雙存儲系統的複雜性。
1. 生產中的機器學習模型
雖然具有(分析)模型的批處理應用程式在很大程度上類似於模型本身的訓練,需要有效訪問將要參與評分的大量數據,但在線應用程式需要低延遲訪問給定主鍵的最新特徵值,然後作為特徵向量發送到模型服務實例進行推理。
據我們所知沒有單一的資料庫能夠高性能滿足這兩個要求,因此數據團隊傾向於將用於訓練和批量推理的數據保留在數據湖中,而 ML工程師更傾向於構建微服務以將微服務中的特徵工程邏輯複製到在線應用程式中。
然而,這給數據科學家和機器學習工程師帶來了不必要的障礙,無法快速迭代並顯著增加機器學習模型的用於生產環境的時間
- 數據科學視角:數據和基礎設施通過微服務緊密耦合,導致數據科學家無法從開發轉向生產,也無法復用特徵。
- ML 工程視角:大量工程工作以保證對生產中數據的一致訪問,正如 ML 模型在訓練過程中所看到的那樣。
2. Hopsworks特徵存儲庫:透明的雙存儲系統
Hopsworks特徵存儲庫是一個雙存儲系統,由高頻寬(低成本)離線存儲和低延遲在線存儲組成。離線存儲是我們 HopsFS 文件系統上的 Apache Hudi 表(由 S3 或 Azure Blob 存儲支援)和外部表(例如 Snowflake、Redshift 等),提供對大量特徵數據的訪問以用於訓練或批量評分。相比在線存儲是一個低延遲的鍵值資料庫,它只存儲每個特徵的最新值及其主鍵。 因此在線特徵存儲充當這些特徵值的低延遲快取。
為了使該系統對數據科學家有價值並縮短生產時間,並為最終用戶提供良好的體驗,它需要滿足一些要求:
- 用於訓練和服務的一致特徵:在 ML 中,為生產中的特徵複製精確的特徵工程邏輯非常重要,因為它用於生成模型訓練的特徵。
- 特徵新鮮度:低延遲、高吞吐量的在線特徵存儲只有在存儲在其中的數據保持最新時才有益,特徵新鮮度被定義為觸發特徵重新計算的事件到達與重新計算的特徵在在線特徵庫中發布之間的端到端延遲。
- 延遲:在線特徵庫必須提供近乎實時的低延遲和高吞吐量,以便應用程式能夠使用儘可能多的特徵及其可用的SLA。
- 可訪問性:數據需要可通過直觀的 API 訪問,就像從離線特徵存儲中提取數據進行訓練一樣容易。
Hopsworks在線特徵庫圍繞四大支柱構建,以滿足需求,同時擴展以管理大量數據:
- HSFS API:Hopsworks 特徵存儲庫是開發人員特徵存儲的主要入口點,可用於 Python 和 Scala/Java。HSFS 將兩個存儲系統抽象出來,提供透明的 Dataframe API(Spark、Spark Structured Streaming、Pandas)用於在線和離線存儲的寫入和讀取。
- 元數據:Hopsworks 可以存儲大量自定義元數據,以便數據科學家發現、管理和重用特徵,而且還能夠在將模型移至生產時依賴模式和數據品質。
- 引擎:在線特徵存儲帶有可擴展的無狀態服務,可確保數據儘快寫入在線特徵存儲,而不會從數據流(Spark 結構化流)或靜態 Spark 或 Pandas DataFrame中進行寫入放大,即不必在攝取特徵之前先將特徵物化到存儲中 – 可以直接寫入特徵存儲。
- RonDB:在線存儲背後的資料庫是世界上最快的具有 SQL 功能的鍵值存儲。不僅為在線特徵數據構建基礎,而且還處理 Hopsworks 中生成的所有元數據。
我們將在以下部分詳細介紹其中的每一部分,並提供一些用於定量比較的基準。
3. RonDB:在線特徵存儲,文件系統和元數據的基礎
Hopsworks 是圍繞分散式橫向擴展元數據從頭開始構建的。這有助於確保 Hopsworks 內服務的一致性和可擴展性,以及數據和 ML 工件的注釋和可發現性。
自第一次發布以來,Hopsworks 一直使用 NDB Cluster(RonDB 的前身)作為在線特徵存儲。2020 年我們創建了 RonDB 作為 NDB Cluster 的託管版本,並針對用作在線特徵存儲進行了優化。
但是在 Hopsworks 中我們將 RonDB 用於不僅僅是在線特徵存儲。RonDB 還存儲整個特徵存儲庫的元數據,包括模式、統計資訊和提交。 RonDB 還存儲了文件系統 HopsFS 的元數據,其中存儲了離線 Hudi 表。使用 RonDB 作為單個元數據資料庫,我們使用事務和外鍵來保持 Feature Store 和 Hudi 元數據與目標文件和目錄(inode)一致。Hopsworks 可通過 REST API 或直觀的 UI(包括特徵目錄)訪問或通過 Hopsworks 特徵存儲 API (HSFS) 以編程方式訪問。
4. OnlineFS:可擴展的在線特徵物化引擎
有了底層的 RonDB 和所需的元數據,我們就能夠構建一個橫向擴展、高吞吐量的物化服務,以在在線特徵存儲上執行更新、刪除和寫入——我們簡單地將其命名為 OnlineFS。
OnlineFS 是一種使用 ClusterJ 直接訪問 RonDB 數據節點的無狀態服務。 ClusterJ 被實現為原生 C++ NDB API 之上的高性能 JNI 層,提供低延遲和高吞吐量。由於 RonDB 中元數據的可用性,例如 avro 模式和特徵類型,我們能夠使 OnlineFS 無狀態。 使服務無狀態允許我們通過簡單地添加或刪除服務的實例來向上和向下擴展對在線特徵存儲的寫入,從而隨著實例的數量線性地增加或減少吞吐量。
讓我們完成將數據寫入在線特徵存儲所需的步驟,這些步驟在下圖中編號。
- 特徵作為 Pandas 或 Spark DataFrame寫入特徵存儲
每個 Dataframe 更新一個稱為特徵組的表(離線存儲中有一個類似的表)。一個特徵組中的特徵共享同一個主鍵,可以是複合主鍵。 主鍵與元數據的其餘部分一起被跟蹤。 因此Hopsworks 特徵存儲庫有一個 Dataframe API,這意味著特徵工程的結果應該是將寫入到特徵存儲的常規 Spark、Spark Structured Streaming 或 Pandas Dataframe。對於所有三種類型的DataFrame,用於寫入特徵存儲的 API 幾乎相同。 通過對特徵組對象的引用可以插入DataFrame。 特徵組在創建時已配置為將 Dataframe 存儲到在線和離線庫或僅存儲到其中之一。
- 編碼和產生
Dataframe 的行使用 avro 進行編碼並寫入在 Hopsworks 上運行的 Kafka中。每個特性組都有自己的 Kafka 主題,具有可配置的分區數量,並按主鍵進行分區,這是保證寫入順序所必需的。
- 消費和解碼
我們使用 Kafka 來緩衝來自 Spark 特徵工程作業的寫入,因為直接寫入 RonDB 的大型 Spark 集群可能會使 RonDB 過載,因為現有 Spark JDBC 驅動程式中缺乏背壓。OnlineFS 從 Kafka 讀取緩衝的消息並對其進行解碼。 重要的是OnlineFS 僅解碼原始特徵類型,而嵌入等複雜特徵以二進位格式存儲在在線特徵存儲中。
- 基於主鍵的Upsert
OnlineFS 可以使用 ClusterJ API 將行實際更新插入到 RonDB。Upsert 分批執行(具有可配置的批量大小)以提高吞吐量。
由於管道步驟中的所有服務都可以訪問相同的元數據,因此我們能夠向用戶隱藏與編碼和模式相關的所有複雜性。 此外所有涉及的服務都是水平可擴展的(Spark、Kafka、OnlineFS),並且由於我們類似於流的設置,該過程不會創建不必要的數據副本,即沒有寫放大。 由於模式註冊表、X.509 證書管理器和 Hopsworks 中的 Kafka 等服務的可用性,這種高度可擴展的設置成為可能。 在任何時候X.509 證書都用於雙向身份驗證,而 TLS 用於加密網路流量。
5. 可訪問性意味著透明的 API
在分散式系統中,我們經常談論透明度。 如果分散式系統對開發人員隱藏網路訪問和實現特定知識,則它是透明的。 在 Hopsworks 特徵存儲庫中,寫入是通過相同的 API 透明地完成的,如前所述(1)無論是常規的 Spark、Spark Streaming 還是 Pandas 以及(2)系統負責一致地更新在線和離線存儲
插入
HSFS 庫中的核心抽象是表示特徵組、訓練數據集和特徵存儲中的特徵的元數據對象。 我們使用 HSFS 的目標是讓開發人員能夠使用他們喜歡的語言和框架來設計功能。 當我們在 Dataframe API 上對齊時,Dataframe 中包含的任何內容都可以寫入特徵存儲。 如果您有現有的 ETL 或 ELT 管道,它們生成包含特徵的數據幀,您可以通過簡單地獲取對其特徵組對象的引用並使用您的數據幀作為參數調用 .insert()
來將該數據幀寫入特徵存儲 . 這可以從定期安排的作業中調用(使用您選擇的任何編排器,或者,如果您想要開箱即用的編排器,則 Hopsworks 附帶 Airflow)。 但是也可以通過將批次寫入 Spark 結構化流應用程式中的數據幀來連續更新特徵組對象。
# populate feature group metadata object
store_fg_meta = fs.create_feature_group(name="store_fg",
version=1,
primary_key=["store"],
description="Store related features",
online_enabled=True)
# create feature group for the first time in feature store
fg.save(Dataframe)
# replace .save with .insert for scheduled batch job
fg.insert(Dataframe)
# if required, stream data only to the online feature store in long running Spark
# Structured Streaming application
fg.insert_stream(streaming_Dataframe)
讀取
許多現有的特徵存儲沒有模型的表示。 然而Hopsworks 引入了訓練數據集抽象來表示用於訓練模型的特徵集和特徵值。 也就是說,不可變的訓練數據集和模型之間存在一對一的映射關係,但可變特徵組與不可變的訓練數據集之間是一對多的關係。 您可以通過從特徵組中加入、選擇和過濾特徵來創建訓練數據集。 訓練數據集包括特徵的元數據,例如它們來自哪個特徵組、該特徵組的提交 ID 以及訓練數據集中特徵的順序。 所有這些資訊使 HSFS 能夠在稍後的時間點重新創建訓練數據集,並在服務時透明地構建特徵向量。
# create a query
feature_join = rain_fg.select_all() \
.join(temperature_fg.select_all(), on=["location_id"]) \
.join(location_fg.select_all())
td = fs.create_training_dataset("rain_dataset",
version=1,
label=」weekly_rain」,
data_format=」tfrecords」)
# materialize query in the specified file format
td.save(feature_join)
# we can also use the training dataset for serving
# this serving code typically runs in a Python environment
td = fs.get_training_dataset(「rain_dataset」, version=1)
# get serving vector
td.get_serving_vector({「location_id」: 「honolulu」})
在線特徵庫的使用方要麼是使用 ML 模型的應用程式,要麼是模型服務基礎設施,這些基礎設施通過缺失的特徵來豐富特徵向量。 Hopsworks 為在線庫提供了一個基於 JDBC 的 API。 JDBC 具有提供高性能協議、網路加密、客戶端身份驗證和訪問控制的優勢。 HSFS 為 Python 和 Scala/Java 提供語言級別的支援。 但是,如果您的服務應用程式在不同的程式語言或框架中運行,您總是可以直接使用 JDBC。
6. Benchmarks
Mikael Ronstrom(NDB 集群的發明者和邏輯時鐘的數據負責人,領導 RonDB 團隊)為 RonDB 提供了 sysbench 基準測試,並提供了針對 Redis 的比較性能評估。在本節中我們展示了 OnlineFS 服務的性能,能夠處理和維持寫入在線特徵存儲的高吞吐量,以及對 Hopsworks 中典型託管 RonDB 設置的特徵向量查找延遲和吞吐量的評估。
在此基準測試中,Hopsworks 設置了 3xAWS m5.2xlarge(8 個 vCPU,32 GB)實例(1 個頭,2 個工作器)。 Spark 使用 worker 將數據幀寫入在線庫。此外相同的工作人員被重新用作客戶端,在在線特徵存儲上執行讀取操作以進行讀取基準測試。
RonDB 設置了 1x AWS t3.medium(2 vCPU,4 GB)實例作為管理節點,2x r5.2xlarge(8 vCPU,64 GB)實例作為數據節點,3x AWS c5.2xlarge(8 vCPU,16 GB) ) MySQL 伺服器的實例。這種設置允許我們在具有 2 倍複製的在線特徵存儲中存儲 64GB 的記憶體數據。 MySQL 伺服器為在線特徵存儲提供 SQL 介面,在此基準測試中,我們沒有使 RonDB 數據節點完全飽和,因此可以潛在地添加更多 MySQL 伺服器和客戶端,以增加超出此處所示水平的吞吐量。
寫吞吐
我們對 OnlineFS 服務中寫入 RonDB 的吞吐量進行了基準測試。 此外,我們測量了從 Kafka 主題中獲取記錄到提交到 RonDB 之間處理記錄所需的時間。 對於這個基準測試,我們部署了兩個 OnlineFS 服務,一個在頭節點上,一個在 MySQL 伺服器節點之一上。
我們通過將 20M 行從 Spark 應用程式寫入在線特徵存儲來運行實驗。 經過短暫的預熱期後,兩個服務實例的吞吐量穩定在約 126K 行/秒(11 個特徵)、約 90K 行/秒(51 個特徵)和最大特徵向量約 60K 行/秒。 由於其設計,這可以通過添加更多服務實例輕鬆擴展。
其次,我們輸出了在 OnlineFS 服務中處理特徵向量所需的時間。 這個時間不包括一條記錄在 Kafka 中等待處理的時間,原因是等待時間在很大程度上取決於寫入 Kafka 的 Spark 執行程式的數量。 相反您應該依靠吞吐量數字將它們與您的要求進行比較。
處理時間是按行報告的,但 OnlineFS 中的部分管道是並行化的,例如,行以 1000 的批次提交給 RonDB。通過這種設置,我們實現了 11 個特徵的 p99 約為 250 毫秒,行大小為 948 位元組。
服務查找吞吐量和延遲
我們對與越來越多的並行執行請求的客戶端相關的不同特徵向量大小的吞吐量和延遲進行了基準測試。 請注意,客戶端被分成兩個工作節點(每個 8vCPU)。
每個請求的單個向量
在這個基準測試中,每個請求都包含一個主鍵值查找(一個特徵向量)。 吞吐量和延遲可線性擴展至 16 個客戶端,同時保持低延遲。 對於超過 16 個客戶端,我們觀察到運行客戶端的主機達到其最大 CPU 和網路利用率。 此外,我們沒有看到 RonDB 數據節點或 MySQL 伺服器的過度使用,這意味著我們可以通過從更大的工作實例運行客戶端或添加更多工作主機來運行客戶端來進一步線性擴展。
批處理,每個請求 100 個向量
為了證明 RonDB 每秒可擴展到更多的關鍵查找,我們運行了另一個基準測試,其中每個客戶端以 100 個批次請求特徵向量。正如我們所看到的查找數量仍然線性擴展,查找吞吐量增加了 15 倍,而 每個請求的延遲僅適度增加。
7. 結論
Hopsworks 附帶託管 RonDB,為 Hopsworks 和在線特徵提供統一的元數據存儲。 在這篇部落格中,我們展示了一個高度可用的雙節點 RonDB 集群(r5.2xlarge VM)線性擴展到 >250k ops/sec,特徵向量查找的 11 個特徵的大小約為 1KB,p99 延遲為 7.5 毫秒。 因此Hopsworks 提供了當今市場上性能最高的在線特徵庫。