Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24025

Join of bucketed and non-bucketed tables can give two exchanges and sorts for non-bucketed side

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0, 2.3.1
    • None
    • SQL

    Description

      While exploring bucketing I found the following join query of non-bucketed and bucketed tables that ends up with two exchanges and two sorts in the physical plan for the non-bucketed join side.

      // Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
      // Disable auto broadcasting
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      
      val bucketedTableName = "bucketed_4_id"
      val large = spark.range(1000000)
      large.write
        .bucketBy(4, "id")
        .sortBy("id")
        .mode("overwrite")
        .saveAsTable(bucketedTableName)
      
      // Describe the table and include bucketing spec only
      val descSQL = sql(s"DESC FORMATTED $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === "Sort Columns")
      scala> descSQL.show(truncate = false)
      +--------------+---------+-------+
      |col_name      |data_type|comment|
      +--------------+---------+-------+
      |Num Buckets   |4        |       |
      |Bucket Columns|[`id`]   |       |
      |Sort Columns  |[`id`]   |       |
      +--------------+---------+-------+
      
      val bucketedTable = spark.table(bucketedTableName)
      val t1 = spark.range(4)
        .repartition(2, $"id")  // Use just 2 partitions
        .sortWithinPartitions("id") // sort partitions
      
      val q = t1.join(bucketedTable, "id")
      // Note two exchanges and sorts
      scala> q.explain
      == Physical Plan ==
      *(5) Project [id#79L]
      +- *(5) SortMergeJoin [id#79L], [id#77L], Inner
         :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#79L, 4)
         :     +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0
         :        +- Exchange hashpartitioning(id#79L, 2)
         :           +- *(1) Range (0, 4, step=1, splits=8)
         +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0
            +- *(4) Project [id#77L]
               +- *(4) Filter isnotnull(id#77L)
                  +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
      
      q.foreach(_ => ())
      

      Attachments

        1. join-jira.png
          295 kB
          Jacek Laskowski

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jlaskowski Jacek Laskowski
              Votes:
              4 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: