Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40703

Performance regression for joins in Spark 3.3 vs Spark 3.2

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.3.1, 3.4.0
    • SQL
    • None

    Description

      When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a performance regression vs Spark 3.2 was discovered. More specifically, it appears as if EnsureRequirements.ensureDistributionAndOrdering() no longer enforces a minimum number of partitions for a join distribution in some cases. This impacts DSv2 datasources, because if a scan has only a single read partition DataSourceV2ScanExecBase.outputPartitioning() returns a SinglePartition instance. The SinglePartition creates a SinglePartitionShuffleSpec, and SinglePartitionShuffleSpec.canCreatePartitioning() returns true.

      Because canCreatePartitioning() returns true in this case, EnsureRequirements.ensureDistributionAndOrdering() won't enforce minimum parallelism and also will favor the single partition when considering the best distribution candidate. Ultimately this results in a single partition being selected for the join distribution, even if the other side of the join is a large table with many partitions. This can seriously impact performance of the join.

      Spark 3.2 enforces minimum parallelism differently in ensureDistributionAndOrdering() and thus does not suffer from this issue. It will shuffle both sides of the join to enforce parallelism.

      In the TPC-DS benchmark, some queries affected include 14a and 14b. This can also be demonstrated using a simple query, for example:

      select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = ics.i_item_sk

      ...where item is a small table that is read into one partition, and catalog_sales is a large table. These tables are part of the TPC-DS but you can create your own. Also, to demonstrate the issue, you may need to turn off broadcast joins though that is not required for this issue to occur, it happens when running the TPC-DS with broadcast setting at default.

      Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan shows how in Spark 3.2, the join parallelism of 200 is reached by inserting an exchange after the item table scan. In Spark 3.3, no such exchange is inserted and the join parallelism is 1.

      Attachments

        1. spark32-plan.txt
          1 kB
          Bryan Keller
        2. spark33-plan.txt
          1 kB
          Bryan Keller
        3. test.py
          0.8 kB
          Bryan Keller

        Issue Links

          Activity

            People

              csun Chao Sun
              bryanck Bryan Keller
              Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: