淘先锋技术网

首页 1 2 3 4 5 6 7

背景
Spark在判断能否转为BroadCastJoin时主要是根据输入表的大小是否超过了 spark.sql.autoBroadcastJoinThreshold 参数所配置的大小,如果未超过阈值则可以转为BroadCastJoin.

结论
先说下整个判断的流程:
1.首先在非分区表情况下并且 spark.sql.statistics.fallBackToHdfs此参数开启时会统计表hdfs目录大小
2.在物理计划生成时会统计输入的大小(cbo开启时统计条数 不开启则只统计大小)
3.最终是否走BroadCast会结合如上的两个值进行判断只要满足一个即可

流程分析
代码分支:spark3.2

这里只讨论在转换过程中是如何获取输入大小的。
第一部分来自Analyze阶段对LoginPlan做resolved时,这里Spark内置了一个统计输入的Rule:

  class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  //主要是此方法会进行静态信息的统计
  private def hiveTableWithStats(relation: HiveTableRelation): HiveTableRelation = {
    val table = relation.tableMeta
    val partitionCols = relation.partitionCols
    //如果spark.sql.statistics.fallBackToHdfs设置为True并且此时为非分区表时,则统计此时hdfs相关路径大小作为输入大小
    val sizeInBytes = if (conf.fallBackToHdfsForStatsEnabled && partitionCols.isEmpty) {
      try {
        val hadoopConf = session.sessionState.newHadoopConf()
        val tablePath = new Path(table.location)
        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
        fs.getContentSummary(tablePath).getLength
      } catch {
        case e: IOException =>
          logWarning("Failed to get table size from HDFS.", e)
          conf.defaultSizeInBytes
      }
    } else {
      //如果是分区表或者在参数为false时,则使用默认值,这里的默认值是 Long.MAX_VALUE 
      //因此默认情况下是不会走BroadCastJoin的
      conf.defaultSizeInBytes
    }

    val stats = Some(Statistics(sizeInBytes = BigInt(sizeInBytes)))
    relation.copy(tableStats = stats)
  }

  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    case relation: HiveTableRelation
      if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      hiveTableWithStats(relation)

    // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
    // children, hence not matched directly by previous HiveTableRelation case.
    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
      if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      i.copy(table = hiveTableWithStats(relation))
  }
}

第二部分:这里是在生成物理执行计划时所做的操作
这部分可以通过 Strategy的子类 JoinSelection中定义的生成Join策略查看到,其中在判断是否可以转为BroadCastJoin时有如下方法:

 def getBroadcastBuildSide(
      left: LogicalPlan,
      right: LogicalPlan,
      joinType: JoinType,
      hint: JoinHint,
      hintOnly: Boolean,
      conf: SQLConf): Option[BuildSide] = {
      //省略部分代码
      canBroadcastBySize(left, conf) && !hintToNotBroadcastLeft(hint)
      //省略部分代码
    )
  }

再看canBroadcastBySize方法具体实现:

  //这里会从两个地方获取阈值的大小一部分是自适应另外就是上面提到的参数
  //具体是从那个地方获取由plan.stats.isRuntime决定,最终应该是由
  //spark.sql.adaptive.enabled(开启自适应) 参数来决定
  def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    val autoBroadcastJoinThreshold = if (plan.stats.isRuntime) {
      conf.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
        .getOrElse(conf.autoBroadcastJoinThreshold)
    } else {
      conf.autoBroadcastJoinThreshold
    }
    //这里根据输入的大小来决定是否走BroadCastJoin
    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold
  }

这里继续查看 plan.stats
//这里就是判断是否从Cbo中获取或者另外的方式

 def stats: Statistics = statsCache.getOrElse {
    if (conf.cboEnabled) {
      statsCache = Option(BasicStatsPlanVisitor.visit(self))
    } else {
      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
    }
    statsCache.get
  }

visit方法:
这里我们在join 时一般都是两个表的join即一般都是HiveTableRelation(LoginalPlan的子类 这里不会绝对是这个,比如自己从内存中创建的表),因此会调用如上两个类的default方法

 def visit(p: LogicalPlan): T = p match {
   //省略部分代码
    case p: Sort => visitSort(p)
    case p: WithCTE => visitWithCTE(p)
    case p: LogicalPlan => default(p)
  }

两者区别在于是否对输入条数做了统计
首先SizeInBytesOnlyStatsPlanVisitor,这里最后调用的是其中的Default方法

  override def default(p: LogicalPlan): Statistics = p match {
    //调用叶子节点的方法计算当前的输入大小
    case p: LeafNode => p.computeStats()
    //遍历所有子节点进行大小的统计 这里未统计条数
    //product方法就是求和
    case _: LogicalPlan =>
      Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
  }

//BasicStatsPlanVisitor

override def default(p: LogicalPlan): Statistics = p match {
    case p: LeafNode => p.computeStats()
    case _: LogicalPlan =>
      val stats = p.children.map(_.stats)
      val rowCount = if (stats.exists(_.rowCount.isEmpty)) {
        None
      } else {
        Some(stats.map(_.rowCount.get).filter(_ > 0L).product)
      }
      //这里统计了rowCount
      Statistics(sizeInBytes = stats.map(_.sizeInBytes).filter(_ > 0L).product, rowCount = rowCount)
  }

上面在统计子节点时,最终都会调用的LeafNode节点,然后调用其computeStats方法
因此这里我们可以到HiveTableRelation中查看具体的实现:

//这里就是取默认的tableStats 即前面第一部分获取的值或者在cbo 开启时进行统计,这里统计完成就会给上面的地方进行最终统计然后来判断是否走BroadCastJoin
//HiveTableRelation

  override def computeStats(): Statistics = {
    tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
      .orElse(tableStats)
      .getOrElse {
      throw QueryExecutionErrors.tableStatsNotSpecifiedError
    }
  }

//ExternalRDD这种就是从内存中创建的

 override def computeStats(): Statistics = Statistics(
    // TODO: Instead of returning a default value here, find a way to return a meaningful size
    // estimate for RDDs. See PR 1238 for more discussions.
    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
  )

注意:PruneHiveTablePartitions 这里实际还要结合这个这个rule,这个rule是针对hive分区表做下推的,在下推的过程中会统计
分区静态信息然后更新到HiveTableRelation中