Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37063 SQL Adaptive Query Execution QA: Phase 2
  3. SPARK-36443

Demote BroadcastJoin causes performance regression and increases OOM risks

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2
    • None
    • SQL
    • None

    Description

       

      A test case

      Use bin/spark-sql with local mode and all other default settings with 3.1.2 to run the case below

      // Some comments here
      set spark.sql.shuffle.partitions=20;
      set spark.sql.adaptive.enabled=true;
      -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- (default 0.2)enable this for not demote bhj
      set spark.sql.autoBroadcastJoinThreshold=200;
      SELECT
        l.id % 12345 k,
        sum(l.id) sum,
        count(l.id) cnt,
        avg(l.id) avg,
        min(l.id) min,
        max(l.id) max
      from (select id % 3 id from range(0, 1e8, 1, 100)) l
        left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id
      GROUP BY 1;
      

       
      1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out

       

       
      Job Id ▾ Description Submitted Duration Stages: Succeeded/Total Tasks (for all stages): Succeeded/Total
      4 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 17:31:37 71 ms 1/1 (4 skipped) 3/3 (205 skipped) 
       
      3 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 17:31:18 19 s 1/1 (3 skipped) 4/4 (201 skipped) 
       
      2 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 17:31:18 87 ms 1/1 (1 skipped) 1/1 (100 skipped) 
       
      1 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 17:31:16 2 s 1/1 100/100 
       
      0 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 17:31:15 2 s 1/1 100/100 

      2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to demote bhj

       

      Job Id (Job Group) ▾ Description Submitted Duration Stages: Succeeded/Total Tasks (for all stages): Succeeded/Total
      5 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 18:25:15 29 ms 1/1 (2 skipped) 3/3 (200 skipped) 
       
      4 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 18:25:13 2 s 1/1 (1 skipped) 100/100 (100 skipped) 
       
      3 (700fefe1-8446-4761-9be2-b68ed6e84c11) broadcast exchange (runId 700fefe1-8446-4761-9be2-b68ed6e84c11)$anonfun$withThreadLocalCaptured$1 at FutureTask.java:266 2021/08/06 18:25:13 54 ms 1/1 (2 skipped) 1/1 (101 skipped) 
       
      2 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 18:25:13 88 ms 1/1 (1 skipped) 1/1 (100 skipped) 
       
      1 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 18:25:10 2 s 1/1 100/100 
       
      0 SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1main at NativeMethodAccessorImpl.java:0 2021/08/06 18:25:10 3 s 1/1 100/100
       

      The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces highly compressed shuffle map output and 17/20 empty partitions at the reduced side, where is also the AQE reOptimize point for DynamicJoinSelection.

      Exchange
      
      shuffle records written: 100,000,000
      shuffle write time total (min, med, max )
      891 ms (2 ms, 5 ms, 33 ms )
      records read: 100,000,000
      local bytes read total (min, med, max )
      10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
      fetch wait time total (min, med, max )
      0 ms (0 ms, 0 ms, 0 ms )
      remote bytes read: 0.0 B
      local blocks read: 300
      remote blocks read: 0
      data size total (min, med, max )
      1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
      remote bytes read to disk: 0.0 B
      shuffle bytes written total (min, med, max )
      10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
      

       

      In the case 1), the bhj is demoted and the `coalesce partitions rule` successfully coalesces these 'small' partitions even set spark.sql.adaptive.advisoryPartitionSizeInBytes=1m. See,

       

      Then, as you can see at the smj phase, the former coalesce and the latter expansion cause performance regression

       

      // code placeholder
      Sort
      
      sort time total (min, med, max (stageId: taskId))
      166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
      peak memory total (min, med, max (stageId: taskId))
      315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
      spill size total (min, med, max (stageId: taskId))
      1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
      

       

       

      1 202 0 SUCCESS   driver     2021-08-06 17:31:18 18 s 4 s 3.0 ms 10.0 ms     105.3 MiB 1.0 ms 91 B / 1 3.4 MiB / 33333333 615 MiB 4.5 MiB  
      2 203 0 SUCCESS   driver     2021-08-06 17:31:18 19 s 4 s 4.0 ms 10.0 ms     105.3 MiB 1.0 ms 89 B / 1 3.4 MiB / 33333333 615 MiB 4.5 MiB  
      0 201 0 SUCCESS   driver     2021-08-06 17:31:18 17 s 4 s 6.0 ms 10.0 ms     105.3 MiB 1.0 ms 70 B / 1 3.3 MiB / 33333334 615 MiB 4.4 MiB

       

      In the case 2), the bhj mode increases task numbers which will casue extra schedule overhead and running unnecessary empty tasks, but it avoid the oom risk and the performance regression described  above.

      A real-world case, in which the expansion of the data increases the oom risk to a very high level. 

       

       

      Attachments

        1. screenshot-1.png
          39 kB
          Kent Yao 2
        2. image-2021-08-06-17-57-15-765.png
          166 kB
          Kent Yao 2
        3. image-2021-08-06-11-24-34-122.png
          971 kB
          Kent Yao 2

        Activity

          People

            Unassigned Unassigned
            Qin Yao Kent Yao 2
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: