Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32456

Check the Distinct by assuming it as Aggregate for Structured Streaming

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.1, 3.1.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      We want to fix 2 things here:

      1. Give better error message for Distinct related operations in append mode that doesn't have a watermark

      Check the following example: 

      val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1")
      val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2")
      val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2")
      unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})

      We'll get the following confusing exception:

      java.util.NoSuchElementException: None.get
      	at scala.None$.get(Option.scala:529)
      	at scala.None$.get(Option.scala:527)
      	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
      	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
      	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
      	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
      ...
      

      The union clause in SQL has the requirement of deduplication, the parser will generate Distinct(Union) and the optimizer rule ReplaceDistinctWithAggregate will change it to Aggregate(Union). So the root cause here is the checking logic for Aggregate is missing for Distinct.

      Actually it happens for all Distinct related operations in Structured Streaming, e.g

      val df = spark.readStream.format("rate").load()
      df.createOrReplaceTempView("deduptest")
      val distinct = spark.sql("select distinct value from deduptest")
      distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})

       

      2. Make Distinct in complete mode runnable.

      The distinct in complete mode will throw the exception:

       

      Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;

        Attachments

          Activity

            People

            • Assignee:
              XuanYuan Yuanjian Li
              Reporter:
              XuanYuan Yuanjian Li
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: