Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.2.0
-
None
Description
*Short Description: *
Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
Exception:
Caused by: java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
Code to replicate:
package test import java.nio.file.{Files, Path, Paths} import java.text.SimpleDateFormat import org.apache.spark.sql.types._ import org.apache.spark.sql.{SparkSession} import scala.collection.JavaConverters._ object Test1 { def main(args: Array[String]) { val sparkSession = SparkSession .builder() .master("local[*]") .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val checkpointPath = "target/cp1" val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath delete(newEventsPath) delete(Paths.get(checkpointPath).toAbsolutePath) Files.createDirectories(newEventsPath) val dfNewEvents= newEvents(sparkSession) dfNewEvents.createOrReplaceTempView("dfNewEvents") //The below works - Start // val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds") // dfNewEvents2.createOrReplaceTempView("dfNewEvents2") // val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp") // End //The below doesn't work - Start val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds") dfNewEvents2.createOrReplaceTempView("dfNewEvents2") val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime") // - End val query1 = groupEvents.writeStream .outputMode("append") .format("console") .option("checkpointLocation", checkpointPath) .start("./myop") val newEventFile1=newEventsPath.resolve("eventNew1.json") Files.write(newEventFile1, List( """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""", """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}""" ).toIterable.asJava) query1.processAllAvailable() sparkSession.streams.awaitAnyTermination(10000) } private def newEvents(sparkSession: SparkSession) = { val newEvents = Paths.get("target/newEvents/").toAbsolutePath delete(newEvents) Files.createDirectories(newEvents) val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds") dfNewEvents } private val eventsSchema = StructType(List( StructField("symbol", StringType, true), StructField("price", DoubleType, true), StructField("eventTime", TimestampType, false) )) private def delete(dir: Path) = { if(Files.exists(dir)) { Files.walk(dir).iterator().asScala.toList .map(p => p.toFile) .sortWith((o1, o2) => o1.compareTo(o2) > 0) .foreach(_.delete) } } }
Attachments
Issue Links
- links to