分布式机器学习:如何快速从Python栈过渡到Scala栈

首先介绍下我的情况和需求,如果你跟我类似,那么这篇文章将很有帮助;

我之前的技术栈主要是JavaPython,机器学习方面主要用到是pandasnumpysklearnscipymatplotlib等等,因为工作需要使用spark,所以理所应当的开始学习pyspark

之后一方面团队其他成员基本都是用scala,同时在Spark API更新上,pyspark也要慢于scala的,而且对于集群维护的同事来说,也不想再维护一套python环境,基于此,开始将技术栈转到scala+spark

如果你的情况也大致如上,那么这篇文章可以作为一个很实用的参考,快速的将一个之前用pyspark完成的项目转移到scala上;

正文开始。。。。

项目介绍

基于300w用户的上亿出行数据的聚类分析项目,最早使用Python栈完成,主要是pandas+sklearn+seaborn等库的使用,后需要使用spark集群,因此转移到pyspark;

现在的需求是功能等不动的前提下转移到Scala下实现,也就是通过Scala+SparkAPI实现整个机器学习流程以及结果解释分析;

根据需求分解任务如下:

  1. 学习scala基本语法、数据结构、IO等;
  2. 搭建Idea+scala+spark的本地开发环境;
  3. 以上述两点为基础移植前Python项目;

Scala基础学习

Scala是一门多范式语言,函数式编程语言,这一点主要体现在于针对问题的处理方式上于面向对象的语言略有不同,函数式+惰性求值+多线程支持等方面的优势使得它被Spark选择为开发语言;

上述是一些概念性的东西,属于熟悉了Scala之后需要去多多了解的,不过对于工作中现学的同学来说更重要的是如何快速上手,那就从最最紧迫的部分开始吧;

一般来说一门语言最快掌握的方法是与其他语言对比进行学习,因此下面每一部分都与java、python两门最多人用的语言做对比者进行,指出一些明显差异的点和类似的点;

参考链接://www.cnblogs.com/qingyunzong/p/8858234.html

Hello World

Scala语言是运行于JVM的,没错,就是java虚拟机,因此它的编译、运行过程与java非常类似,或者说我们就认为是一样也是可以的,java通过javac编译得到字节码文件,通过java运行,Scala则是通过scalac编译,通过scala运行,而且由于二者底层是一致的,因此Scala中可以直接导入java的库来使用,这有助于利用java中很多久经考验的第三方库;

开发工具选择Idea,Idea支持Scala插件,下载后可以直接新建Scala项目,还是很方便的,Idea搭建Scala环境参考链接://www.cnblogs.com/skyell/p/9939535.html

Hello World:

object Demo {

  def hello(): Unit = {
    println("Hello Scala!!!")
  }

  def main(args: Array[String]): Unit = {
    hello()
  }
}

对比Java、Python等,看看它的特点:

  1. 首先整体代码组成结构上与java一致,需要一个类为运行主体,main函数为入口;
  2. 在方法定义上使用def关键字,同时是先指定入参,再指定出参,注意Unit表示函数没有返回值;
  3. 每行代码末尾的;可有可无,这与Python一致;

语言基础

语言基础主要由基本数据类型IF-ELSE循环函数组成,这也是每个语言的基础,基本上这部分统一了大部分代码都能看懂;

基本数据类型

val byte:Byte = 127 // -128 ~ 127
val short:Short = 32767 // -32768 ~ 32767
val int:Int = 123456
val long:Long = 1234567890
val float:Float = 123.45f
val double:Double = 123.45d
val char:Char = 'a'
val string:String = "abc"
val bool:Boolean = true
val unit:Unit = () // unit一般用于函数不返回值时,也就是java的void
val nil:Null = null // 空值
// Nothing是所有其他类的子类 Any是所有其他类的超类 AnyRef是所有引用类的基类
var name = "helong"
name = "nemo" // var才能赋值,val赋值会报错,可以不指定类型,会自动推断
println(byte,short,int,long,float,double,char,string,bool,unit,nil,name)

数据类型上看Scala的特点有:

  1. 与java类似支持的数据类型比较多,比如单、双精度浮点型都支持,不像Python只有双精度;
  2. 区分CharString,意味着单引号、双引号不能随便混用;
  3. Unit类型用于函数没有返回值时;
  4. Null表示空值;
  5. val定义的变量为常量,其值不能改变,而var定义的则是变量,值可以随便改,这里主要关注类型为集合时,可变与不可变如何理解,这点放到后面集合时再细说;

IF-ELSE

val x = 1
println(if(x>0) x else 0) // 条件表达式类似三元运算符
println(if(x>1) x) // 缺省else就等价于else ()
println(if(x>1) x else if(x==1) "x:1" else ()) // 支持if、else if、else

// 块表达式类似把条件表达式拉直
// 注意到当我们不指定类型时,就可以返回多种格式让编译器做运行时处理
val y = {
    if(x==0)
    	"x=0"
    else if(x==1)
    	x
    else
    	0
}
println(y)

条件判断语句上看差异:

  1. 独特的三目运算符格式:if(条件) 满足返回A else 不满足返回B
  2. Scala的三目运算符其实是条件表达式的一种特定格式;
  3. 条件表达式的各个条件下返回值类型可以不一致;
  4. 可以通过写成块状来提高可读性,外层用{}包住;

循环

while:

// while循环
var n = 10
while(n>0) {
    println(n)
    n-=1
}

while循环看起来没什么特别的,实际使用也确实比较少。。。。

for:

println(1 to 10)
println(1 until 10)
for (i <- 1 to 10)
	print(i+"\t")
println()
for (i <- 1 until 10)
	print(i+"\t")
println()
// 遍历数组中的元素,类似java的增强for
// 可以看到数组中元素可以不同类型
for (arr <- Array('n',1,3.45,true,"nemo"))
	print(arr+"\t")
println()
// for循环高级技巧:单个for中有多个变量,每个生成器都带过滤条件
// 效果就是嵌套for循环
for (i <- 1 to 10 if i%2==0; j <- Array("n","e","m","o") if j!="e")
	print(i+":"+j+"\t")
println()

// for推导式,循环体以yield开始会构建一个集合返回
val vec = for (i <- 1 to 10) yield i*10
println(vec)
for (i <- vec)
	print(i+"\t")
println()

for循环时Scala比较有特点的一部分:

  1. 支持n to mn until m两种方式,区别是使用until时循环不包含m,算是很贴心的小改动,可读性比java和python都强一些;
  2. for循环支持生成器、集合、range等类型中遍历,类似java的普通循环和增强for循环的结合,for (item <- 1 to 10)for (item <- Array('a','b','c'))
  3. 高级for循环技巧:每层循环带过滤条件,嵌套循环写在一个for内;
  4. Scala的for循环也支持类似python列表推导式的方法:for (1 <- 1 to 10) yield i*10

函数

准确的说,在Scala中函数方法不完全等价,所谓的方法是类的一部分,而函数则是一个对象,可以赋值给一个变量,这里就不纠结啦;

// 函数:一行函数,返回值类型可以不写,此时自动推断
def func(x:Int, y:Int): Int = x+y
println(func(2,3))
// 但是递归函数的返回值类型必须手动指定
def fib(f:Int): Int = if(f==0) 0 else if(f==1) 1 else if(f==2) 2 else fib(f-2)+fib(f-1)
println(fib(1),fib(2),fib(3),fib(4),fib(5))
// 在scala中,函数也是一种变量类型,因此也同样可以赋值为某个常量或者当作另一个函数的参数
val f = (x:Int) => x*10 // 简易函数就是lambda表达式
println(f)
def ff(k:(Int) => Int,x:Int,y:Int): Int = k(x)+k(y)
println(ff(f,3,5))
// def的方法转函数
println(fib _) // fib本身是def定义的方法,甚至不能直接print

上面介绍的其实都是函数而不是方法:

  1. 定义一个变量,将一个函数赋值给它;
  2. 将一个函数变量作为入参传入到另一个函数中;

这里对于函数的理解可以想象数学中的函数,数学中的函数嵌套、组合的过程就是Scala中的函数互相作为参数传递的过程;

基本集合类型

一般高级语言中支持的集合类型都是类似的:数组列表字典元组等,Scala也不例外,这些基本上也满足日常需求;

一个需要注意的点:Scala中的这些集合类型基本都分为定长变长这两种,默认情况下都是定长的,可以通过scala.collection.mutable.xxx来导入对应的变长版本,主要区别在于当集合长度改变时是否需要重新创建一个新的集合对象;

数组

val arr = new Array[Int](8) // 长度为8,全是0的不可变数组
println(arr) // 直接打印数组看不到其内部元素,只能看到它的地址
println(arr.toBuffer) // 通过toBuffer方法转为数组缓冲区
val arr2 = Array[Int](8) // 注意这里没用new
println(arr2)
println(arr2.toBuffer)
val arr3 = Array(0,1.2f,true,'h',"nemo") // 指定内容的定长数组
println(arr3(0),arr3(1),arr3(3)) // 通过(n)访问数组元素,下标从0开始
// 变长数组,不改变变量的前提下依然可以通过+=,++=来扩展数组
import scala.collection.mutable.ArrayBuffer
val marr = ArrayBuffer[Int]()
marr += 1
marr += (2,3,4)
marr ++= Array(5,6,7)
marr ++= ArrayBuffer(8,9)
marr.insert(0,0)
marr.remove(0)
println(marr)
// 使用for遍历
for (item <- marr)
	print(item+"\t")
println()
for (idx <- (0 until marr.length).reverse) // reverse可以反转Range内的元素
	print(idx+":"+marr(idx)+"\t")
println()
// 对于数组,取出其全部偶数,再乘以10返回新数组
// 写法1:也是一般的程序写法,这个过程中其实是将需求转换为程序思想
var marr2 = Array(1,2,3,4,5,6,7,8,9,10)
marr2 = for (i <- marr2 if i%2==0) yield i*10
println(marr2.toBuffer)
// 写法2:更加scala,面向函数,可读性更强,不需要维护那个i,循环会执行两次,先过滤,再映射
marr2 = Array(1,2,3,4,5,6,7,8,9,10)
marr2 = marr2.filter(_%2==0).map(_*10)
println(marr2.toBuffer)
// 数组的一些常用方法,注意scala中函数调用没有参数时可以不写()
println(marr2.sum,marr2.max,marr2.sorted.toBuffer)

从数组上看差异:

  1. 首先一个小特点在于直接打印数组对象只能看到内存地址,要看到内容需要打印arr.toBuffer
  2. 数组内的元素可以是不同类型的;
  3. 通过arr(n)访问元素,下标从0开始;
  4. ArrayBuffer是Array的变长版本;

列表

val list_x = List(1,2,3)
println(0::list_x) // 0插入到list_x的第一个位置
println(0+:list_x) // 等价于::
println(list_x.::(0))
println(list_x.+:(0)) // 注意以上四种不管0在前还是后,都是插入到第一个位置
println(list_x :+ 4)
println(list_x.:+(4)) // 所以区别是到底是+:还是:+
val list_y = List(4,5,6)
println(list_x++list_y) // ++连接两个List
println(list_x++:list_y) // ++:看起来跟++也是一样的嘛
// 可变List
import scala.collection.mutable.ListBuffer
val blist = ListBuffer(1,2,3)
val blist2 = new ListBuffer[Int]
blist += 4
blist.append(5)
println(blist)
blist2 ++= blist
println(blist2)

列表的骚操作就比较多了:

  1. 比如各种:::++:,看的头大。。。。大家还是朴实一点吧,不然过两天自己看代码都想死了。。。。
  2. 同样的ListBuffer是List的可变版本;

字典

字典在Scala中叫做映射;

val map1 = Map("k1"->10, 2->1.5, 3.3->"abc")
println(map1)
val map2 = Map((1,1),(2,2),(3,3))
println(map2)
// 获取值的方式类似数组用下标
println(map1("k1"),map1(3.3),map1.get(2),map1.getOrElse(5,"default"))
// Map默认是不可变的Map,也可以引入mutable包中的可变的Map
import scala.collection.mutable.{Map=>MMap}
val mmap = MMap((1,1),(2,2))
// map1(1) = 1 报错,Map不可变指的是其长度、元素都不能变
mmap(1)=mmap(1)+1
// map1 += (3 -> 3) 报错,因为原始Map不可变,+=会创建一个新的,但是map1又是常量
mmap += (3->3,4->4)
println(mmap)

字典的差异:

  1. 有两种创建语法,个人喜欢第二种,更短一些;
  2. Map不可变指的是它的元素个数、元素内容都不能变;
  3. 如果用var来定义一个不可变Map,那么看似可变,实际上是创建了一个新的Map对象;

元组

val tuple = (1,1.2,"abc")
println(tuple._1,tuple._2,tuple._3)
// 下面这种方式可以同时给元组中各个元素赋值到一个变量上
val tuple2,(name,age,score) = ("nemo",22,88.5)
println(tuple2,name,age,score)
// toMap+zip实现多个单Array到Map的映射
val names = Array("张三","李四","王五")
val ages = Array(17,16,18)
val scores = Array(80.5,77,90,100) // zip的结果长度为输入两个数组中短的那个
val infoes = names.zip(ages.zip(scores)).toMap
println(infoes)

元组的特点不要过于明显:

  1. 可以通过tuple._n的方式来访问第n个元素,注意是从1开始的,说实话看呆我了,这到底有啥用。。。。
  2. 集合类都有类似toArraytoListtoMapzipunzip等方法可以进行互相转换;

Set

val set = Set(1,1,2)
println(set)
val arr_set = Array(1,1,2,2)
println(arr_set.toSet)

说实话我看到Set眼睛里只有去重两个字。。。

IO

首先很多时候我们用到也是java.io._库里的读写方法,比如PrintWriterFile等,不过Scala自己也有一套scala.io,看情况使用吧;

val path = "D:\\Project\\Data\\Scala\\hello.txt"
val file_scala = Source.fromFile(path)
file_scala.foreach {
    println
}

val file_java = new File(path)
val writer = new PrintWriter(file_java)
writer.write("Hello Scala")
writer.close()

Scala读、Java写:

  1. 可以看到Scala中用java库基本一摸一样;
  2. Scala的读文件结合foreach可以简化代码;

以上

Scala语言基础部分到底结束,以上内容不包含Scala的高级用法、代码优化、函数式编程、多线程等等,这些都是后续再去慢慢掌握的;

Spark本地开发环境搭建

这里主要分为以下几个步骤:

  1. windows本地hadoop+spark环境搭建;
  2. Idea基于Maven搭建Spark环境;

基本上都上网上找的资料,也算是踩过不少坑吧,环境问题有时候是比较烦人的,我也被一个Scala与Spark版本不对应问题浪费了几个小时。。。。大家要有耐心哈,有问题的讨论区留言,这里留下几个比较靠谱的参考链接:

我感觉吧,Windows搞环境确实问题更多,以前搞Python、Anaconda那些也是,老是缺这个少那个的,报错信息还看不出来。。。。这方面还是Ubuntu更靠谱一点。。。。

pyspark到Scala Spark

代码移植的过程相信大家都有很多经验,关键在于小步前进,千万别为了图快从头到尾搞完再运行,后面调起来更要命,把项目按功能模块划分,机器学习的项目基本还是比较简单的线性结构,我这里主要划分为以下几部分分别进行:

  1. Spark初始化以及数据加载;
  2. 数据预处理;
  3. 外部数据处理与链接;
  4. 特征工程;
  5. 建模;

可以看到基本以机器学习的各个环节为划分依据,方便出行问题进行debug,以我的经验主要工作在特征工程部份,这部分两边的差异会比较大,而且处理起来要格外小心,避免因为逻辑bug导致最终结果不一致;

Spark初始化以及数据加载

这部分最简单,因为除了语法差异,可以说是完全一致,注意点如下:

  1. 由于Idea开发Spark默认没有启动Hadoop的,因此对应数据都在本地;
  2. 字符串如果用的是单引号需要全部替换为双引号;
  3. 两边的API名基本都没变,Scala更常用的是链式调用,Python用的更多是显式指定参数的函数调用;

外部数据

这里有一个很大的问题,如果你的数据中的列名有中文,那么建议全部重命名为英文,否在在构建SQL表达式等地方会报错,奇怪的是这部分在Python中倒是正常的,这个坑也填了好久。。。。

对于udf的使用上,区别主要在于Scala与Python的函数定义以及Python中对Lambda的使用,官方建议是少用udf,最好在functions包里找找先;

特征工程

我在这部分花的时间比较多,主要是它涉及很多udf、列表推导式、SQL表达式、特征复杂处理等,需要注意:

  1. 对于udf部分,Scala中的入参指定类型这一点花了我不少时间,Python用多了就是惯坏了。。。
  2. 列表推导式可以由Scala的for (....) yield ....来替换,注意tountil的区别;
  3. 表达式部分千万千万不要用中文,都是泪啊,我是因为之前数据集中有一部分列是外部数据,用的中文,天坑。。。。

建模

这部分本身倒是没什么问题,但是我这部分最后会将结果写入到本地的parquet文件,以及保存模型文件,结果一直报错,错误信息也看不出具体原因,按常理来说我首先考虑是权限问题,折腾半天不行,又考虑是API用法问题,各种参数、用法都试了还是不行,最后发现又是windows的问题,缺了一些dll,哎哎,不说了,如果大家也遇到了ExitCodeException exitCode=-1073741515,那么不用怀疑,直接到这里下载程序安装dll即可;

最后

以上就是全部过渡过程,中间有一些波折,但是整体还算顺利,总时间大概是2.5个工作日,希望这篇文章可以加快有同样需求的小伙伴的速度哈,目前感觉上Python要更简洁,Scala更像Java,不过它多了一些可以简化代码的语法糖、高级特性等,期待后面更深入的学习;

也要特别感谢网上大佬们的总结分享,有些问题真心难搞。。。。