Spark


弹性分布式数据集:基于内存集群计算的容错抽象

摘要(Abstract)

我们提出的弹性分布式数据集(RDDs),是一个让程序员在大型集群上以容错的方式执行基于内存计算的分布式内存抽象。RDDs受启发于两类使用当前计算框架处理不高效的应用:迭代算法和交互式数据挖掘工具。这二者在内存中保存数据性能能够提高一个数量级。为了有效容错,RDDs提供一种受限的共享内存,基于粗粒度转换(transformations)而非细粒度地更新共享状态。尽管如此,RDDs仍足以表达许多类型的计算,包括最近专门用于迭代作业的编程模型(如Pregel)以及这些模型无法表示的新应用。我们在一个名为Spark的系统中实现了RDDs,我们通过各种用户应用程序和基准测试来评估该系统。

1、介绍(Introduction)

像MapReduce和Dryad这样的集群计算框架已经广泛应用于大规模数据分析。这些系统让用户使用一系列高级算子编写并行计算,而不用担心工作分配和容错。

Cluster computing frameworks是一种分布式计算框架,它允许在大规模数据分析中使用并行计算。这些系统让用户使用一组高级操作符编写并行计算,而无需担心工作分配和容错

虽然现有框架为访问集群计算资源提供了大量抽象,但是他们缺少利用分布式内存的抽象。这使得它们对那些跨多个计算重用中间结果的新兴应用效率不高。数据重用普遍存在于很多迭代的机器学习和图像算法中,包括PageRank,K-means聚类和logistic回归。另一个引人注目的用例是交互式数据挖掘,即用户在同样的数据子集上运行多个即时查询。不幸的是,大多数现有框架中,在计算之间(比如在两个MapReduce作业之间)重用数据的唯一方式是把它写到外部稳定存储系统(如分布式文件系统)。这种方式会因为数据复制,磁盘I/O和序列化而带来巨大开销,这直接影响了应用的执行时间。

由于认识到这个问题,研究人员为一些需要数据重用的应用开发出了专门的框架。例如,Pregel是为迭代的图像计算而在内存中保存中间数据的系统;而HaLoop则提供一个迭代的MapReduce接口。但这些框架只提供了特定的计算模型,并对这些模式隐式地进行数据共享。它们没有为更普遍的重用提供抽象,如让用户把几个数据集加载到内存中,在它们上运行即时查询。

这篇论文中,我们提出一个新的抽象,叫弹性分布式数据集(RDDs),它让数据重用在广泛的应用上都是高效的。RDDs是容错的并行数据结构,它让用户显示地在内存中保存中间结果,控制它们的分区以优化数据布局,用丰富的算子去操作它们。

在设计RDDs时主要的挑战是:设计一个能有效提供容错性的编程接口。现有的对集群上基于内存存储的抽象(如分布式共享内存、key-value stores、数据库、Piccolo)提供了基于细粒度更新可变状态(比如表中的元素)的接口。这个接口提供容错的仅有的方式是跨机器复制数据或跨机器记录更新。这两个方法对数据密集型工作负载代价很大,因为它们需要在集群网络上拷贝大量数据,它的带宽远比RAM的要小,并且导致严重的存储开销。

和这些系统对比,RDDs提供基于粗粒度转换(map、filter和join等)的接口。这允许它们通过记录用于构建数据集(它的lineage)的转换(transformations)去有效地提供容错性。如果RDD的分区丢失了,那个RDD会有足够的信息从其他RDDs衍生去重新计算刚才那个分区。从而,丢失的数据通常可以很快被恢复,且不需要花销很大的复制。

粗粒度是应用于整个数据集,如上述map、filter;细粒度是指应用于单行,如get(index)和set(index)。

即便是基于粗粒度转换的接口在最初也会受限,RDDs很适合很多并行化应用,因为这些应用天然适合在多个数据项上应用相同的操作。RDDs能高效地表示很多已经提出的作为独立系统的集群编程模型,包括MapReduce、Haloop等,也包括这些系统不能很好地应用的新应用程序,如交互式数据挖掘。
我们已经在Spark的系统中实现了RDDs,Spark系统在UC Berkeley和个别公司被用于研究、生产应用。Spark提供一个方便语言集成的编程接口,接口是Scala语言编写的。另外,Spark可以用来从Scala解释器上交互式地查询大数据集。Spark允许通用编程语言以交互式速度用于集群上基于内存的数据挖掘,我们相信Spark是第一个这样的系统。
我们通过微基准和用户应用程序的测试评估RDDs。发现,对于迭代应用,Spark比Hadoop快20倍;实际数据分析类报表速度提升40倍;Spark用于交互式查询1TB的数据集只有5至7秒的延迟。为了说明RDDs的普适性,我们已经在Spark之上实现了Pregel和HaLoop编程模型,包括以相对较小的库的形式提供了它们使用的位置优化策略(每个200行代码)。
本文先分别在2和3节概述RDDs和Spark。然后,在第4节讨论RDDs的内部表示,第5节是RDDs内部的实现,第6节是实验结果。最后,在第7节讨论RDDs是怎样引起几个现有的集群编程模型兴趣的,第8节研究相关工作,最后是结论。

2、弹性分布式数据集(Resilient Distributed Datasets RDDs)

这节对RDDs进行概述。先在2.1节给RDDs下定义,2.2节阐述Spark中RDDs的编程接口,然后在2.3节比较RDDs和细粒度共享内存抽象,最后在2.4节讨论RDD模型的局限性。

2.1、RDD抽象(RDD Abstraction)

RDD是一个只读的、分区记录的集合。RDDs能通过通过以下两种方式创建:(1)稳定存储中的数据 (2)其他RDDs

我们称这些操作为转换(transformations),以把它们和其他RDDs上的操作(actions)区分开。map、filter、join都属于transformations。

RDDs任何时候都不需要物化(真正执行操作后存入稳定存储)。相反,一个RDD有足够多的关于它是怎样从其他数据集衍生出(它的Lineage)的信息,据此从稳定存储中的数据计算出它的分区。这是一个强有力的特性:从本质上说,如果故障后无法重建RDD,程序就不能引用该RDD。(有点绕,其实就是说RDD能够被重建是个很重要的特性,RDD有很强大的容错机制

最终,用户能控制RDDs的两个其他方面:持久化和分区。用户可以表明他们将重用那个RDDs并为它们选择存储策略(如in-memory 存储)。他们也可以让一个RDD的元素基于每个记录的key跨机器分区。这对位置优化是有用的,如确定两个将要join的数据集是以相同方式进行哈希分区的。

2.2、Spark编程接口(Spark Programming Interface)

Spark通过一个语言集成API暴露RDD,API中每个数据集被表示为一个对象,在这些对象上transformations被使用方法调用。
程序员编程的第一步是通过在稳定存储中的数据调用transformations来定义RDDs(如map和filter)。然后他们可以对这些RDDs调用actions,actions是给应用程序返回值或者把数据导出到存储系统的操作。count、collect、save都属于actions。Spark只有在第一次调用action时才会真正计算RDDs,在此之前进行的transformations都是惰性计算,这样能对transformations进行并行流水线化(pipeline)。
另外,程序员可以调用persist方法表明在以后的操作中他们想用哪个RDDs。Spark默认在内存中持久化RDDs,但是如果没有足够的RAM将会把多余的存入磁盘。用户也可以请求其它的持久化策略,如只在磁盘上存储RDD,或跨集群复制RDD,通过flags进行persist。最后,用户可以在每个RDD上设置持久化优先级去指定哪些内存的数据应该优先被存入磁盘。

2.2.1、示例:控制台日志挖掘(Example: Console Log Mining)

假设有一个web服务出错了,操作员想从保存在HDFS中的TB级的日志中找出原因。通过使用Spark,操作员只需从日志中把刚才那个错误的信息加载到一组节点的RAM,并交互式地查询它们。她将先写出下面的Scala代码:

lines =spark.textFile("hdfs://...")
errors = lines.filter(_.startWith("ERROR"))
errors.persist()

第一行通过HDFS文件定义了一个RDD(即是文本形式的lines的集合),第二行对lines进行过滤得到一个过滤后的RDD,第三行将errors存入内存以便查询中共享。值得注意的是,在Scala语法中filter的参数是一个闭包。
此时,集群上并没有执行任何工作。但是,用户可以对该RDD执行动作(actions),如统计信息条数:

errors.count()

用户也可以在该RDD上进一步执行transformations,并使用转换后的结果,如下:

//统计errors中涉及MySQL的行数:
errors.filter(_.contains("MySQL")).count()

//以数组的形式返回errors中涉及HDFS的时间字段
//(假设时间是'\t'分隔的number为3的字段)
errors.filter(_.contains("HDFS"))
      .map(_split('\t')(3))
      .collect()

在errors的第一个action运行后,Spark将在内存中保存errors的分区,极大地加速了后续计算。值得注意的是,最初的RDD(lines)没有被缓存。这很合理,因为错误信息可能只是数据的一小部分(小到足以存入内存)。

最后,为了说明我们的模型怎样容错,我们在图1中展示了第三次查询中的RDDs的血缘(lineage)图。这次查询,在lines上进行filter得到errors,然后在errors上进一步应用filter和map,之后是collect。Spark调度器将并行流水线化后两个transformations,给拥有errors的分区缓存的节点发送任务集去计算。另外,如果errors的一个分区丢失了,Spark可以仅在lines相应的分区上应用过滤器来重建该分区。

2.3、RDD模型的优势(Advantages of the RDD Model)

为了明白RDDs作为分布式内存抽象的好处,我们在表1中列出了RDDs与分布式共享内存(DSM)的对比。DSM系统中,应用在全局地址空间任意位置读写。值得注意的是,在这种定义下,DSM不仅包括传统的共享内存系统,还包括其他采用细粒度写共享状态的系统,提供共享的DHT的Piccolo和分布式数据库。DSM是非常通用的抽象,这种通用性使它难以在商业集群上实现高效率和容错性。

image-20230510101717901

方面 RDDs 分布式共享内存
粗/细粒度 细粒度
粗粒度 细粒度
一致性 不重要(不可变) 取决于app/运行时
故障恢复 细粒度,使用Lineage的的开销 需要检查点和程序回滚
落后任务降灾 可以使用任务备份 难以实现
任务安排 基于数据本地化自动分配 取决于app(运行时实现透明性)
内存不足时行为 类似于已有数据流系统 性能差(交换?)

表1:RDDs与DSM的对比

RDDs和DSM主要的区别在于RDDs只能通过粗粒度transformations创建(“written”),而DSM允许读写每个存储单元。这虽然限制了RDDs只读,但允许RDDs有更高效的容错。尤其,RDDs不需要检查点的开销,因为它们可以使用lineage恢复。此外,只有RDD丢失的分区才需要重新计算,并且它们可以在不同节点上并行计算,而不用回滚整个程序。
RDDs的第二个好处是它们的不可变性使系统能够运行类似MapReduce的备份任务来缓和慢节点。用DSM难以实现备份任务,因为一个任务的两个副本将访问相同的内存位置,相互干扰更新。
RDDs相比于DSM还提供了另外两个好处。第一, 在对RDDs的批量操作中,运行时会基于数据本地化去调度任务以提高性能。第二,仅仅当它们用于基于扫描的操作时,RDDs在内存不足以存储它们时会优雅降级,内存存不下的分区会存在磁盘中,此时与现有的数据并行系统性能相当。

2.4、不适合RDDs的应用(Applications Not Suitable for RDDs)

就像引言中讨论的,RDDs最适合批处理应用,批处理应用就是对一个数据集的所有元素运行相同的操作。这种情况下,RDDs能够有效地记住每个transformation,每个transformation是lineage图中的一个步骤,并且,不需要记录大量数据就能够恢复丢失分区。RDDs不太适合那些对共享状态进行异步的细粒度的更新,如web应用的存储系统或web爬虫增量抓取器。对于这些应用,执行传统的更新日志和数据检查点操作更加高效,例如数据库等。我们的目标是给批处理分析提供高效编程模型,把这些异步应用程序留给专门的系统。

3、Spark编程接口(Spark Programming Interface)

Spark给RDD抽象提供了一个用Scala编写了语言集成API。Scala是在JVM上的静态类型函数式编程语言。我们选择Scala是因为它简洁的组合(便于交互式使用)和效率(由于静态类型)。然而,RDD抽象并不是一定要用函数式语言。
为了使用Spark,开发者写一个驱动程序(driver program)连接wokers集群,如图2所示。driver定义一或多个RDDs,并在RDDs上调用action。在driver上的Spark代码还会追踪这些RDDs的lineage。wokers是长期运行的进程,能在内存中存储RDD分区。

image-20230510102301902

Figure2:Spark运行态。用户的驱动程序启动多个worker,worker从分布式文件系统读取数据块,并在内存中保存已计算的RDD分区

就像在2.2.1节日志挖掘实例中所展现的,用户给RDD操作(如map)传参是传递闭包(字面函数)。Scala用一个java对象代表每个闭包,这些对象可以被序列化,可以通过网络中传递闭包加载到另一个节点上。Scala也把闭包中的变量约束保存为Java对象中的字段。例如,一个人可以写下面的代码去对RDD中每个元素加5。

var x = 5
rdd.map(_ + x)

RDDs本身是元素类型参数化的静态类型对象。例如,RDD[Int]是整数的RDD。由于Scala支持类型推断,所以大多数例子中都省略了类型。
虽然用Scala实现RDDs在概念上很简单,但是我们必须使用反射解决Scala闭包对象的问题。通过Scala解释器去使用用Spark,我们还需要做很多的工作,我们将在5.2节讨论。但我们并不需要去修改Scala编译器。

3.1、Spark中的RDD操作(RDD Operations in Spark)

表2列出了Spark中主要的RDD的transformations和actions操作。每个操作都给出了表示,中括号表示类型参数。前面说过transformations是定义新RDD的惰性操作,而actions开始真正的计算并返回一个值给程序或是把数据写到外部存储。

请注意,一些操作(例如join)仅在键值对的 RDD 上可用。此外,我们选择我们的功能名称来匹配 Scala 和其他功能语言中的其他 API;例如,map 是一个一对一的映射,而 flatMap 将每个输入值映射到一个或多个输出(类似于 MapReduce 中的映射)。

除了这些运算符之外,用户可以要求 RDD 持久化存储。此外,用户可以获得 RDD 的分区顺序,由分区类表示,并根据其划分另一个数据集。groupByKey、reduceByKey等操作自动排序导致哈希或范围分区RDD。

3.2、应用实例(Example Applications)

我们对2.2.1节中的数据挖掘示例补充两个迭代应用:logistic回归和网页排名。之后说明怎样控制RDDs的分区能提高性能。

3.2.1、logistic回归

很多机器学习算法都具有迭代特性,因为他们需要运行迭代优化程序去最大化一个函数,如梯度下降法。它们的数据保存在内存中会让它们运行更快。
下面的程序实现了logistic回归。logistic回归是一个常见的用于寻找一个能最佳分割两组点(如垃圾邮件和非垃圾邮件)的超平面w的经典算法。算法使用梯度下降法:w开始时是随机值,每一次迭代,对w的函数求和,使w朝着优化的方向移动。

val points = spark.textFile(...)
                  .map(parsePoint).persist()
var w = //随机初始向量
for(i <- 1to ITERATIONS){
  val gradient = points.map{p =>
    p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
  }.reduce((a,b) => a+b)
  w -= gradient
}

我们一开始定义了一个缓存RDD——points,作为在一个文本文件上调用map转换的结果,即把文本的每行都解析为Point对象。然后在points上运行mapreduce来计算梯度,在每一步对当前w的函数求和。把points保存在内存中迭代能提高20倍的速度,我们将在6.1节可以看到。

3.2.2、网页排名(PageRank)

在网页排名中会出现更加复杂的数据共享模式。算法通过把链向每个页面的所有页面的贡献值(contributions)加起来,迭代地更新每个页面的rank。在每次迭代过程中,每个页面给周围页面发送r/n个contribution,这里r是它的排名,n是周围文件的个数。接下来,它把它的排名更新为$\alpha/N+(1-\alpha)\sum{c_{i}}$,其中求和是对它接收到的贡献值,N是页面个数。在Spark中我们可以把网页排名写成如下代码:

val links = spark.textFile(...).map(...).persist()
var ranks = //(URL, rank)对的RDD
for(i <- 1 to ITERATIONS){
//根据每个页面发送过来的贡献值创建(targetURL, float)对的RDD,
val contibs = links.join(ranks).flatMap{
  (url, (links, rank)) =>
  links.map(dest => (dest, rank/links.size))
  }
  //根据URL对贡献值求和,并获取新的排名
  ranks = contibs.reduceByKey((x,y) => x+y)
                 .mapValues(sum => a/N+(1-a)*sum)
}

这个程序会生成如图3中的RDD血缘图。每一步迭代过程,我们都会基于上一个迭代器的contribs、ranks和静态的links集合创建一个新排名集合(排名更新了)。这个图有一个特征:随着迭代次数的增加,图会变得越来越长。因此,在一个拥有很多迭代步骤的作业中,有必要可靠复制ranks的一些版本以减少故障恢复次数。用户可以调用带可靠标识的 persist来达到目的。值得注意的是,links集合不需要被复制,因为它的分区可以通过在输入文件块上重新运行 map操作来重建。这个数据集将比ranks大,因为每个文件有很多链接但只有一定数量会作为它的排名,以致于在系统上使用lineage比检查点检查程序整个内存状态来恢复要更快。

最后,我们可以通过控制RDDs的分区来优化网页排名中的通信。如果我们为links指定一个分区方式(如根据URL对link列表跨节点哈希分区),我们则可以以相同的方式对ranks分区,并确保links和ranks的join操作不需要通信(因为每个URL的排名将会与它的link列表在相同的机器上)。我们也可以写一个Partitioner类把相互连接的页面聚在一起(比如按域名对URL分区)。这两种优化可以表示为在定义links时调用partitionBy:

links = spark.textFile(...).map(...)
             .partitionBy(myPartFunc).persist()

该初始化调用后,links和ranks的join操作将自动把每个URL的贡献值聚合到它link列表所在的机器上,在那计算它的新排名,并join它和它的links。这类多次迭代的一致性分区是指定框架(Pregel)中主要优化方式之一。RDDs让用户直接表达他的目的。

4、表示RDDs(Representing RDDs)

把RDDs作为一个抽象会有一个问题:为它们选择一个代表在广泛的transformations中能追踪血缘。理想情况下,一个实现RDDs的系统应该提供尽可能丰富的转换算子,并让用户以任意方式组合它们。我们为RDDs提出一个简单的基于图的代表以达到这些目的。我们已经在Spark中使用这个代表去支持广泛的transformations,而且不会为任何transformation去给调度器添加特殊逻辑,这极大地简化了系统设计。
简而言之,我们了一个通用接口去代表每个RDD,该接口表达五种信息:

  • 一组分区(partitons),数据集的原子组成
  • 一组父RDDs上的依赖(dependencies
  • 一个基于父数据集计算的函数
  • 分区策略的元数据
  • 数据位置策略

例如,一个RDD表示一个HDFS文件的每个块都有一个分区,并且知道每个块在哪台机器上。同时,在这个RDD上执行map操作后的结果分区不变。我们把这个接口总结在表3中。

操作 含义
partitions() 返回Partiton对象的列表
preferredLocations(p) 列出p分区由于数据局部性可以被快速访问的节点
dependencies() 返回依赖列表
iterator(p, parentIters) 根据为父分区指定的迭代器,逐个计算p分区的元素
partitioner() 返回RDD是否是hash/range分区的元数据信息

表3:Spark中用于表示RDDs的接口

在设计这个接口时最有趣的问题是怎样在RDDs间表示依赖。我们发现把依赖分成两类足够了,并且很有用。

  • 一类是窄依赖。父RDD的每个分区被子RDD的至多一个分区使用。
  • 一类是宽依赖。多个子分区依赖于一个父分区。

例如,map操作会发生窄依赖,join操作发生宽依赖(除非父RDD是哈希分区)。图4显示了其他例子。

Figure4:宽窄依赖的例子。每个空心框代表一个RDD,里面的阴影矩阵代表分区。

这个区别是有用的,有两个原因。第一,窄依赖考虑到了一个节点上的流水线执行。例如,在一个个元素上先应用filter然后map。相反,宽依赖需要父分区的所有数据可用,并用一个类似于MapReduce的操作在节点间重排(shuffle)。第二,窄依赖使节点故障后恢复更有效,因为只有丢失的父分区需要被重新计算,并且可能在不同节点上并行计算。相反,在宽依赖的血缘图中,单一的故障节点可能导致一个RDD所有祖先的一些分区丢失,需要一个完全的重新执行。
这个RDDs通用接口在Spark中实现的大部分transformations都少于20行代码。甚至Spark新用户在不知道调度器的细节的情况下都能够实现新的transformations(如sampling和各种join)。下面写了一些RDD实现。

HDFS files:我们采样的输入RDDs都是来自于HDFS中的文件。对于这些RDDs,partitions返回文件每个块的分区(块的偏移量存在于每个Partition对象中)。preferredLocations给出块所在节点和读块的迭代器。

map:在任何RDD上调用map都会返回MappedRDD对象。这个对象和它的父RDD有相同的分区和首选位置。map的参数是一个函数,对于父RDD中的所有记录,将以iterator方法的方式执行这个函数。

union:对两个RDD调用union会返回一个分区为父RDD分区的联合的RDD。每个子分区都是在相应的父分区上进行窄依赖计算得到的。

sample:采样类似于映射,除了RDD会为每个分区保存一个随机数生成种子以确定性地采样父分区记录。

join:join两个RDD可能会产生两个窄依赖(如果他们具有相同的hash/range分区),可能是两个宽依赖,也可能都有(如果一个父RDD有分区,另一个没有)。

5、实现(Implementation)

我们用大约14000行Scala代码实现了Spark。系统运行在Mesos集群管理器上,使其能够与Hadoop、MPI和其他应用程序共享资源。每个Spark程序都作为独立的Mesos应用运行,它有自己驱动程序(master)和workers,并且在这些被Mesos处理的应用中共享资源。
Spark可以从任何Hadoop输入源读取数据(比如HDFS、HBase),只需要使用Hadoop已存在的插件的API,在Scala的未修改版本上运行。
我们大致说几个该系统技术上比较有趣的部分:任务调度(5.1节),Spark解释器允许交互式使用(5.2节),内存管理(5.3节),支持检查点(5.4节)。

5.1、任务调度(Job Scheduling)

Spark的调度器使用了RDDs的表示,在第4节已经描述过了。

​ 总而言之,我们的调度器与Dryad的类似,但它另外还考虑了持久化的RDDs的哪些分区在内存中可用。无论何时用户运行在RDD上运行action,调度器都会检查RDD的血缘图,建立由stages组成的DAG,然后执行,如图5的插图。每个阶段(stage)包含尽可能多的窄依赖流水线转换(transformations)。stages的边界是宽依赖shuffle操作,或者任何已经计算过的分区,它可以截断父RDD的计算。然后,调度器在每个stage中运行任务去计算丢失的分区,知道计算出想要的RDD。

DAG是指有向无环图(Directed Acyclic Graph),它是一种数据结构,用于表示计算任务之间的依赖关系。在RDD中,每个RDD都可以被看作是一个DAG中的一个节点。DAG的每个边表示一个转换操作,例如map或reduce。由于DAG是无环的,因此可以保证计算任务按正确的顺序执行,避免了循环依赖和死锁等问题。RDD使用DAG来跟踪数据集之间的依赖关系,并在需要时重新计算丢失的数据分区。这种机制使得RDD具有容错性和高效性。

​ 调度器基于数据存放位置使用延迟调度给机器指派任务。如果一个任务需要处理节点内存中可用的分区,我们就把它发送给那个节点。但是,如果处理的分区位于多个可能的位置(如HDFS文件),则把任务发送给这些节点。

​ 对于宽依赖(即shuffle依赖),我们一般把中间结果具体化在持有父分区的节点上,以简化故障恢复,很像MapReduce物化map的输出结果。

​ 如果一个任务失败,只要它的stage的父分区还可用,我们将在另一个节点上重新运行它。如果一些stages已经不可用(例如,由于一个shuffle的map输出结果丢失了),我们会重新提交任务去并行地计算丢失分区。我们还不能对调度器故障容错,但是复制RDD血缘图是直截了当的做法。

​ 最后,虽然Spark中所有当前运行的计算都对驱动程序中调用的actions响应,我们也会试验让集群上的任务(如maps)调用lookup操作,该操作允许按关键字随机访问哈希分区的RDDs的元素。在这种情况下,任务需要告诉调度器去计算哪些丢失的分区。

5.2、解释器的集成(Interpreter Integration)

Scala包含一个类似于Ruby和Python的交互式shell。考虑到实现内存中数据带来的低延迟,我们希望让用户在解释器上交互式地运行Spark,查询大数据集。
Scala解释器通常把用户输入的代码行编译成一个类,然后加载到JVM,之后调用类的函数。这个类包括一个单例对象(单例对象包含那行代码的变量或方法),并且在一个初始化函数中运行那行代码。例如,如果用户写入var x = 5,接下来println(x),解释器会定义包含x的Line1类,并让第二行编译成println(Line1.getInstance().x)
我们对Spark中的编译器做了两点改变:

  • 类传输:为了让工作节点获取每行代码上创建的类的字节码,我们使解释器基于HTTP传输这些类。
  • 改进的代码生成逻辑:一般地,每行代码生成的单例对象是通过相应的类上的一个静态方法去访问的。这意味着当我们序列化一个引用了前一行定义的变量的闭包时(如上面例子中的Line1.x),Java将不会通过对象图跟踪而传输包装xLine1实例。因此,工作节点将不接受x。我们把代码生成逻辑改成了直接引用每行对象的实例。

图6显示在做了上面两个改变后,解释器怎样把用户写的一系列代码行翻译成Java对象。

Figure6:显示解释器怎样把用户输入的两行代码翻译成Java对象

我们发现Spark解释器便于处理大量跟踪关系,也便于研究HDFS中保存的数据集。我们也计划交互式地运行更高级的查询语言,如SQL。

5.3、内存管理(Memory Management)

Spark对于持久化RDDs提供了三个选项:

  • 序列化成Java对象存储在内存中
  • 作为序列化数据存储在内存中
  • 存储在磁盘上

第一个选项性能最快,因为JVM能在本机访问每个RDD元素。第二个选项让用户在内存空间有限时,选择比Java对象图更加有效的存储方式,性能会差一点。第三个选项对于那些太大而无法存入内存的RDDs是有用的,但是每次使用都需要重新计算,这很耗时。
为了管理有限的可用内存,我们基于RDDs的级别使用LRU淘汰策略。当计算一个新的RDD分区,而又没有足够的空间存储它时,我们淘汰一个最近最少访问RDD的分区,除非这个RDD和新分区的一样。在这种情况下,我们把旧分区存入内存以防止相同RDD的分区循环读入和写出。这很重要,因为大多数操作将在整个RDD上运行任务,所以很可能以后会需要用到一直在内存中的分区。我们发现,目前为止这个默认的策略在我们的应用中很好地工作,我们通过每个RDD的“持久化优先级“,让用户进一步控制RDD。
目前,集群上Spark的每个实例都有自己独立的内存空间。未来,我们打算研究通过统一的内存管理实现在Spark实例间共享RDDs。

5.4、支持检查点(Checkpointing)

虽然血缘图总是被用于在故障后恢复RDDs,但这样的恢复在血lineage链很长的时候会很耗时。因此,把一些RDD执行检查点操作存入稳定内存十分有用。
一般地,检查点对于Lineage图长、宽依赖的RDDs很有用,如3.2.2节中PageRank例子中的rank集合。在这些情况下,集群中节点故障可能导致会每个父RDD的一些数据分片丢失,这就需要完全重新计算,检查点操作在这里就可以避免完全重新计算。相反,对于稳定存储数据上的窄依赖RDDs,检查点没什么价值。如果节点故障,这些RDDs丢失的分区可以并行地在其他节点上计算,成本比复制整个RDD要少得多。
Spark目前为检查点提供了一个API(给persist传入REPLICATE标识),把checkpoint哪个数据的决定权留给了用户。然而,我们也研究了怎样实现自动的检查点。因为我们的调度器知道每个数据集的大小,也知道第一次计算花费的时间,所以它应该能选择一个优化的RDDs集合执行检查点操作,最小化系统恢复时间。
最后,值得注意的是RDDs的只读属性使检查点操作比常见的共享内存更简单。因为不需要关注一致性,RDDs可以在不需要程序中断或分布式快照方案的情况下在后台写出。

6、测评(Evaluation)

我们再Amazon EC2上做了一系列实验,以此来评估Spark和RDDs,并与其他用户应用程序的基准做了对比。总之,我们的结果如下:

  • Spark在迭代机器学习和图像应用方面性能比Hadoop高20倍。速度的提升的原因是,把数据作为Java对象存入内存中避免了I/O和反序列化的成本。
  • 用户所写应用执行效果好。我们使用Spark对原来在Hadoop上运行的分析报告提升了40倍。
  • 当节点故障,Spark可以通过只重建丢失的RDD分区来快速恢复。
  • Spark用于交互式地查询1TB数据集,只有5-7s的延迟。

我们将在6.1节呈现与Hadoop对比的迭代机器学习应用的基准,6.2节呈现对比后的PageRank。然后,6.3节评估Spark中故障恢复,6.4节呈现数据集在内存不足时的行为。最后,6.5节讨论用户应用程序的结果,6.6节交互式数据挖掘。
除非另有说明,我们的测试使用m1.xlarge EC2节点,4核,15G内存。我们使用HDFS存储,块大小256M。在每个测试之前,为了准确地测量IO成本,我们清理了操作系统缓存。

6.1、迭代机器学习应用(Iterative Machine Learning Applications)

我们实现了两个迭代机器学习应用,逻辑回归和k-means,为了比较下面系统的性能:

  • Hadoop:Hadoop 0.20.0稳定版本
  • HadoopBinMem:一个Hadoop部署。第一次迭代时,把输入数据转换成低开销二进制格式,以消除在之后迭代过程的文本解析,并把它存入内存中的HDFS实例。
  • Spark:RDDs的实现。

我们用25-100台机器在100GB的数据集上对这两个算法进行了10次迭代。两个应用程序的主要区别在于他们执行数据每个字节的计算的数量。k-means的迭代时间主要取决于计算,但逻辑回归不是计算密集型的,因此时间更多地花费在反序列化和I/O上。
由于经典的学习算法需要几十次迭代才能收敛,所以我们在报告时间时,把首轮迭代的时间与后续迭代的时间分开。我们发现经RDDs共享数据极大地加快了后面的迭代。

Figure7:图表示了逻辑回归和k-means两种算法分别在Hadoop、HadoopBinMem和Spark三种集群中的首轮迭代和后续迭代的时长。实验是在有100个节点的集群上对100GB数据进行的。
Figure:Hadoop、HadoopBinMem和Spark上后续迭代的运行时间。

首轮迭代 三个系统在首轮迭代时都从HDFS中读取文本输入。如图7中的浅色长方形所示,实验中Spark比Hadoop更快。这个不同是因为在Hadoop的master和workers之间的心跳协议中的通信开销。HadoopBinMem是最慢的,因为它运行了一个额外的MapReduce工作去把数据转成二进制的,它必须通过网络把这个数据写向一个复制的内存HDFS实例。

后续迭代 图7也显示了后续迭代的平均运行时间,图8显示了随着集群大小的变化是运行时间的分布情况。对于logistic回归,在100台机器上,Spark比Hadoop快25.3倍,比HadoopBinMem快20.7倍。对于更加计算密集型的k-means应用,Spark有1.9到3.2倍的提速。

理解速度 我们惊讶地发现,Spark甚至都超过了基于内存存储二进制数据的Hadoop(HadoopBinMem)20多倍。在HadoopBinMem中,我们使用了Hadoop的标准二进制格式(SequenceFile)和256MB大的块,我们还强制HDFS的数据直接存放在内存文件系统。然而Hadoop仍然运行缓慢,有以下几个因素:

  1. Hadoop软件栈的最小开销
  2. 提供数据时HDFS的开销
  3. 把二进制记录转换成可用的内存Java对象的反序列化成本

我们来依次研究这几个因素。为了衡量(1),我们运行没有操作的Hadoop作业,然后发现,仅仅完成作业设置,启动任何和清理工作的最小需求就需要花费至少25s的开销。对于(2),我们发现HDFS为每个块提供多次内存拷贝和计算校验和的操作。

最后,为了测量(3),我们在单机上运行微基准程序,在256MB多种格式的输入上运行逻辑回归计算。尤其,我们比较了来源于HDFS(这里将体现出HDFS栈的开销)和内存本地文件(内核能够非常有效地把数据传递给程序)的文本输入和二进制输入的处理时间。

我们在图9中展示了这些实验的结果。内存中的HDFS和本地文件的区别显示,从HDFS读取数据会多花费2s的开销,甚至数据就在本地机器的内存中。文本输入和二进制输入的区别表明,解析的开销相差7s。甚至当从内存文件读取数据时,把预解析的二进制数据转成Java对象都要花费3s,这几乎和逻辑回归本身的成本一样。而Spark通过在内存中把RDD元素直接存成Java对象,避免了以上所有开销。

6.2、网页排名(PageRank)

我们使用了54GB维基百科数据,比较分别在Spark和Hadoop上实现PageRank的性能。我们运行10轮PageRank算法去处理大约4百万文章的链接图。图10显示在30台节点上,只基于内存存储时,Spark比Hadoop快2.4倍。另外,控制RDDs的分区方式使整个迭代过程保持一致,如3.2.2节所讨论的,提升速度至7.4倍。扩展到60台机器,结果也随之近于线性地减少。

Figure10: 在Hadoop和Spark上PageRank的性能

我们也评估了PageRank的两一个版本——用Pregel在Spark上实现,我们再7.1节进行描述。迭代时间与图10类似,但是长了4s,因为Pregel会在每次迭代额外运行一个操作,这个操作让顶点“投票”是否结束作业。

6.3、故障恢复(Fault Recovery)

我们对k-means应用评估了其在单点故障后,使用lineage重建RDD分区的成本。图11比较了在75个节点的集群上k-means10轮迭代在正常情况下和一个节点在第六轮迭代的开始时故障的情况下的运行时间。没有任何故障,每轮迭代会运行400各任务处理100GB数据。

Figure11:k-means存在故障时的迭代时间。在第六轮迭代开始时kill掉一台机器导致使用lineage部分重构RDD

直到第五轮迭代结束,迭代时间都是大约58s。在第六轮迭代,一台机器被kill掉,导致运行在该机器上的任务和存储该机器上的RDD分区的丢失。Spark在其他机器上并行地重新运行这些任务,他们在这些机器上重新读取相应的输入数据并通过lineage重构RDDs,这会让迭代时间增至80s。一旦丢失的RDD分区被重建,迭代时间将降回到58s。
值得注意的是,基于检查点的故障恢复机制,恢复将可能需要运行至少几轮迭代,取决于检查点操作的频率。更进一步说,系统将需要通过网络复制应用的100GB工作集(文本输入数据转成二进制),要么消费两次Spark内存去复制它到内存中,要么将不得不等到100GB写入磁盘。相反地,我们例子中RDDs的血缘图都是小于10KB的。

6.4、内存不足时的行为(Behavior with Insufficient Memory)

目前为止,我们确保每个集群中每台机器有足够内存存储迭代中所有的RDDs。一个自然的问题是,如果没有足够的内存去存储作业的数据时Spark怎样运行。在这个实验中,我们配置Spark在每台机器上不是用超过一定百分比的内存去存储RDDs。我们在图12中展示逻辑回归在多种百分比内存空间下的运行结果。我们看见随着空间变小性能缓慢下降。

6.5、基于Spark建立的用户应用(User Applications Built with Spark)

内存分析(In-Memory Analytics) Conviva Inc(一家视频发行公司)使用Spark加快了数据分析报告的数据,以前是基于Hadoop上运行的。举个例子,一个报告被作为一系列Hive查询运行为客户计算多种统计。这些查询全都是基于数据的相同子集(记录匹配用户提供的过滤器),但在不同分组的字段上执行聚合(averages, precentiles和COUNT DISTINCT)操作需要独立的MapReduce作业。在Spark中实现上述查询,并把数据子集一次加载到RDD中,该公司能够对报告提速40倍。一个基于200G压缩数据的报告在Hadoop集群上运行要花费20小时,现在仅仅只需要2台Spark机器就可以运行在30分钟以内。更进一步,Spark程序只需要96GB的RAM,因为它只把匹配用户过滤器的行和列存入RDD,而不是所有解压文件。
交通建模(Traffic Modeling) 在Berkeley的Mobile Millennium项目中,研究人员基于分散的汽车GPS测量,并行化一个学习算法去预测道路交通阻塞情况。源数据是城市的10000个互联的道路网,还有600,000由装备GPS的汽车采集到的点到点的行驶时间的样本(每条路线的形式时间可能包括多条互联的道路)。使用交通模型,系统可以估计跨交通网花费的时间。研究人员用一个期望最大化算法训练这个模型,这个算法迭代地重复两次mapreduceByKey步骤。这个应用近乎线性地从20个节点扩展到80个节点,每个节点4核,如图13(a)所示。

图13:用Spark实现的两个用户应用程序每次运行时间。错误条显示标准差。

Twitter垃圾分类(Twitter Spam Classification) 伯克利的Monarch项目用Spark识别Twitter信息中的垃圾链接。他们在Spark之上实现了一个逻辑回归分类器,和6.1节中例子类似,但他们使用分布式的 reduceByKey对并行的梯度向量求和。在图13(b)中,我们展示了在超过50GB数据上训练一个分类器的扩展结果,数据包括250000URLs和10^7与网络相关的特征/维度和在每个URL的页面的内容属性。缩放不是接近线性的原因是每次迭代都会有更高的固定通信成本。

6.6、交互式数据挖掘(Interactive Data Mining)

为了证明Spark在交互查询大数据集方面的能力,我们用它去分析1TB的维基百科页面访问日志(2年的数据)。这个实验,我们用8核、68GB内存的100m2.3xlarge EC2实例。我们运行查询以获得以下内容总访问次数(1)所有页面,(2)标题能精确匹配给定关键字的页面,(3)标题部分匹配关键字的页面。每个查询会扫描整个输入数据。
图14显示的是在整个数据集、一半数据和十分之一数据的查询响应时间。甚至在1TB的数据上,Spark上的查询只需要花费5-7s。这比查询磁盘上的数据的速度快一个数量级以上。例如,从磁盘上查询1TB的文件花费了170s。这证明了RDDs使Spark更适用于交互式数据挖掘。

Figure14:在Spark上交互式查询的响应时间,在100台机器上扫描持续增大的输入数据集

7、讨论(Discussion)

虽然,由于RDDs的不可变的性质和粗粒度转换,它们提供了一个限制的编程接口,但我们发现它们适用于广泛类别的应用。尤其,RDDs可以表达的集群编程模型数量惊人,这些集群编程模型目前为止都被作为独立框架提出,允许用户在一个项目(例如,运行一个MapReduce操作图建立一个图,然后在其上运行Pregel)中compose这些模型,并在他们之间分享数据。在这一节,我们将在7.1小节讨论RDDs可以表达哪些编程模型和为什么它们应用这么广泛。另外,我们在7.2小节讨论RDDs中lineage信息的另一个好处,即为了方便在这些模型上调试。

7.1、表达已有的编程模型(Expressing Existing Programming Models)

RDDs可以有效地表达一些目前已经独立提出的集群编程模型。这里说有效地,我们的意思是RDDs不仅可以产生和这些模型相同的结果,还可以捕获这些框架执行的优化,如把指定的数据保存在内存中,对它进行分区以最小化通信并且高效地从故障中恢复。可以使用RDDs表达的模型包括:

MapReduce:这个模型可以通过在Spark使用flatMapgroupByKey操作,或者在有结合器时使用reduceByKey操作来表达。

DryadLINQ:DryadLINQ系统提供了比在更普通的Dryad运行时的MapReduce更广泛的算子。但是这些是直接对应于Spark中可用的RDD转换的所有大型算子(如map, groupByKey,join,etc)。

SQL:类似于DryadLINQ表达式,SQL需要在记录集上执行数据并行化操作。

Pregel:Google 的 Pregel [22] 是专门用于迭代图像应用的模型,它最初与其他系统中的面向集合的编程模型有很大不同。在Pregel中,程序运行为一系列协调的“superstep”。在每个superstep中,图中的每个顶点运行一个用户函数,可以更新与顶点相关的状态,改变图拓扑,并将消息发送给其他顶点,以便在下一个superstep中使用。该模型可以表达许多图算法,包括最短路径、二部匹配和PageRank。

让我们用 RDD 实现这个模型的关键观察是 Pregel 在每次迭代中将相同的用户函数应用于所有顶点。因此,我们可以在 RDD 中存储每次迭代的顶点状态并执行批量转换 (flatMap) 来应用该函数并生成消息的 RDD。然后,我们可以将这个 RDD 与顶点状态连接起来以执行消息交换。同样重要的是,RDD 允许我们像 Pregel 那样在内存中保留顶点状态,通过控制它们的分区来最小化通信,并支持对故障的部分恢复。我们在Spark之上实现了Pregel作为200行库,并请读者参考[33]了解更多细节。

Iterative MapReduce:最近提出的几个系统,包括 HaLoop [7] 和 Twister [11],提供了一个迭代 MapReduce 模型,其中用户向系统提供一系列 MapReduce 作业以循环。系统保持数据在迭代之间始终分区,Twister 还可以将其保存在内存中。这两个优化都很容易用 RDD 表达,我们能够使用 Spark 将 HaLoop 实现为 200 行库。

批处理流处理(Batched Stream Processing):研究人员最近提出了几种增量处理系统,用于周期性地用新数据更新结果[21,15,4]。例如,每15分钟更新一次广告点击统计信息的应用程序应该能够将前15分钟窗口的中间状态与来自新日志的数据结合起来。这些系统执行与Dryad类似的批量操作,但将应用程序状态存储在分布式文件系统中。将中间状态放在RDD中可以加快它们的处理速度。

解释RDD的表达能力(Explaining the Expressivity of RDDs)

为什么 RDD 能够表达这些不同的编程模型?原因是 RDD 的限制在许多并行应用程序中几乎没有影响。特别是,虽然 RDD 只能通过批量转换创建,但许多并行程序自然会对许多记录应用相同的操作,这使得它们易于表达。类似地,RDD 的不变性不是障碍,因为人们可以创建多个 RDD 来表示相同数据集的版本。事实上,今天的许多MapReduce应用程序运行在不允许更新文件的文件系统上,如HDFS。

最后一个问题是为什么以前的框架没有提供相同的通用性水平。我们认为这是因为这些系统探索了 MapReduce 和 Dryad 不能很好地处理的特定问题,例如迭代,而不观察这些问题的共同原因是缺乏数据共享抽象。

7.2、利用RDDs进行调试(Leveraging RDDs for Debugging)

虽然我们最初将rdd设计为可确定性地重新计算以实现容错性,但该属性也有助于调试。特别是,通过记录作业期间创建的RDD的沿行,可以(1)稍后重建这些RDD,并让用户以交互方式查询它们;(2)通过重新计算作业所依赖的RDD分区,在单进程调试器中重新运行作业中的任何任务。与传统的用于一般分布式系统的重播调试器[13]不同,它必须捕获或推断跨多个节点的事件顺序,这种方法实际上增加了零记录开销,因为只需要记录RDD血缘图。我们目前正在基于这些想法开发一个Spark调试器[33]。

8、相关工作(Related Work)

集群编程模型:集群编程模型相关工作主要有几类。

第一,数据流模型,如MapReduce、Drayad和Ciel,支持丰富的算子去处理数据,但通过稳定外部系统共享数据。RDDs表示的是一个比稳定存储更高效的数据共享抽象,因为它们避免了数据复制、I/O和序列化的成本。
第二,数据流系统的高级别编程接口,包括DryadLINQ和FlumeJava,提供语言集成APIs,它们可以让用户通过像mapjoin的算子操作“并行的集合”。然而,在这些系统,并行的集合要么代表磁盘上的文件,要么代表用来表达查询计划的临时数据集。即使,系统将在相同的查询中的算子间流水线化数据(如一个map接着另一个map),它们不能在查询间高效地共享数据。我们把Spark的API基于并行集合模型是因为它的便利,并且不主张新奇的语言集成接口,但是通过这个接口的背后提供RDDs作为存储抽象,我们允许它支持更广泛类型的应用。
第三类系统为需要数据共享的特别类型的应用提供高级别接口。例如,Pregel支持迭代图像应用,而Twister和HaLoop支持迭代MapReduce的运行。然而,这些框架对它们支持的计算类型隐式地执行数据共享,也不提供一个普遍地抽象让用户可以在他选择的操作间去共享他们选择的数据。例如,一个用户不能用Pregel或者Twister去把数据集加载到内存中,然后决定在它上面运行去查询什么。RDDs显示地提供一个分布式存储抽象,并可以支持这些特定系统不包括的应用,如交互式数据挖掘。

最后,一些系统暴露出共享可变状态以允许用户执行内存中的计算。例如,Picco让用户运行变形的函数读和更新分布式哈希表中的单元分布式共享。分布式共享内存系统和键值存储提供相似的模型。RDDs以两种方式区别这些系统。第一:RDDs提供一个高级别编程接口,接口基于如map/sort这些算子。而Piccolo和DSM中的接口仅仅是读和更新到表的单元中。第二,Piccolo和DSM系统通过检查点和回滚实现恢复,这比RDDs基于lineage的策略更耗费性能。最终,如2.3节所讨论的,RDDs也提供其他优点。

缓存系统:Nectar能够通过识别带有程序分析的子表达式,在DryadLINQ作业间重用中间结果。把这种功能添加到基于RDD的系统将是引人入胜的。然而,Nectar不提供内存缓存(它把数据存放在分布是文件系统中),也不让用户显示地控制持久化哪些数据集和怎样对数据集分区。Ciel和FlumeJava同样可以缓存任务结果,但是却不提供内存缓存和显示控制缓存数据。
Ananthanarayanan et al.提出给分布式文件系统添加一个内存缓存,以利用数据访问的时空本地化。虽然这个解决方案能够更快地访问在文件系统中的数据,但是它不像RDDs那样可以高效地在应用中共享中间结果,因为在stage间它仍然需要应用去把这些结果写到文件系统。

Lineage :对于一些应用,如需要说明结果的、允许其他应用复制的、如果在工作流中发现bug或者数据及丢失而导致的重新计算数据的,获取数据的血缘或出处信息一直都是科学计算和数据库的研究主题。RDDs提供了并行化编程模型,它的细粒度血缘较容易获取,所以它可以被用于故障恢复。
我们的基于lineage恢复机制类似于MapReduce和Dryad中计算(作业)内使用的恢复机制,它在任务的DAG中追踪依赖。但在这些系统中,血缘信息在作业结束时被丢弃,需要使用备份存储系统跨计算分享数据。相反,RDDs把血缘应用于跨计算持久化内存中的数据,没有复制和磁盘I/O的开销。
关系型数据库:RDDs概念上类似于数据库中的试图,持久化RDDs类似于物化试图。但,像DSM系统,数据库一般允许对所有的记录进行细粒度的读写访问,需要记录操作和数据来容错,并需要保持一致性的额外开销。

9、结语

我们已经介绍了RDDs,一个用于在集群应用中共享数据的高效的、普遍用途的、容错的抽象。RDDs能表达广发的并行应用,包括很多特殊的编程模型,已经提出用于迭代计算和这些模型不包含的新应用。已存在的集群存储抽象需要为容错而复制数据,而RDDs与此不同,RDDs提供一个基于粗粒度转换的API,让它们用lineage高效地恢复数据。我们已经在Spark中实现RDDs,运行迭代应用速度提升20X,可以用于交互式查询几百GB数据。我们已经开源Spark。


文章作者: wck
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wck !
评论
 本篇
Spark Spark
Resilient Distributed Datasets:A Fault-Tolerant Abstraction for In-Memory Cluster Computing
2023-05-10
下一篇 
FaRM FaRM
No compromises:distributed transactions withconsistency, availability, and performance
2023-05-10
  目录