For each task that the TaskSetManager adds, it iterates through the entire list of existing tasks to check if it's there. As a result, scheduling a new task set is O(N^2), which can be slow for large task sets.
This is a bug that was introduced by https://github.com/apache/spark/commit/3535b91: that commit removed the "!readding" condition from the if-statement, but since the re-adding parameter defaulted to false, that commit should have removed the condition check in the if-statement altogether.
We discovered this bug while running a large pipeline with 200k tasks, when we found that the executors were not able to register with the driver because the driver was stuck holding a global lock in TaskSchedulerImpl.submitTasks function for a long time (it wasn't deadlocked – just taking a long time).
jstack of the driver - http://pastebin.com/m8CP6VMv
executor log - http://pastebin.com/2NPS1mXC
From the jstack I see that the thread handing the resource offer from executors (dispatcher-event-loop-9) is blocked on a lock held by the thread "dag-scheduler-event-loop", which is iterating over an entire ArrayBuffer when adding a pending tasks. So when we have 200k pending tasks, because of this o(n2) operations, the driver is just hung for more than 5 minutes.
Solution - In addPendingTask function, we don't really need a duplicate check. It's okay if we add a task to the same queue twice because dequeueTaskFromList will skip already-running tasks.
Please note that this is a regression from Spark 1.5.