Wetts's blog

Stay Hungry, Stay Foolish.

0%

Spark.md

概述

Spark 简介

Spark 具有如下 4 个主要特点:

  • 运行速度快。Spark 使用先进的 DAG(Directed Acyelic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比 Hadoop MapReduce 快上百倍,基于磁盘的执行速度也能快十倍。
  • 容易使用。Spark 支持使用 Scala、Java、Python 和 R 语言进行编程,简洁的 API 设计有助于用户轻松构建并行程序,并且可以通过 Spark Shell 进行交互式编程。
  • 通用性。Spark 提供了完整而强大的技术栈,包括 SQL 查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算。
  • 运行模式多样。Spark 可运行于独立的集群模式中,或者运行于 Hadoop 中,也可运行于 Amazon EC2 等云环境中,并且可以访问 HDFS、Cassandra、HBase、Hive等多种数据源。

Spark 与 Hadoop 的对比

Hadoop 虽然已成为大数据技术的事实标准,但其本身还存在诸多缺陷,最主要的缺陷是其 MapReduce 计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用于离线批处理的应用场景。

Hadoop存在以下缺点:

  • 表达能力有限。计算都必须要转化成 Map 和 Reduce 两个操作,但这并不适合所有的情况,难以描述复杂的数据处理过程。
  • 磁盘 IO 开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写人到磁盘中,IO 开销较大。
  • 延迟高。一次计算可能需要分解成一系列按顺序执行的 MapReduce 任务,任务之间的衔接由于涉及到 IO 开销,会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,因此难以胜任复杂、多阶段的计算任务。

Spark 在借鉴 Hadoop MapReduce 优点的同时,很好地解决了 MapReduce 所面临的问题。相比于 MapReduce,Spark主要具有如下优点:

  • Spark 的计算模式也属于 MapReduce,但不局限于 Map 和 Reduce 操作,还提供了多种数据集操作类型,编程模型比 MapReduce 更灵活。
  • Spark 提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率。
  • Spark 基于 DAG 的任务调度执行机制,要优于 MapReduce 的迭代执行机制。

Hadoop与Spark的执行流程对比

Spark 最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了 IO 开销,因而 Spark 更适合于迭代运算比较多的数据挖掘与机器学习运算。

使用 Hadoop 进行迭代计算非常耗资源,因为每次迭代都需要从磁盘中写人、读取中间数据,IO 开销大。而 Spark 将数据载人内存后,之后的迭代计算都可以直接使用内存中的中间结果作运算,避免了从磁盘中频繁读取数据。

Hadoop与Spark执行逻辑回归的时间对比

在实际进行开发时,使用 Hadoop 需要编写不少相对底层的代码,不够高效。相对而言,Spark 提供了多种高层次、简洁的 API。通常情况下,对于实现相同功能的应用程序,Hadoop 的代码量要比 Spark 多 2~5 倍。更重要的是,Spark 提供了实时交互式编程反馈,可以方便地验证、调整算法。

尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,主要用于替代 Hadoop 中的 MapReduce 计算模型。实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于 HDFS 实现分布式存储。此外,Hadoop 可以使用廉价的、异构的机器来做分布式存储与计算,但是 Spark 对硬件的要求稍高一些,对内存与 CPU 有一定的要求。

Spark 生态系统

在实际应用中,大数据处理主要包括以下三个类型:

  • 复杂的批量数据处理:时间跨度通常在数十分钟到数小时之间。
  • 基于历史数据的交互式查询:时间跨度通常在数十秒到数分钟之间。
  • 基于实时数据流的数据处理:时间跨度通常在数百毫秒到数秒之间。

目前,已有很多相对成熟的开源软件用于处理以上三种情景。

  • 可以利用 Hadoop MapReduce 来进行批量数据处理;
  • 可以用 Impala 来进行交互式查询(Impala 与 Hive 相似,但底层引擎不同,提供了实时交互式 SQL 查询);
  • 对于流式数据处理可以采用开源流计算框架 Storm。

一些企业可能只会涉及其中部分应用场景,只需部署相应软件即可满足业务需求,但是对于互联网公司而言,通常会同时存在以上三种场景,就需要同时部署三种不同的软件,这样做难免会带来一些问题。

  • 不同场景之间输人输出数据无法做到无缝共享,通常需要进行数据格式的转换。
  • 不同的软件需要不同的开发和维护团队,带来了较高的使用成本。
  • 比较难以对同一个集群中的各个系统进行统一的资源协调和分配。

Spark 的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,既能够提供内存计算框架,也可以支持 SQL 即席查询、实时流式计算、机器学习和图计算等。Spark 可以部署在资源管理器 YARN 之上,提供一站式的大数据解决方案。因此,Spark 所提供的生态系统足以应对上述三种场景,即同时支持批处理、交互式查询和流数据处理。

Spark 生态系统主要包含了 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX 等组件,各个组件的具体功能如下:

  1. Spark Core

    Spark Core 包含 Spark 的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向批数据处理。Spark 建立在统一的抽象 RDD 之上,使其可以以基本一致的方式应对不同的大数据处理场景。

  2. Spark SQL

    Spark SQL 允许开发人员直接处理 RDD,同时也可查询 Hive、HBase 等外部数据源。Spark SQL 的一个重要特点是其能够统一处理关系表和 RDD,使得开发人员不需要自己编写 Spark 应用程序,开发人员可以轻松地使用 SQL 命令进行查询,并进行更复杂的数据分析。

  3. Spark Streaming

    Spark Streaming 支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用 Spark Core 进行快速处理。Spark Streaming 支持多种数据输入源,如 Kafka、Flume 和 TCP 套接字等。

  4. MLlib(机器学习)

    MLlib 提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作。

  5. GraphX(图计算)

    GraphX 是 Spark 中用于图计算的 API,可认为是 Pregel 在 Spark 上的重写及优化,GraphX 性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

需要说明的是,无论是 Spark SQL、Spark Streaming、MLlib 还是 GraphX,都可以使用 Spark Core 的 API 处理问题,它们的方法几乎是通用的,处理的数据也可以共享,不同应用之间的数据可以无缝集成。

Spark的应用场景

Spark 运行架构

基本概念

  • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的英文缩写,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
  • DAG:是 Directed Acyclic Graph(有向无环图)的英文缩写,反映 RDD 之间的依赖关系。
  • Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程
    序存储数据。
  • 应用:用户编写的 Spark 应用程序。
  • 任务:运行在 Executor 上的工作单元。
  • 作业:一个作业包含多个 RDD 及作用于相应 RDD 上的各种操作。
  • 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者
    也被称为“任务集”。

架构设计

Spark 运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(WorkerNode)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是 Spark 自带的资源管理器,也可以是 YARN 或 Mesos 等资源管理框架。

Spark运行架构

与 Hadoop MapReduce 计算框架相比,Spark 所采用的 Executor 有两个优点:

  • 一是利用多线程来执行具体的任务(Hadoop MapReduce 采用的是进程模型),减少任务的启动开销;
  • 二是 Executor 中有一个 BlockManager 存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时就可以直接读该存储模块里的数据,而不需要读写到 HDFS 等文件系统里,因而有效减少了 IO 开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写 IO 性能。

在 Spark 中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动 Executor,并向 Executor 发送应用程序代码和文件,然后在 Executor 上执行任务,运行结束后执行结果会返回给任务控制节点,或者写到 HDFS 或者其他数据库中。

Spark中各种概念之间定的相互关系

Spark 运行基本流程

Spark 运行基本流程如下:

  1. 当一个 Spark 应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个 SparkContext,由 SparkContext 负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext 会向资源管理器注册并申请运行 Executor 的资源。
  2. 资源管理器为 Executor 分配资源,并启动 Executor 进程,Executor 运行情况将随着“心跳”发送到资源管理器上。
  3. SparkContext 根据 RDD 的依赖关系构建 DAG 图,DAG 图提交给 DAG 调度器(DAGScheduler)进行解析,将 DAG 图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor 向 SparkContext 申请任务,任务调度器将任务分发给 Executor 运行,同时 SparkContext 将应用程序代码发放给 Executor。
  4. 任务在 Executor 上运行,把执行结果反馈给任务调度器,然后反馈给 DAG 调度器,运行完毕后写人数据并释放所有资源。

Spark运行基本流程图

Spark 运行架构具有以下特点:

  1. 每个应用都有自己专属的 Executor 进程,并且该进程在应用运行期间一直驻留。Executor 进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠。
  2. Spark 运行过程与资源管理器无关,只要能够获取 Executor 进程并保持通信即可。
  3. Executor 上有一个 BlockManager 存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写人到 HDFS 等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写 IO 性能。
  4. 任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark 采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么在这种情况下是否需要将数据移动到其他的空闲节点上呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么调度就会等待,直到当前节点可用。

RDD 的设计与运行原理

RDD 设计背景

在实际应用中,存在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,目前的 MapReduce 框架都是把中间结果写人到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。虽然类似 Pregel 等图计算框架也是将结果保存在内存当中,但是这些框架只能支持一些特定的计算模式,并没有提供种通用的数据抽象。RDD 就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同 RDD 之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销。

RDD 概念

一个 RDD 就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个 RDD 可以分成多个分区,每个分区就是一个数据集片段,并且一个 RDD 的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD 提供了一种高度受限的共享内存模型,即 RDD 是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建 RDD,或者通过在其他 RDD 上执行确定的转换操作(如 map、join 和 groupBy)而创建得到新的 RDD。

RDD 提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型:

  • 前者用于执行计算并指定输出的形式
  • 后者指定 RDD 之间的相互依赖关系。

两类操作的主要区别是,转换操作(如map、filter、 groupBy、 join等)接受 RDD 并返回 RDD,而行动操作(如 count、collect 等)接受 RDD 但是返回非 RDD(即输出一个值或结果)。

RDD提供的转换接口都非常简单,都是类似 map、filter、 groupBy、 join 等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。因此,RDD 比较适合对于数据集中元素执行相同操作的批处理式应用,而不适合用于需要异步、细粒度状态的应用,比如 Web 应用系统、增量式的网页爬虫等。正因为这样,这种粗粒度转换接口设计,会使人直觉上认为 RDD 的功能很受限、不够强大。但是,实际上 RDD 已经被实践证明可以很好地应用于许多并行计算应用中,可以具备很多现有计算框架(如 MapReduce、SQL、 Pregel 等)的表达能力,并且可以应用于这些框架处理不了的交互式数据挖掘应用。

Spark的转换和行动操作

RDD 特性

  1. 高效的容错性。现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销。在 RDD 的设计中,数据只读,不可修改,如果需要修改数据,必须从父 RDD 转换到子 RDD,由此在不同 RDD 之间建立了血缘关系。所以,RDD 是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,而只需通过 RDD 父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,而且重算过程可以在不同节点之间并行进行,实现了高效的容错。此外,RDD 提供的转换操作都是一些粗粒度的操作(比如 map、filter 和 join),RDD 依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销。
  2. 中间结果持久化到内存。数据在内存中的多个 RDD 操作之间进行传递,不需要“落地”到磁盘上,避免了不必要的读写磁盘开销。
  3. 存放的数据可以是 Java 对象,避免了不必要的对象序列化和反序列化开销。

RDD 之间的依赖关系

RDD 中不同的操作会使得不同 RDD 中的分区产生不同的依赖。RDD 中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)。

RDD的宽依赖和窄依赖

窄依赖表现为一个父 RDD 的分区对应于一个子 RDD 的分区,或多个父 RDD 的分区对应于一个子 RDD 的分区。比如图中,RDD1 是 RDD2 的父 RDD,RDD2 是子 RDD,RDD1 的分区 1 对应于 RDD2 的一个分区(即分区 4);再比如,RDD6 和 RDD7 都是RDD8的父RDD, RDD6 中的分区(分区 15)和 RDD7 中的分区(分区18),两者都对应于 RDD8 中的一个分区(分区 21)。

宽依赖则表现为存在一个父 RDD 的一个分区对应一个子 RDD 的多个分区。比如中,RDD9 是 RDD12 的父 RDD,RDD9 中的分区 24 对应了 RDD12 中的两个分区(即分区 27 和分区 28)。

总体而言,如果父 RDD 的一个分区只被一个子 RDD 的一个分区所使用就是窄依赖,否则就
是宽依赖。窄依赖典型的操作包括 map、filter、 union 等,宽依赖典型的操作包括groupByKey、sortByKey 等。对于连接(Join)操作,可以分为两种情况。

  1. 对输人进行协同划分,属于窄依赖,如图所示。所谓协同划分(Co-partitioned)是指多个父 RDD 的某一分区的所有“键(Key)”落在子 RDD的同一个分区内,不会产生同一个父 RDD 的某一分区落在子 RDD 的两个分区的情况。
  2. 对输人做非协同划分,属于宽依赖,如图所示。

对于窄依赖的 RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合。对于宽依赖的 RDD,则通常伴随着 Shuffle 操作,即首先需要计算好所有父分区数据,然后在节
点之间进行 Shuffle。

Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。因为,RDD 数据集通过“血缘关系”记住了它是如何从其他 RDD 中演变过来的,血缘关系记录的是粗颗粒度的转换操作行为,当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。相对而言,在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可(不需要重新计算所有分区),而且可以并行地在不同节点上进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父 RDD 分区,开销较大。此外,Spark 还提供了数据检查点和记录日志,用于持久化中间 RDD,从而使得在进行失败恢复时不需要追溯到最开始的阶段。在进行故障恢复时,Spark 会对数据检查点开销和重新计算 RDD 分区的开销进行比较,从而自动选择最优的恢复策略。

阶段的划分

Spark 通过分析各个 RDD 的依赖关系生成了 DAG,再通过分析各个 RDD 中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算

RDD阶段划分

例如,根据 RDD 分区的依赖关系划分阶段,假设从 HDFS 中读入数据生成 3 个不同的 RDD(即 A、C 和 E),通过一系列转换操作后再将计算结果保存回 HDFS。对 DAG 进行解析时,在依赖图中进行反向解析,由于从 RDD A到 RDD B 的转换以及从 RDD B 和 RDD F 到 RDD G 的转换都属于宽依赖,因此在宽依赖处断开后可以得到 3 个阶段,即阶段 1、阶段 2和阶段 3。由图可以看出,在阶段 2 中,从 map 到 union 都是窄依赖,这两步操作可以形成一个流水线操作。比如,分区 7 通过 map 操作生成的分区 9,可以不用等待分区 8 到分区 10 这个转换操作的计算结束,而是继续进行 union 操作,转换得到分区 13,这样流水线执行大大提高了计算的效率。

由上述论述可知,把一个 DAG 图划分成多个阶段以后,每个阶段都代表了一组关联的、相互之间没有 Shuffle 依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给 Executor 运行。

RDD 运行过程

  1. 创建 RDD 对象。
  2. SparkContext 负责计算 RDD 之间的依赖关系,构建 DAG。
  3. DAGScheduler 负责把 DAG 图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的 Executor 去执行。

RDD运行过程