Details
-
Sub-task
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.1.2
-
None
-
None
Description
A test case
Use bin/spark-sql with local mode and all other default settings with 3.1.2 to run the case below
// Some comments here set spark.sql.shuffle.partitions=20; set spark.sql.adaptive.enabled=true; -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- (default 0.2)enable this for not demote bhj set spark.sql.autoBroadcastJoinThreshold=200; SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1;
1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out
Job Id ▾ | Description | Submitted | Duration | Stages: Succeeded/Total | Tasks (for all stages): Succeeded/Total |
---|---|---|---|---|---|
4 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 17:31:37 | 71 ms | 1/1 (4 skipped) | 3/3 (205 skipped) |
3 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 17:31:18 | 19 s | 1/1 (3 skipped) | 4/4 (201 skipped) |
2 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 17:31:18 | 87 ms | 1/1 (1 skipped) | 1/1 (100 skipped) |
1 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 17:31:16 | 2 s | 1/1 | 100/100 |
0 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 17:31:15 | 2 s | 1/1 | 100/100 |
2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to demote bhj
Job Id (Job Group) ▾ | Description | Submitted | Duration | Stages: Succeeded/Total | Tasks (for all stages): Succeeded/Total |
---|---|---|---|---|---|
5 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 18:25:15 | 29 ms | 1/1 (2 skipped) | 3/3 (200 skipped) |
4 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 18:25:13 | 2 s | 1/1 (1 skipped) | 100/100 (100 skipped) |
3 (700fefe1-8446-4761-9be2-b68ed6e84c11) | broadcast exchange (runId 700fefe1-8446-4761-9be2-b68ed6e84c11)$anonfun$withThreadLocalCaptured$1 at FutureTask.java:266 | 2021/08/06 18:25:13 | 54 ms | 1/1 (2 skipped) | 1/1 (101 skipped) |
2 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 18:25:13 | 88 ms | 1/1 (1 skipped) | 1/1 (100 skipped) |
1 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 18:25:10 | 2 s | 1/1 | 100/100 |
0 | SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 | 2021/08/06 18:25:10 | 3 s | 1/1 | 100/100 |
The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces highly compressed shuffle map output and 17/20 empty partitions at the reduced side, where is also the AQE reOptimize point for DynamicJoinSelection.
Exchange shuffle records written: 100,000,000 shuffle write time total (min, med, max ) 891 ms (2 ms, 5 ms, 33 ms ) records read: 100,000,000 local bytes read total (min, med, max ) 10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB ) fetch wait time total (min, med, max ) 0 ms (0 ms, 0 ms, 0 ms ) remote bytes read: 0.0 B local blocks read: 300 remote blocks read: 0 data size total (min, med, max ) 1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB ) remote bytes read to disk: 0.0 B shuffle bytes written total (min, med, max ) 10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
In the case 1), the bhj is demoted and the `coalesce partitions rule` successfully coalesces these 'small' partitions even set spark.sql.adaptive.advisoryPartitionSizeInBytes=1m. See,
Then, as you can see at the smj phase, the former coalesce and the latter expansion cause performance regression
// code placeholder
Sort
sort time total (min, med, max (stageId: taskId))
166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
peak memory total (min, med, max (stageId: taskId))
315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
spill size total (min, med, max (stageId: taskId))
1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
1 | 202 | 0 | SUCCESS | driver | 2021-08-06 17:31:18 | 18 s | 4 s | 3.0 ms | 10.0 ms | 105.3 MiB | 1.0 ms | 91 B / 1 | 3.4 MiB / 33333333 | 615 MiB | 4.5 MiB | ||||||
2 | 203 | 0 | SUCCESS | driver | 2021-08-06 17:31:18 | 19 s | 4 s | 4.0 ms | 10.0 ms | 105.3 MiB | 1.0 ms | 89 B / 1 | 3.4 MiB / 33333333 | 615 MiB | 4.5 MiB | ||||||
0 | 201 | 0 | SUCCESS | driver | 2021-08-06 17:31:18 | 17 s | 4 s | 6.0 ms | 10.0 ms | 105.3 MiB | 1.0 ms | 70 B / 1 | 3.3 MiB / 33333334 | 615 MiB | 4.4 MiB |
In the case 2), the bhj mode increases task numbers which will casue extra schedule overhead and running unnecessary empty tasks, but it avoid the oom risk and the performance regression described above.
A real-world case, in which the expansion of the data increases the oom risk to a very high level.