分散式機器學習:如何快速從Python棧過渡到Scala棧

首先介紹下我的情況和需求,如果你跟我類似,那麼這篇文章將很有幫助;

我之前的技術棧主要是JavaPython,機器學習方面主要用到是pandasnumpysklearnscipymatplotlib等等,因為工作需要使用spark,所以理所應當的開始學習pyspark

之後一方面團隊其他成員基本都是用scala,同時在Spark API更新上,pyspark也要慢於scala的,而且對於集群維護的同事來說,也不想再維護一套python環境,基於此,開始將技術棧轉到scala+spark

如果你的情況也大致如上,那麼這篇文章可以作為一個很實用的參考,快速的將一個之前用pyspark完成的項目轉移到scala上;

正文開始。。。。

項目介紹

基於300w用戶的上億出行數據的聚類分析項目,最早使用Python棧完成,主要是pandas+sklearn+seaborn等庫的使用,後需要使用spark集群,因此轉移到pyspark;

現在的需求是功能等不動的前提下轉移到Scala下實現,也就是通過Scala+SparkAPI實現整個機器學習流程以及結果解釋分析;

根據需求分解任務如下:

  1. 學習scala基本語法、數據結構、IO等;
  2. 搭建Idea+scala+spark的本地開發環境;
  3. 以上述兩點為基礎移植前Python項目;

Scala基礎學習

Scala是一門多範式語言,函數式程式語言,這一點主要體現在於針對問題的處理方式上於面向對象的語言略有不同,函數式+惰性求值+多執行緒支援等方面的優勢使得它被Spark選擇為開發語言;

上述是一些概念性的東西,屬於熟悉了Scala之後需要去多多了解的,不過對於工作中現學的同學來說更重要的是如何快速上手,那就從最最緊迫的部分開始吧;

一般來說一門語言最快掌握的方法是與其他語言對比進行學習,因此下面每一部分都與java、python兩門最多人用的語言做對比者進行,指出一些明顯差異的點和類似的點;

參考鏈接://www.cnblogs.com/qingyunzong/p/8858234.html

Hello World

Scala語言是運行於JVM的,沒錯,就是java虛擬機,因此它的編譯、運行過程與java非常類似,或者說我們就認為是一樣也是可以的,java通過javac編譯得到位元組碼文件,通過java運行,Scala則是通過scalac編譯,通過scala運行,而且由於二者底層是一致的,因此Scala中可以直接導入java的庫來使用,這有助於利用java中很多久經考驗的第三方庫;

開發工具選擇Idea,Idea支援Scala插件,下載後可以直接新建Scala項目,還是很方便的,Idea搭建Scala環境參考鏈接://www.cnblogs.com/skyell/p/9939535.html

Hello World:

object Demo {

  def hello(): Unit = {
    println("Hello Scala!!!")
  }

  def main(args: Array[String]): Unit = {
    hello()
  }
}

對比Java、Python等,看看它的特點:

  1. 首先整體程式碼組成結構上與java一致,需要一個類為運行主體,main函數為入口;
  2. 在方法定義上使用def關鍵字,同時是先指定入參,再指定出參,注意Unit表示函數沒有返回值;
  3. 每行程式碼末尾的;可有可無,這與Python一致;

語言基礎

語言基礎主要由基本數據類型IF-ELSE循環函數組成,這也是每個語言的基礎,基本上這部分統一了大部分程式碼都能看懂;

基本數據類型

val byte:Byte = 127 // -128 ~ 127
val short:Short = 32767 // -32768 ~ 32767
val int:Int = 123456
val long:Long = 1234567890
val float:Float = 123.45f
val double:Double = 123.45d
val char:Char = 'a'
val string:String = "abc"
val bool:Boolean = true
val unit:Unit = () // unit一般用於函數不返回值時,也就是java的void
val nil:Null = null // 空值
// Nothing是所有其他類的子類 Any是所有其他類的超類 AnyRef是所有引用類的基類
var name = "helong"
name = "nemo" // var才能賦值,val賦值會報錯,可以不指定類型,會自動推斷
println(byte,short,int,long,float,double,char,string,bool,unit,nil,name)

數據類型上看Scala的特點有:

  1. 與java類似支援的數據類型比較多,比如單、雙精度浮點型都支援,不像Python只有雙精度;
  2. 區分CharString,意味著單引號、雙引號不能隨便混用;
  3. Unit類型用於函數沒有返回值時;
  4. Null表示空值;
  5. val定義的變數為常量,其值不能改變,而var定義的則是變數,值可以隨便改,這裡主要關注類型為集合時,可變與不可變如何理解,這點放到後面集合時再細說;

IF-ELSE

val x = 1
println(if(x>0) x else 0) // 條件表達式類似三元運算符
println(if(x>1) x) // 預設else就等價於else ()
println(if(x>1) x else if(x==1) "x:1" else ()) // 支援if、else if、else

// 塊表達式類似把條件表達式拉直
// 注意到當我們不指定類型時,就可以返回多種格式讓編譯器做運行時處理
val y = {
    if(x==0)
    	"x=0"
    else if(x==1)
    	x
    else
    	0
}
println(y)

條件判斷語句上看差異:

  1. 獨特的三目運算符格式:if(條件) 滿足返回A else 不滿足返回B
  2. Scala的三目運算符其實是條件表達式的一種特定格式;
  3. 條件表達式的各個條件下返回值類型可以不一致;
  4. 可以通過寫成塊狀來提高可讀性,外層用{}包住;

循環

while:

// while循環
var n = 10
while(n>0) {
    println(n)
    n-=1
}

while循環看起來沒什麼特別的,實際使用也確實比較少。。。。

for:

println(1 to 10)
println(1 until 10)
for (i <- 1 to 10)
	print(i+"\t")
println()
for (i <- 1 until 10)
	print(i+"\t")
println()
// 遍曆數組中的元素,類似java的增強for
// 可以看到數組中元素可以不同類型
for (arr <- Array('n',1,3.45,true,"nemo"))
	print(arr+"\t")
println()
// for循環高級技巧:單個for中有多個變數,每個生成器都帶過濾條件
// 效果就是嵌套for循環
for (i <- 1 to 10 if i%2==0; j <- Array("n","e","m","o") if j!="e")
	print(i+":"+j+"\t")
println()

// for推導式,循環體以yield開始會構建一個集合返回
val vec = for (i <- 1 to 10) yield i*10
println(vec)
for (i <- vec)
	print(i+"\t")
println()

for循環時Scala比較有特點的一部分:

  1. 支援n to mn until m兩種方式,區別是使用until時循環不包含m,算是很貼心的小改動,可讀性比java和python都強一些;
  2. for循環支援生成器、集合、range等類型中遍歷,類似java的普通循環和增強for循環的結合,for (item <- 1 to 10)for (item <- Array('a','b','c'))
  3. 高級for循環技巧:每層循環帶過濾條件,嵌套循環寫在一個for內;
  4. Scala的for循環也支援類似python列表推導式的方法:for (1 <- 1 to 10) yield i*10

函數

準確的說,在Scala中函數方法不完全等價,所謂的方法是類的一部分,而函數則是一個對象,可以賦值給一個變數,這裡就不糾結啦;

// 函數:一行函數,返回值類型可以不寫,此時自動推斷
def func(x:Int, y:Int): Int = x+y
println(func(2,3))
// 但是遞歸函數的返回值類型必須手動指定
def fib(f:Int): Int = if(f==0) 0 else if(f==1) 1 else if(f==2) 2 else fib(f-2)+fib(f-1)
println(fib(1),fib(2),fib(3),fib(4),fib(5))
// 在scala中,函數也是一種變數類型,因此也同樣可以賦值為某個常量或者當作另一個函數的參數
val f = (x:Int) => x*10 // 簡易函數就是lambda表達式
println(f)
def ff(k:(Int) => Int,x:Int,y:Int): Int = k(x)+k(y)
println(ff(f,3,5))
// def的方法轉函數
println(fib _) // fib本身是def定義的方法,甚至不能直接print

上面介紹的其實都是函數而不是方法:

  1. 定義一個變數,將一個函數賦值給它;
  2. 將一個函數變數作為入參傳入到另一個函數中;

這裡對於函數的理解可以想像數學中的函數,數學中的函數嵌套、組合的過程就是Scala中的函數互相作為參數傳遞的過程;

基本集合類型

一般高級語言中支援的集合類型都是類似的:數組列表字典元組等,Scala也不例外,這些基本上也滿足日常需求;

一個需要注意的點:Scala中的這些集合類型基本都分為定長變長這兩種,默認情況下都是定長的,可以通過scala.collection.mutable.xxx來導入對應的變長版本,主要區別在於當集合長度改變時是否需要重新創建一個新的集合對象;

數組

val arr = new Array[Int](8) // 長度為8,全是0的不可變數組
println(arr) // 直接列印數組看不到其內部元素,只能看到它的地址
println(arr.toBuffer) // 通過toBuffer方法轉為數組緩衝區
val arr2 = Array[Int](8) // 注意這裡沒用new
println(arr2)
println(arr2.toBuffer)
val arr3 = Array(0,1.2f,true,'h',"nemo") // 指定內容的定長數組
println(arr3(0),arr3(1),arr3(3)) // 通過(n)訪問數組元素,下標從0開始
// 變長數組,不改變變數的前提下依然可以通過+=,++=來擴展數組
import scala.collection.mutable.ArrayBuffer
val marr = ArrayBuffer[Int]()
marr += 1
marr += (2,3,4)
marr ++= Array(5,6,7)
marr ++= ArrayBuffer(8,9)
marr.insert(0,0)
marr.remove(0)
println(marr)
// 使用for遍歷
for (item <- marr)
	print(item+"\t")
println()
for (idx <- (0 until marr.length).reverse) // reverse可以反轉Range內的元素
	print(idx+":"+marr(idx)+"\t")
println()
// 對於數組,取出其全部偶數,再乘以10返回新數組
// 寫法1:也是一般的程式寫法,這個過程中其實是將需求轉換為程式思想
var marr2 = Array(1,2,3,4,5,6,7,8,9,10)
marr2 = for (i <- marr2 if i%2==0) yield i*10
println(marr2.toBuffer)
// 寫法2:更加scala,面向函數,可讀性更強,不需要維護那個i,循環會執行兩次,先過濾,再映射
marr2 = Array(1,2,3,4,5,6,7,8,9,10)
marr2 = marr2.filter(_%2==0).map(_*10)
println(marr2.toBuffer)
// 數組的一些常用方法,注意scala中函數調用沒有參數時可以不寫()
println(marr2.sum,marr2.max,marr2.sorted.toBuffer)

從數組上看差異:

  1. 首先一個小特點在於直接列印數組對象只能看到記憶體地址,要看到內容需要列印arr.toBuffer
  2. 數組內的元素可以是不同類型的;
  3. 通過arr(n)訪問元素,下標從0開始;
  4. ArrayBuffer是Array的變長版本;

列表

val list_x = List(1,2,3)
println(0::list_x) // 0插入到list_x的第一個位置
println(0+:list_x) // 等價於::
println(list_x.::(0))
println(list_x.+:(0)) // 注意以上四種不管0在前還是後,都是插入到第一個位置
println(list_x :+ 4)
println(list_x.:+(4)) // 所以區別是到底是+:還是:+
val list_y = List(4,5,6)
println(list_x++list_y) // ++連接兩個List
println(list_x++:list_y) // ++:看起來跟++也是一樣的嘛
// 可變List
import scala.collection.mutable.ListBuffer
val blist = ListBuffer(1,2,3)
val blist2 = new ListBuffer[Int]
blist += 4
blist.append(5)
println(blist)
blist2 ++= blist
println(blist2)

列表的騷操作就比較多了:

  1. 比如各種:::++:,看的頭大。。。。大家還是樸實一點吧,不然過兩天自己看程式碼都想死了。。。。
  2. 同樣的ListBuffer是List的可變版本;

字典

字典在Scala中叫做映射;

val map1 = Map("k1"->10, 2->1.5, 3.3->"abc")
println(map1)
val map2 = Map((1,1),(2,2),(3,3))
println(map2)
// 獲取值的方式類似數組用下標
println(map1("k1"),map1(3.3),map1.get(2),map1.getOrElse(5,"default"))
// Map默認是不可變的Map,也可以引入mutable包中的可變的Map
import scala.collection.mutable.{Map=>MMap}
val mmap = MMap((1,1),(2,2))
// map1(1) = 1 報錯,Map不可變指的是其長度、元素都不能變
mmap(1)=mmap(1)+1
// map1 += (3 -> 3) 報錯,因為原始Map不可變,+=會創建一個新的,但是map1又是常量
mmap += (3->3,4->4)
println(mmap)

字典的差異:

  1. 有兩種創建語法,個人喜歡第二種,更短一些;
  2. Map不可變指的是它的元素個數、元素內容都不能變;
  3. 如果用var來定義一個不可變Map,那麼看似可變,實際上是創建了一個新的Map對象;

元組

val tuple = (1,1.2,"abc")
println(tuple._1,tuple._2,tuple._3)
// 下面這種方式可以同時給元組中各個元素賦值到一個變數上
val tuple2,(name,age,score) = ("nemo",22,88.5)
println(tuple2,name,age,score)
// toMap+zip實現多個單Array到Map的映射
val names = Array("張三","李四","王五")
val ages = Array(17,16,18)
val scores = Array(80.5,77,90,100) // zip的結果長度為輸入兩個數組中短的那個
val infoes = names.zip(ages.zip(scores)).toMap
println(infoes)

元組的特點不要過於明顯:

  1. 可以通過tuple._n的方式來訪問第n個元素,注意是從1開始的,說實話看呆我了,這到底有啥用。。。。
  2. 集合類都有類似toArraytoListtoMapzipunzip等方法可以進行互相轉換;

Set

val set = Set(1,1,2)
println(set)
val arr_set = Array(1,1,2,2)
println(arr_set.toSet)

說實話我看到Set眼睛裡只有去重兩個字。。。

IO

首先很多時候我們用到也是java.io._庫里的讀寫方法,比如PrintWriterFile等,不過Scala自己也有一套scala.io,看情況使用吧;

val path = "D:\\Project\\Data\\Scala\\hello.txt"
val file_scala = Source.fromFile(path)
file_scala.foreach {
    println
}

val file_java = new File(path)
val writer = new PrintWriter(file_java)
writer.write("Hello Scala")
writer.close()

Scala讀、Java寫:

  1. 可以看到Scala中用java庫基本一摸一樣;
  2. Scala的讀文件結合foreach可以簡化程式碼;

以上

Scala語言基礎部分到底結束,以上內容不包含Scala的高級用法、程式碼優化、函數式編程、多執行緒等等,這些都是後續再去慢慢掌握的;

Spark本地開發環境搭建

這裡主要分為以下幾個步驟:

  1. windows本地hadoop+spark環境搭建;
  2. Idea基於Maven搭建Spark環境;

基本上都上網上找的資料,也算是踩過不少坑吧,環境問題有時候是比較煩人的,我也被一個Scala與Spark版本不對應問題浪費了幾個小時。。。。大家要有耐心哈,有問題的討論區留言,這裡留下幾個比較靠譜的參考鏈接:

我感覺吧,Windows搞環境確實問題更多,以前搞Python、Anaconda那些也是,老是缺這個少那個的,報錯資訊還看不出來。。。。這方面還是Ubuntu更靠譜一點。。。。

pyspark到Scala Spark

程式碼移植的過程相信大家都有很多經驗,關鍵在於小步前進,千萬別為了圖快從頭到尾搞完再運行,後面調起來更要命,把項目按功能模組劃分,機器學習的項目基本還是比較簡單的線性結構,我這裡主要劃分為以下幾部分分別進行:

  1. Spark初始化以及數據載入;
  2. 數據預處理;
  3. 外部數據處理與鏈接;
  4. 特徵工程;
  5. 建模;

可以看到基本以機器學習的各個環節為劃分依據,方便出行問題進行debug,以我的經驗主要工作在特徵工程部份,這部分兩邊的差異會比較大,而且處理起來要格外小心,避免因為邏輯bug導致最終結果不一致;

Spark初始化以及數據載入

這部分最簡單,因為除了語法差異,可以說是完全一致,注意點如下:

  1. 由於Idea開發Spark默認沒有啟動Hadoop的,因此對應數據都在本地;
  2. 字元串如果用的是單引號需要全部替換為雙引號;
  3. 兩邊的API名基本都沒變,Scala更常用的是鏈式調用,Python用的更多是顯式指定參數的函數調用;

外部數據

這裡有一個很大的問題,如果你的數據中的列名有中文,那麼建議全部重命名為英文,否在在構建SQL表達式等地方會報錯,奇怪的是這部分在Python中倒是正常的,這個坑也填了好久。。。。

對於udf的使用上,區別主要在於Scala與Python的函數定義以及Python中對Lambda的使用,官方建議是少用udf,最好在functions包里找找先;

特徵工程

我在這部分花的時間比較多,主要是它涉及很多udf、列表推導式、SQL表達式、特徵複雜處理等,需要注意:

  1. 對於udf部分,Scala中的入參指定類型這一點花了我不少時間,Python用多了就是慣壞了。。。
  2. 列表推導式可以由Scala的for (....) yield ....來替換,注意tountil的區別;
  3. 表達式部分千萬千萬不要用中文,都是淚啊,我是因為之前數據集中有一部分列是外部數據,用的中文,天坑。。。。

建模

這部分本身倒是沒什麼問題,但是我這部分最後會將結果寫入到本地的parquet文件,以及保存模型文件,結果一直報錯,錯誤資訊也看不出具體原因,按常理來說我首先考慮是許可權問題,折騰半天不行,又考慮是API用法問題,各種參數、用法都試了還是不行,最後發現又是windows的問題,缺了一些dll,哎哎,不說了,如果大家也遇到了ExitCodeException exitCode=-1073741515,那麼不用懷疑,直接到這裡下載程式安裝dll即可;

最後

以上就是全部過渡過程,中間有一些波折,但是整體還算順利,總時間大概是2.5個工作日,希望這篇文章可以加快有同樣需求的小夥伴的速度哈,目前感覺上Python要更簡潔,Scala更像Java,不過它多了一些可以簡化程式碼的語法糖、高級特性等,期待後面更深入的學習;

也要特別感謝網上大佬們的總結分享,有些問題真心難搞。。。。