Spark SQL 的 AQE 机制

阅读: 评论:0

Spark SQL 的 AQE 机制

Spark SQL 的 AQE 机制

原文

本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》

背景

SPARK-9850 在 Spark 中提出了自适应执行的基本思想。

DAGScheduler中,添加了一个新的 API 来支持提交单个 Map Stage。

DAGScheduler请参考我的这篇博客——DAGScheduler 是什么?有什么作用?

目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。

如果一个阶段(Stage)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator可以帮助它确定 Shuffle 后分区的数量。

在我们添加 Exchange 时,当前的实现增加了Exchange Coordinator

然而,有一些局限性。

首先,它可能会导致额外的 Shuffle,从而降低性能。

当它添加Exchange Coordinator时,我们可以从EnsureRequirements规则中看到这一点。

其次,在我们添加Exchange时添加Exchange Coordinator不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。

比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange中使用相同的Exchange Coordinator,但是目前会添加两个单独的Exchange Coordinator

第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。

我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。

这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。

目标

我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。

新的实现应该解决前面讨论的所有限制。

更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。

规划

Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。

关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍

Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。

一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。

新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。

我们将介绍称为 QueryStageQueryStageInput 的新节点。

在自适应执行模式下,一个执行计划被分成多个QueryStages

每个 QueryStage 都是在单个阶段中运行的子树。

QueryStageInputQueryStage 的叶节点,用来隐藏其子阶段。

它获取其子阶段的结果并将其作为 QueryStage 的输入。

QueryStage 通过收集 QueryStageInputs 了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。

我们添加了 QueryStageQueryStageInputs,从而在计划中查找 Exchange

下面是在一个阶段 中 JOIN 3 个表的示例。

我们将计划分为四个子树。

最后一个是具有三个 QueryStageInputsResult StageQueryStage4

QueryStageInput 是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1 指向 QueryStage1

除了最后一个 QueryStageQueryStage 的子级始终是 ExchangeBroadcastExchangeExec

我们添加一个规则PlanQueryStage 来添加QueryStageQueryStageInput

仅当启用自适应执行时才会应用该规则。

执行和调度

我们从最后一个 QueryStage 开始执行。

在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。

线程池用于提交子阶段。

如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。

所以实际上 QueryStages 没有依赖关系将首先提交,其他 QueryStages 将等待其子阶段完成。

子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer 的数量。

最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。

只要我们在 QueryStage 中优化计划时不添加任何 Exchange,就不会发生额外的 shuffle。

自动设置 Reducer 的数量

将使用三种配置来控制Reducer的数量。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。

spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。

这两种配置在 Spark 中已经存在。

我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。

在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。

对于每个 QueryStage,我们使用以下方法在运行时自动设置分区的数量。

  1. 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
  2. 我们创建一个exchange coordinator,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices方法来确定Shuffle后分区的数量。 (将来我们可能会删除类 ExchangeCoordinator,因为在更改之后只使用其中的一种方法)
  3. 每个子阶段获取相同的partitionStartIndex,并以此为基础构造一个新的ShuffledRowRDD。这些 ShuffledRowRDD 是当前阶段的输入 RDD。

在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。

假设 max 配置为 5,min 配置为 1。

map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。

如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。

第一个 reducer 处理分区 0 (70MB)。

第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。

第三个 reducer 处理分区 4 (50MB)。

Spark SQL UI

执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。

在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。

当自适应执行开始时,每个 QueryStage 都会提交子阶段,并且可能会更改其中的执行计划。

我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo) 来更新 UI 上的执行计划。

优化执行计划和处理倾斜 JOIN

通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage 中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin 更改为 BroadcastHashJoin

我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。

这些策略可以作为单独的规则添加到自适应执行中并单独启用。

本文发布于:2024-02-02 18:20:43,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170686972645596.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:机制   Spark   SQL   AQE
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23