Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24025

Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Incomplete
    • Affects Version/s: 2.3.0, 2.3.1
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:
    • Environment:

      Description

      While exploring bucketing I found the following join query of non-bucketed and bucketed tables that ends up with two exchanges and two sorts in the physical plan for the non-bucketed join side.

      // Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
      // Disable auto broadcasting
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      
      val bucketedTableName = "bucketed_4_id"
      val large = spark.range(1000000)
      large.write
        .bucketBy(4, "id")
        .sortBy("id")
        .mode("overwrite")
        .saveAsTable(bucketedTableName)
      
      // Describe the table and include bucketing spec only
      val descSQL = sql(s"DESC FORMATTED $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns")
      scala> descSQL.show(truncate = false)
      +--------------+---------+-------+
      |col_name      |data_type|comment|
      +--------------+---------+-------+
      |Num Buckets   |4        |       |
      |Bucket Columns|[`id`]   |       |
      |Sort Columns  |[`id`]   |       |
      +--------------+---------+-------+
      
      val bucketedTable = spark.table(bucketedTableName)
      val t1 = spark.range(4)
        .repartition(2, $"id")  // Use just 2 partitions
        .sortWithinPartitions("id") // sort partitions
      
      val q = t1.join(bucketedTable, "id")
      // Note two exchanges and sorts
      scala> q.explain
      == Physical Plan ==
      *(5) Project [id#79L]
      +- *(5) SortMergeJoin [id#79L], [id#77L], Inner
         :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#79L, 4)
         :     +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0
         :        +- Exchange hashpartitioning(id#79L, 2)
         :           +- *(1) Range (0, 4, step=1, splits=8)
         +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0
            +- *(4) Project [id#77L]
               +- *(4) Filter isnotnull(id#77L)
                  +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
      
      q.foreach(_ => ())
      

        Attachments

        1. join-jira.png
          295 kB
          Jacek Laskowski

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                jlaskowski Jacek Laskowski
              • Votes:
                4 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: