Spark AQE 在 DataSkew 处理过程中,需要考虑一些边界条件,否则可能会引入一些额外的Shuffle。
在开始今天的Topic之前,需要先回顾一下 EnsureRequirements, 熟悉的同学请跳过。
EnsureRequirements 是为了保证Spark 算子的数据输入要求,在算子之间引入Shuffle的核心工具。
算子的 Partitioning 表示该算子的数据在集群上是如何分布的。
override def outputPartitioning: Partitioning = child.outputPartitioning
.child.outputPartitioning
不再有效的,比如: PartitionRecombinationExec
, CoalesceExec
Aggregate
计算,此时数据分布一定要是 ClusteredDistribution(Final Aggregate Expressions)
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
这种特殊的数据分区left.outputPartitioning
right.outputPartitioning
Test case 参数
Config Name | Config String | Config Value |
---|---|---|
ADAPTIVE_EXECUTION_ENABLED | spark.abled | true |
AUTO_BROADCASTJOIN_THRESHOLD | spark.sql.autoBroadcastJoinThreshold | -1 |
SKEW_JOIN_SKEWED_PARTITION_THRESHOLD | spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 100 |
ADVISORY_PARTITION_SIZE_IN_BYTES | spark.sql.adaptive.advisoryPartitionSizeInBytes | 100 |
ALLOW_ADDITIONAL_SHUFFLE | spark.sql.adaptive.allowAdditionalShuffle | false |
SELECT key1
FROM (select id % 3 as key1, id as value1 from range(1, 1000)
) as skewData1
JOIN (select id % 1 as key2, id as value2 from range(1, 1000)
) as skewData2
ON key1 = key2
GROUP BY key1
在做Join之前,这里两张表都进行了数据Shuffle,我们在 TestSQLContext
中配置了 test suite 的 SHUFFLE_PARTITIONS = 5,所以Shuffle结果如下:
validMetrics = {$colon$colon@14794} "::" size = 20 = {MapOutputStatistics@14802} # 对应算子 Exchange hashpartitioning(key1#276L, 5), ENSURE_REQUIREMENTS, [id=#143]shuffleId = 0bytesByPartitionId = {long[5]@14804} [660, 0, 0, 660, 660]1 = {MapOutputStatistics@14803} # 对应算子 Exchange hashpartitioning(key2#282L, 5), ENSURE_REQUIREMENTS, [id=#154]shuffleId = 1bytesByPartitionId = {long[5]@14805} [720, 0, 0, 0, 0]
在AQE里,为了减少数据的Shuffle,AQE 的 CoalesceShufflePartitions Rule 支持对多个小的Map结果进行合并读。
测试UT中配置参数 ADVISORY_PARTITION_SIZE_IN_BYTES = 100
。在Rule的日志中显示: For shuffle(0, 1), advisory target size: 100, actual target size 100.
最终 Shuffle partitionSpecs:
partitionSpecs = {ArrayBuffer@14877} "ArrayBuffer" size = 40 = {CoalescedPartitionSpec@14883} "CoalescedPartitionSpec(0,1)"1 = {CoalescedPartitionSpec@14884} "CoalescedPartitionSpec(1,3)"2 = {CoalescedPartitionSpec@14885} "CoalescedPartitionSpec(3,4)"3 = {CoalescedPartitionSpec@14886} "CoalescedPartitionSpec(4,5)"
CoalescedPartitionSpec(0,1) 读取的数据为 660 + 720,为数据倾斜分区。
outputPartitioning = UnknownPartitioning
(因为Skew数据处理改变了数据分布)。再次进行 EnsureRequirements 检查时,不满足后面的 FinalAggregate
要求的Distribution ClusteredDistribution(Seq(keys=[key1#286L])) :: Nil
,所以要引入额外的 Shuffle.requiredChildDistributionExpressions = None
所以对应 UnspecifiedDistribution
可以先忽略)PartitioningCollection(Seq(hashpartitioning(key1#286L, 5), hashpartitioning(key2#288L, 5)))
满足后面的 FinalAggregate
要求的Distribution ClusteredDistribution(Seq(keys=[key1#286L])) :: Nil
(其中一个partition 马满足就够了),所以这里也没有额外的Shuffle在正常情况下,DAG 增加一个额外的Shuffle,是一个非常重的操作,但是当发生Skew 时,对应的Task可能非常的长尾,引入额外Shuffle,增加倾斜的数据的并行处理,有可能会更快。我们线上是允许在Skew Join的时候增加一个额外的Shuffle,即 spark.sql.adaptive.allowAdditionalShuffle = true
所以对应的DAG 如下
本文发布于:2024-02-02 18:19:15,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170686966845589.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |