淘先锋技术网

首页 1 2 3 4 5 6 7

前言

目前我们的业务需要多个spark job串行一起执行,每个spark job所需的参数配置各不相同;之所以分开多个spark job,是我们想保留每个任务独立执行的能力,提供独立的服务能力,又想在任务需要时,将多个任务作为一个执行链条串行执行;这里主要介绍我在任务串行时想通过创建多个spark context来实现不同参数配置的实践,资源管理是yarn。

实验过程

测试代码

SparkSession sparkSession =  SparkSession.builder()//.master("local[1]")
.config("spark.driver.allowMultipleContexts", true)
.config("spark.executor.cores", 4)
.appName("Java Spark SQL basic example").getOrCreate();

sparkSession.sparkContext().stop();

SparkSession newSession = sparkSession.newSession();
newSession.conf().set("spark.executor.cores","2");
sparkSession.setDefaultSession(newSession);

结果:

[2018-07-11 09:39:29.879] [ERROR] [main] [org.apache.spark.deploy.yarn.ApplicationMaster] >>> Uncaught exception: 
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:766)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
	at com.vip.spark.api.SparkSimpleTest01.main(SparkSimpleTest01.java:41)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

spark context 已停止,囧...

查看源码,说要创建一个新的context就要先停掉旧的啊...

/**
 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
 *
 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before
 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
 *
 * @param config a Spark Config object describing the application configuration. Any settings in
 *   this config overrides the default configs as well as system properties.
 */
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

  // The call site where this SparkContext was constructed.
  private val creationSite: CallSite = Utils.getCallSite()

  // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)

... 省了n行代码

无解,无法删除旧context就无法创建新的配置的context,这就意味着第一个context 在yarn里申请的资源将会是所有任务的共享资源,并且无法改变...

再看一下spark on yarn的资源申请过程,确实是这样,第一个container就是spark context的配置,后续的container都在这个基础上向resource manager 申请,如果不杀掉这个application master的container,就无法重现申请新的container,而一旦使用sparkContext.stop 方法杀掉Context,则整个应用会死掉,囧.... spark on yarn cluster 模式

看到这里,我还不死心,想在spark Context 源码上看有没有入口可以切换不同的context,或者让多个context并存

 /**
   * Called to ensure that no other SparkContext is running in this JVM.
   *
   * Throws an exception if a running context is detected and logs a warning if another thread is
   * constructing a SparkContext.  This warning is necessary because the current locking scheme
   * prevents us from reliably distinguishing between cases where another context is being
   * constructed and cases where another constructor threw an exception.
   */
  private def assertNoOtherContextIsRunning(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
          val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
            " To ignore this error, set spark.driver.allowMultipleContexts = true. " +
            s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
          val exception = new SparkException(errMsg)
          if (allowMultipleContexts) {
            logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
          } else {
            throw exception
          }
        }

      contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
        // Since otherContext might point to a partially-constructed context, guard against
        // its creationSite field being null:
        val otherContextCreationSite =
          Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
        val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
          " constructor).  This may indicate an error, since only one SparkContext may be" +
          " running in this JVM (see SPARK-2243)." +
          s" The other SparkContext was created at:\n$otherContextCreationSite"
        logWarning(warnMsg)
      }
    }
  }

看到这里,spark context 使用一个同步锁确保在master端同个jvm下只能创建一个context,并且所有切换当前context方法都是私有方法,我....

再看看源码里面说的 SPARK-2243 issues https://issues.apache.org/jira/browse/SPARK-2243

Won't Fix,呵呵,不想看了,路已被堵死,只能找别的出路了。

解决方法

在进程级别控制 spark job,通过脚本串行执行,即多个 spark job submit 的命令在一起的脚本,这个不用我写了吧 囧。。

spark on yarn 集群部署

概述hadoop2.7.1 spark1.5.1192.168.31.62 resourcemanager,namenode,master192.168.31.63 nodemanager,datanode,worker192.168.31.64 nodemanager,datanode,worker Hadoop配置hadoop-env.sh  mapred-env.sh  yarn-e...

spark on yarn 资源计算

1、sparkjob提交模式spark on yarn 分两种情况,一种是yarn-client 提交,一种是yarn-cluster提交方式,两种方式的区别是:yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监....

spark部署之yarn模式

2019独角兽企业重金招聘Python工程师标准>>>spark部署之yarn模式hadoop-3.0.0集群搭建配置相应环境java环境scala(可配可不配)hadoop环境从官网下载spark解压配置启动启动hdfs启动yarn启动spark-shell报错解决停掉y...