Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46676

dropDuplicatesWithinWatermark throws error on canonicalizing plan

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Simply said, this test code fails:

      test("SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") {
        withTempDir { checkpoint =>
          val dedupeInputData = MemoryStream[(String, Int)]
          val dedupe = dedupeInputData.toDS()
            .withColumn("eventTime", timestamp_seconds($"_2"))
            .withWatermark("eventTime", "10 second")
            .dropDuplicatesWithinWatermark("_1")
            .select($"_1", $"eventTime".cast("long").as[Long])
      
          testStream(dedupe, Append)(
            StartStream(checkpointLocation = checkpoint.getCanonicalPath),
      
            AddData(dedupeInputData, "a" -> 1),
            CheckNewAnswer("a" -> 1),
      
            Execute { q =>
              // This threw out error!
              q.lastExecution.executedPlan.canonicalized
            }
          )
        }
      } 

      with below error:

      [info] - SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 237 milliseconds)
      [info]   Assert on query failed: Execute: None.get
      [info]   scala.None$.get(Option.scala:627)
      [info]       scala.None$.get(Option.scala:626)
      [info]       org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101)
      [info]       org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092)
      [info]       org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148)
      [info]       org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087)
      [info]       org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210)
      [info]       org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208)
      [info]       org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949)
      [info]       org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323) 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kabhwan Jungtaek Lim
            kabhwan Jungtaek Lim
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment