Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23406

Stream-stream self joins does not work

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1, 2.4.0
    • Structured Streaming
    • None

    Description

      Currently stream-stream self join throws the following error

      val df = spark.readStream.format("rate").option("numRowsPerSecond", "1").option("numPartitions", "1").load()
      display(df.withColumn("key", $"value" / 10).join(df.withColumn("key", $"value" / 5), "key"))
      

      error:

      Failure when resolving conflicting references in Join:
      'Join UsingJoin(Inner,List(key))
      :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
      : +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      
      Conflicting attributes: timestamp#850,value#851L
      ;;
      'Join UsingJoin(Inner,List(key))
      :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 as double)) AS key#855]
      : +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 as double)) AS key#860]
       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
      
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
       at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:101)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:378)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:98)
       at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:148)
       at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:98)
       at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:101)
       at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:71)
       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
       at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3063)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:787)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:756)
       at org.apache.spark.sql.Dataset.join(Dataset.scala:731)
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            tdas Tathagata Das
            tdas Tathagata Das
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment