Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
None
Description
We want to fix 2 things here:
1. Give better error message for Distinct related operations in append mode that doesn't have a watermark
Check the following example:
val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})
We'll get the following confusing exception:
java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ...
The union clause in SQL has the requirement of deduplication, the parser will generate Distinct(Union) and the optimizer rule ReplaceDistinctWithAggregate will change it to Aggregate(Union). So the root cause here is the checking logic for Aggregate is missing for Distinct.
Actually it happens for all Distinct related operations in Structured Streaming, e.g
val df = spark.readStream.format("rate").load() df.createOrReplaceTempView("deduptest") val distinct = spark.sql("select distinct value from deduptest") distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})
2. Make Distinct in complete mode runnable.
The distinct in complete mode will throw the exception:
Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;