Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.3.0, 1.4.1, 1.5.2, 1.6.0
Description
The issue happens with the following sample code: uses updateStateByKey followed by a map with checkpoint interval 10 seconds
val sparkConf = new SparkConf().setAppName("test") val streamingContext = new StreamingContext(sparkConf, Seconds(10)) streamingContext.checkpoint("""checkpoint""") val source = streamingContext.socketTextStream("localhost", 9999) val updatedResult = source.map( (1,_)).updateStateByKey( (newlist : Seq[String], oldstate : Option[String]) => newlist.headOption.orElse(oldstate)) updatedResult.map(_._2) .checkpoint(Seconds(10)) .foreachRDD((rdd, t) => { println("Deep: " + rdd.toDebugString.split("\n").length) println(t.toString() + ": " + rdd.collect.length) }) streamingContext.start() streamingContext.awaitTermination()
From the output, we can see that the dependency will be increasing time over time, the updateStateByKey never get check-pointed, and finally, the stack overflow will happen.
Note:
- The rdd in updatedResult.map(_._2) get check-pointed in this case, but not the updateStateByKey
- If remove the checkpoint(Seconds(10)) from the map result ( updatedResult.map(_._2) ), the stack overflow will not happen