Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47320

Datasets involving self joins behave in an inconsistent and unintuitive manner

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1
    • None
    • SQL

    Description

      The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works.

      Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes.  Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown.

      After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous.

      There are two ways to look at resolution regarding ambiguity

      1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds

      2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View.  So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity.

      For Example:

      val df1 = Seq((1, 2)).toDF("a", "b")
      val df2 = Seq((1, 2)).toDF("aa", "bb")
      val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
      df2("aa"), df1("b"))
      val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a"))

      The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1.

      But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not
      df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a")

      So In this case , current spark throws Exception as it is using resolution based on # 1

      But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes

      val df1 = Seq((1, 2)).toDF("a", "b")
      val df2 = Seq((1, 2)).toDF("aa", "bb")
      val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
      df2("aa"), df1("b"))

      df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))

      The difference in the 2 cases is that in the first case , select is present.
      But in the 2nd query, select is not there.

      So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous.

      IMHO , the ambiguity identification criteria should be based totally on #2 and consistently.

      In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria.

      There is an existing test in DataFrameSelfJoinSuite

      test("SPARK-28344: fail ambiguous self join - column ref in Project")
      val df1 = spark.range(3)
      val df2 = df1.filter($"id" > 0)

      Assertion1 : existing
      assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))

      Assertion2 : added by me
      assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))

      }

      Here the Assertion1 passes ( that is ambiguous exception is thrown)
      But the Assertion2 fails ( that is no ambiguous exception is thrown)
      The only chnage is the join order

      Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity.

       

      Also much of this confusion arises, because join conditions are attempted being resolved on the "un-deduplicated" plan. Attempt to resolve join condition should be made after the deduplication of Join Plan. Which is what the PR for the bug does.

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            ashahid7 Asif

            Dates

              Created:
              Updated:

              Slack

                Issue deployment