Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22211

LimitPushDown optimization for FullOuterJoin generates wrong results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.1, 2.3.0
    • SQL
    • None
    • on community.cloude.databrick.com
      Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)

    Description

      LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate a wrong result:

      Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected, but at right side we have 100K rows including 999, the result will be

      • one row is (999, 999)
      • the rest rows are (null, xxx)

      Once you call show(), the row (999,999) has only 1/100000th chance to be selected by CollectLimit.

      The actual optimization might be,

      • push down limit
      • but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.

      Here is my notebook:
      https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html

      import scala.util.Random._
      
      val dl = shuffle(1 to 100000).toDF("id")
      val dr = shuffle(1 to 100000).toDF("id")
      
      println("data frame dl:")
      dl.explain
      
      println("data frame dr:")
      dr.explain
      
      val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
      
      j.explain
      
      j.show(false)
      
      data frame dl:
      == Physical Plan ==
      LocalTableScan [id#10]
      data frame dr:
      == Physical Plan ==
      LocalTableScan [id#16]
      == Physical Plan ==
      CollectLimit 1
      +- SortMergeJoin [id#10], [id#16], FullOuter
         :- *Sort [id#10 ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#10, 200)
         :     +- *LocalLimit 1
         :        +- LocalTableScan [id#10]
         +- *Sort [id#16 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#16, 200)
               +- LocalTableScan [id#16]
      import scala.util.Random._
      dl: org.apache.spark.sql.DataFrame = [id: int]
      dr: org.apache.spark.sql.DataFrame = [id: int]
      j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
      
      +----+---+
      |id  |id |
      +----+---+
      |null|148|
      +----+---+
      

      Attachments

        Activity

          People

            henryr Henry Robinson
            bewang.tech Benyi Wang
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: