Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27564

'No plan for EventTimeWatermark' error while using structured streaming with column pruning

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Structured Streaming

    Description

      I get 'No plan for EventTimeWatermark' error while doing a query with columns pruning using structured streaming with a custom data source that implements Spark datasource v2.

      My data source implementation that handles the schemas includes the following: 

      class MyDataSourceReader extends DataSourceReader with  SupportsPushDownRequiredColumns { 
          var schema: StructType = createSchema()
      
          override def readSchema(): StructType = schema
      
          override def pruneColumns(requiredSchema: StructType) = {
              this.schema = requiredSchema
          }

      and then:

      class MyDataSourceReaderStream extends MyDataSourceReader { ... }
      

      This is my test code:

      def x(): Unit = {
              val df1 = sparkSession.readStream.format(myV2Source).load()
      
              val df2 = df1
                      .withColumn("epoch", (round(col("epoch")/(30*1000))*30).cast(TimestampType))
                      .withWatermark("epoch", "1 milliseconds")
                      .groupBy(col("epoch"), col("id")).count()
      
              val streamingQuery = df2
                      .writeStream
                      .format("console")
                      .trigger(Trigger.ProcessingTime("10 seconds"))
                      .outputMode(OutputMode.Append())
                      .start()
      
              streamingQuery.awaitTermination()
         }
      

      I get the following exception:

      Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
      +- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L]
         +- DataSourceV2Relation [epoch#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
      
      at scala.Predef$.assert(Predef.scala:170)
      at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
      at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
      at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      

      Note that in the logical plan I got DataSourceV2Relation and not StreamingDataSourceV2Relation although I use streaming.

      Where is the problem? 

      Attachments

        Activity

          People

            Unassigned Unassigned
            Raviv Kineret
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: