Spark AQE 配置和源码说明

阅读: 评论:0

Spark AQE 配置和源码说明

Spark AQE 配置和源码说明

网摘:AQE(Adaptive Query Execution)自适应查询,是Spark 3.0开始增加的一种机制,可以根据 Shuffle Map阶段的统计信息,基于预设的规则 动态 地调整和修正尚未执行逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

该文基于Spark 3.2.0版本,进行AQE的配置以及源码说明,主要的源码实现都在下面这个包:

org.apache.ution.adaptive

1. AQE流程梳理

先简单根据实际代码下一个结论,生成物理执行计划之后,根据AQE和其他的计算规则,选中一个最终待支持的物理计划,在QueryExecution代码里大概就是这样:

sparkPlan -> AQE -> executedPlan

  lazy val sparkPlan: SparkPlan = withCteMap {// We need to materialize the optimizedPlan here because sparkPlan is also tracked under// the planning phaseassertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// Clone the logical plan here, in case the planner rules change the states of the logical// ateSparkPlan(sparkSession, planner, optimizedPlan.clone())}}// executedPlan should not be used to initialize any SparkPlan. It should be// only used for execution.lazy val executedPlan: SparkPlan = withCteMap {// We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure// that the optimization time is not counted as part of the planning phase.assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// clone the plan to avoid sharing the plan instance between different stages like analyzing,// optimizing and planning.QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}}

1.1 AQE配置说明

spark.abled 这个是AQE的开关配置,默认是打开的。

spark.sql.adaptive.forceApply 这个是AQE强制执行的开关配置,AQE会跳过没有shuffle的查询或者没有子查询的查询,这个配置默认是会跳过没有shuffle或者没有子查询的执行计划,,因为在这些场景下AQE对于性能提升没有帮助。这个配置是internal的,即外部的配置是更改不了的。

//AQE是否打开
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.abled").doc("When true, enable adaptive query execution, which re-optimizes the query plan in the " +"middle of query execution, based on accurate runtime statistics.").version("1.6.0").ateWithDefault(true)//AQE会跳过没有shuffle的查询或者没有子查询的查询,这个配置默认是关闭的,即对于没有shuffle或者没有子查询的执行计划,直接跳过了,因为对于性能提升没有帮助,这个配置是internal的,即外部的配置是更改不了的。
val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply").internal().doc("Adaptive query execution is skipped when the query does not have exchanges or " +"sub-queries. By setting this config to true (together with " +s"'${ADAPTIVE_EXECUTION_ENABLED.key}' set to true), Spark will force apply adaptive query " +"execution for all supported queries.").version("3.0.0").ateWithDefault(false)

1.2 AQE调用入口

AQE的调用入口是在这个类:

org.apache.ution.QueryExecution

这个QueryExecution也实现了从逻辑计划到物理计划的整体流程。

1.2.1 toRdd

toRDD是一个懒加载对象,调用到这里的时候,就会返回一个SQLExecutionRDD对象,这个对象就是Spark真正可执行的各种Jobs了。

package org.apache.ution
...
object SparkPlan {lazy val toRdd: RDD[InternalRow] = new ute(), f)
}

这个toRDD是所有action算子最终调用的,比如DataSet的rdd方法,也是会调到这里:

@Stable
class Dataset[T] private[sql](@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,@DeveloperApi @Unstable @transient val encoder: Encoder[T])extends Serializable {lazy val rdd: RDD[T] = {val objectType = exprEnc.Rdd.mapPartitions { rows =>rows.map(_.get(0, objectType).asInstanceOf[T])}}
}

1.2.2 executedPlan

上面toRdd中传的第一个参数是 ute(),executedPlan也是一个懒加载的对象,它是一个SparkPlan,注意SparkPlan的父类方法plan能将逻辑执行计划转换为物理执行计划

看代码,最关注的是在PLANNING阶段进行的QueryExecution.prepareForExecution方法,在这个方法中,会有AQE的调用入口。

package org.apache.ution
...
object SparkPlan {// 返回一个SparkPlan, SparkPlan的父类方法plan能将逻辑执行计划转换为物理执行计划。lazy val executedPlan: SparkPlan = withCteMap {// 这里是逻辑执行计划的优化。assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// clone 这个sparkPlan是避免计划被共享到其他的阶段:analyzing阶段,optimizing节点或者planning阶段,这里的变动阶段是在PLANNING阶段。QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}}lazy val toRdd: RDD[InternalRow] = new ute(), f)
}

 1.2.3 prepareForExecution

代码中的最重要的就是第三个注释,

在获取进行AQE之前,会进行sparkPlan的获取,sparkPlan已经生成了物理执行计划,然后会迭代遍历一些预设的规则,执行它们的apply方法,也包括AQE,选择一个最终的SparkPlan。

package org.apache.ution
...
object SparkPlan {protected def preparations: Seq[Rule[SparkPlan]] = {//1. 调用preparations方法,第二个参数就引入了AQE,即InsertAdaptiveSparkPlanQueryExecution.preparations(sparkSession,Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)}lazy val executedPlan: SparkPlan = withCteMap {assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}}
}object QueryExecution {private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {//2. 把adaptiveExecutionRule加入到Rule序列里去Seq ++Seq(//3. 下面这些是AQE之外的其他优化规则,这里先跳过      CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),RemoveRedundantSorts,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(lumnarRules, outputsColumnar = false),CollapseCodegenStages()) ++(if (subquery) {Nil} else {Seq(ReuseExchangeAndSubquery)})}private[execution] def prepareForExecution(preparations: Seq[Rule[SparkPlan]],plan: SparkPlan): SparkPlan = {val planChangeLogger = new PlanChangeLogger[SparkPlan]()//3. 最重要的就是这里,会逐一遍历这些规则,生成新的SparkPlan,再作为左值入参。简单理解就是迭代执行所有规则,得到最终的SparkPlan,具体的操作就是调用每个rule的apply方法。val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>val result = rule.apply(sp)planChangeLogger.logRule(rule.ruleName, sp, result)result}planChangeLogger.logBatch("Preparations", plan, preparedPlan)preparedPlan}
}

1.2.4 InsertAdaptiveSparkPlan.apply

此处抽出关键代码, 在这一步,就将SparkPlan包装程了带有自适应优化的AdaptiveSparkPlanExec。

case class InsertAdaptiveSparkPlan(adaptiveExecutionContext: AdaptiveExecutionContext) extends Rule[SparkPlan] {override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {case _ if !conf.adaptiveExecutionEnabled => plancase _: ExecutedCommandExec => plancase _: CommandResultExec => plancase c: DataWritingCommandExec => c.copy(child = apply(c.child))case c: V2CommandExec => c.withNewChildren(c.children.map(apply))case _ if shouldApplyAQE(plan, isSubquery) =>if (supportAdaptive(plan)) {try {val subqueryMap = buildSubqueryMap(plan)val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)val preprocessingRules = Seq(planSubqueriesRule)val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)logDebug(s"Adaptive execution enabled for plan: $plan")//在这里实现AQE逻辑AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)} ...

1.2.5 InsertAdaptiveSparkPlan.doExecute

得到了封装好的sparkPlan之后,回到QueryExecution查看具体的执行逻辑,最终调用的doExecute方法会被具体的sparkPlan子类重写,此处就是InsertAdaptiveSparkPlan。

package org.apache.ution
...object SparkPlan {final def execute(): RDD[InternalRow] = executeQuery {if (isCanonicalizedPlan) {throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")}doExecute()}//doExecute方法被具体的sparkPlan重写,此处的sparkPlan子类就是InsertAdaptiveSparkPlanprotected def doExecute(): RDD[InternalRow]lazy val toRdd: RDD[InternalRow] = new ute(), f)
}

InsertAdaptiveSparkPlan的doExecute最终会调用getFinalPhysicalPlan,这个方法的逻辑挺多的,此处不解析,流程就是在创建新的Stage过程中,根据预设规则和统计结果,Cost计算,去优化新的执行计划。

package org.apache.ution.adaptivecase class AdaptiveSparkPlanExec(inputPlan: SparkPlan,@transient context: AdaptiveExecutionContext,@transient preprocessingRules: Seq[Rule[SparkPlan]],@transient isSubquery: Boolean,@transient override val supportsColumnar: Boolean = false)extends LeafExecNode {override def doExecute(): RDD[InternalRow] = {withFinalPlanUpdate(_.execute())}private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {val plan = getFinalPhysicalPlan()  //在这里获取最终的物理执行计划val result = fun(plan)finalPlanUpdateresult}private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {...}
}

1.2.6 InsertAdaptiveSparkPlan的预设规则

流程清楚之后,可以直接看一下InsertAdaptiveSparkPlan的预设规则:

  //在创建Stage之前要执行的物理计划规的列表。运行这些规则后,物理计划应达到Stage的最终状态(即不再添加或删除Exchange节点)。@transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(RemoveRedundantProjects,EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined),RemoveRedundantSorts,DisableUnnecessaryBucketedScan) ++ context.session.sessionState.queryStagePrepRules//在Stage执行之前要执行的物理计划规则列表。@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(PlanAdaptiveDynamicPruningFilters(this),ReuseAdaptiveSubquery(context.subqueryCache),OptimizeSkewedJoin,OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions(context.session),OptimizeShuffleWithLocalRead)

下面分别说明一下这些规则的用处和用法。

2. AQE规则列表

2.1 Stage创建之前的AQE优化规则

规则名称说明
RemoveRedundantProjects
EnsureRequirements
RemoveRedundantSorts
DisableUnnecessaryBucketedScan

2.2 Stage执行之前的AQE优化规则

规则名称说明
PlanAdaptiveDynamicPruningFilters
ReuseAdaptiveSubquery
OptimizeSkewedJoin
OptimizeSkewInRebalancePartitions
CoalesceShufflePartitions
OptimizeShuffleWithLocalRead

本文发布于:2024-02-02 18:21:17,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170686974645599.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:Spark 3.0
标签:源码   Spark   AQE
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23