Spark动态优化机制:AQE与DPP

1 AQE

1.1 AQE诞生的背景

Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。

RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列裁剪,这些规则和策略来源于数据库领域已有的应用经验。RBO实际上算是一种经验主义。经验主义的弊端就是对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO 是基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO 支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值等。因为有统计数据做支持,所以 CBO 选择的优化策略往往优于 RBO 选择的优化规则。
但是,CBO 也有三个方面的不足:
(1)、适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但在其他的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如 Parquet、ORC、CSV 等。
(2)、统计信息的搜集效率比较低。对于注册到 Hive Metastore 的数据表,开发者需要调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息,而各类信息的收集会消耗大量时间。
(3)、静态优化,RBO、CBO执行计划一旦制定完成,就会按照该计划坚定不移地执行;如果在运行时数据分布发生动态变化,先前制定的执行计划并不会跟着调整、适配。

考虑到 RBO 和 CBO 的种种限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自适应查询执行)。用一句话来概括,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件

1.2 AQE如何使用?

AQE三大特性:自动分区合并 、自动数据倾斜处理、Join 策略调整。

1.2.1 自动分区合并(Partition Coalescing)‌

‌场景‌:当Shuffle后产生大量小分区(如文件数过多或数据分布不均),导致任务调度开销过大或资源浪费。
‌示例‌:对日志数据进行GROUP BY聚合操作时,若原始数据被分为1000个小文件,AQE会动态合并相邻小分区,减少下游Task数量,避免因Task过多导致资源争抢或调度延迟‌。
-- 启用AQE后,Spark自动合并小分区 SET spark.sql.adaptive.enabled=true; SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;

1.2.2 Join策略动态切换(Switch Join Strategy)‌

‌场景‌:静态优化器选择的Join策略(如Sort-Merge Join)在运行时因数据量变化不再高效。
‌示例‌:
表A(1亿条)与表B(过滤后仅1万条)进行Join时,AQE在运行时检测到表B数据量极小,自动将Sort-Merge Join切换为Broadcast Join,减少Shuffle开销‌。
-- 动态切换为Broadcast Join SELECT /*+ MERGE(a, b) */ * FROM large_table a JOIN small_table b ON a.id = b.id;

1.2.3 倾斜Join优化(Optimize Skew Join)‌

‌场景‌:Join操作中某些Key的数据量极大(如热门商品或用户行为数据倾斜),导致单个Task处理时间过长。
‌示例‌:用户行为表与商品表Join时,商品ID为“hot_item_123”的记录占总量80%。AQE将该Key对应的数据拆分为多个子分区,并行处理以平衡负载‌。
-- 自动拆分倾斜Key SET spark.sql.adaptive.skewJoin.enabled=true; SELECT a.user_id, b.item_name FROM user_actions a JOIN items b ON a.item_id = b.item_id;

1.2.4 动态优化聚合操作‌

‌场景‌:聚合操作(如GROUP BY)因数据分布不均导致部分Task内存不足。
‌示例‌:统计每个城市的订单总金额时,若北上广深数据量远超其他城市,AQE会动态调整Hash聚合为Sort聚合,减少内存压力‌。
‌启用AQE的核心配置‌
`
-- 基础配置

SET spark.sql.adaptive.enabled=true;

SET spark.sql.adaptive.coalescePartitions.enabled=true; -- 合并小分区

SET spark.sql.adaptive.skewJoin.enabled=true; -- 倾斜Join优化

-- 高级调优(根据集群规模调整)

SET spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB; -- 分区合并目标大小

SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB; -- 倾斜判定阈值
`

1.3 AQE适用场景

场景 问题特征 AQE优化动作 触发条件 配置参数
小文件过多 大量小分区导致Task数量激增,调度开销大 动态合并相邻分区,减少Task数量 spark.sql.adaptive.coalescePartitions.enabled=true
分区大小 < spark.sql.adaptive.advisoryPartitionSizeInBytes
spark.sql.adaptive.coalescePartitions.enabled
spark.sql.adaptive.advisoryPartitionSizeInBytes
spark.sql.adaptive.coalescePartitions.parallelismFirst
Join策略选择错误 大表Join小表时静态优化器未选择广播Join 运行时检测小表大小,切换为Broadcast Join 小表大小 ≤ spark.sql.adaptive.localShuffleReader.enabled
Join类型适合广播
spark.sql.adaptive.localShuffleReader.enabled
spark.sql.autoBroadcastJoinThreshold
数据倾斜严重 单个Task处理时间远高于其他Task,存在长尾任务 拆分倾斜Key数据到多个Task处理 Task处理时间 > 中位数 × N倍
数据倾斜度超过阈值
spark.sql.adaptive.skewJoin.enabled
spark.sql.adaptive.skewJoin.skewedPartitionFactor
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
聚合内存压力大 哈希聚合导致OOM,内存使用过高 切换为Sort聚合或调整分区大小 聚合内存使用超过阈值
分区数据分布不均匀
spark.sql.adaptive.hashAggregate.enabled

1.4 AQE配置调优参考表

优化目标 关键配置参数 推荐值 说明
小文件合并 spark.sql.adaptive.coalescePartitions.enabled true 启用分区合并
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB 目标分区大小
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 最小分区大小
spark.sql.adaptive.coalescePartitions.parallelismFirst false 优先保证并行度
数据倾斜处理 spark.sql.adaptive.skewJoin.enabled true 启用倾斜Join优化
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 倾斜分区判定因子
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 倾斜分区阈值
spark.sql.adaptive.forceOptimizeSkewedJoin true 强制优化倾斜Join
Join策略优化 spark.sql.adaptive.localShuffleReader.enabled true 启用本地Shuffle读取
spark.sql.autoBroadcastJoinThreshold 10MB 广播Join阈值
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 本地Map Join阈值
通用优化 spark.sql.adaptive.enabled true 启用AQE
spark.sql.adaptive.logLevel INFO AQE日志级别
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2 非空分区比例阈值

1.5 AQE版本演进特性

Spark版本 新增AQE特性 解决的问题
Spark 3.0 • 动态合并Shuffle分区
• 动态切换Join策略
• 动态优化数据倾斜
初步解决静态优化器的局限性
Spark 3.1 • 动态分区剪裁(DPP)
• 优化Shuffle分区数
进一步提升复杂查询性能
Spark 3.2 • 运行时过滤优化
• 增强的倾斜处理
优化Join和过滤性能
Spark 3.3 • 自适应查询合并
• 更智能的广播超时处理
减少重复计算,增强稳定性
Spark 3.4+ • AI驱动的优化建议
• 更细粒度的自适应控制
智能化优化决策

‌注‌:AQE在Spark 3.0+版本默认启用,需结合集群资源和数据特征调整参数以最大化性能‌。

2 DPP

2.1 什么是DPP

DPP 是 Spark 针对分区表 Join / 过滤的优化机制,核心是在运行时动态推导分区裁剪条件,只扫描符合条件的分区,避免全表扫描。对比静态分区裁剪(仅能基于 SQL 中显式的过滤条件裁剪),DPP 的核心优势:

  • 静态裁剪:仅能识别 where dt='2026-01-01' 这类显式条件
  • DPP 裁剪:若查询是 select * from A join B on A.dt = B.dt where A.id=100,DPP 会先执行 A 表的过滤(id=100),得到 A 表的 dt 取值(如 2026-01-01),再将该条件动态应用到 B 表的分区裁剪,只扫描 B 表 dt=2026-01-01 的分区

2.2 开启DPP

spark.sql.optimizer.dynamicPartitionPruning.enabled

2.3 触发条件

注意:这个动态分区裁剪操作默认是开启的,但是触发动态分区裁剪是需要一些条件的。
(1)需要裁剪的表必须是分区表,并且分区字段必须在 Join 中的 on 条件里面
(2)Join 类型必须是 Inner Join、Left Semi Join、Left Outer Join 或者 Right Outer Join

  • 针对 Inner Join:Join 操作左边的表、右边的表可以都是分区表,或者只有某一个表是分区表,至少要有一个表是分区表,这样才能支持裁剪
  • 针对 Left Semi Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪
  • 针对 Left Outer Join:需要保证 Join 操作右边的表是分区表,这样才能支持裁剪
  • 针对 Right Outer Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪
    (3)另一张表里面需要至少存在一个过滤条件

https://ggwujun.github.io/blog-architect/spark性能调优实战/04.spark-sql性能调优篇/05/
https://chuna2.787528.xyz/yeyuzhuanjia/p/18791016
https://cloud.tencent.com/developer/article/1922959
https://juejin.cn/post/7504968076557205513
https://www.hangge.com/blog/cache/detail_3633.html

posted @ 2026-01-04 22:27  scales123  阅读(55)  评论(0)    收藏  举报