Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32456

Check the Distinct by assuming it as Aggregate for Structured Streaming

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.1, 3.1.0
    • Structured Streaming
    • None

    Description

      We want to fix 2 things here:

      1. Give better error message for Distinct related operations in append mode that doesn't have a watermark

      Check the following example: 

      val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1")
      val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2")
      val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2")
      unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})

      We'll get the following confusing exception:

      java.util.NoSuchElementException: None.get
      	at scala.None$.get(Option.scala:529)
      	at scala.None$.get(Option.scala:527)
      	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
      	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
      	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
      	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
      ...
      

      The union clause in SQL has the requirement of deduplication, the parser will generate Distinct(Union) and the optimizer rule ReplaceDistinctWithAggregate will change it to Aggregate(Union). So the root cause here is the checking logic for Aggregate is missing for Distinct.

      Actually it happens for all Distinct related operations in Structured Streaming, e.g

      val df = spark.readStream.format("rate").load()
      df.createOrReplaceTempView("deduptest")
      val distinct = spark.sql("select distinct value from deduptest")
      distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})

       

      2. Make Distinct in complete mode runnable.

      The distinct in complete mode will throw the exception:

       

      Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;

      Attachments

        Activity

          People

            XuanYuan Yuanjian Li
            XuanYuan Yuanjian Li
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: