ExecutorAllocationListener doesn't clean up data properly. ExecutorAllocationListener performs progressively slower and eventually fails to process events in time.
There are two problems:
- a bug (typo?) in totalRunningTasksPerResourceProfile() method
getOrElseUpdate() is used instead of getOrElse().
If spark-dynamic-executor-allocation thread calls schedule() after a SparkListenerTaskEnd event for the last task in a stage
but before SparkListenerStageCompleted event for the stage, then stageAttemptToNumRunningTask will not be cleaned up properly.
- resourceProfileIdToStageAttempt clean-up is broken
If a SparkListenerTaskEnd event for the last task in a stage was processed before SparkListenerStageCompleted for that stage,
then resourceProfileIdToStageAttempt will not be cleaned up properly.
Bugs were introduced in this commit: https://github.com/apache/spark/commit/496f6ac86001d284cbfb7488a63dd3a168919c0f .
Steps to reproduce:
- Launch standalone master and worker with 'spark.shuffle.service.enabled=true'
- Run spark-shell with --conf 'spark.shuffle.service.enabled=true' --conf 'spark.dynamicAllocation.enabled=true' and paste this script
- make a heap dump and examine ExecutorAllocationListener.totalRunningTasksPerResourceProfile and ExecutorAllocationListener.resourceProfileIdToStageAttempt fields
Expected: totalRunningTasksPerResourceProfile and resourceProfileIdToStageAttempt(defaultResourceProfileId) are empty
Actual: totalRunningTasksPerResourceProfile and resourceProfileIdToStageAttempt(defaultResourceProfileId) contain non-relevant data