­

MapReduce (MIT 6.824: Lec 1: Preparation)

  • 2020 年 5 月 2 日
  • AI

从本篇文章开始,专栏将以《MIT 6.824:Distributes Systems》的课程逻辑出发,逐步更新课程内的全部内容,敬请关注,谢谢。

如果想要跟方便的查看课程的更新内容,也欢迎关注微信公众号:《油麻酥爱学习爱健身》,微信号:youmasu

除了MIT分布式课程的学习以外,公众号还会不定期分享自己的健身经验,包括,家庭自重健身,健身房增肌减脂,日常饮食营养等健身内容,再次谢谢大家。

一,摘要

MapReduce是一种编程模型,也是一种处理和生成大规模数据集的具体实现。

它使用map函数处理一个key-value源数据,生成若干个key-value的中间数据集合。再使用reduce函数将所有中间数据中拥有相同key的value合并起来生成目标结果。

使用MapReduce方式实现的程序将自动获得并发的能力,并能在大规模集群上运行。谷歌使用的MapReduce实现拥有高度的扩展性,它每天在数千台机器上处理TB级的数据。

二,导论

Google每天都要处理海量数据,但大多数情况下,程序员都需要设计和编写极其复杂的程序来处理简单的数据运算,将大量的时间都浪费在对输入数据的拆分,程序在集群上的调度,机器失败处理,集群间的通信等通用功能上。

因此,受到Lisp中map,reduce原语的启发,我们设计了一个只需要关注数据核心处理逻辑,隐藏复杂系统设计的处理框架MapReduce。该框架用户只需自定义map和reduce操作,就能执行分布式并发处理,并在处理失败时默认重新执行作为容错的基本原则。

这项篇文章会提供一个可以用于在大规模集群上自动并行的,简单而又强大的编程接口及其高性能实现。

三,编程模型

任务的输入和输出都是kv数据集,使用Map和Reduce函数来表示具体运算。

Map函数,由用户编写,输入一个kv数据,生成一系列中间的kv数据集。MapReduce会将中间数据集的values按相同的key(比如I)分组,作为Reduce函数的输入。

Reduce函数,由用户编写,接收一个中间数据的key(比如I)和它拥有的values集合,并将它们合并成一个更小规模的values集合。通常,MapReduce通过迭代器向用户定义的Reduce函数分批提供输入数据,以避免全部数据不能载入内存的情况,最后Reduce函数在每次被调用后,计算得到0个和1个结果作为输出。

当然,针对不同的任务,甚至是相同的任务,不同的用户对kv的定义,都各有不同。比如,词频统计中,k1是文章id,v1是文章text,k2是单词word,v2是词频1。

但是,针对一次map-reduce中原始输入,中间结果和最终输出的k-v类型,却有一个默认的限定。即,map输入的k-v类型和输出的k-v类型不同,reduce输入的k-v类型和输出的k-v相同

四,MapReduce实现

对同一个map-reduce的基本思想,不同的人在不同的系统环境下会有不同的实现。一个建议的依赖准则是系统环境,比如,针对共享内存的多进程map-reduce,针对NUMA多CPU的map-reduce或者针对网络通信的集群map-reduce等等。

谷歌MapReduce的具体实现依赖于普通商用机集群,主要的配置如下:(1)Linux系统,双核x86机器,2-4GB内存;(2)理论网络速度是100M/s或1GB/s,但实际场景下要小得多;(3)成百上千台普通机器组成的集群,机器错误时有发生;(4)基于GFS的分布式文件系统,系统内基于replication来实现软件层面的高可用和高可靠;(5)用户向调度系统提交Job(每个Job包含一组tasks),调度器负责在集群上寻找可用的机器执行Job。

1,实现概述

全部的输入数据被自动并行拆分成M份后,多台机器分别读取M份数据后并行调用Map过程,输出中间数据。Reduce函数将M份中间数据的key值再拆分成R份进行处理。其中,拆分方法和R值由用户定义,通常是按hash取模的实现。具体流程如下:

(1),用户程序端的MapReduce框架首先将输入文件拆分成16M-64M左右大小的M份文件后,fork出多个MapReduce Worker进程,执行指定的MapReduce操作。

(2),被Fork出的进程中,有一个是Master进程,所有的Map或Reduce任务,将由它指派给其他空闲的Worker进程。其中,Map任务有M个,Reduce任务有R个。

(3),如果一个Worker进程被分派为Map任务。它将首先读取一份被拆分的数据,并从数据中解析出原始的key-value值,并将其输入到用户定义的Map函数中。Map函数输出的key-value中间数据被缓存在内存里。

(4),内存中缓存的中间数据会被周期性调用拆分函数,拆分成R份刷入磁盘。写入磁盘的位置会通知给master进程,master会将这些位置信息转交给Reduce Worker。

(5),当Reduce Worker被通知到后,它会使用RPC调用从硬盘内读取Map Worker输出的中间数据结果,并对所有中间数据按key值排序分组。因为要将所有的中间数据分配给若干个Reduce Worker并发执行,排序是必要的。当内存无法满足其数据规模时,可以使用借助文件系统的外部排序。

(6),Reduce Worker将排序后的,按key值分组后的中间数据转交给用户定义的Reduce函数迭代执行。Reduce函数的输出结果会被依次附加到对应这R份中间数据中的特定输出文件中。

(7),当所有Map和Reduce Worker完成后,Master进程唤醒用户程序。此时,MapReduce框架会将控制器交还给用户进程。

(8),该执行过程会成功输出R个输出文件(每个 Reduce任务生成一个,文件名由用户定义)。通常情况下,用户不需要将这R份文件合并成一个,因为它们会被作为下一个MapReduce任务或另一个分布式应用的输入数据,它们会很好的处理这个问题。

2,Master节点数据结构

Master节点有一些特定的数据结构来管理每一个map任务和reduce任务。首先,它会存储3种状态(空闲,就绪,完成),并且会记录每一个非空闲Worker进程的ID。

Master节点就好像map任务和reduce任务中间的导管一样,传递中间数据文件的区域位置。所以,对每一个已完成map任务生成的R个中间数据文件,Master节点都会存储它们的位置和大小,当map任务有更新时也会通知更新Master内的对应数据,Master也会同步增量通知给就绪状态的reduce任务。

3,容错

MapReduce设计上就是在大规模集群上处理TB级数据的框架,容错处理显得尤其重要。

  1. ,Worker错误

Master节点会定期ping Worker节点。如果ping不通,表示worker节点错误。当前节点上的所有map已完成的任务都要被回滚到空闲状态,并对所有其他worker可见。同理,错误Worker上的所有就绪状态的map和reduce任务,也要被回滚到空闲状态,供Master重新调度。

当一个map任务首先被Worker A执行,出错后被Worker B执行。此时所有正在执行对应reduce任务的Worker都会被通知需要重新执行。所有还没有从Worker A读到数据的reduce任务会从Worker B中去读。

MapReduece在设计上就可应对大规模的Worker错误。比如,因集群维护导致的大规模网络不可达就是最常见的错误之一,这时,MapReduce只需要简单地重新调度执行所有不可达节点上已完成的任务,然后继续推进,直到全部完成 。

  1. ,Master错误

Master节点会周期性的checkpoints它所包含的数据。当Master节点失败后,新节点可以非常简单的从最新的一个checkpoints恢复集群。但是,Google当前实现的Master是单点的,如果它发生错误,整个集群的MapReduce计算就会失败。用户可以定义失败后的处理,比如依赖特定条件的重试操作等。

(3),保持确定性的输出结果

当用户定义的map/reduce任务可以对输入数据产生确定性输出时,MapReduce能产生一条确定的执行序列输出一样的最终结果。当某些map/reduce任务的输出不确定时(错误发生),MapReduce可能会生成若干条不同的输出序列,产生不同的结果,但对特定的一条路径,只会输出特定的结果。

为了实现这一点,map和reduce任务的原子提交就最为重要。每一个就绪任务都会将它的输出写入到私有的临时文件,比如,一个reduce任务生成一个文件,一个map任务生成R个文件(每个对应一个reduce任务)。当map任务完成的时候,它会通知master节点,并将R份临时文件的位置也同时送到。如果这是一条全新的消息,Master会记下这些文件位置,否则,直接丢弃。

当reduce任务完成后,它会原子性的重命名它输出的临时文件至最终的文件名。如果在其他机器上也执行完全相同的reduce任务,多个相同的重命名操作会被并发执行,因此,MapReduce所依赖的分布式文件系统GFS必须要能保证原子性的重命名操作,确保只有一个重名操作能被执行。

(4),本地化

网络带宽是稀有资源,所以MapReduce尽量使用集群的本地硬盘来完成数据输入。本地数据存储在GFS上,它将数据按64M分块并在不同的机器上存储多份(通常是3份)。MapReduce的Master节点会优先调度那些本地硬盘上有对应输入数据备份的map任务,当map任务失败,它会尝试调度和对应输入数据处于一个交换机子网内的Map Worker,来节约带宽。当MapReduce集群非常大时,输入数据基本都是从本地读取,基本不消耗网络带宽。

(5),任务粒度

通常Map阶段会并发M份,Reduce阶段并发R份,并且M和R也远大于集群中的Worker数量。当一个Worker可以同时处理多个map或reduce任务时,可有效提高系统的整体负载并加速Worker失败后的恢复效率:因为失败Worker上已完成的任务,可以快速”蔓延“到其他Worker机器上,重新执行。

那么,如果估算M和R的上限呢?对Master节点而言,它必须做出O(M+R)次调度决策并在内存中保存O(M*R)个任务状态,不过O(M*R)的常数项非常小,一对map/reduce任务,大约也就1byte的数据而已。

通常情况下,因为reduce任务都已一个单独的输出文件结束,R通常由用户来指定。在实践中,我们倾向于去选择M的大小,以至于使每个map任务的输入文件都在16M到64M的范围内,来提高”本地化“的效率。而R的选择,通常只是Worker机器数的一个小倍数而已 。比如,谷歌在使用2000台Worker机器的前提下,通常设置M=200000和R=5000。

(6),执行备份任务

MapReduce框架执行时间超长的原因,通常是因为某台机器在执行最后一些map或者reduce任务时,花费了异常超长的计算时间。这可能有很多因素产生,比如,一台机器上磁盘有损坏,一直在做错误校正,导致它的读取性能从30M/s下降至1M/s;或者,混合部署的集群在执行其他分布式系统的任务,导致它执行更消耗CPU,内存,磁盘和网络带宽的MapReduce任务更慢;再或者,一台机器系统初始化的bug,导致CPU缓存被非预期的禁用,导致计算效率下降了100倍。

MapReduce框架有一个通用策略来缓解这个问题。当一个MapReduce操作要完成之前,Master的调度器会将剩余的就绪任务起一个备份同时执行,所以,一个MapReduce的操作完成只需要主任务或备份任务有一个完成即可。谷歌将这一机制优化到只需要额外增加几个百分点资源的消耗,却能将任务的执行时长平均缩减几十个百分点。

4,细节

虽然MapReduce提供的Map和Reduce函数已经足够业务使用了。但还有一些小的函数扩展可以考虑使用。

(1),拆分函数(Partitioning Function)

在map结束后,内存中的数据要被拆分成R份写入磁盘,供R个reduce任务使用,并输出R个文件。“拆分成R份”的工作默认都使用 hash(Key) mod R来实现,虽然它简单有效,但有时候需要更定制化的需求,比如,Key是URL的时候,或许希望将同一个站点的URL都放到一个reduce任务中执行,此时,MapReduce框架提供hash(Hostname(urlkey))来实现目标。

(2),排序函数(Ordering Guarantees)

在给定拆分函数后,拆分后的中间数据会按照key/value升序依次处理,排序函数实现有序的输出文件也非常简单,便于后期查找和检索。

(3),合并函数(Combiner Function)

在某些场景下,map任务中生成中间数据的部分可能和reduce任务很像,比如,词频统计中,map任务不断的生成<word,1>,并通过大量的网络传输给到reduce任务,由reduce汇总得到word的汇总结果。其实大可不必这样,用户可以自定义合并函数,在map任务中就先将<word, 1>汇总一次,再给reduce任务使用,减少网络传输,提高效率。

合并函数可以在每台执行map任务的机器上运行,代码上通常和reduce任务相似,两者的区别是MapReduce框架对两者输出结果的不同处理。合并函数输出到中间文件作为reduce的输入,reduce函数则是直接输出最后的结果文件。

(4),输入输出类型

MapReduce框架支持不同格式的文件输入,比如,txt文件输入,在map任务中通常以文件偏移为key,偏移对应那一行的内容为value。即,用户可以自定义对不同输入类型数据的处理,只需要定义对key-value的拆分,以便输入给map任务即可。并且,如果用户实现reader接口,他甚至可以自定义实现从任何地方读任何格式数据作为输入。

MapReduce框架对输出格式的支持也是一样。

(5),副作用(Side-effects)

用户可以使用MapReduce在输出最终文件的时候,顺便输出一些说明文件。但MapReduce仅支持对写单个输出文件和重命名的原子操作。如果一个map/reduce任务想要生成多个输出文件的话,就需要自己保证文件一致性操作,因为MapReduce框架不支持对多个输出文件的原子化“两阶段提交”。

(6),忽略“坏”数据

通常用户程序的bug可能会导致MapReduce框架崩溃,虽然最常见的解决方法是修复bug,但可能bug来源于三方库,那怎么办呢?MapReduce提供一种机制,可以忽略“坏”数据,这在一些大数据统计场景下非常有用。

当Worker进程内置的信号处理器捕获到用户程序因为段错误或者其他bug产生信号时,会发送一个包含一个序列号的“last gasp”UDP包到Master节点。当Master节点发现某一条数据存在多个失败时,它会在下一次重新调度Map/Reduce任务的时候会通知跳过这条数据。

(7),本地扩展

因为所有计算都发生在分布式系统上,任务又是靠Master节点动态调度完成,当机器数量很多时,map任务和reduce任务的debug非常麻烦。MapReduce框架提供一种可选的实现,可以在单机顺序执行所有map/reduce的工作,帮助用户在本地debug,profile和small-scale testing。用户甚至可以只执行特定的map任务并使用gdb调试它。

(8),状态信息

Master节点内部还开启了一个HTTP服务,提供一系列的状态信息供用户阅读。比如,当前计算的进度,就绪任务数,输出文件链接,输入输出文件大小,标准错误等等。用户可以评估当前计算任务的状态是否符合预期。同时,在顶层的状态页,Master还会展示哪一个 Worker在处理哪一个具体的map/reduce任务的时候,发生了错误,供用户调试。

(9),计数器

MapReduce框架提供一个计数器包来统计各类事件的出现次数。用户可以在代码内创建一个计数器对象,并在指定的map/reduce任务中递增即可。每一个Worker中统计的结果会定期发给Master,由Master汇总后再返回给用户代码。Master保证计数正确,比如,忽略重复执行的map/reduce任务等等。重复执行通常发生在“执行备份任务”或“Worker失败重试”中。框架会维护某些特定的计数器,比如,输入key-value数和输出key-value数等等。用户可以通过自定义的计数器在代码内assert逻辑。

5,性能

MapReduce将演示2个任务的执行效率:一个是从TB级数据中匹配特定模式(Grep);另一个是对TB级数据排序(Sort)。这2个任务基本包含了:在计算机内载入数据,并筛选特定数据的所有场景。

(1),集群配置

1800台机器。实验在周末的下午完成,计算机资源比较空闲。

每台机器的配置: 2个Intel 2GHz CPU,开启超线程;4G内存,大概有1-1.15G是供集群中的其他业务使用;2个160GB的IDE硬盘和千兆网卡。

所有机器位于一个2层树状的交换机网络,其中汇聚节点拥有100-200Gbps的网络带宽,并且放置在同一个机房,机房内机器间的消息往返时间小于1ms。

(2),Grep

在TB级数据内匹配某个稀有模式(3个字符,大约92337条匹配)。输入数据被分成15000份,每份64M(M=15000),最终输出到一个文件中(R=1)。

图片中纵轴表示输入数据的载入速度。当数据被分配到更多机器执行MapReduce计算时,载入速度逐渐提升至最高30GB/s,此时有1764个Worker在工作。当map任务完成后,速度开始下降,直到第80s的时候回到0。整个计算过程花费150秒,包含了一分钟左右的预热时间,包括MapReduce程序在不同Worker中传递和GFS打开超过1000个输入文件读取信息以期进行“本地化”优化的延迟等。

(3),Sort

对TB级数据排序,代码不到50行。其中3行是Map函数,负责从每行文本中提取10个字节的key用于排序,并以key-value的形式保存为中间数据。Reduce函数使用内置的Identity功能,直接输出中间输出作为最终的结果,在GFS上保存2份。

同样,MapReduce任务中,M=15000,R设置为4000,即将排序结果输出到4000个文件中。拆分函数以key值的第一个字节作为依据,并按不同的分布采样排序结果的分割点。

图中第一行表示输入数据的读取速度。不同于Grep任务,Sort任务读取输入数据的峰值是13GB/s,200秒左右排序完成。读取峰值较低的原因是排序任务要花将近一半的时间在中间数据读取的IO上,而Grep则没有这个问题。

图中第二行表示Map任务完成后,中间数据通过网络传递给Reduce任务的速度。一旦第一个Map任务完成,就开始有数据传输。第一个峰值发生在1700台机器上的Reduce任务开始执行,大概300秒后,第一批Reduce任务完成,开始执行剩下的Reduce任务,最终花费600秒时间。

图中第三行表示排序后将结果输出到文件的速度。输出发生在Shuffle第一个驼峰后的几秒,因为机器正忙于对中间数据进行排序,数据的写入速率保持在2-4GB/s,最终在第850秒的时候全部完成。包含启动的预热时间在内,总计891秒的排序时间。与当前最高效的排序框架TeraSort的1057秒的速度接近。

注意,输入数据的速度最快,是因为我们有“本地化”的优化,大多数数据都是从本地磁盘读取。shuffle速度第二,是因为输出的结果文件要在GFS上写2份,来保证数据的可靠性和可用性。如果文件系统使用擦除编码(erasure coding)的话,相比replication,对网络带宽的要求会进一步降低。

(4),备份任务的优化

在不开启备份任务的执行下,图形与普通类似,最终完成的时间显著增长。直到第960秒,还有5个Reduce任务尚未完成。直到300秒后,最后一个Reduce任务才彻底结束。整个过程花费1283秒,比开启备份任务的情况下,延长44%。

(5),任务出错的优化

在1746个Worker机器执行一段时间后,特意kill掉200台机器上的处理任务,来模拟在排序过程中机器出错的情况。Master调度器会立刻在这些机器上重启任务开始执行。

之所以存在负的输入速度,是因为之前已经完成的Map任务被杀掉了,需要重新执行。最终的执行时间是933秒,比正常情况只延长5%。

6,经验

MapReduce最大的好处使用简单的程序,就可以在一个大的分布式集群中执行计算任务。在Google中,MapReduce重构了生产环境中的索引系统,为搜索提供可靠服务。相比重构前的系统,MapReduce的优势如下:

(1),索引代码更简单,小巧和易于理解。因为容错,分布式和并行处理都丢给MapReduce,业务代码从3800行C++减少为700行。

(2),性能更好,逻辑更解耦,使得扩展和优化更简单。老系统上可能需要几个月的优化工作在MapReduce上只需几天就能搞定。

(3),运维更简单。大量的机器出错,机器太慢和网络冲突都被MapReduce接管后自动解决,同时,只需扩展机器就能提升整个索引系统的性能。

7,类似的框架

(1),很多系统提供一套受限的编程框架来解决并行问题,比如,Bulk同步编程和某些MPI原语对并行编程提供高层抽象。但MapReduce简化和提炼了它们。更重要的是,MapReduce还在大规模集群上原生提供一套容错机制,这是拥有并行能力的框架所不具备的。

(2),MapReduce“本地化”的灵感来自于active disks技术,它们会将计算推送至距离本地硬盘更近的计算单元,减少对网络带宽的消耗。MapReduce则更进一步,直接在拥有数据的机器上执行计算。实现不同,思想类似。

(3),MapReduce“备份任务”的灵感与在Charlotte系统中的eager调度机制类似,但eager调度机器可能会重复执行出错的任务,致使整个任务失败。MapReduce则默认会跳过一些错误数据。

(4),MapReduce所依赖的集群管理系统的实现与Condor的集群管理思想类似。

(5),MapReduce排序功能的实现参考了NOW-Sort。即,Map任务将数据拆分成能排序的分组,并发送给其中一个Reduce任务在内存中执行排序。但NOW-Sort不能自定义Map和Reduce函数,使用场景更有限。

(6),River提供的编程模型是通过消息队列来完成任务的调度,为了在异构的硬件环境和系统扰动的前提下,提供更短的平均计算时间,River会小心的在磁盘和网络传输中平衡完成时间。MapReduce则通过受限的编程模型提供另一种解决方案,它将一个大问题拆分成更多的小问题,并在所有可利用的机器上动态调度小问题的任务,使得更快的机器可以执行更多的任务,甚至当错误发生时,也可以在更接近任务所需数据的地方重新调度,减少完成时间。

(7),BAD-FS则使用一种完全不同的编程模型,它的任务执行发生在广域网网络中。不过,MapReduce和它有2个相似点:使用重复调度来恢复机器失败的问题;使用“本地化”减少在拥塞网络环境下的网络传输。

(8),最后,使用重复调度(re-execution)来容错的方案也被TACC所采用。

8,结论

MapReduce在Google内部获得成功有3个原因:

(1),受限制的编程模型,使得在大规模集群运算中易于使用,因为它隐藏了几乎所有分布式细节(并行,容错,本地化和负载均衡)。

(2),有限的网络带宽,使得MapReduce着重优化网络传输,比如,使用本地磁盘读取数据,完成Map任务后在本地磁盘写入中间数据等。

(3),重复调度(re-execution,redundant execution)减少慢机器的影响(备份任务)和解决机器错误或数据丢失。

9,参考文章

(1),pdos.csail.mit.edu/6.82