Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21565

aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.2.1, 2.3.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      *Short Description: *

      Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,

      Exception:

      Caused by: java.util.NoSuchElementException: None.get
      	at scala.None$.get(Option.scala:347)
      	at scala.None$.get(Option.scala:345)
      	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
      	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
      	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
      	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
      	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      

      Code to replicate:

      package test
      
      import java.nio.file.{Files, Path, Paths}
      import java.text.SimpleDateFormat
      
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{SparkSession}
      
      import scala.collection.JavaConverters._
      
      object Test1 {
      
        def main(args: Array[String]) {
      
          val sparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate()
      
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          val checkpointPath = "target/cp1"
          val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
          delete(newEventsPath)
          delete(Paths.get(checkpointPath).toAbsolutePath)
          Files.createDirectories(newEventsPath)
      
      
          val dfNewEvents= newEvents(sparkSession)
          dfNewEvents.createOrReplaceTempView("dfNewEvents")
      
          //The below works - Start
      //    val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
      //    dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
      //    val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
          // End
          
          
          //The below doesn't work - Start
          val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
           dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
            val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
          // - End
          
          
          val query1 = groupEvents.writeStream
            .outputMode("append")
              .format("console")
            .option("checkpointLocation", checkpointPath)
            .start("./myop")
      
          val newEventFile1=newEventsPath.resolve("eventNew1.json")
          Files.write(newEventFile1, List(
            """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
            """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
          ).toIterable.asJava)
          query1.processAllAvailable()
      
          sparkSession.streams.awaitAnyTermination(10000)
      
        }
      
        private def newEvents(sparkSession: SparkSession) = {
          val newEvents = Paths.get("target/newEvents/").toAbsolutePath
          delete(newEvents)
          Files.createDirectories(newEvents)
      
          val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
          dfNewEvents
        }
      
        private val eventsSchema = StructType(List(
          StructField("symbol", StringType, true),
          StructField("price", DoubleType, true),
          StructField("eventTime", TimestampType, false)
        ))
      
        private def delete(dir: Path) = {
          if(Files.exists(dir)) {
            Files.walk(dir).iterator().asScala.toList
              .map(p => p.toFile)
              .sortWith((o1, o2) => o1.compareTo(o2) > 0)
              .foreach(_.delete)
          }
        }
      
      }
      
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                joseph.torres Jose Torres
                Reporter:
                amit.assudani@gmail.com Amit Assudani
              • Votes:
                1 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: