molyeo 阅读(18) 评论(0)

本篇主要讲解spark运行架构,包含如下内容:

Spark运行架构

基本概念

  • Application
    Spark的应用程序,包含一个Driver program和若干Executor
  • SparkContext
    Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node的Executor
  • Driver Program
    运行Application的main()函数并且创建SparkContext
  • Executor
  • 为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都会申请各自的Executor来处理任务
  • Cluster Manager
    在集群上获取资源的外部服务(例如Standalone,Mesos,Yarn)
  • Worker Node
    集群中任何可以运行Application代码的节点,运行一个或者多个Executor进程
  • Task
    运行在Executor上的工作单元
  • Job
    SparkContext提交的具体Action操作,常和Action对应
  • Stage
    每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet
  • RDD
    是Resilient distributed datasets的简称,中文为弹性分布式数据集,是Spark最核心的模块和类
  • DAGScheduler
    根据Job构建基本Stage的DAG,并提交Stage给TaskScheduler
  • TaskScheduler
    将TaskSet提交给Worker node集群运行并返回结果
  • Transformations
    是Spark API的一种类型,Transformation返回值还是一个RDD,所有的Transformation采用的都是懒策略,如果只是将Transformation的提交时不会执行计算的
  • Action
    是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合,计算只有在Action被提交的时候计算才被触发

    架构设计

    Standalone模式

    Spark Standalone模式的集群由Master与Worker节点组成,程序通过与Master节点交互申请资源,Worker节点启动Executor运行

  • Driver部分
    Driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。
  • Executor部分
    Spark应用程序的Executor部分是对数据的处理,包含原生数据,RDD,以及共享变量

    spark on Mesos模式

    Mesos是Apache下的开源分布式资源管理框架。由于血缘关系,Spark官方推荐这种模式,很多公司也采用该模式。Spark On Mesos模式参考:http://dongxicheng.org/apache-mesos/meso-architecture/

    Spark on YARN模式

    Spark on YARN框架解析:

  • 基于YARN的Spark作业首先由Spark客户端生成作业信息,提交给ResourceManager,ResourceManager在某一个NodeManager汇报时把AppMaster分配给该NodeManager;
  • 该NodeManager启动Spark AppMaster,Spark AppMaster 启动后初始化作业,然后向ResourceManager申请资源;
  • 申请到相应资源后,Spark AppMaster通过RPC让对应若干个NodeManager启动相应的 Spark Executor,Spark Executor向Spark AppMaster汇报完成相应的任务。
  • Spark客户端会通过Spark AppMaster获取作业运行状态。

    Client模式

    在Spark on YARN/Mesos模式中,根据Spark Application的Driver是否在集群中运行,Spark on YARN/Mesos运行模式又可以分为Client模式和Cluster模式。

Cluster模式

Spark运行基本流程

Spark集群由若干节点构成,而每个节点都是可以接受和发送消息的RPC服务端点(Endpoint),根据责任的不同可以分为三类端点:Client、Master、Worker,三端调用关系:

  • Client:Spark任务节点,负责发布、查询,以及终止任务的相关进程的统称,Client是运行时有效进程,如果任务处理完成,对应的进程结束
  • Master:Spark管理节点,负责Worker进程,协调调度Client发布的任务处理,Master节点可以为多个,有且仅有一个为Active状态,其他为Standby状态
  • Worker节点:Spark工作节点,负责具体任务的执行工作
    基于三端模型,用户通过spark-submit提交或者运行spark-shell REPL,集群创建Driver,Driver加载Application,最后Application根据用户代码转化成RDD,RDD分解为Tasks,Executor执行Task等,整体交互蓝图如下:

1.Client运行时向Master发送启动驱动请求(ResquestSubmitDriver指令)
2.Master调度可用Worker资源进行驱动安装(发送LaunchDriver指令)
3.Worker运行DriverRunner进行驱动加载,并向Master发送应用注册请求(发送RegisterApplication指令)
4.Master调度可用Worker资源进行应用的Executor安装(发送LaunchExecutor指令)
5.Executor安装完毕后向Driver注册驱动可用的Executor资源(发送RegisterExecutor指令)
6.运行用户代码时,通过DAGScheduler,TaskScheduler封装成可以执行的TaskSetManager对象
7.TaskSetManager对象与Driver中的Executor资源进行匹配,在队形的Executor中发布任务(发送LaunchTask指令)
8.TaskRunner执行完毕后,调用DriverRunner提交给DAGScheduler,循环7,执行任务完成

Spark核心概念

RDD

弹性分布式数据集(Resilient Distributed Data,RDD)作为Spark的编程模型,相对于MapReduce模型有更好的扩展和延伸:

  • 提供了抽象层次更高的API
  • 高效的数据共享
  • 高效的容错性

RDD的操作类型

RDD大致可以包括如下四种操作类型:

  • 创建操作(Creation):从内存集合和外部存储系统创建RDD,或者通过转换操作生成RDD
  • 转换操作(Transformation):转换操作是惰性操作,只是定义一个RDD并记录依赖关系,没有立即执行
  • 控制操作(Control):进行RDD的持久化,通过设定不同级别对RDD进行缓存
  • 行动操作(Action):触发任务提交、Spark运行的操作,操作的结果是获取到结果集或者保存到外部存储系统

    RDD的实现

    RDD 的分区

    RDD的分区是一个逻辑概念,转换操作前后的分区在物理上可能是同一块内存或者存储。在RDD操作中用户可以设定和获取分区数目,默认分区数目为该程序所分配到的cpu核数,如果是从HDFS文件创建,默认为文件的

分片数。

RDD 的“血统”和依赖关系

“血统”和依赖关系:RDD 的容错机制是通过记录更新来实现的,且记录的是粗粒度的转换操作。我们将记录的信息称为血统(Lineage)关系,而到了源码级别,Apache Spark 记录的则是 RDD 之间的依赖(Dependency)关系。如上所示,每次转换操作产生一个新的RDD(子RDD),子RDD会记录其父RDD的信息以及相关的依赖关系。

依赖关系

依赖关系划分为两种:窄依赖(Narrow Dependency)和 宽依赖(源码中为Shuffle Dependency)。

窄依赖指的是父 RDD 中的一个分区最多只会被子 RDD 中的一个分区使用,意味着父RDD的一个分区内的数据是不能被分割的,子RDD的任务可以跟父RDD在同一个Executor一起执行,不需要经过 Shuffle 阶段去重组数据。
窄依赖包括两种:一对一依赖(OneToOneDependency)和范围依赖(RangeDependency)
一对一依赖:

范围依赖(仅union方法):

宽依赖指的是父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的所有分区,因此宽依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。
宽依赖只有一种:Shuffle依赖(ShuffleDependency)

Transformations / Actions

RDDs support 两种类型的操作: transformations(转换), 它会在一个已存在的 dataset 上创建一个新的 dataset, 和 actions(动作), 将在 dataset 上运行的计算后返回到 driver 程序. 例如, map 是一个通过让每个数据集元素都执行一个函数,并返回的新 RDD 结果的 transformation, reduce reduce 通过执行一些函数,聚合 RDD 中所有元素,并将最终结果给返回驱动程序(虽然也有一个并行 reduceByKey 返回一个分布式数据集)的 action.

Spark 中所有的 transformations 都是 lazy(懒加载的), 因此它不会立刻计算出结果. 相反, 他们只记得应用于一些基本数据集的转换 (例如. 文件). 只有当需要返回结果给驱动程序时,transformations 才开始计算. 这种设计使 Spark 的运行更高效. 例如, 我们可以了解到,map 所创建的数据集将被用在 reduce 中,并且只有 reduce 的计算结果返回给驱动程序,而不是映射一个更大的数据集.

默认情况下,每次你在 RDD 运行一个 action 的时, 每个 transformed RDD 都会被重新计算。但是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。

Jobs / Stage

作业执行原理

作业(Job):RDD每一个行动操作都会生成一个或者多个调度阶段。
调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage包含多个任务集(TaskSet),TaskSet的数量与分区数相同。
任务(Task):分发到Executor上的工作任务,是Spark的最小执行单元。
DAGScheduler:DAGScheduler是面向调度阶段的任务调度器,负责划分调度阶段并提交给TaskScheduler。
TaskScheduler:TaskScheduler是面向任务的调度器,它负责将任务分发到Woker节点,由Executor进行执行。

作业提交和作业调度策略

每一次行动操作都会触发SparkContext的runJob方法进行作业的提交。
这些作业之间可以没有任何依赖关系,对于多个作业之间的调度,共有两种:一种是默认的FIFO模式,另一种则是FAIR模式,该模式的调度可以通过设定minShare(最小任务数)和weight(任务的权重)来决定Job执行的优先级。
FIFO调度策略:优先比较作业优先级(作业编号越小优先级越高),再比较调度阶段优先级(调度阶段编号越小优先级越高)。
FAIR调度策略:先获取两个调度的饥饿程度,是否处于饥饿状态由当前正在运行的任务是否小于最小任务决定,获取后进行如下比较:

  • 优先满足处于饥饿状态的调度
  • 同处于饥饿状态,优先满足资源比小的调度
  • 同处于非饥饿状态,优先满足权重比小的调度
  • 以上情况均相同的情况下,根据调度名称进行排序 

    划分调度阶段(DAG构建)

DAG的构建:主要是通过对最后一个RDD进行递归,使用广度优先遍历每个RDD跟父RDD的依赖关系(前面提到子RDD会记录依赖关系),碰到ShuffleDependency的则进行切割。切割后形成TaskSet传递给TaskScheduler进行执行。
DAG的作用:让窄依赖的RDD操作合并为同一个TaskSet,将多个任务进行合并,有利于任务执行效率的提高。
TaskSet结构图:假设数据有两个Partition时,TaskSet是一组关联的,但相互之间没有Shuffle依赖关系的Task集合,TaskSet的ShuffleMapStage数量跟Partition个数相关,主要包含task的集合,stage中的rdd信息等等。Task会被序列化和压缩 

Shuffle

概念

Spark里的某些操作会触发shuffle。shuffle是Spark重新分配数据的一种机制,使得这些数据可以跨不同的区域进行分组,通常涉及在executors和机器之间拷贝数据,使得shuffle成为一个复杂的、代价高的操作。

实例说明

为了明白 reduceByKey 操作的过程,我们以 reduceByKey 为例。

  1. 数据处理:文件在hdfs中以多个切片形式存储,读取时每一个切片会被分配给一个Excutor进行处理;

  2. map端操作:map端对文件数据进行处理,格式化为(key,value)键值对,每个map都可能包含a,b,c,d等多个字母,如果在map端使用了combiner,则数据会被压缩,value值会被合并;(注意:这个过程的使用需要保证对最终结果没有影响,有利于减少shuffle过程的数据传输);
  3. reduce端操作:reduce过程中,假设a和b,c和d在同一个reduce端,需要将map端被分配在同一个reduce端的数据进行洗牌合并,这个过程被称之为shuffle。

reduceByKey(func, [numTasks]) :在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的。
reduceBykey 操作产生一个新的 RDD,其中 key 所有相同的的值组合成为一个 tuple - key 以及与 key 相关联的所有值在 reduce 函数上的执行结果。面临的挑战是,一个 key 的所有值不一定都在一个同一个 paritition 分区里,甚至是不一定在同一台机器里,但是它们必须共同被计算。
在 spark 里,特定的操作需要数据不跨分区分布。在计算期间,一个任务在一个分区上执行,为了所有数据都在单个 reduceByKey 的 reduce 任务上运行,我们需要执行一个 all-to-all 操作。它必须从所有分区读取所有的 key 和 key对应的所有的值,并且跨分区聚集去计算每个 key 的结果 - 这个过程就叫做 shuffle。
尽管每个分区新 shuffle 的数据集将是确定的,分区本身的顺序也是这样,但是这些数据的顺序是不确定的。如果希望 shuffle 后的数据是有序的,可以使用:

  • mapPartitions 对每个 partition 分区进行排序,例如sorted
  • repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
  • sortBy 对 RDD 进行全局的排序

触发的 shuffle 操作包括 repartition 操作,如 repartition 和 coalesce, ‘ByKey 操作 (除了 counting 之外) 像 groupByKey 和 reduceByKey, 和 join 操作, 像 cogroup 和 join.

性能影响

shuffle 是一个代价比较高的操作,它涉及磁盘 I/O、数据序列化、网络 I/O。为了准备 shuffle 操作的数据,Spark 启动了一系列的任务,map 任务组织数据,reduce 完成数据的聚合。
在内部,一个 map 任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在 reduce 时,任务将读取相关的已排序的数据块。
某些 shuffle 操作会大量消耗堆内存空间,因为 shuffle 操作在数据转换前后,需要在使用内存中的数据结构对数据进行组织。需要特别说明的是,reduceByKey 和 aggregateByKey 在 map 时会创建这些数据结构,'ByKey 操作在 reduce 时创建这些数据结构。当内存满的时候,Spark 会把溢出的数据存到磁盘上,这将导致额外的磁盘 I/O 开销和垃圾回收开销的增加。
shuffle 操作还会在磁盘上生成大量的中间文件。在 Spark 1.3 中,这些文件将会保留至对应的 RDD 不在使用并被垃圾回收为止。这么做的好处是,如果在 Spark 重新计算 RDD 的血统关系(lineage)时,shuffle 操作产生的这些中间文件不需要重新创建。如果 Spark 应用长期保持对 RDD 的引用,或者垃圾回收不频繁,这将导致垃圾回收的周期比较长。这意味着,长期运行 Spark 任务可能会消耗大量的磁盘空间。临时数据存储路径可以通过 SparkContext 中设置参数 spark.local.dir 进行配置。

Cache

Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的 storage level 存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象 (Scala, Java, Python) 给 persist() 方法进行设置。cache() 方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下:

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据.这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.

存储级别选择

Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:

  • 如果您的 RDD 适合于默认存储级别 (MEMORY_ONLY),那就这样. 这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行.
  • 如果不是, 试着使用 MEMORY_ONLY_SER 和选择快速的序列化库以使对象更加节省空间,但仍然能够快速访问。 (Java和Scala)
  • 不要溢出到磁盘,除非计算您的数据集的函数是昂贵的, 或者它们过滤大量的数据. 否则, 重新计算分区可能与从磁盘读取分区一样快.
  • 如果需要快速故障恢复,请使用复制的存储级别 (e.g. 如果使用Spark来服务 来自网络应用程序的请求). All存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.