Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23406

Stream-stream self joins does not work

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.3.1, 2.4.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      Currently stream-stream self join throws the following error

      val df = spark.readStream.format("rate").option("numRowsPerSecond", "1").option("numPartitions", "1").load()
      display(df.withColumn("key", $"value" / 10).join(df.withColumn("key", $"value" / 5), "key"))
      

      error:

      Failure when resolving conflicting references in Join:
      'Join UsingJoin(Inner,List(key))
      :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
      : +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      
      Conflicting attributes: timestamp#850,value#851L
      ;;
      'Join UsingJoin(Inner,List(key))
      :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
      : +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
       at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:101)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:378)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:98)
       at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:148)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:98)
       at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:101)
       at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:71)
       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
       at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3063)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:787)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:756)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:731)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tdas Tathagata Das
                Reporter:
                tdas Tathagata Das
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: