Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.5.0, 4.0.0
Description
Simply said, this test code fails:
test("SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") { withTempDir { checkpoint => val dedupeInputData = MemoryStream[(String, Int)] val dedupe = dedupeInputData.toDS() .withColumn("eventTime", timestamp_seconds($"_2")) .withWatermark("eventTime", "10 second") .dropDuplicatesWithinWatermark("_1") .select($"_1", $"eventTime".cast("long").as[Long]) testStream(dedupe, Append)( StartStream(checkpointLocation = checkpoint.getCanonicalPath), AddData(dedupeInputData, "a" -> 1), CheckNewAnswer("a" -> 1), Execute { q => // This threw out error! q.lastExecution.executedPlan.canonicalized } ) } }
with below error:
[info] - SPARK-XXXXX: canonicalization of StreamingDeduplicateWithinWatermarkExec should work *** FAILED *** (1 second, 237 milliseconds) [info] Assert on query failed: Execute: None.get [info] scala.None$.get(Option.scala:627) [info] scala.None$.get(Option.scala:626) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.<init>(statefulOperators.scala:1101) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.copy(statefulOperators.scala:1092) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1148) [info] org.apache.spark.sql.execution.streaming.StreamingDeduplicateWithinWatermarkExec.withNewChildInternal(statefulOperators.scala:1087) [info] org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1210) [info] org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1208) [info] org.apache.spark.sql.execution.streaming.BaseStreamingDeduplicateExec.withNewChildrenInternal(statefulOperators.scala:949) [info] org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:323)
Attachments
Issue Links
- links to