半小時,將你的Spark SQL模型變為在線服務

SparkSQL在機器學習場景中應用

第四範式已經在很多行業落地了上萬個AI應用,比如在金融行業的反欺詐,媒體行業的新聞推薦,能源行業管道檢測,而SparkSQL在這些AI應用中快速實現特徵變換髮揮着重要的作用
image2020-6-25_9-56-2.png
SparkSQL在特徵變換主要有一下幾類

  1. 多表場景,用於表之間拼接操作,比如交易信息表去拼接賬戶表
  2. 使用udf進行簡單的特徵變換,比如對時間戳進行hour函數處理
  3. 使用時間窗口和udaf進行時序類特徵處理,比如計算一個人最近1天的消費金額總和

SparkSQL到目前為止,解決很好的解決離線模型訓練特徵變換問題,但是隨着AI應用的發展,大家對模型的期望不再只是得出離線調研效果,而是在真實的業務場景發揮出價值,而真實的業務場景是模型應用場景,它需要高性能,需要實時推理,這時候我們就會遇到以下問題

  1. 多表數據離線到在線怎麼映射,即批量訓練過程中輸入很多表,到在線環境這些表該以什麼形式存在,這點也會影響整個系統架構,做得好能夠提升效率,做得不好就會大大增加模型產生業務價值的成本
  2. SQL轉換成實時執行成本高,因為在線推理需要高性能,而數據科學家可能做出成千上萬個特徵,每個特徵都人肉轉換,會大大增加的工程成本
  3. 離線特徵和在線特徵保持一致困難,手動轉換就會導致一致性能,而且往往很難一致
  4. 離線效果很棒但是在線效果無法滿足業務需求

在具體的反欺詐場景,模型應用要求tp99 20ms去檢測一筆交易是否是欺詐,所以對模型應用性能要求非常高

第四範式特徵工程數據庫是如何解決這些問題

image2020-6-25_10-29-23.png
通過特徵工程數據庫讓SparkSQL的能力得到了補充

  1. 以數據庫的形式,解決了離線表到在線的映射問題,我們對前面給出的答案就是離線表是怎麼分佈的,在線也就怎麼分佈
  2. 通過同一套代碼去執行離線和在線特徵轉換,讓在線模型效果得到了保證
  3. 數據科學家與業務開發團隊的合作以sql為傳遞介質,而不再是手工去轉換代碼,大大提升模型迭代效率
  4. 通過llvm加速的sql,相比scala實現的spark2.x和3.x在時序複雜特徵場景能夠加速2~3倍,在線通過in-memory的存儲,能夠保證sql能夠在非常低延遲返回結果

快速將spark sql 模型變成實時服務demo

demo的模型訓練場景為預測一次打車行程到結束所需要的時間,這裡我們將使用fedb ,pyspark,lightgbm等工具最終搭建一個http 模型推理服務,這也會是spark在機器學習場景的實踐

image2020-6-25_10-36-41.png
整個demo200多行代碼,製作時間不超過半個小時

  1. train_sql.py 特徵計算與訓練, 80行代碼
  2. predict_server.py 模型推理http服務, 129行代碼

場景數據和特徵介紹

整個訓練數據如下樣子
樣例數據

`id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration`

`id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856`

`id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198`

`id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303`

`id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330`

`id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496`

`id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935`

`id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904`

`id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331`

`id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674`

場景特徵變換sql腳本
特徵變換

`select` `trip_duration, passenger_count,`

`sum``(pickup_latitude) over w<span> </span``as` `vendor_sum_pl,`

`max``(pickup_latitude) over w<span> </span``as` `vendor_max_pl,`

`min``(pickup_latitude) over w<span> </span``as` `vendor_min_pl,`

`avg``(pickup_latitude) over w<span> </span``as` `vendor_avg_pl,`

`sum``(pickup_latitude) over w2<span> </span``as` `pc_sum_pl,`

`max``(pickup_latitude) over w2<span> </span``as` `pc_max_pl,`

`min``(pickup_latitude) over w2<span> </span``as` `pc_min_pl,`

`avg``(pickup_latitude) over w2<span> </span``as` `pc_avg_pl ,`

`count``(vendor_id) over w2<span> </span``as` `pc_cnt,`

`count``(vendor_id) over w<span> </span``as` `vendor_cnt`

`from` `{}`

`window w<span> </span``as` `(partition<span> </span``by` `vendor_id<span> </span``order` `by` `pickup_datetime ROWS_RANGE<span> </span``BETWEEN` `1d PRECEDING<span> </span``AND` `CURRENT` `ROW),`

`w2<span> </span``as` `(partition<span> </span``by` `passenger_count<span> </span``order` `by` `pickup_datetime ROWS_RANGE<span> </span``BETWEEN` `1d PRECEDING<span> </span``AND` `CURRENT` `ROW)`

我們選擇了vendor_id 和 passenger_count 兩個緯度做時序特徵

`train_df<span> </span``=` `spark.sql(train_sql)`

`# specify your configurations as a dict`

`params<span> </span``=` `{`

`    ``'boosting_type'``:<span> </span``'gbdt'``,`

`    ``'objective'``:<span> </span``'regression'``,`

`    ``'metric'``: {``'l2'``,<span> </span``'l1'``},`

`    ``'num_leaves'``:<span> </span``31``,`

`    ``'learning_rate'``:<span> </span``0.05``,`

`    ``'feature_fraction'``:<span> </span``0.9``,`

`    ``'bagging_fraction'``:<span> </span``0.8``,`

`    ``'bagging_freq'``:<span> </span``5``,`

`    ``'verbose'``:<span> </span``0`

`}`

`print``(``'Starting training...'``)`

`gbm<span> </span``=` `lgb.train(params,`

`                ``lgb_train,`

`                ``num_boost_roun``=``20``,`

`                ``valid_sets``=``lgb_eval,`

`                ``early_stopping_rounds``=``5``)`

`gbm.save_model(``'model.txt'``)`

執行模型訓練過程,最終產生model.txt

模型推理過程

導入數據代碼
import

`def` `insert_row(line):`

`    ``row<span> </span``=` `line.split(``','``)`

`    ``row[``2``]<span> </span``=` `'%dl'``%``int``(datetime.datetime.strptime(row[``2``],<span> </span``'%Y-%m-%d %H:%M:%S'``).timestamp()<span> </span``*` `1000``)`

`    ``row[``3``]<span> </span``=` `'%dl'``%``int``(datetime.datetime.strptime(row[``3``],<span> </span``'%Y-%m-%d %H:%M:%S'``).timestamp()<span> </span``*` `1000``)`

`    ``insert<span> </span``=` `"insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"``%` `tupl``(row)`

`    ``driver.executeInsert(``'db_test'``, insert)`

`with<span> </span``open``(``'data/taxi_tour_table_train_simple.csv'``,<span> </span``'r'``) as fd:`

`    ``idx<span> </span``=` `0`

`    ``for` `line<span> </span``in` `fd:`

`        ``if` `idx<span> </span``=``=` `0``:`

`            ``idx<span> </span``=` `idx<span> </span``+` `1`

`            ``continu`

`        ``insert_row(line.replace(``'\n'``, ''))`

`        ``idx<span> </span``=` `idx<span> </span``+` `1`

註:train.csv為訓練數據csv格式版本

模型推理邏輯
predict.py

`def` `post(``self``):`

`        ``row<span> </span``=` `json.loads(``self``.request.body)`

`        ``ok, req<span> </span``=` `fedb_driver.getRequestBuilder(``'db_test'``, sql)`

`        ``if` `not` `ok<span> </span``or` `not` `req:`

`            ``self``.write(``"fail to get req"``)`

`            ``return`

`        ``input_schema<span> </span``=` `req.GetSchema()`

`        ``if` `not` `input_schema:`

`            ``self``.write(``"no schema found"``)`

`            ``return`

`        ``str_length<span> </span``=` `0`

`        ``for` `i<span> </span``in` `rang``(input_schema.GetColumnCnt()):`

`            ``if` `sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))<span> </span``=``=` `'string'``:`

`                ``str_length<span> </span``=` `str_length<span> </span``+` `len``(row.get(input_schema.GetColumnName(i), ''))`

`        ``req.Init(str_length)`

`        ``for` `i<span> </span``in` `rang``(input_schema.GetColumnCnt()):`

`            ``tname<span> </span``=`  `sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))`

`            ``if` `tname<span> </span``=``=` `'string'``:`

`                ``req.AppendString(row.get(input_schema.GetColumnName(i), ''))`

`            ``elif` `tname<span> </span``=``=` `'int32'``:`

`                ``req.AppendInt32(``int``(row.get(input_schema.GetColumnName(i),<span> </span``0``)))`

`            ``elif` `tname<span> </span``=``=` `'double'``:`

`                ``req.AppendDouble(``float``(row.get(input_schema.GetColumnName(i),<span> </span``0``)))`

`            ``elif` `tname<span> </span``=``=` `'timestamp'``:`

`                ``req.AppendTimestamp(``int``(row.get(input_schema.GetColumnName(i),<span> </span``0``)))`

`            ``els``:`

`                ``req.AppendNULL()`

`        ``if` `not` `req.Build():`

`            ``self``.write(``"fail to build request"``)`

`            ``return`

 

`        ``ok, rs<span> </span``=` `fedb_driver.executeQuery(``'db_test'``, sql, req)`

`        ``if` `not` `ok:`

`            ``self``.write(``"fail to execute sql"``)`

`            ``return`

`        ``rs.``Next``()`

`        ``ins<span> </span``=` `build_feature(rs)`

`        ``self``.write(``"----------------ins---------------\n"``)`

`        ``self``.write(``str``(ins)<span> </span``+` `"\n"``)`

`        ``duration<span> </span``=` `bst.predict(ins)`

`        ``self``.write(``"---------------predict trip_duration -------------\n"``)`

`        ``self``.write(``"%s s"``%``str``(duration[``0``]))`

最終執行效果

`# 發送推理請求 ,會看到如下輸出`

`python3 predict.py`

`----------------ins---------------`

`[[ 2.       40.774097 40.774097 40.774097 40.774097 40.774097 40.774097`

`  ``40.774097 40.774097  1.        1.      ]]`

`---------------predict trip_duration -------------`

`859.3298781277192 s`

運行demo請到//github.com/4paradigm/SparkSQLWithFeDB
qrcode_for_gh_d85202affe29_258.jpg
關注公眾號,獲得視頻教程

Tags: