7 JavaScala中启动Spark作业(Launching Spark jobs from Java Scala)

Spark官方文档 - 中文翻译

Spark版本:1.6.0
1 概述(Overview)

总体来讲,每一个Spark驱动程序应用都由一个驱动程序组成,该驱动程序包含一个由用户编写的main方法,该方法会在集群上并行执行一些列并行计算操作。Spark最重要的一个概念是弹性分布式数据集,简称RDD(resilient distributed dataset )。RDD是一个数据容器,它将分布在集群上各个节点上的数据抽象为一个数据集,并且RDD能够进行一系列的并行计算操作。可以将RDD理解为一个分布式的List,该List的数据为分布在各个节点上的数据。RDD通过读取Hadoop文件系统中的一个文件进行创建,也可以由一个RDD经过转换得到。用户也可以将RDD缓存至内存,从而高效的处理RDD,提高计算效率。另外,RDD有良好的容错机制。

Spark另外一个重要的概念是共享变量(shared variables)。在并行计算时,可以方便的使用共享变量。在默认情况下,执行Spark任务时会在多个节点上并行执行多个task,Spark将每个变量的副本分发给各个task。在一些场景下,需要一个能够在各个task间共享的变量。Spark支持两种类型的共享变量:

  • 广播变量(broadcast variables):将一个只读变量缓存到集群的每个节点上。例如,将一份数据的只读缓存分发到每个节点。

  • 累加变量(accumulators):只允许add操作,用于计数、求和。

2 引入Spark(Linking with Spark)

在Spark 1.6.0上编写应用程序,支持使用Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使用Java 8,支持lambda表达式(lambda expressions)。

在编写Spark应用时,需要在Maven依赖中添加Spark,Spark的Maven Central为:

groupId = org.apache.spark
artifactId = spark-core2.10
version = 1.6.0

另外,如果Spark应用中需要访问HDFS集群,则需要在hadoop-client中添加对应版本的HDFS依赖:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,需要在程序中添加Spark类。代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext. 
3 初始化Spark(Initializing Spark)

使用Scala编写Spark程序的需要做的第一件事就是创建一个SparkContext对象(使用Java语言时创建JavaSparkContext)。SparkContext对象指定了Spark应用访问集群的方式。创建SparkContext需要先创建一个SparkConf对象,SparkConf对象包含了Spark应用的一些列信息。代码如下:

  • Scala
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
spark-submit

3.1 使用Spark Shell(Using the Shell)

–master–jars–packages–repositories
  • 本地模式下,使用4个核运行Spark程序:
\( ./bin/spark-shell --master local[4]<br/>
</code></pre>
<ul><li>将code.jar包添加到classpath:</li></ul>
<pre><code>\) ./bin/spark-shell –master local[4] –jars code.jar
  • 使用Maven坐标添加一个依赖:
$ ./bin/spark-shell –master local[4] –packages “org.example:example:0.1”
spark-shell –help
4 弹性分布式数据集(RDDs)

Spark最重要的一个概念就是RDD,RDD是一个有容错机制的元素容器,它可以进行并行运算操作。得到RDD的方式有两个:

  • 通过并行化驱动程序中已有的一个集合而获得
  • 通过外部存储系统(例如共享的文件系统、HDFS、HBase等)的数据集进行创建

4.1 并行集合(Parallelized Collections)

在驱动程序中,在一个已经存在的集合上(例如一个Scala的Seq)调用SparkContext的parallelize方法可以创建一个并行集合。集合里的元素将被复制到一个可被并行操作的分布式数据集中。下面为并行化一个保存数字1到5的集合示例:

  • Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • Java
List&lt;Integer&gt; data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD&lt;Integer&gt; distData = sc.parallelize(data);
distData.reduce((a,b) =&gt; a + b)
sc.parallelize(data, 10)

4.2 外部数据库(External Datasets)

Spark可以通过Hadoop支持的外部数据源创建分布式数据集,Hadoop支持的数据源有本地文件系统、HDFS、Cassandra、HBase、Amazon S3、Spark支持的文本文件、SequenceFiles、Hadoop InputFormat。

SparkContext的testFile方法可以创建文本文件RDD。使用这个方法需要传递文本文件的URI,URI可以为本机文件路径、hdfs://、s3n://等。该方法读取文本文件的每一行至容器中。示例如下:

  • Scala
scala&gt; val distFile = sc.textFile(“data.txt”)
distFile: RDD[String] = MappedRDD@1d4cee08
  • Java
JavaRDD&lt;String&gt; distFile = sc.textFile(“data.txt”);
distFile.map(s =&gt; s.length).reduce((a, b) =&gt; a + b)
textFiletextFile(“/my/directory”) textFile(“/my/directory/.txt”)textFile(“/my/directory/.gz”)

除了文本文件之外,Spark还支持其它的数据格式:

SparkContext.wholeTextFilessequenceFile[K,V]sequenceFile[Int,String]SparkContext.hadoopRDDorg.apache.hadoop.mapreduceSparkContext.newAPIHadoopRDDSparkContext.newHadoopRDDRDD.saveAsObjectFileSparkContext.objectFile

4.3 RDD操作(RDD Operations)

RDD支持两种类型的操作:

  • transformation:从一个RDD转换为一个新的RDD。
  • action:基于一个数据集进行运算,并返回RDD。

例如,map是一个transformation操作,map将数据集的每一个元素按指定的函数转换为一个RDD返回。reduce是一个action操作,reduce将RDD的所有元素按指定的函数进行聚合并返回结果给驱动程序(还有一个并行的reduceByKey能够返回一个分布式的数据集)。

Spark的所有transformation操作都是懒执行,它们并不立马执行,而是先记录对数据集的一系列transformation操作。在执行一个需要执行一个action操作时,会执行该数据集上所有的transformation操作,然后返回结果。这种设计让Spark的运算更加高效,例如,对一个数据集map操作之后使用reduce只返回结果,而不返回庞大的map运算的结果集。

persistcache

4.3.1 基础(Basics)

参考下面的程序,了解RDD的基本轮廓:

  • Scala
val lines = sc.textFile(“data.txt”)
val lineLengths = lines.map(s =&gt; s.length)
val totalLength = lineLengths.reduce((a, b) =&gt; a + b)
  • Java
JavaRDD&lt;String&gt; lines = sc.textFile(“data.txt”);
JavaRDD&lt;Integer&gt; lineLengths = lines.map(s -&gt; s.length());
int totalLength = lineLengths.reduce((a, b) -&gt; a + b);

第一行通过读取一个文件创建了一个基本的RDD。这个数据集没有加载到内存,也没有进行其他的操作,变量lines仅仅是一个指向文件的指针。第二行为transformation操作map的结果。此时lineLengths也没有进行运算,因为map操作为懒执行。最后,执行action操作reduce。此时Spark将运算分隔成多个任务分发给多个机器,每个机器执行各自部分的map并进行本地reduce,最后返回运行结果给驱动程序。

如果在后面的运算中仍会用到lineLengths,可以将其缓存,在reduce操作之前添加如下代码,该persist操作将在lineLengths第一次被计算得到后将其缓存到内存:

  • Scala
lineLengths.persist()
  • Java
lineLengths.persist(StorageLevel.MEMORYONLY());

4.3.2 把函数传递到Spark(Passing Functions to Spark)

MyFunction.func1
object MyFunctions {
def func1(s: String): String = { … }
} myRdd.map(MyFunctions.func1)

注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象),在传递方法的时候,应该同时传递包含该方法的类对象。举个例子:

class MyClass {
def func1(s: String): String = { … }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

上面示例中,如果我们创建了一个类实例new MyClass,并且调用了实例的doStuff方法,该方法中的map操作调用了这个MyClass实例的func1方法,所以需要将整个对象传递到集群中。类似于写成:rdd.map(x=&gt;this.func1(x))。

类似地,访问外部对象的字段时将引用整个对象:

class MyClass {
val field = “Hello”
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x =&gt; field + x) }
}

等同于写成rdd.map(x=&gt;this.field+x),引用了整个this。为了避免这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
val field
= this.field
rdd.map(x =&gt; field_ + x)
}
  • Java

    Spark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。在Java中,函数由那些实现了org.apache.spark.api.java.function包中的接口的类表示。有两种创建这样的函数的方式:
    • 在你自己的类中实现Function接口,可以是匿名内部类,或者命名类,并且传递类的一个实例到Spark。
    • 在Java8中,使用lambda表达式来简明地定义函数的实现。

为了保持简洁性,本指南中大量使用了lambda语法,这在长格式中很容易使用所有相同的APIs。比如,我们可以把上面的代码写成:

JavaRDD&lt;String&gt;  lines = sc.textFile(“data.txt”);
JavaRDD lineLengths = lines.map(new Function Integer&gt;() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2 Integer, Integer&gt;() {
public Integer call(Integer a, Integer b) { return a + b; }
});

同样的功能,使用内联式的实现显得更为笨重繁琐,代码如下:

class GetLength implements Function Integer&gt; {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2 Integer, Integer&gt; {
public Integer call(Integer a, Integer b) { return a + b; }
} JavaRDD lines = sc.textFile(“data.txt”);
JavaRDD lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

注意,java中的内部匿名类,只要带有final关键字,就可以访问类范围内的变量。Spark也会把变量复制到每一个worker节点。

4.3.3 理解闭包(Understanding closures)

foreach()

4.3.3.1 示例(Example)

local–master = local[n]spark-submit
  • Scala
var counter = 0
var rdd = sc.parallelize(data) // Wrong: Don‘t do this!!
rdd.foreach(x =&gt; counter += x) println(“Counter value: ” + counter)
  • Java
int counter = 0;
JavaRDD&lt;Integer&gt; rdd = sc.parallelize(data); // Wrong: Don’t do this!!
rdd.foreach(x -&gt; counter += x); println(“Counter value: ” + counter);

4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes)

在本地模式下仅有一个JVM,上面的代码将直接计算RDD中元素和,并存储到counter中。此时RDD和变量counter都在driver节点的同一内存空间中。

然而,在集群模式下,情况会变得复杂,上面的代码并不会按照预期的方式执行。为了执行这个job,Spark把处理RDD的操作分割成多个任务,每个任务将被一个executor处理。在执行之前,Spark首先计算闭包(closure)。闭包是必须对executor可见的变量和方法,在对RDD进行运算时将会用到这些变量和方法(在本例子中指foreach())。这个闭包会被序列化,并发送给每个executor。在local模式下,只有一个executor,所以所有的变量和方法都使用同一个闭包。在其他模式下情况跟local模式不一样,每个executor在不同的worker节点上运行,每个executor都有一个单独的闭包。

在这里,发送给每个executor的闭包内的变量是当前变量的副本,因此当counter在foreach中被引用时,已经不是在driver节点上的counter了。在driver节点的内存中仍然有一个counter,但这个counter对executors不可见。executor只能操作序列化的闭包中的counter副本。因此,最终counter的值仍然是0,因为所有对counter的操作都是在序列化的闭包内的counter上进行的。

在类似这种场景下,为了保证良好的行为确保,应该使用累加器。Spark中的累加器专门为在集群中多个节点间更新变量提供了一种安全机制。在本手册的累加器部分将对累加器进行详细介绍。

一般情况下,像环或本地定义方法这样的闭包结构,不应该用于更改全局状态。Spark不定义也不保证来自闭包外引用导致的对象变化行为。有些情况下,在local模式下可以正常运行的代码,在分布式模式下也许并不会像预期那样执行。在分布式下运行时,建议使用累加器定义一些全局集合。

4.3.3.3 打印RDD的元素(Printing elements of an RDD)

rdd.foreach(println)rdd.map(println)collect()rdd.collect().foreach(println)collect()take()rdd.take(100).foreach(println)

4.3.4 操作键值对(Working with Key­Value Pairs)

Spark大部分的RDD操作都是对任意类型的对象的,但是,有部分特殊的操作仅支持对键值对的RDD进行操作。最常用的是分布式“shuffle”操作,比如按照key将RDD的元素进行分组或聚集操作。

(a,b)PairRDDFunctionsreduceByKey
val lines = sc.textFile(“data.txt”)
val pairs = lines.map(s =&gt; (s, 1))
val counts = pairs.reduceByKey((a, b) =&gt; a + b)
scala.Tuple2mapToPairflatMapToPairreduceByKey
JavaRDD&lt;String&gt; lines = sc.textFile(“data.txt”);
JavaPairRDD&lt;String, Integer&gt; pairs = lines.mapToPair(s -&gt; new Tuple2(s, 1));
JavaPairRDD&lt;String, Integer&gt; counts = pairs.reduceByKey((a, b) -&gt; a + b);
counts.sortByKey()counts.collect()
equals()hashCode()

4.3.5 Transformations

下面列出了Spark常用的transformation操作。详细的细节请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java)。

Iterator =&gt; Iterator(Int, Iterator) =&gt; IteratorreduceByKeyaggregateByKey numTasksleftOuterJoinrightOuterJoinfullOuterJoingroupWith

4.3.6 Actions

下面列出了Spark支持的常用的action操作。详细请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java)。

take(1)withReplacementseedtoStringWritable

4.3.7 Shuffle操作(Shuffle operations)

Spark内的一个操作将会触发shuffle事件。shuffle是Spark将多个分区的数据重新分组重新分布数据的机制。shuffle是一个复杂且代价较高的操作,它需要完成将数据在executor和机器节点之间进行复制的工作。

4.3.7.1 背景(Background)

reduceByKeyreduceByKey

在Spark中,通常一条数据不会垮分区分布,除非为了一个特殊的操作在必要的地方才会跨分区分布。在计算过程中,一个分区由一个task进行处理。因此,为了组织所有的数据让一个reduceByKey任务执行,Spark需要进行一个all-to-all操作。all-to-all操作需要读取所有分区上的数据的所有的key,以及key对应的所有的值,然后将多个分区上的数据进行汇总,并将每个key对应的多个分区的数据进行计算得出最终的结果,这个过程称为shuffle。

虽然每个分区中新shuffle后的数据元素是确定的,分区间的顺序也是确定的,但是所有的元素是无序的。如果想在shuffle操作后将数据按指定规则进行排序,可以使用下面的方法:

mapPartitions.sortedrepartitionAndSortWithinPartitionssortBy

会引起shuffle过程的操作有:

repartitionrepartitioncoalesceByKeygroupByKeyreduceByKeyjoincogroupjoin

4.3.7.2 性能影响(Performance Impact)

mapreduce

在内部,一个map任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在reduce时,任务将读取相关的已排序的数据块。

reduceByKeyaggregateByKeyByKey
spark.local.dir

shuffle操作的行为可以通过调节多个参数进行设置。详细的说明请看Configuration Guide中的“Shuffle Behavior”部分。

4.4 RDD持久化(RDD Persistence)

Spark中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个RDD时,每个节点会将本节点计算的数据块存储到内存,在该数据上的其他action操作将直接使用内存中的数据。这样会让以后的action操作计算速度加快(通常运行速度会加速10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

persist()cache()
StorageLevelpersist()cache()StorageLevel.MEMORY_ONLY
  • MEMORY_ONLY:将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK:将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER:将RDD以序列化的Java对象的形式进行存储(每个分区为一个byte数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用fast serializer时会节省更多的空间,但是在读取时会增加CPU的计算负担。
  • MEMORY_AND_DISK_SER:类似于MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY:只在磁盘上缓存RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP (实验中):以序列化的格式 (serialized format) 将 RDD存储到 Tachyon。相比于MEMORY_ONLY_SER, OFF_HEAP 降低了垃圾收集(garbage collection)的开销,使得 executors变得更小,而且共享了内存池,在使用大堆(heaps)和多应用并行的环境下有更好的表现。此外,由于 RDD存储在Tachyon中, executor的崩溃不会导致内存中缓存数据的丢失。在这种模式下, Tachyon中的内存是可丢弃的。因此,Tachyon不会尝试重建一个在内存中被清除的分块。如果你打算使用Tachyon进行off heap级别的缓存,Spark与Tachyon当前可用的版本相兼容。详细的版本配对使用建议请参考Tachyon的说明。

注意,在Python中,缓存的对象总是使用Pickle进行序列化,所以在Python中不关心你选择的是哪一种序列化级别。

reduceByKeypersistpersist

4.4.1 如何选择存储级别(Which Storage Level to Choose?)

Spark的存储级别的选择,核心问题是在内存使用率和CPU效率之间进行权衡。建议按下面的过程进行存储级别的选择:

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的RDD没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高CPU的效率,可以使在RDD上的操作以最快的速度运行。
  • 如果内存不能全部存储RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储界别(例如,使用Spark作为web应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
  • 在高内存消耗或者多任务的环境下,还处于实验性的OFF_HEAP模式有下列几个优势:
    • 它支持多个executor使用Tachyon中的同一个内存池。
    • 它显著减少了内存回收的代价。
    • 如果个别executor崩溃掉,缓存的数据不会丢失。

4.4.2 移除数据(Removing Data)

RDD.unpersist()
5 共享变量(Shared Variables)
mapreduce

5.1 广播变量(broadcast variables)

广播变量允许编程者将一个只读变量缓存到每台机器上,而不是给每个任务传递一个副本。例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的数据集副本。在使用广播变量时,Spark也尝试使用高效广播算法分发变量,以降低通信成本。

Spark的action操作是通过一些列的阶段(stage)进行执行的,这些阶段(stage)是通过分布式的shuffle操作进行切分的。Spark自动广播在每个阶段内任务需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务在运行前进行反序列化。这明确说明了,只有在跨越多个阶段的多个任务任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。

vSparkContext.broadcast(v)vvaluev
  • Scala
scala&gt; val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala&gt; broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
  • Java
Broadcast&lt;int[]&gt; broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]

广播变量创建之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的v值。所以,每个节点上的v最多分发一次。另外,对象v在广播后不应该再被修改,以保证分发到所有的节点上的广播变量有同样的值(例如,在分发广播变量之后,又对广播变量进行了修改,然后又需要将广播变量分发到新的节点)。

5.2 累加器(Accumulators)

累加器只允许关联操作进行“added”操作,因此在并行计算中可以支持特定的计算。累加器可以用于实现计数(类似在MapReduce中那样)或者求和。原生Spark支持数值型的累加器,编程者可以添加新的支持类型。创建累加器并命名之后,在Spark的UI界面上将会显示该累加器。这样可以帮助理解正在运行的阶段的运行情况(注意,在Python中还不支持)。

SparkContext.accumulator(v)add+=value
  • Scala
scala&gt; val accum = sc.accumulator(0, “My Accumulator”)
accum: spark.Accumulator[Int] = 0 scala&gt; sc.parallelize(Array(1, 2, 3, 4)).foreach(x =&gt; accum += x)

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala&gt; accum.value
res2: Int = 10
  • Java
Accumulator&lt;Integer&gt; accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -&gt; accum.add(x));
// …
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value();
// returns 10
zeroaddInPlacezeroaddInPlaceVector
  • Scala
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {

Vector.zeros(initialValue.size)<br/>

}
def addInPlace(v1: Vector, v2: Vector): Vector = {

v1 += v2<br/>

}
} // Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(…))(VectorAccumulatorParam)

  • Java
class VectorAccumulatorParam implements AccumulatorParam&lt;Vector&gt; {
public Vector zero(Vector initialValue) {

return Vector.zeros(initialValue.size());<br/>

}
public Vector addInPlace(Vector v1, Vector v2) {

v1.addInPlace(v2); return v1;<br/>

}
} // Then, create an Accumulator of this type:
Accumulator&lt;Vector&gt; vecAccum = sc.accumulator(new Vector(…), new VectorAccumulatorParam());

SparkContext.accumulableCollection

累加器的更新只发生在action操作中,Spark保证每个任务只能更新累加器一次,例如重新启动一个任务,该重启的任务不允许更新累加器的值。在transformation用户需要注意的是,如果任务过job的阶段重新执行,每个任务的更新操作将会执行多次。

map()
  • Scala
val accum = sc.accumulator(0)
data.map { x =&gt; accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the &lt;code&gt;map&lt;/code&gt; to be computed.
  • Java
Accumulator&lt;Integer&gt; accum = sc.accumulator(0);
data.map(x -&gt; { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the map to be computed.
6 将应用提交到集群(Deploying to a Cluster)
.py.zipbin/spark-submit
7 Java/Scala中启动Spark作业(Launching Spark jobs from Java / Scala)

使用org.apache.spark.launcher包提供的简单的Java API,可以将Spark作业以该包中提供的类的子类的形式启动。

8 单元测试(Unit Testing)
SparkContextlocalSparkContext.stop()finallytearDown
9 从Spark1.0之前的版本迁移(Migrating from pre­1.0 Versions of Spark)

Spark 1.0冻结了1.X系列的Spark核的API,因此,当前没有标记为“experimental”或者“developer API”的API都将在未来的版本中进行支持。

  • Scala的变化
groupByKeycogroupjoin(Key,Seq[Value])(Key,Iterable[Value])
org.apache.spark.api.java.functionFunctionextends Functionimplement FunctionmapmapToPairmapToDoublegroupByKeycogroupjoin(Key,Seq[Value])(Key,Iterable[Value])

这些迁移指导对Spark Streaming、MLlib和GraphX同样有效。

10 下一步(Where to Go from Here)
examplesbin/run-example
./bin/run-example SparkPi
spark-submit
./bin/spark-submit examples/src/main/python/pi.py
spark-submit
./bin/spark-submit examples/src/main/r/dataframe.R

在configuration和tuning手册中,有许多优化程序的实践。这些优化建议,能够确保你的数据以高效的格式存储在内存中。对于部署的帮助信息,请阅读cluster mode overview,该文档描述了分布式操作和支持集群管理器的组件。

最后,完整的API文档请查阅Scala、Java、Python、R。