Affects Version/s: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
Fix Version/s: None
In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***166700000" but with the timestamp ***166500000 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed
We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA.
1. The application properties: Batch Duration: 20000, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, SlideDuration, 20000
2. RDD at 166500000 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator
3. Job at 166500000 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator
4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 166520000 - 166700000 are generated and completed.
5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 166520000 - 166700000 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 166700000.
6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted.
7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- (166600000, 166700000] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread
8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY CRITICAL: at this step, RDD no later than 166600000 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with the timestamp 166500000. and at this moment, Application crashed
10. Application recovers from /path/checkpoint-166700000 and try to get RDD with validTime 166500000. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 166700000
The proposed fix:
0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?)
1. ClearMetadata shall be ClearMedataAndCheckpoint
2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here