Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32041

Exchange reuse won't work in cases when DPP, subqueries are involved

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.6, 3.0.0
    • 3.2.0
    • SQL
    • None

    Description

      When an Exchange node is repeated at multiple places in the PhysicalPlan, and if that exchange has some some DPP Subquery filter, then ReuseExchange doesn't work for such Exchange and different stages are launched to compute same thing.

      Example:

      // generate data
      val factData = (1 to 100).map(i => (i%5, i%20, i))
      factData.toDF("store_id", "product_id", "units_sold")
        .write
        .partitionBy("store_id")
        .format("parquet")
        .saveAsTable("fact_stats")
      
      val dimData = Seq[(Int, String, String)](
        (1, "AU", "US"),
        (2, "CA", "US"),
        (3, "KA", "IN"),
        (4, "DL", "IN"),
        (5, "GA", "PA"))
      
      dimData.toDF("store_id", "state_province", "country")
        .write
        .format("parquet")
        .saveAsTable("dim_stats")
      
      sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
      sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
      
      // Set Configs
      spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
      spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
      
      val query = """
          With view1 as (
            SELECT product_id, f.store_id
            FROM fact_stats f JOIN dim_stats
            ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
          SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
      """
      val df = spark.sql(query)
      println(df.queryExecution.executedPlan)
      
      
      Plan:
       *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
       :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, 0
       : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
       : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
       : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], Inner, BuildRight
       : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
       : : +- *(2) Filter isnotnull(product_id#1968)
       : : +- *(2) ColumnarToRow
       : : +- FileScan parquet default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: Parquet, Location: InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., PartitionFilters: [isnotnull(store_id#1970), dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int>
       : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], [id=#1131|#1131]
       : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#1021|#1021]
       : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#1021|#1021]
       : +- *(1) Project [store_id#1971|#1971]
       : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND isnotnull(store_id#1971))
       : +- *(1) ColumnarToRow
       : +- FileScan parquet default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), isnotnull(store_id#1971)|#1973), (country#1973 = IN), isnotnull(store_id#1971)], Format: Parquet, Location: InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: struct<store_id:int,country:string>
       +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, 0
       +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
      

      Issue:
      Note the last line of plan. Its a ReusedExchange which is pointing to id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange node is pointing to incorrect Child node (1026 instead of 1140) and so in actual, exchange reuse won't happen in this query.

      Another query where issue is because of ReuseSubquery:

      spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
      
      val query1 = """
                        | With view1 as (
                        |   SELECT product_id, units_sold
                        |   FROM fact_stats
                        |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
                        |         and units_sold = 2
                        | ), view2 as (
                        |   SELECT product_id, units_sold
                        |   FROM fact_stats
                        |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
                        |         and units_sold = 1
                        | )
                        |
                        | SELECT *
                        | FROM view1 v1 join view2 v2 join view2 v3
                        | WHERE v1.product_id = v2.product_id and v2.product_id = v3.product_id
      """
      // Here we are joining v2 with self. So it should use ReuseExchange. But final plan computes v2 twice.
      val df = spark.sql(query1);
      println(df.queryExecution.executedPlan)

      Here we are joining v2 with self. So it should use ReuseExchange. But final plan computes v2 twice.

       

      Attachments

        Activity

          People

            petertoth Peter Toth
            prakharjain09 Prakhar Jain
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: