Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22683

DynamicAllocation wastes resources by allocating containers that will barely be used

    Details

    • Type: Improvement
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.1.0, 2.2.0
    • Fix Version/s: None
    • Component/s: Spark Core

      Description

      While migrating a series of jobs from MR to Spark using dynamicAllocation, I've noticed almost a doubling (+114% exactly) of resource consumption of Spark w.r.t MR, for a wall clock time gain of 43%

      About the context:

      • resource usage stands for vcore-hours allocation for the whole job, as seen by YARN
      • I'm talking about a series of jobs because we provide our users with a way to define experiments (via UI / DSL) that automatically get translated to Spark / MR jobs and submitted on the cluster
      • we submit around 500 of such jobs each day
      • these jobs are usually one shot, and the amount of processing can vary a lot between jobs, and as such finding an efficient number of executors for each job is difficult to get right, which is the reason I took the path of dynamic allocation.
      • Some of the tests have been scheduled on an idle queue, some on a full queue.
      • experiments have been conducted with spark.executor-cores = 5 and 10, only results for 5 cores have been reported because efficiency was overall better than with 10 cores
      • the figures I give are averaged over a representative sample of those jobs (about 600 jobs) ranging from tens to thousands splits in the data partitioning and between 400 to 9000 seconds of wall clock time.
      • executor idle timeout is set to 30s;

      Definition:

      • let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, which represent the max number of tasks an executor will process in parallel.
      • the current behaviour of the dynamic allocation is to allocate enough containers to have one taskSlot per task, which minimizes latency, but wastes resources when tasks are small regarding executor allocation and idling overhead.

      The results using the proposal (described below) over the job sample (600 jobs):

      • by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in resource usage, for a 37% (against 43%) reduction in wall clock time for Spark w.r.t MR
      • by trying to minimize the average resource consumption, I ended up with 6 tasks per core, with a 30% resource usage reduction, for a similar wall clock time w.r.t. MR

      What did I try to solve the issue with existing parameters (summing up a few points mentioned in the comments) ?

      • change dynamicAllocation.maxExecutors: this would need to be adapted for each job (tens to thousands splits can occur), and essentially remove the interest of using the dynamic allocation.
      • use dynamicAllocation.backlogTimeout:
      • setting this parameter right to avoid creating unused executors is very dependant on wall clock time. One basically needs to solve the exponential ramp up for the target time. So this is not an option for my use case where I don't want a per-job tuning.
      • I've still done a series of experiments, details in the comments. Result is that after manual tuning, the best I could get was a similar resource consumption at the expense of 20% more wall clock time, or a similar wall clock time at the expense of 60% more resource consumption than what I got using my proposal @ 6 tasks per slot (this value being optimized over a much larger range of jobs as already stated)
      • as mentioned in another comment, tampering with the exponential ramp up might yield task imbalance and such old executors could become contention points for other exes trying to remotely access blocks in the old exes (not witnessed in the jobs I'm talking about, but we did see this behavior in other jobs)

      Proposal:

      Simply add a tasksPerExecutorSlot parameter, which makes it possible to specify how many tasks a single taskSlot should ideally execute to mitigate the overhead of executor allocation.

      PR: https://github.com/apache/spark/pull/19881

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                jcuquemelle Julien Cuquemelle
              • Votes:
                1 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated: