01 导读
在这一系列的文章中我将带大家探寻在Yarn-Cluster模式下,Spark程序是怎么执行的。文章将分为多篇,根据个人时间安排来写后续篇幅。在第一篇,我主要讲整体流程和提交程序。Spark框架自身是带有资源调度功能的,采用的是经典的Master-Slaves架构,包括负责资源分配和调度的Master和具体执行处理和计算进程的Slave。
资源调度不是Spark框架的强项,在实际工作中,Yarn作为Hadoop框架的组件,经常用来做集群资源管理,Spark源码中有专门的包spark.deploy.yarn,就是用来连接Spark框架和Yarn的。在我的《搭建Spark on Yarn集群碰到的那些坑》文章中就介绍了搭建Spark-Yarn集群遇到的问题和解决办法。
Spark的英文原意是星火的意思,彰显出活力四射的生命力。Yarn的本意是纱线、纺线,能把原本一团乱的线绳梳理得清晰有序。我们向Yarn集群提交Spark计算程序后,各节点是如何通力协作完成作业的,中间发生了什么神奇的事儿,该系列文章将带您一探究竟。
02 整体流程
学习一门技术,首先要对它有个全局的认识,这非常关键。我们往往一上来就陷入了细节当中,容易出不来,经常就是从入门到放弃。在尚硅谷Spark教程(B站搜Spark)的基础上,我重新画了一张图:
当您看到这张图,先别着急,不理解每个方框里的内容没关系,关键是要明白两件事:1)Spark程序执行分为两个并行又互相有关联的主线,其中一条主线是用来准备环境的,另一条主线是执行应用程序的;2)这两条主线分别涉及到Yarn资源管理器和Spark框架,两者之间的桥梁就是ApplicationMaster(AM)。
在Spark集群多个节点协同完成同一个用户程序的计算作业时,Spark的Driver负责执行程序的Main方法,将程序转化为job,在executor之间调度任务task,跟踪executor的执行情况。不同的executor负责执行具体的任务task。Yarn不参与用户程序的执行,它提供资源管理和分配。其中ResourceManager(RM)负责管理整个集群的资源,NodeManger(NM)则负责节点进程的管理。
我还是拿足球来比喻,对于某个足球俱乐部而言,管理和保障部门相当于Yarn,足球队相当于Spark。在俱乐部,管理层相当于RM,各种保障部门相当于NM。在球队,主教练的角色相当于Driver,每位上场比赛的球员相当于Executor。俱乐部收到赛事指令,启动比赛安排。在比赛前,主教练要根据比赛目标和要求向俱乐部申请各种资源保障,俱乐部也会根据主教练的申请和实际情况,给比赛分配合适的资源。这就相当于Driver创建SparkContext并进行属性的初始化,然后拿着相关属性参数去找RM申请资源。临赛前,主教练就会根据赛前部署(这相当于用户写好的程序),发布首发名单和阵型,指定每个位置上球员的角色和需要执行的具体任务,这就相当于Driver将整个程序划分为任务集taskset,并启动executor进程及executor计算对象。
比赛开始后,主教练要监控每个球员的表现,对比赛进行调度和指挥。当某个球员出问题了,不能继续比赛,就要换人。这就相当于Driver对Executor的执行情况进行跟踪,如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
03 提交程序
我们向集群提交如下命令:
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.1.jar \
10
在spark-submit指令中封装了"java org.apache.spark.deploy.SparkSubmit",相当于启动1个JVM(java虚拟机),然后在JVM中启动一个SparkSubmit进程,接着执行SparkSubmit中的Main方法。所以,我们打开org.apache.spark.deploy包中的SparkSubmit.scala程序文件,来看看源码:
这个Main方法总共也就33行,关键是在于第1016行的"submit.doSubmit(agrs)",而这句又实际调用了上面1005行的doSubmit,然后是"super.doSubmit(args)",点进去看:
首先进行了参数的解析,即第85行,然后是第90行,这里用了模式匹配,当程序参数appArgs的action字段为SparkSubmitAction.SUBMIT时,执行"submit(appArgs, uninitLog)",再点开:
这段代码的关键是第180行"runMain(args, uninitLog)",再点进去
第871行,字面意思准备提交环境,入参是args,args就是指令中经过解析的--class、--master、--deploy-mode,返回值有4个值,最关键的是childMainClass。点进prepareSubmitEnvironment,来到这个函数体:
这个函数开头第228行,声明了一个变量childMainClass,在下面的代码中给它赋值了,在Yarn集群模式下
点开"YARN_CLUSTER_SUBIT_CLASS":
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
绕了半天,原来这个childMainClass的名字就是YarnClusterApplication。我们再回到runMain这个函数:
上图标红框的代码很关键,首先根据childMainClass反射获取类的信息,得到mainClass,此处用到了scala的反射机制;然后根据类信息实例化一个新的对象,这里根据mainClass是否继承于SparkApplication,如果是,则用mainClass创建一个SparkApplication实例,这个实例的名字就叫app,然后调用app的start方法:
到此为止,所有的源码都是在org.apache.spark.deploy包下的SparkSubmit.scala中的,第二篇要看看包含YarnClusterApplication类的Client.scala源码文件。在此之前,我作个小结:在应用程序提交给集群后,1)开启SparkSubmit进程;2)完成命令行参数的解析;3)将参数传递给runMain;4)准备提交环境,获取一个继承于SparkApplication的app实例;5)最后运行start方法。
此时,所有的程序都是在外部运行,还没有真正涉及到Yarn和Spark。用足球比赛的例子说一下,相当于主办方发布了比赛安排,在哪比赛、什么时候比赛、对手是谁等等,俱乐部收到比赛安排后对赛事进行了一系列解读,然后开始准备资源和比赛。
好了,第一篇先讲到这,下一篇将聊聊SparkSubmit进程是如何去RM那申请资源并准备程序运行环境的。