Branch:spark-3.0
有不对的地方欢迎各位大佬批评指正!
相关参数:
spark.abled AQE是否开启
spark.abled 分区合并是否开启
spark.alescePartitions.minPartitionNum 合并后最小的分区数,下文我们简称为minPartitionNum
spark.sql.adaptive.advisoryPartitionSizeInBytes 开发者建议的分区大小,下文我们简称为advisoryPartitionSizeInBytes
类:AdaptiveSparkPlanExec.scala
入口:queryStageOptimizerRules —> CoalesceShufflePartitions —> coalescePartitions
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(ReuseAdaptiveSubquery(conf, context.subqueryCache),CoalesceShufflePartitions(context.session),// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'// added by `CoalesceShufflePartitions`. So they must be executed after it.OptimizeSkewedJoin(conf),OptimizeLocalShuffleReader(conf))
判断AQE是否开启、所有叶子结点是否都是查询阶段(如果不是的话合并分区会破坏所有子节点具有相同数量的输出分区假设)
if (!alesceShufflePartitionsEnabled) {return plan}
if (!llectLeaves().forall(_.isInstanceOf[QueryStageExec])|| plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {// If not all leaf nodes are query stages, it's not safe to reduce the number of// shuffle partitions, because we may break the assumption that all children of a spark plan// have same number of output partitions. return plan
}
将一棵树所有节点的ShuffleStage收集起来,为接下来分区合并使用
def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {case stage: ShuffleQueryStageExec => Seq(stage)case _ => plan.children.flatMap(collectShuffleStages)}val shuffleStages = collectShuffleStages(plan)
首先会判断这些Shuffle是否能够进行分区合并,如果不能的话会直接将plan返回
判断条件是如果用户自己制定了repartition或是singlePartition的情况下会不进行分区合并
这个方法是在ShuffleChangeExec类(shuffle的类)中的canChangeNumPartitons,里面的分区数是执行之前就被构造好的
if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {plan
对shuffleStage进行筛选,若Shuffle的分区已被确定好,则此Stage也会跳过,否则将会返回MapOutputStatistics(shuffleId: Int, 当前stage中task内的内的分区数: Array())
过滤完成后得到validMetrics(可以进行合并的分区列表)
val validMetrics = shuffleStages.flatMap(_.mapStats)
多个Task进行Shuffle时,每个Task需要具有相同的分区数才能进行合并
例如,当我们将完全聚合的数据(数据被安排到单个分区)和SortMergeJoin的结果(多个分区)结合在一起时。
val distinctNumPreShufflePartitions =validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (Empty && distinctNumPreShufflePartitions.length == 1){
// 有分区需要合并 且 当前进行合并的stage中的task中的分区数都是一样的
........
}
取最小的分区数,如果未定义则取Spark默认的并行度(为了避免分区合并后的性能退化)
val minPartitionNum = Conf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(session.sparkContext.defaultParallelism)
进入真正执行的方法
参数Array(shuffleID和当前stage中task内分区的个数)
advisoryTargetSize(开发者定义的分区大小默认值64M)
minNumPartitions(最小分区数)
val partitionSpecs = Array,advisoryTargetSize = Conf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),minNumPartitions = minPartitionNum)
advisoryTargetSize数值的重新设定
如果inputSize=1000M 10分区 而设置advisoryTargetSize为200M 则通过一下计算会排除200M这个设置
advisoryTargetSize=maxTargetSize
// 所有分区的总分区数
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// 分区大小的最大值
val maxTargetSize = math.il(totalPostShuffleInputSize / Double).toLong, 16)
// 确定真正的分区大小(避免上述例子的情况出现)
val targetSize = math.min(maxTargetSize, advisoryTargetSize)
分区合并
while (i < numPartitions) {// 我们从所有shuffle中计算第i个shuffle分区的总大小 对于每个task中的每个相邻分区合并,直到不大于targetSize// 从所有洗牌中计算第i次洗牌分区的总大小。var totalSizeOfCurrentPartition = 0Lvar j = 0while (j < mapOutputStatistics.length) {// 对每个shuffle中的partition进行合并totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)j += 1}// 如果包含' totalSizeOfCurrentPartition '将超过目标大小,则启动一个新的合并分区。if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)latestSplitPoint = i// 重置postShuffleInputSizecoalescedSize = totalSizeOfCurrentPartition} else {coalescedSize += totalSizeOfCurrentPartition}i += 1}
最后 将合并的分区返回
partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)partitionSpecs
官方例子(说实话这例子我还没验证成功)
* For example, we have two shuffles with the following partition size statistics:* - shuffle 1 (5 partitions): [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]* - shuffle 2 (5 partitions): [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]* Assuming the target size is 128 MiB, we will have 4 coalesced partitions, which are:* - coalesced partition 0: shuffle partition 0 (size 110 MiB)* - coalesced partition 1: shuffle partition 1 (size 30 MiB)* - coalesced partition 2: shuffle partition 2 (size 170 MiB)* - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)** @return A sequence of [[CoalescedPartitionSpec]]s. For example, if partitions [0, 1, 2, 3, 4]* split at indices [0, 2, 3], the returned partition specs will be:* CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and* CoalescedPartitionSpec(3, 5).
结合官方给出的对应单侧看源码能够更快的理解
分区合并对应的测试类ShufflePartitionsUtilSuite
本文发布于:2024-02-02 18:20:09,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170686970245593.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |