从spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE
配置项 | 默认值 | 官方说明 | 分析 |
---|---|---|---|
spark.abled | false | 是否开启自适应查询 | 此处设置为true开启 |
spark.abled | true | 是否合并临近的shuffle分区(根据’spark.sql.adaptive.advisoryPartitionSizeInBytes’的阈值来合并) | 此处默认为true开启,分析见: 分析1 |
spark.alescePartitions.initialPartitionNum | (none) | shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值 | 分析见:分析2 |
spark.alescePartitions.minPartitionNum | (none) | shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度 | 分析见: 分析3 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | 建议的shuffle分区的大小,在合并分区和处理join数据倾斜的时候用到 | 分析见:分析3 |
spark.sql.abled | true | 是否开启join中数据倾斜的自适应处理 | |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | 数据倾斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes | 分析见:分析4 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | 数据倾斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes | 分析见:分析4 |
spark.sql.adaptive.logLevel | debug | 配置自适应执行的计划改变日志 | 调整为info级别,便于观察自适应计划的改变 |
spark.EmptyPartitionRatioForBroadcastJoin | 0.2 | 转为broadcastJoin的非空分区比例阈值,>=该值,将不会转换为broadcastjoin | 分析见:分析5 |
在OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)
/*** The goal of skew join optimization is to make the data distribution more even. The target size* to split skewed partitions is the average size of non-skewed partition, or the* advisory partition size if avg size is smaller than it.*/private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {val advisorySize = Conf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))// It's impossible that all the partitions are skewed, as we use median size to define skew.Empty)math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)}
其中:
在SQLConf.scala
def numShufflePartitions: Int = {if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)} else {defaultNumShufflePartitions}}
从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.alescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果
在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作
if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {plan} else {// `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,// we should skip it when calculating the `partitionStartIndices`.val validMetrics = shuffleStages.flatMap(_.mapStats)// We may have different pre-shuffle partition numbers, don't reduce shuffle partition number// in that case. For example when we union fully aggregated data (data is arranged to a single// partition) and a result of a SortMergeJoin (multiple partitions).val distinctNumPreShufflePartitions =validMetrics.map(stats => stats.bytesByPartitionId.length).distinctif (Empty && distinctNumPreShufflePartitions.length == 1) {// We fall back to Spark default parallelism if the minimum number of coalesced partitions// is not set, so to avoid perf regressions compared to no coalescing.val minPartitionNum = Conf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(session.sparkContext.defaultParallelism)val partitionSpecs = Array,advisoryTargetSize = Conf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),minNumPartitions = minPartitionNum)// This transformation adds new nodes, so we must use `transformUp` here.val stageIds = shuffleStages.map(_.id).ansformUp {// even for shuffle exchange whose input RDD has 0 partition, we should still update its// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same// number of output partitions.case stage: ShuffleQueryStageExec ains(stage.id) =>CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)}} else {plan}}}
也就是说:
在OptimizeSkewedJoin.scala中,我们看到
/*** A partition is considered as a skewed partition if its size is larger than the median* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than* ADVISORY_PARTITION_SIZE_IN_BYTES.*/private def isSkewed(size: Long, medianSize: Long): Boolean = {size > medianSize * Conf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&size > Conf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)}
在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {logicalPlan.invalidateStatsCache()val optimized = ute(logicalPlan)val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)(newPlan, optimized)}
而optimizer 中有个DemoteBroadcastHashJoin规则:
@transient private val optimizer = new RuleExecutor[LogicalPlan] {// TODO add more optimization rulesoverride protected def batches: Seq[Batch] = Seq(Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)))}
而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:
case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {private def shouldDemote(plan: LogicalPlan): Boolean = plan match {case LogicalQueryStage(_, stage: ShuffleQueryStageExec) sultOption.isDefined&& stage.mapStats.isDefined =>val mapStats = val partitionCnt = mapStats.bytesByPartitionId.lengthval nonZeroCnt = unt(_ > 0)partitionCnt > 0 && nonZeroCnt > 0 &&(nonZeroCnt * 1.0 / partitionCnt) < EmptyPartitionRatioForBroadcastJoincase _ => false}def apply(plan: LogicalPlan): LogicalPlan = ansformDown {case j @ Join(left, right, _, _, hint) =>var newHint = hintif (!ists(_.strategy.isDefined) && shouldDemote(left)) {newHint = py(leftHint =Some(OrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))}if (!ists(_.strategy.isDefined) && shouldDemote(right)) {newHint = py(rightHint =Some(OrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))}if ((hint)) {j.copy(hint = newHint)} else {j}}
}
shouldDemote就是对是否进行broadcastjoin的判断:
见coalescePartition如示:
def coalescePartitions(mapOutputStatistics: Array[MapOutputStatistics],advisoryTargetSize: Long,minNumPartitions: Int): Seq[ShufflePartitionSpec] = {// If `minNumPartitions` is very large, it is possible that we need to use a value less than// `advisoryTargetSize` as the target size of a coalesced task.val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum// The max at here is to make sure that when we have an empty table, we only have a single// coalesced partition.// There is no particular reason that we pick 16. We just need a number to prevent// `maxTargetSize` from being set to 0.val maxTargetSize = math.il(totalPostShuffleInputSize / Double).toLong, 16)val targetSize = math.min(maxTargetSize, advisoryTargetSize)val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +s"actual target size $targetSize.")// Make sure these shuffles have the same number of partitions.val distinctNumShufflePartitions =mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct// The reason that we are expecting a single value of the number of shuffle partitions// is that when we add Exchanges, we set the number of shuffle partitions// (i.e. map output partitions) using a static setting, which is the value of// `spark.sql.shuffle.partitions`. Even if two input RDDs are having different// number of partitions, they will have the same number of shuffle partitions// (i.e. map output partitions).assert(distinctNumShufflePartitions.length == 1,"There should be only one distinct value of the number of shuffle partitions " +"among registered Exchange operators.")val numPartitions = distinctNumShufflePartitions.headval partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()var latestSplitPoint = 0var coalescedSize = 0Lvar i = 0while (i < numPartitions) {// We calculate the total size of i-th shuffle partitions from all shuffles.var totalSizeOfCurrentPartition = 0Lvar j = 0while (j < mapOutputStatistics.length) {totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)j += 1}// If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a// new coalesced partition.if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)latestSplitPoint = i// alescedSize = totalSizeOfCurrentPartition} else {coalescedSize += totalSizeOfCurrentPartition}i += 1}partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)partitionSpecs}
见optimizeSkewJoin如示:
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = ansformUp {case smj @ SortMergeJoinExec(_, _, joinType, _,s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)ains(joinType) =>assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)val numPartitions = left.partitionsWithSizes.length// Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))logDebug(s"""|Optimizing skewed join.|Left side partitions size info:|${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}|Right side partitions size info:|${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}""".stripMargin)val canSplitLeft = canSplitLeftSide(joinType)val canSplitRight = canSplitRightSide(joinType)// We use the actual partition sizes (may be coalesced) to calculate target size, so that// the final data distribution is even (coalesced partitions + split partitions).val leftActualSizes = left.partitionsWithSizes.map(_._2)val rightActualSizes = right.partitionsWithSizes.map(_._2)val leftTargetSize = targetSize(leftActualSizes, leftMedSize)val rightTargetSize = targetSize(rightActualSizes, rightMedSize)val leftSidePartitions = pty[ShufflePartitionSpec]val rightSidePartitions = pty[ShufflePartitionSpec]val leftSkewDesc = new SkewDescval rightSkewDesc = new SkewDescfor (partitionIndex <- 0 until numPartitions) {val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeftval leftPartSpec = left.partitionsWithSizes(partitionIndex)._1val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < dReducerIndexval isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRightval rightPartSpec = right.partitionsWithSizes(partitionIndex)._1val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < dReducerIndex// A skewed partition should never be coalesced, but skip it here just to be safe.val leftParts = if (isLeftSkew && !isLeftCoalesced) {val reducerId = leftPartSpec.startReducerIndexval skewSpecs = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)if (skewSpecs.isDefined) {logDebug(s"Left side partition $partitionIndex is skewed, split it into " +s"${length} parts.")leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))}OrElse(Seq(leftPartSpec))} else {Seq(leftPartSpec)}// A skewed partition should never be coalesced, but skip it here just to be safe.val rightParts = if (isRightSkew && !isRightCoalesced) {val reducerId = rightPartSpec.startReducerIndexval skewSpecs = createSkewPartitionSpecs(right.mapStats.shuffleId, reducerId, rightTargetSize)if (skewSpecs.isDefined) {logDebug(s"Right side partition $partitionIndex is skewed, split it into " +s"${length} parts.")rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))}OrElse(Seq(rightPartSpec))} else {Seq(rightPartSpec)}for {leftSidePartition <- leftPartsrightSidePartition <- rightParts} {leftSidePartitions += leftSidePartitionrightSidePartitions += rightSidePartition}}logDebug("number of skewed partitions: " +s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions, String)val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions, py(left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)} else {smj}}
如:AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(ReuseAdaptiveSubquery(conf, context.subqueryCache),CoalesceShufflePartitions(context.session),// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'// added by `CoalesceShufflePartitions`. So they must be executed after it.OptimizeSkewedJoin(conf),OptimizeLocalShuffleReader(conf))
可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
,而InsertAdaptiveSparkPlan在QueryExecution中被调用
而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {Conf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {plan.find {case _: Exchange => truecase p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => truecase p => ists(_.find {case _: SubqueryExpression => truecase _ => false}.isDefined)}.isDefined}}private def supportAdaptive(plan: SparkPlan): Boolean = {// TODO migrate dynamic-partition-pruning onto adaptive execution.sanityCheck(plan) &&!ists(_.isStreaming) &&!ists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&plan.children.forall(supportAdaptive)}
如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)
在spark 3.0.1中已经废弃了如下的配置:
spark.sql.adaptive.skewedPartitionMaxSplits
spark.sql.adaptive.skewedPartitionRowCountThreshold
spark.sql.adaptive.skewedPartitionSizeThreshold
本文部分参考:
=MzA5MTc0NTMwNQ==&mid=2650718363&idx=1&sn=d20fffebafdd2bed6939eaeb39f5e6e3
本文发布于:2024-02-02 18:18:10,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170686962645584.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |