Spark開發指南

總的來說,每一個Spark的應用,都是由一個驅動程式(driver program)構成,它運行用戶的main函數,在一個集群上執行各種各樣的並行操作。Spark提出的最主要抽象概念是彈性分散式數據集 (resilient distributed dataset,RDD),它是元素的集合,劃分到集群的各個節點上,可以被並行操作。RDDs的創建可以從HDFS(或者任意其他支援Hadoop文件系統) 上的一個文件開始,或者通過轉換驅動程式(driver program)中已存在的Scala集合而來。用戶也可以讓Spark保留一個RDD在記憶體中,使其能在並行操作中被有效的重複使用。最後,RDD能自動從節點故障中恢復。

    Spark的第二個抽象概念是共享變數(shared variables),可以在並行操作中使用。在默認情況下,Spark通過不同節點上的一系列任務來運行一個函數,它將每一個函數中用到的變數的拷貝傳遞到每一個任務中。有時候,一個變數需要在任務之間,或任務與驅動程式之間被共享。Spark 支援兩種類型的共享變數:廣播變數(broadcast variables),可以在記憶體的所有的結點上快取變數;累加器(accumulators):只能用於做加法的變數,例如計數或求和。

    本指南將展示這些特性,並給出一些例子。讀者最好比較熟悉Scala,尤其是閉包的語法。請留意,你也可以通過spark-shell腳本,來互動式地運行Spark。我們建議你在接下來的步驟中這樣做。

2 接入Spark

    Spark 1.6.1 需要搭配使用 Scala 2.10. 如果你用Scala 來編寫應用,你需要相適應的版本的Scala(2.10.X或者更高版本).要寫一個Spark 應用,你需要給它加上Spark的依賴。如果你使用SBT或者Maven,Spark可以通過Maven中心庫來獲得:

groupId = org.apache.spark  artifactId = spark-core_2.10  version = 1.6.1

    另外,如果你想訪問一個HDFS集群,你需要根據你的HDFS版本,添加一個hadoop-client的依賴, HDFS的版本可以在third party distributions找到:

groupId = org.apache.hadoop  artifactId = hadoop-client  version = 

最後,你需要將一些Spark的類和隱式轉換導入到你的程式中。通過如下語句:

import org.apache.spark.SparkContext  import org.apache.spark.SparkConf

    Spark 1.6.1 可以運行在Java 6及以上版本。 如果你使用Java 8, Spark支援Lambda表達式來代替實現function匿名類,否則你還是需要使用org.apache.spark.api.java.function 包下的function類.如果你想用Java來編寫Spark應用程式,你需要添加Spark依賴,maven版本依賴如下:

groupId = org.apache.spark  artifactId = spark-core_2.10  version = 1.6.1

3 初始化Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

    Spark程式需要做的第一件事情,就是創建一個SparkContext對象,它將告訴Spark如何訪問一個集群。創建一個SparkContext對象,你需要創建一個包含你應用資訊的SparkConf對象,把它傳給JavaSparkContext 。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);  JavaSparkContext sc = new JavaSparkContext(conf);

    「appName」參數是你的應用名字,它展示在集群的UI上面。master是一個Spark, Mesos or YARN cluster URL,或者local模式運行的特殊字元串「local」。 實踐中,當程式運行在集群中時,不需要在程式中硬編碼master,而是使用spark-submit啟動應用. 然而對於本地測試和單元測試,你需要將"local"傳給Spark。

3.1 使用Shell

   使用Spark shell時, 一個特殊的互動式的SparkContext已經為你創建, 叫做sc變數. 你自己的SparkContext不會工作. 你可以使用–master參數指定context連接的master。你可以通過–jar參數增加外部jar. 例如運行bin/spark-shell在四個core上:

$ ./bin/spark-shell --master local[4]

也可以增加code.jar:

$ ./bin/spark-shell --master local[4] --jars code.jar

To include a dependency using maven coordinates:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

運行spark-shell --help查看更多的參數。, spark-shell invokes the more general spark-submit script.

4 彈性分散式數據集RDD

Spark圍繞的概念是彈性分散式數據集(RDD),這是一個有容錯機制並可以被並行操作的元素集合。目前有兩種方式創建RDD:並行集合(Parallelized Collections):接收一個已經存在的Scala集合,然後進行各種並行計算。 或者引用一個外部存儲系統的數據集,比如共享文件系統,HDFS, HBase 或者hadoop支援的任意存儲系統即可。

4.1並行集合(Parallelized Collections)

並行集合是通過調用SparkContext的parallelize方法,在一個已經存在的Scala集合上創建的。集合的對象將會被拷貝,創建出一個可以被並行操作的分散式數據集。例如,下面的解釋器輸出,演示了如何從一個數組(1到5)創建一個並行集合:

val data = Array(1, 2, 3, 4, 5)  val distData = sc.parallelize(data)

一旦分散式數據集(distData)被創建好,它們將可以被並行操作。例如,我們可以調用distData.reduce((a, b) => a + b)來將數組的元素相加。我們會在後續的分散式數據集運算中進一步描述。

並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集群上為每一份數據起一個任務。典型地,你可以在集群的每個CPU上分布2-4個slices. 一般來說,Spark會嘗試根據集群的狀況,來自動設定slices的數目。然而,你也可以通過傳遞給parallelize的第二個參數來進行手動設置。(例如:sc.parallelize(data, 10)).

4.2 外部數據集(External Datasets)

Spark可以從Hadoop支援的文件系統創建數據集, 包括本地文件,HDFS,Cassandra,HBase,amazon S3等。Spark可以支援TextFile,SequenceFiles以及其它任何Hadoop輸入格式。

Text file的RDDs可以通過SparkContext』s textFile的方式創建,該方法接受一個文件的URI地址(或者機器上的一個本地路徑,或者一個hdfs://, sdn://,kfs://,其它URI). 下面是一個調用例子:

scala> val distFile = sc.textFile("data.txt")  distFile: RDD[String] = MappedRDD@1d4cee08

一旦創建完成,distFile可以被進行數據集操作。例如,我們可以通過使用如下的map和reduce操作:distFile.map(s => s.length).reduce((a, b) => a + b)將所有數據行的長度相加。 讀取文件時的一些注意點:

  1. 如果使用本地文件系統,必須確保每個節點都能自己節點的此路徑下訪問相同的文件。 可以將文件複製到所有的worker上或者使用網 絡共享文件系統。
  2. Spark所有的文件輸入方法,包括textFile,支援文件夾,壓縮文件和通配符。 比如你可以使用textFile("/my/directory"), textFile("/my/directory/.txt")和 textFile("/my/directory/.gz")。
  3. textFile方法也可以通過輸入一個可選的第二參數,來控制文件的分片數目。默認情況下,Spark為每一塊文件創建一個分片(HDFS默認的塊大小為64MB),但是你也可以通過傳入一個更大的值,來指定一個更高的片值。注意,你不能指定一個比塊數更小的片值。

除了文本文件,Spark Scala API 也支援其它數據格式:

  1. SparkContext.wholeTextFiles允許你讀取文件夾下所有的文件,比如多個小的文本文件, 返迴文件名/內容對。
  2. 對於SequenceFiles,可以使用SparkContext的sequenceFile[K, V]方法創建,其中K和V是文件中的key和values的類型。像IntWritable和Text一樣,它們必須是Hadoop的Writable interface的子類。另外,對於幾種通用Writable類型,Spark允許你指定原生類型來替代。例如:sequencFile[Int, String]將會自動讀取IntWritable和Texts。
  3. 對於其他類型的Hadoop輸入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意類型的JobConf和輸入格式類,鍵類型和值類型。按照像Hadoop作業一樣的方法,來設置輸入源就可以了。你也可以使用SparkContext.newHadoopRDD, 它基於新的MapReduce API(org.apache.hadoop.mapreduce).
  4. RDD.saveAsObjectFile and SparkContext.objectFile支援保存RDD為一個簡單格式, 包含序列化的Java對象. 儘管這不是一個高效的格式,比如Avro, 但是它提供了一個容易的方式來保存RDD。

4.3 RDD的操作

RDD支援兩種操作:轉換(transformation)從現有的數據集創建一個新的數據集;而動作(actions)在數據集上運行計算後,返回一個值給驅動程式。 例如,map就是一種轉換,它將數據集每一個元素都傳遞給函數,並返回一個新的分布數據集表示結果。另一方面,reduce是一種動作,通過一些函數將所有的元素疊加起來,並將最終結果返回給Driver程式。(不過還有一個並行的reduceByKey,能返回一個分散式數據集)

Spark中的所有轉換都是惰性的,也就是說,他們並不會直接計算結果。相反的,它們只是記住應用到基礎數據集(例如一個文件)上的這些轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這個設計讓Spark更加有效率的運行。例如,我們可以實現:通過map創建的一個新數據集,並在reduce中使用,最終只返回reduce的結果給driver,而不是整個大的新數據集。

默認情況下,每一個轉換過的RDD都會在你在它之上執行一個動作時被重新計算。不過,你也可以使用persist(或者cache)方法,持久化一個RDD在記憶體中。在這種情況下,Spark將會在集群中,保存相關元素,下次你查詢這個RDD時,它將能更快速訪問。在磁碟上持久化數據集,或在集群間複製數據集也是支援的。

4.3.1 基礎操作

下面的程式碼演示了RDD的基本操作:

val lines = sc.textFile("data.txt")  val lineLengths = lines.map(s => s.length)  val totalLength = lineLengths.reduce((a, b) => a + b)

第一行從一個外部文件創建了一個基本的RDD對象。這個數據集並沒有載入到記憶體中,行只不過是一個指向文件的指針. 程式碼第二行定義行長度作為mao的結果, 行長度由於惰性設計並沒有立即計算。最終 當我們運行reduce,這是一個action。 這時Spark將計算分解成運行在各個節點的任務。 每個節點運行它的map部分以及一個本地的reduction, 並僅將它的結果返回給驅動程式。

如果你想再使用行長度,我們可以在reduce之前增加:

lineLengths.persist()

它可以在lineLengths第一次計算之前被保存在記憶體中。

4.3.2將function對象傳給Spark

Spark API非常依賴在集群中運行的驅動程式中傳遞function, 對於Scala來說有兩種方式實現:

  1. 匿名函數語法(Anonymous function syntax), 可以用作簡短的程式碼。
  2. 全局單例對象的靜態方法(Static methods in a global singleton object). 例如,你可以定義MyFunctions對象,傳遞MyFunctions.func1, 如下所示:
object MyFunctions {    def func1(s: String): String = { ... }  }    myRdd.map(MyFunctions.func1)

4.3.3Working with Key-Value Pairs

大部分的Spark操作可以包含任意類似的對象,而一些特殊的操作只能操作鍵值對的RDD。 最有代表性的是「shuffle」操作, 比如根據鍵分組或者聚合元素。 在Scala中,這些操作可以使用包含Tuple2 元素的RDD(Scala內建的tuple類型,只需(a, b)就可創建此類型的對象), 比需要import org.apache.spark.SparkContext._ 允許Spark隱式轉換. 可以在PairRDDFunctions上應用鍵值對操作。

舉例來說,下面的程式碼使用reduceByKey操作來計算行在文件中出現了多少次:

val lines = sc.textFile("data.txt")  val pairs = lines.map(s => (s, 1))  val counts = pairs.reduceByKey((a, b) => a + b)

我們也可以使用counts.sortByKey(),例如按照字幕順序排序然後使用counts.collect()繼續將它們作為驅動程式的一個數組對象。

注意: 當使用訂製對象作為鍵時,必須保證equals() 和hashCode() 方法一致.

4.3.4 轉換

下面的列表列出了一些通用的轉換。 請參考 RDD API doc (ScalaJava, Python) 和 pair RDD functions doc (ScalaJava) 了解細節.

轉換

含義

map(func)

返回一個新分散式數據集,由每一個輸入元素經過func函數轉換後組成

filter(func)

返回一個新數據集,由經過func函數計算後返回值為true的輸入元素組成

flatMap(func)

類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

mapPartitions(func)

類似於map,但獨立地在RDD的每一個分塊上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似於mapPartitions, 但func帶有一個整數參數表示分塊的索引值。因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例,對數據進行取樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)

and pair RDD functions doc (Scala, Java) for details.

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path) (Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path) (Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark』s mechanism for re-distributing data so that it』s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple – the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition – thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key – this is called the shuffle.

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it』s possible to use:

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

Operations which can cause a shuffle include repartition operations like repartition and coalesce, 『ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks – map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark』s map and reduce operations.

Internally, results from individual map tasks are kept in memory until they can』t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don』t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the 『Shuffle Behavior』 section within the Spark Configuration Guide.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark』s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level

Meaning

MEMORY_ONLY

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY

Store the RDD partitions only on disk.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)

Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Which Storage Level to Choose?

Spark』s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
  • Don』t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
  • In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
    • It allows multiple executors to share the same pool of memory in Tachyon.
    • It significantly reduces garbage collection costs.
    • Cached data is not lost if individual executors crash.

Removing Data

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed 「shuffle」 operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))  broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)    scala> broadcastVar.value  res0: Array[Int] = Array(1, 2, 3)
Broadcast broadcastVar = sc.broadcast(new int[] {1, 2, 3});    broadcastVar.value();  // returns [1, 2, 3]
>>> broadcastVar = sc.broadcast([1, 2, 3])      >>> broadcastVar.value  [1, 2, 3]

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

Accumulators

Accumulators are variables that are only 「added」 to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark』s UI. This can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator』s value, using its value method.

The code below shows an accumulator being used to add up the elements of an array:

scala> val accum = sc.accumulator(0, "My Accumulator")  accum: spark.Accumulator[Int] = 0    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)  ...  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s    scala> accum.value  res2: Int = 10

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a 「zero value」 for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {    def zero(initialValue: Vector): Vector = {      Vector.zeros(initialValue.size)    }    def addInPlace(v1: Vector, v2: Vector): Vector = {      v1 += v2    }  }    // Then, create an Accumulator of this type:  val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

In Scala, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements), and the SparkContext.accumulableCollection method for accumulating common Scala collection types.

Accumulator accum = sc.accumulator(0);    sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));  // ...  // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s    accum.value();  // returns 10

While this code used the built-in support for accumulators of type Integer, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a 「zero value」 for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam implements AccumulatorParam {    public Vector zero(Vector initialValue) {      return Vector.zeros(initialValue.size());    }    public Vector addInPlace(Vector v1, Vector v2) {      v1.addInPlace(v2); return v1;    }  }    // Then, create an Accumulator of this type:  Accumulator vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

In Java, Spark also supports the more general Accumulable interface to accumulate data where the resulting type is not the same as the elements added (e.g. build a list by collecting together elements).

>>> accum = sc.accumulator(0)  Accumulator    >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))  ...  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s    scala> accum.value  10

While this code used the built-in support for accumulators of type Int, programmers can also create their own types by subclassing AccumulatorParam. The AccumulatorParam interface has two methods: zero for providing a 「zero value」 for your data type, and addInPlace for adding two values together. For example, supposing we had a Vector class representing mathematical vectors, we could write:

class VectorAccumulatorParam(AccumulatorParam):      def zero(self, initialValue):          return Vector.zeros(initialValue.size)        def addInPlace(self, v1, v2):          v1 += v2          return v1    # Then, create an Accumulator of this type:  vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

For accumulator updates performed inside actions only, Spark guarantees that each task』s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task』s update may be applied more than once if tasks or job stages are re-executed.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

val accum = sc.accumulator(0)  data.map { x => accum += x; f(x) }  // Here, accum is still 0 because no actions have caused the map to be computed.
Accumulator accum = sc.accumulator(0);  data.map(x -> { accum.add(x); return f(x); });  // Here, accum is still 0 because no actions have caused the `map` to be computed.
accum = sc.accumulator(0)  def g(x):    accum.add(x)    return f(x)  data.map(g)  # Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster

The application submission guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script lets you submit it to any supported cluster manager.

Launching Spark jobs from Java / Scala

The org.apache.spark.launcher package provides classes for launching Spark jobs as child processes using a simple Java API.

Unit Testing

Spark is friendly to unit testing with any popular unit test framework. Simply create a SparkContext in your test with the master URL set to local, run your operations, and then call SparkContext.stop() to tear it down. Make sure you stop the context within a finally block or the test framework』s tearDown method, as Spark does not support two contexts running concurrently in the same program.

Migrating from pre-1.0 Versions of Spark

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked 「experimental」 or 「developer API」 will be supported in future versions. The only change for Scala users is that the grouping operations, e.g. groupByKey, cogroup and join, have changed from returning (Key, Seq[Value]) pairs to (Key, Iterable[Value]).

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked 「experimental」 or 「developer API」 will be supported in future versions. Several changes were made to the Java API:

  • The Function classes in org.apache.spark.api.java.function became interfaces in 1.0, meaning that old code that extends Function should implement Function instead.
  • New variants of the map transformations, like mapToPair and mapToDouble, were added to create RDDs of special data types.
  • Grouping operations like groupByKey, cogroup and join have changed from returning (Key, List) pairs to (Key, Iterable).

Spark 1.0 freezes the API of Spark Core for the 1.X series, in that any API available today that is not marked 「experimental」 or 「developer API」 will be supported in future versions. The only change for Python users is that the grouping operations, e.g. groupByKey, cogroup and join, have changed from returning (key, list of values) pairs to (key, iterable of values).

Migration guides are also available for Spark Streaming, MLlib and GraphX.

Where to Go from Here

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run Java and Scala examples by passing the class name to Spark』s bin/run-example script; for instance:

./bin/run-example SparkPi

For Python examples, use spark-submit instead:

./bin/spark-submit examples/src/main/python/pi.py

For R examples, use spark-submit instead:

./bin/spark-submit examples/src/main/r/dataframe.R

For help on optimizing your programs, the configuration and tuning guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. For help on deploying, the cluster mode overview describes the components involved in distributed operation and supported cluster managers.

Finally, full API documentation is available in Scala, Java, Python and R.

本文參考:http://colobu.com/2014/12/08/spark-programming-guide/