Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.1.2
-
None
-
None
Description
StreamingQuery.processAllAvailable() blocks forever when called on queries containing a mapGroupsWithState operation configured with GroupStateTimeout.ProcessingTimeTimeout().
I think processAllAvailable() should unblock when all incoming data has been processed AND when all existing groupStates do not have a current timeout specified.
Sample code to demonstrate this failure follows:
def demoSparkProcessAllAvailableBug() : Unit = { val localSpark = SparkSession .builder() .master("local[*]") .appName("demoSparkProcessAllAvailableBug") .config("spark.driver.host", "localhost") .getOrCreate() import localSpark.implicits._ val demoDataStream = MemoryStream[BugDemo.NameNumberData](1, localSpark.sqlContext) demoDataStream.addData(BugDemo.NameNumberData("Alice", 1)) demoDataStream.addData(BugDemo.NameNumberData("Bob", 2)) demoDataStream.addData(BugDemo.NameNumberData("Alice", 3)) demoDataStream.addData(BugDemo.NameNumberData("Bob", 4)) // StreamingQuery.processAllAvailable() is successful when executing against NoTimeout, // but blocks forever when executing against EventTimeTimeout val timeoutTypes = List(GroupStateTimeout.NoTimeout(), GroupStateTimeout.ProcessingTimeTimeout()) for (timeoutType <- timeoutTypes) { val totalByName = demoDataStream.toDF() .as[BugDemo.NameNumberData] .groupByKey(_.Name) .mapGroupsWithState(timeoutType)(BugDemo.summateRunningTotal) val totalByNameQuery = totalByName .writeStream .format("console") .outputMode("update") .start() println(s"${timeoutType} query starting to processAllAvailable()") totalByNameQuery.processAllAvailable() println(s"${timeoutType} query completed processAllAvailable()") totalByNameQuery.stop() } } } object BugDemo { def summateRunningTotal(name: String, input: Iterator[NameNumberData], groupState: GroupState[RunningTotal]): NameNumberData = { var currentTotal: Int = if (groupState.exists) { groupState.get.Total } else { 0 } for (nameNumberData <- input) { currentTotal += nameNumberData.Number } groupState.update(RunningTotal(currentTotal)) NameNumberData(name, currentTotal) } case class NameNumberData( Name: String, Number: Integer ) case class RunningTotal( Total: Integer ) }