1. 结构
分区,只是将数据分区,一个分区对应一个task。
spark是标准的主从结构,在yarn模式下,是由resourceManager负责调度,当一个任务提交的时候,会开启一个Driver,Driver会分配资源,划分任务,再向rm申请节点,节点过来后,在节点中开辟Executor,执行每一块任务。
- Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
Driver 在 Spark 作业执行时主要负责:
➢ 将用户程序转化为作业(job)
➢ 在 Executor 之间调度任务(task)
➢ 跟踪 Executor 的执行情况
➢ 通过 UI 展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关
Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类 - Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用(Application)启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用(Application)的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 有两个核心功能:
➢ 负责运行组成 Spark 应用(Application)的任务,并将结果返回给驱动器进程(Driver)
➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存
数据加速运算
2. 四种部署模式
- 1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
local:只启动一个executor
local[k]:启动k个executor
local[*]:启动跟cpu数目相同的 executor - 2)standalone模式
分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其
他模式的基础。 - 3)Spark on yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。 - 4)Spark On Mesos模式。
官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
(1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark OnMesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
3. RDD(重要)
3.1. RDD概念
rdd是一个弹性的分布式数据集(在代码中,是一个抽象类)
弹性的分布式的数据集
-
弹性:在一定范围内进行变化不影响整体的情况
-
分布式:RDD本身没有分布式概念,里面的数据是分布式存储的
-
数据集:数据的集合
rdd是spark计算的核心,也是计算的瞬时结果。
特点
a. rdd是一个抽象的概念,partition是具体的概念
b. rdd里的数据是不可变的
c. 每个rdd经过一个函数的转换把结果赋给下一个rdd
d. rdd可以并行计算
3.2. RDD五大核心属性
- 分区列表
在创建rdd的时候,可以设置分区(切片),有多少个分区,就产生多少个task,但是并行度最终是由分区数和cpu核数决定的。在通常情况下,并行度是等于分区数的,但是若 cpu核数<分区数 那么并行度就由cpu核数决定。
若不设置分区,则用默认值,若默认值不存在,则用cpu最大核数为分区数 - 分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算 - RDD之间的依赖关系
多个RDD之间可以形成依赖关系 - 分区器(分区的规则,可能有可能没有)
- 首选为位置
3.3. 分区规则
textFile的底层调用的就是mr的TextInputFormat
这里是按照偏移量进行分区的,比如word.txt的大小为18个字节,那么18/2 = 9,每一个分区的偏移量就是9.
分区1:[0,9]
分区2:[9,18]
3.4. RDD算子
一个元素走完所有的算子,再开始第二个元素,这样的效率很低,每一次都要从磁盘读取,所以需要一个类似缓冲区的操作,mapPartions,会把一个分区的元素全部读到内存后,再进行操作
转换算子:对RDD中的数据进行操作
行动算子:触发任务执行,底层调用的是runJob方法,运行有向无环图
3.5 RDD序列化
3.6. RDD持久化策略
存磁盘不一定比重算好,
按照spark的思想,
计算速度很快,
即使重新计算也比
存入磁盘要快。
-
持久化
在数据库中,把数据写入表里的过程。
在spark中,把内存中的RDD临时永久存储起来的过程 -
容错机制
如果数据在内存中丢失,向父级依赖查找数据,直到找到数据为止,最坏的情况是找到hdfs重新获取数据重新计算,自动把结果补到原来持久化的位置。 -
用法:
创建持久化:
就是转换算子,直接调用就可以
cache() 完全等价于 persist(),使用默认策略持久化
如果想改变策略,使用persist(策略)指定持久化策略 -
删除持久化:
a.spark本身根据最近最少使用原则,自动取消持久化
b.手动调用unpersit算子取消持久化
从持久化的rdd中移除当前rdd,
并把它的策略置为None -
策略
两组对比- MEMORY_ONLY 和 MEMORY_ONLY_SER
带SER的节省一定的空间 - MEMORY_ONLY 和 MEMORY_AND_DISK
MEMORY_ONLY:先存内存,如果存不下就不存了。
读取的时候没存的部分重新读数据重新计算
MEMORY_AND_DISK:先存内存,如果存不下存磁盘。
读取的时候先读内存,再读磁盘中的数据
即使重新读取数据重新计算,
速度也比多做一次磁盘IO速度要快。
持久化策略的选择
a.没有特殊情况,就使用默认的存储等级
发挥出最大的CPU性能且计算速度尽可能快
b.如果内存不太够用,可以使用SER进行使用
节省一定的内存空间,速度也会很快。
c.如果rdd的计算量非常复杂,可以选用Disk
d.为了快速容错,可以选用带副本的策略
可以避免容错的时候重新计算的时间耗费
- MEMORY_ONLY 和 MEMORY_ONLY_SER
-
checkPoint
检查点
也是一个很普通的算子,直接调用就可以
首先sc.setCheckPointDir()设置检查点路径
对某个RDD设置检查点,把所有父级依赖全部删除
把当前状态存在这个检查点路径下,一般为hdfs。
4. spark执行流程以及原理
-
窄依赖(narrow dependency)
相当于map操作
父级RDD里的每个partition都对应子级RDD里的唯一一个partition的依赖关系 -
宽依赖(shuffle dependency)
相当于reduce操作
父级RDD里的每个partition都对应子级RDD里的多个partition的依赖关系
宽依赖分区会有所变化,所以一定执行shuffle操作,必执行磁盘IO操作
任务切割是由SparkContext完成的
一个分区对应的一个task,一条线就是一个task
分解task的工作是由sc来执行的,不过sc存在于driver中,所以我们也可以理解为是由driver来执行的 -
standlone模式运行
-
on yarn模式运行
container是一个虚拟的资源,是用来执行任务的。
1个Executor可以有多个线程,同一时间内,一个线程只能执行一个task -
spark任务执行大致可以分为三个阶段
- 任务切割(分配)
- 资源分配
- 任务执行
5. shuffle机制
减少数据落盘,就会提高效率,算子若存在预聚合功能,那么就可以提高shuffle的性能
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。
5.1. Hash shuffle
5.1.1. 普通机制
每一个task都会产生reduce端分区数量的小文件,影响性能。
小文件数量 = task数量 * reduce分区数量
5.1.2. 合并机制
一个Executor中的所有task共用文件
小文件数量 = Excutor数量 * reduce分区数量
5.2. Sort shuffle
为了尽可能避免小文件过多,sort shuffle采用的机制是
一个文件 + 一个索引文件
5.2.1. 普通机制
在上图的基础上,还有一个缓冲区
5.2.2. bypasss机制
- bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
2)不是聚合类的shuffle算子(比如reduceByKey)。 - 与普通机制的区别:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
6. 广播变量和累加器
算子的函数中的临时变量会在每一个task中复制一次,很浪费资源。
- spark封装了三大数据结构:
RDD:弹性的分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
我们先来看一段代码
6.1. 累加器
6.2. 广播变量
我们刚刚说到了,每一个人task在使用driver中的变量的时候,都会拷贝一份,要是driver的那个变量值很大呢?在每一个task中拷贝一份,那会很消耗内存。
解决方式:Executor其实就是一个JVM进程,在启动的时候会分配内存,那么我们可以在一个Excutor里放一个副本,每一个task都共享当前Executor里的副本数据。
- 这个副本就是广播变量
7. sparkSQL
- spark中处理结构化数据的组件
计算过程是DataSet和DataFrame之间的转换
DS/DF可以创建出来,也可以由其他DS/DF转换而来
DS:数据的集合
DF:在DS基础上条件了 schema