Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17698

Join predicates should not contain filter clauses

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.0
    • 2.0.2, 2.1.0
    • SQL
    • None

    Description

      `ExtractEquiJoinKeys` is incorrectly using filter predicates as the join condition for joins. While this does not lead to incorrect results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary shuffle + sort. eg.

      val df = (1 until 10).toDF("id").coalesce(1)
      hc.sql("DROP TABLE IF EXISTS table1").collect
      df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
      hc.sql("DROP TABLE IF EXISTS table2").collect
      df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")
      
      sqlContext.sql("""
        SELECT a.id, b.id
        FROM table1 a
        FULL OUTER JOIN table2 b
        ON a.id = b.id AND a.id='1' AND b.id='1'
      """).explain(true)
      

      This is doing shuffle + sort over table scan outputs which is not needed as both tables are bucketed and sorted on the same columns and have same number of buckets. This should be a single stage job.

      SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter
      :- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
      :     +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
      +- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
            +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
      

      Attachments

        Activity

          People

            tejasp Tejas Patil
            tejasp Tejas Patil
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: