Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36874

Ambiguous Self-Join detected only on right dataframe

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2
    • 3.2.0, 3.1.3
    • SQL

    Description

      When joining two dataframes, if they share the same lineage and one dataframe is a transformation of the other, Ambiguous Self Join detection only works when transformed dataframe is the right dataframe.

      For instance df1 and df2 where df2 is a filtered df1, Ambiguous Self Join detection only works when df2 is the right dataframe:

      • df1.join(df2, ...) correctly fails with Ambiguous Self Join error
      • df2.join(df1, ...) returns a valid dataframe

      Minimum Reproducible example

      Code

      import sparkSession.implicit._
      
      val df1 = Seq((1, 2, "A1"),(2, 1, "A2")).toDF("key1", "key2", "value")
      
      val df2 = df1.filter($"value" === "A2")
      
      df2.join(df1, df1("key1") === df2("key2")).show()
      

      Expected Result

      Throw the following exception:

      Exception in thread "main" org.apache.spark.sql.AnalysisException: Column key2#11 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
      	at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:157)
      	at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:43)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
      	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
      	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
      	at scala.collection.immutable.List.foldLeft(List.scala:91)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
      	at scala.collection.immutable.List.foreach(List.scala:431)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:196)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:190)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:155)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
      	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
      	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:174)
      	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
      	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
      	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
      	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
      	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
      	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
      	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
      	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
      	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
      	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
      	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
      	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
      	at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715)
      	at org.apache.spark.sql.Dataset.join(Dataset.scala:1079)
      	at org.apache.spark.sql.Dataset.join(Dataset.scala:1041)
       ...
      

      Actual result

      Empty dataframe:

      +----+----+-----+----+----+-----+
      |key1|key2|value|key1|key2|value|
      +----+----+-----+----+----+-----+
      +----+----+-----+----+----+-----+
      

      Attachments

        Issue Links

          Activity

            People

              sarutak Kousuke Saruta
              vincentdoba Vincent Doba
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: