【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析

阅读: 评论:0

【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析

【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析

AQE简介

从spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE

spark 3.0.1中的AQE的配置

配置项默认值官方说明分析
spark.abledfalse是否开启自适应查询此处设置为true开启
spark.abledtrue是否合并临近的shuffle分区(根据’spark.sql.adaptive.advisoryPartitionSizeInBytes’的阈值来合并)此处默认为true开启,分析见: 分析1
spark.alescePartitions.initialPartitionNum(none)shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值分析见:分析2
spark.alescePartitions.minPartitionNum(none)shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度分析见: 分析3
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB建议的shuffle分区的大小,在合并分区和处理join数据倾斜的时候用到分析见:分析3
spark.sql.abledtrue是否开启join中数据倾斜的自适应处理
spark.sql.adaptive.skewJoin.skewedPartitionFactor5数据倾斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB数据倾斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.logLeveldebug配置自适应执行的计划改变日志调整为info级别,便于观察自适应计划的改变
spark.EmptyPartitionRatioForBroadcastJoin0.2转为broadcastJoin的非空分区比例阈值,>=该值,将不会转换为broadcastjoin分析见:分析5

分析1

在OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)

 /*** The goal of skew join optimization is to make the data distribution more even. The target size* to split skewed partitions is the average size of non-skewed partition, or the* advisory partition size if avg size is smaller than it.*/private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {val advisorySize = Conf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))// It's impossible that all the partitions are skewed, as we use median size to define skew.Empty)math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)}

其中:

  1. nonSkewSizes为task非倾斜的分区
  2. targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说
    targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值
  3. medianSize值为task的分区大小的中位值

分析2

在SQLConf.scala

def numShufflePartitions: Int = {if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)} else {defaultNumShufflePartitions}}

从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.alescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果

分析3

在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {plan} else {// `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,// we should skip it when calculating the `partitionStartIndices`.val validMetrics = shuffleStages.flatMap(_.mapStats)// We may have different pre-shuffle partition numbers, don't reduce shuffle partition number// in that case. For example when we union fully aggregated data (data is arranged to a single// partition) and a result of a SortMergeJoin (multiple partitions).val distinctNumPreShufflePartitions =validMetrics.map(stats => stats.bytesByPartitionId.length).distinctif (Empty && distinctNumPreShufflePartitions.length == 1) {// We fall back to Spark default parallelism if the minimum number of coalesced partitions// is not set, so to avoid perf regressions compared to no coalescing.val minPartitionNum = Conf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(session.sparkContext.defaultParallelism)val partitionSpecs = Array,advisoryTargetSize = Conf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),minNumPartitions = minPartitionNum)// This transformation adds new nodes, so we must use `transformUp` here.val stageIds = shuffleStages.map(_.id).ansformUp {// even for shuffle exchange whose input RDD has 0 partition, we should still update its// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same// number of output partitions.case stage: ShuffleQueryStageExec ains(stage.id) =>CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)}} else {plan}}}

也就是说:

  1. 如果是用户自己指定的分区操作,如repartition操作,spark.alescePartitions.minPartitionNum无效,且跳过分区合并优化
  2. 如果多个task进行shuffle,且task有不同的分区数的话,spark.alescePartitions.minPartitionNum无效,且跳过分区合并优化
  3. 见alescePartition分析

分析4

在OptimizeSkewedJoin.scala中,我们看到

/*** A partition is considered as a skewed partition if its size is larger than the median* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than* ADVISORY_PARTITION_SIZE_IN_BYTES.*/private def isSkewed(size: Long, medianSize: Long): Boolean = {size > medianSize * Conf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&size > Conf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)}
  1. OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了
  2. medianSize为task的分区大小的中位值

分析5

在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:

private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {logicalPlan.invalidateStatsCache()val optimized = ute(logicalPlan)val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)(newPlan, optimized)}

而optimizer 中有个DemoteBroadcastHashJoin规则:

@transient private val optimizer = new RuleExecutor[LogicalPlan] {// TODO add more optimization rulesoverride protected def batches: Seq[Batch] = Seq(Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)))}

而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:

case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {private def shouldDemote(plan: LogicalPlan): Boolean = plan match {case LogicalQueryStage(_, stage: ShuffleQueryStageExec) sultOption.isDefined&& stage.mapStats.isDefined =>val mapStats = val partitionCnt = mapStats.bytesByPartitionId.lengthval nonZeroCnt = unt(_ > 0)partitionCnt > 0 && nonZeroCnt > 0 &&(nonZeroCnt * 1.0 / partitionCnt) < EmptyPartitionRatioForBroadcastJoincase _ => false}def apply(plan: LogicalPlan): LogicalPlan = ansformDown {case j @ Join(left, right, _, _, hint) =>var newHint = hintif (!ists(_.strategy.isDefined) && shouldDemote(left)) {newHint = py(leftHint =Some(OrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))}if (!ists(_.strategy.isDefined) && shouldDemote(right)) {newHint = py(rightHint =Some(OrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))}if ((hint)) {j.copy(hint = newHint)} else {j}}
}

shouldDemote就是对是否进行broadcastjoin的判断:

  1. 首先得是ShuffleQueryStageExec操作
  2. 如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.EmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin
  3. 这在sql中先join在groupby的场景中比较容易出现

见coalescePartition如示:

def coalescePartitions(mapOutputStatistics: Array[MapOutputStatistics],advisoryTargetSize: Long,minNumPartitions: Int): Seq[ShufflePartitionSpec] = {// If `minNumPartitions` is very large, it is possible that we need to use a value less than// `advisoryTargetSize` as the target size of a coalesced task.val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum// The max at here is to make sure that when we have an empty table, we only have a single// coalesced partition.// There is no particular reason that we pick 16. We just need a number to prevent// `maxTargetSize` from being set to 0.val maxTargetSize = math.il(totalPostShuffleInputSize / Double).toLong, 16)val targetSize = math.min(maxTargetSize, advisoryTargetSize)val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +s"actual target size $targetSize.")// Make sure these shuffles have the same number of partitions.val distinctNumShufflePartitions =mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct// The reason that we are expecting a single value of the number of shuffle partitions// is that when we add Exchanges, we set the number of shuffle partitions// (i.e. map output partitions) using a static setting, which is the value of// `spark.sql.shuffle.partitions`. Even if two input RDDs are having different// number of partitions, they will have the same number of shuffle partitions// (i.e. map output partitions).assert(distinctNumShufflePartitions.length == 1,"There should be only one distinct value of the number of shuffle partitions " +"among registered Exchange operators.")val numPartitions = distinctNumShufflePartitions.headval partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()var latestSplitPoint = 0var coalescedSize = 0Lvar i = 0while (i < numPartitions) {// We calculate the total size of i-th shuffle partitions from all shuffles.var totalSizeOfCurrentPartition = 0Lvar j = 0while (j < mapOutputStatistics.length) {totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)j += 1}// If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a// new coalesced partition.if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)latestSplitPoint = i// alescedSize = totalSizeOfCurrentPartition} else {coalescedSize += totalSizeOfCurrentPartition}i += 1}partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)partitionSpecs}
  1. totalPostShuffleInputSize 先计算出总的shuffle的数据大小
  2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.alescePartitions.minPartitionNum的值
  3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize
  4. while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)

见optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = ansformUp {case smj @ SortMergeJoinExec(_, _, joinType, _,s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)ains(joinType) =>assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)val numPartitions = left.partitionsWithSizes.length// Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))logDebug(s"""|Optimizing skewed join.|Left side partitions size info:|${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}|Right side partitions size info:|${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}""".stripMargin)val canSplitLeft = canSplitLeftSide(joinType)val canSplitRight = canSplitRightSide(joinType)// We use the actual partition sizes (may be coalesced) to calculate target size, so that// the final data distribution is even (coalesced partitions + split partitions).val leftActualSizes = left.partitionsWithSizes.map(_._2)val rightActualSizes = right.partitionsWithSizes.map(_._2)val leftTargetSize = targetSize(leftActualSizes, leftMedSize)val rightTargetSize = targetSize(rightActualSizes, rightMedSize)val leftSidePartitions = pty[ShufflePartitionSpec]val rightSidePartitions = pty[ShufflePartitionSpec]val leftSkewDesc = new SkewDescval rightSkewDesc = new SkewDescfor (partitionIndex <- 0 until numPartitions) {val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeftval leftPartSpec = left.partitionsWithSizes(partitionIndex)._1val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < dReducerIndexval isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRightval rightPartSpec = right.partitionsWithSizes(partitionIndex)._1val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < dReducerIndex// A skewed partition should never be coalesced, but skip it here just to be safe.val leftParts = if (isLeftSkew && !isLeftCoalesced) {val reducerId = leftPartSpec.startReducerIndexval skewSpecs = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)if (skewSpecs.isDefined) {logDebug(s"Left side partition $partitionIndex is skewed, split it into " +s"${length} parts.")leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))}OrElse(Seq(leftPartSpec))} else {Seq(leftPartSpec)}// A skewed partition should never be coalesced, but skip it here just to be safe.val rightParts = if (isRightSkew && !isRightCoalesced) {val reducerId = rightPartSpec.startReducerIndexval skewSpecs = createSkewPartitionSpecs(right.mapStats.shuffleId, reducerId, rightTargetSize)if (skewSpecs.isDefined) {logDebug(s"Right side partition $partitionIndex is skewed, split it into " +s"${length} parts.")rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))}OrElse(Seq(rightPartSpec))} else {Seq(rightPartSpec)}for {leftSidePartition <- leftPartsrightSidePartition <- rightParts} {leftSidePartitions += leftSidePartitionrightSidePartitions += rightSidePartition}}logDebug("number of skewed partitions: " +s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions, String)val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions, py(left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)} else {smj}}
  1. SortMergeJoinExec说明适用于sort merge join
  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等
  3. 分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize
  4. 分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize
  5. 循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理
  6. createSkewPartitionSpecs方法为:
    1.获取每个join的task的对应分区的数据大小
    2.根据targetSize分成多个slice
  7. 如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch减少io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(ReuseAdaptiveSubquery(conf, context.subqueryCache),CoalesceShufflePartitions(context.session),// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'// added by `CoalesceShufflePartitions`. So they must be executed after it.OptimizeSkewedJoin(conf),OptimizeLocalShuffleReader(conf))

可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
,而InsertAdaptiveSparkPlan在QueryExecution中被调用

而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {Conf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {plan.find {case _: Exchange => truecase p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => truecase p => ists(_.find {case _: SubqueryExpression => truecase _ => false}.isDefined)}.isDefined}}private def supportAdaptive(plan: SparkPlan): Boolean = {// TODO migrate dynamic-partition-pruning onto adaptive execution.sanityCheck(plan) &&!ists(_.isStreaming) &&!ists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&plan.children.forall(supportAdaptive)}

如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)

注意:

在spark 3.0.1中已经废弃了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    
spark.sql.adaptive.skewedPartitionRowCountThreshold    
spark.sql.adaptive.skewedPartitionSizeThreshold   

本文部分参考:
=MzA5MTc0NTMwNQ==&mid=2650718363&idx=1&sn=d20fffebafdd2bed6939eaeb39f5e6e3

本文发布于:2024-02-02 18:18:10,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170686962645584.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:系列   spark   AQE   Exection   Query
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23