Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20373

Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.1.0, 2.2.0
    • 2.2.0
    • Structured Streaming
    • None

    Description

      Any Dataset/DataFrame batch query with the operation `withWatermark` does not execute because the batch planner does not have any rule to explicitly handle the EventTimeWatermark logical plan. The right solution is to simply remove the plan node, as the watermark should not affect any batch query in any way.

      from pyspark.sql.functions import *
      
      eventsDF = spark.createDataFrame([("2016-03-11 09:00:07", "dev1", 123)]).toDF("eventTime", "deviceId", "signal").select(col("eventTime").cast("timestamp").alias("eventTime"), "deviceId", "signal")
      
      windowedCountsDF = \
        eventsDF \
          .withWatermark("eventTime", "10 minutes") \
          .groupBy(
            "deviceId",
            window("eventTime", "5 minutes")) \
          .count()
      
      windowedCountsDF.collect()
      

      This throws as an error

      java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark eventTime#3762657: timestamp, interval 10 minutes
      +- Project [cast(_1#3762643 as timestamp) AS eventTime#3762657, _2#3762644 AS deviceId#3762651]
         +- LogicalRDD [_1#3762643, _2#3762644, _3#3762645L]
      
      	at scala.Predef$.assert(Predef.scala:170)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
      

      Attachments

        Issue Links

          Activity

            People

              uncleGen Genmao Yu
              tdas Tathagata Das
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: