前言
目前我们的业务需要多个spark job串行一起执行,每个spark job所需的参数配置各不相同;之所以分开多个spark job,是我们想保留每个任务独立执行的能力,提供独立的服务能力,又想在任务需要时,将多个任务作为一个执行链条串行执行;这里主要介绍我在任务串行时想通过创建多个spark context来实现不同参数配置的实践,资源管理是yarn。
实验过程
测试代码
SparkSession sparkSession = SparkSession.builder()//.master("local[1]")
.config("spark.driver.allowMultipleContexts", true)
.config("spark.executor.cores", 4)
.appName("Java Spark SQL basic example").getOrCreate();
sparkSession.sparkContext().stop();
SparkSession newSession = sparkSession.newSession();
newSession.conf().set("spark.executor.cores","2");
sparkSession.setDefaultSession(newSession);
结果:
[2018-07-11 09:39:29.879] [ERROR] [main] [org.apache.spark.deploy.yarn.ApplicationMaster] >>> Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:766)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
at com.vip.spark.api.SparkSimpleTest01.main(SparkSimpleTest01.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
spark context 已停止,囧...
查看源码,说要创建一个新的context就要先停掉旧的啊...
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)
... 省了n行代码
无解,无法删除旧context就无法创建新的配置的context,这就意味着第一个context 在yarn里申请的资源将会是所有任务的共享资源,并且无法改变...
再看一下spark on yarn的资源申请过程,确实是这样,第一个container就是spark context的配置,后续的container都在这个基础上向resource manager 申请,如果不杀掉这个application master的container,就无法重现申请新的container,而一旦使用sparkContext.stop 方法杀掉Context,则整个应用会死掉,囧....
看到这里,我还不死心,想在spark Context 源码上看有没有入口可以切换不同的context,或者让多个context并存
/**
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private def assertNoOtherContextIsRunning(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (allowMultipleContexts) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
}
contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
// Since otherContext might point to a partially-constructed context, guard against
// its creationSite field being null:
val otherContextCreationSite =
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" running in this JVM (see SPARK-2243)." +
s" The other SparkContext was created at:\n$otherContextCreationSite"
logWarning(warnMsg)
}
}
}
看到这里,spark context 使用一个同步锁确保在master端同个jvm下只能创建一个context,并且所有切换当前context方法都是私有方法,我....
再看看源码里面说的 SPARK-2243 issues https://issues.apache.org/jira/browse/SPARK-2243
Won't Fix,呵呵,不想看了,路已被堵死,只能找别的出路了。
解决方法
在进程级别控制 spark job,通过脚本串行执行,即多个 spark job submit 的命令在一起的脚本,这个不用我写了吧 囧。。
原文链接:https://blog.csdn.net/weixin_34405332/article/details/92026319