淘先锋技术网

首页 1 2 3 4 5 6 7

1. 结构

分区,只是将数据分区,一个分区对应一个task。
spark是标准的主从结构,在yarn模式下,是由resourceManager负责调度,当一个任务提交的时候,会开启一个Driver,Driver会分配资源,划分任务,再向rm申请节点,节点过来后,在节点中开辟Executor,执行每一块任务。

  • Driver
    Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
    Driver 在 Spark 作业执行时主要负责:
    ➢ 将用户程序转化为作业(job)
    ➢ 在 Executor 之间调度任务(task)
    ➢ 跟踪 Executor 的执行情况
    ➢ 通过 UI 展示查询运行情况
    实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关
    Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类
  • Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用(Application)启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用(Application)的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
    Executor 有两个核心功能:
    ➢ 负责运行组成 Spark 应用(Application)的任务,并将结果返回给驱动器进程(Driver)
    ➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存
    数据加速运算

2. 四种部署模式

  • 1)本地模式
    Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
    local:只启动一个executor
    local[k]:启动k个executor
    local[*]:启动跟cpu数目相同的 executor
  • 2)standalone模式
    分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其
    他模式的基础。
  • 3)Spark on yarn模式
    分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。
  • 4)Spark On Mesos模式。
    官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
    (1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
    (2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark OnMesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。

3. RDD(重要)

3.1. RDD概念

rdd是一个弹性的分布式数据集(在代码中,是一个抽象类)
弹性的分布式的数据集

  • 弹性:在一定范围内进行变化不影响整体的情况
    在这里插入图片描述

  • 分布式:RDD本身没有分布式概念,里面的数据是分布式存储的

  • 数据集:数据的集合
    rdd是spark计算的核心,也是计算的瞬时结果。

特点
a. rdd是一个抽象的概念,partition是具体的概念
b. rdd里的数据是不可变的
c. 每个rdd经过一个函数的转换把结果赋给下一个rdd
d. rdd可以并行计算

3.2. RDD五大核心属性

  1. 分区列表
    在这里插入图片描述
    在创建rdd的时候,可以设置分区(切片),有多少个分区,就产生多少个task,但是并行度最终是由分区数和cpu核数决定的。在通常情况下,并行度是等于分区数的,但是若 cpu核数<分区数 那么并行度就由cpu核数决定。
    若不设置分区,则用默认值,若默认值不存在,则用cpu最大核数为分区数
  2. 分区计算函数
    Spark 在计算时,是使用分区函数对每一个分区进行计算
  3. RDD之间的依赖关系
    多个RDD之间可以形成依赖关系
  4. 分区器(分区的规则,可能有可能没有)
  5. 首选为位置在这里插入图片描述

3.3. 分区规则

在这里插入图片描述
textFile的底层调用的就是mr的TextInputFormat
这里是按照偏移量进行分区的,比如word.txt的大小为18个字节,那么18/2 = 9,每一个分区的偏移量就是9.
分区1:[0,9]
分区2:[9,18]

3.4. RDD算子

在这里插入图片描述
一个元素走完所有的算子,再开始第二个元素,这样的效率很低,每一次都要从磁盘读取,所以需要一个类似缓冲区的操作,mapPartions,会把一个分区的元素全部读到内存后,再进行操作
转换算子:对RDD中的数据进行操作
行动算子:触发任务执行,底层调用的是runJob方法,运行有向无环图

3.5 RDD序列化

在这里插入图片描述

3.6. RDD持久化策略

存磁盘不一定比重算好,
按照spark的思想,
计算速度很快,
即使重新计算也比
存入磁盘要快。

  • 持久化
    在数据库中,把数据写入表里的过程。
    在spark中,把内存中的RDD临时永久存储起来的过程

  • 容错机制
    如果数据在内存中丢失,向父级依赖查找数据,直到找到数据为止,最坏的情况是找到hdfs重新获取数据重新计算,自动把结果补到原来持久化的位置。

  • 用法:
    创建持久化:
    就是转换算子,直接调用就可以
    cache() 完全等价于 persist(),使用默认策略持久化
    如果想改变策略,使用persist(策略)指定持久化策略

  • 删除持久化:
    a.spark本身根据最近最少使用原则,自动取消持久化
    b.手动调用unpersit算子取消持久化
    从持久化的rdd中移除当前rdd,
    并把它的策略置为None

  • 策略
    两组对比

    1. MEMORY_ONLY 和 MEMORY_ONLY_SER
      带SER的节省一定的空间
    2. MEMORY_ONLY 和 MEMORY_AND_DISK
      MEMORY_ONLY:先存内存,如果存不下就不存了。
      读取的时候没存的部分重新读数据重新计算
      MEMORY_AND_DISK:先存内存,如果存不下存磁盘。
      读取的时候先读内存,再读磁盘中的数据
      即使重新读取数据重新计算,
      速度也比多做一次磁盘IO速度要快。
      持久化策略的选择
      a.没有特殊情况,就使用默认的存储等级
      发挥出最大的CPU性能且计算速度尽可能快
      b.如果内存不太够用,可以使用SER进行使用
      节省一定的内存空间,速度也会很快。
      c.如果rdd的计算量非常复杂,可以选用Disk
      d.为了快速容错,可以选用带副本的策略
      可以避免容错的时候重新计算的时间耗费
  • checkPoint
    检查点
    也是一个很普通的算子,直接调用就可以
    首先sc.setCheckPointDir()设置检查点路径
    对某个RDD设置检查点,把所有父级依赖全部删除
    把当前状态存在这个检查点路径下,一般为hdfs。

4. spark执行流程以及原理

  • 窄依赖(narrow dependency)
    相当于map操作
    父级RDD里的每个partition都对应子级RDD里的唯一一个partition的依赖关系

  • 宽依赖(shuffle dependency)
    相当于reduce操作
    父级RDD里的每个partition都对应子级RDD里的多个partition的依赖关系
    宽依赖分区会有所变化,所以一定执行shuffle操作,必执行磁盘IO操作
    在这里插入图片描述
    任务切割是由SparkContext完成的
    在这里插入图片描述

    一个分区对应的一个task,一条线就是一个task
    在这里插入图片描述
    分解task的工作是由sc来执行的,不过sc存在于driver中,所以我们也可以理解为是由driver来执行的

  • standlone模式运行
    在这里插入图片描述

  • on yarn模式运行
    在这里插入图片描述
    container是一个虚拟的资源,是用来执行任务的。
    1个Executor可以有多个线程,同一时间内,一个线程只能执行一个task

  • spark任务执行大致可以分为三个阶段

  1. 任务切割(分配)
  2. 资源分配
  3. 任务执行

5. shuffle机制

在这里插入图片描述
减少数据落盘,就会提高效率,算子若存在预聚合功能,那么就可以提高shuffle的性能

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。

5.1. Hash shuffle

5.1.1. 普通机制

每一个task都会产生reduce端分区数量的小文件,影响性能。
小文件数量 = task数量 * reduce分区数量
在这里插入图片描述

5.1.2. 合并机制

一个Executor中的所有task共用文件
小文件数量 = Excutor数量 * reduce分区数量
在这里插入图片描述

5.2. Sort shuffle

为了尽可能避免小文件过多,sort shuffle采用的机制是
一个文件 + 一个索引文件
在这里插入图片描述

5.2.1. 普通机制

在上图的基础上,还有一个缓冲区
在这里插入图片描述

5.2.2. bypasss机制

在这里插入图片描述

  • bypass运行机制的触发条件如下:
    1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    2)不是聚合类的shuffle算子(比如reduceByKey)。
  • 与普通机制的区别:
    第一,磁盘写机制不同;
    第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

6. 广播变量和累加器

算子的函数中的临时变量会在每一个task中复制一次,很浪费资源。

  • spark封装了三大数据结构:
    RDD:弹性的分布式数据集
    累加器:分布式共享只写变量
    广播变量:分布式共享只读变量

我们先来看一段代码
在这里插入图片描述
在这里插入图片描述

6.1. 累加器

在这里插入图片描述

6.2. 广播变量

我们刚刚说到了,每一个人task在使用driver中的变量的时候,都会拷贝一份,要是driver的那个变量值很大呢?在每一个task中拷贝一份,那会很消耗内存。
解决方式:Executor其实就是一个JVM进程,在启动的时候会分配内存,那么我们可以在一个Excutor里放一个副本,每一个task都共享当前Executor里的副本数据。

  • 这个副本就是广播变量

7. sparkSQL

  • spark中处理结构化数据的组件
    计算过程是DataSet和DataFrame之间的转换
    DS/DF可以创建出来,也可以由其他DS/DF转换而来
    DS:数据的集合
    DF:在DS基础上条件了 schema

7.1. 使用

在这里插入图片描述

3.数据分析

在这里插入图片描述