Details
Description
In org.apache.spark.streaming.scheduler.JobGenerator.stop() , the checkpointWriter.stop is called before eventLoop.stop.
If i call the streamingContext.stop when a batch is about to complete(Im invoking it from a StreamingListener.onBatchCompleted callback) , a rejectedException may get thrown from checkPointWriter.executor, since the eventLoop will try to process DoCheckpoint events even after the checkPointWriter.executor was stopped.
16/04/18 19:22:10 ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@76e12f8 rejected from java.util.concurrent.ThreadPoolExecutor@4b9f5b97[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 49]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:253)
at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:294)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
I think the order of the stopping should be changed.