Details
-
Epic
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
Stage Level Scheduling
Description
Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.
Objectives:
- Allow users to specify task and executor resource requirements at the stage level.
- Spark will use the stage level requirements to acquire the necessary resources/executors and schedule tasks based on the per stage requirements.
Many times users have different resource requirements for different stages of their application so they want to be able to configure resources at the stage level. For instance, you have a single job that has 2 stages. The first stage does some ETL which requires a lot of tasks, each with a small amount of memory and 1 core each. Then you have a second stage where you feed that ETL data into an ML algorithm. The second stage only requires a few executors but each executor needs a lot of memory, GPUs, and many cores. This feature allows the user to specify the task and executor resource requirements for the ETL Stage and then change them for the ML stage of the job.
Resources include cpu, memory (on heap, overhead, pyspark, and off heap), and extra Resources (GPU/FPGA/etc). It has the potential to allow for other things like limiting the number of tasks per stage, specifying other parameters for things like shuffle, etc. Initially I would propose we only support resources as they are now. So Task resources would be cpu and other resources (GPU, FPGA), that way we aren't adding in extra scheduling things at this point. Executor resources would be cpu, memory, and extra resources(GPU,FPGA, etc). Changing the executor resources will rely on dynamic allocation being enabled.
Main use cases:
- ML use case where user does ETL and feeds it into an ML algorithm where it’s using the RDD API. This should work with barrier scheduling as well once it supports dynamic allocation.
- This adds the framework/api for Spark's own internal use. In the future (not covered by this SPIP), Catalyst could control the stage level resources as it finds the need to change it between stages for different optimizations. For instance, with the new columnar plugin to the query planner we can insert stages into the plan that would change running something on the CPU in row format to running it on the GPU in columnar format. This API would allow the planner to make sure the stages that run on the GPU get the corresponding GPU resources it needs to run. Another possible use case for catalyst is that it would allow catalyst to add in more optimizations to where the user doesn’t need to configure container sizes at all. If the optimizer/planner can handle that for the user, everyone wins.
This SPIP focuses on the RDD API but we don’t exclude the Dataset API. I think the DataSet API will require more changes because it specifically hides the RDD from the users via the plans and catalyst can optimize the plan and insert things into the plan. The only way I’ve found to make this work with the Dataset API would be modifying all the plans to be able to get the resource requirements down into where it creates the RDDs, which I believe would be a lot of change. If other people know better options, it would be great to hear them.
Q2. What problem is this proposal NOT designed to solve?
The initial implementation is not going to add Dataset APIs.
We are starting with allowing users to specify a specific set of task/executor resources and plan to design it to be extendable, but the first implementation will not support changing generic SparkConf configs and only specific limited resources.
This initial version will have a programmatic API for specifying the resource requirements per stage, we can add the ability to perhaps have profiles in the configs later if its useful.
Q3. How is it done today, and what are the limits of current practice?
Currently this is either done by having multiple spark jobs or requesting containers with the max resources needed for any part of the job. To do this today, you can break it into separate jobs where each job requests the corresponding resources needed, but then you have to write the data out somewhere and then read it back in between jobs. This is going to take longer as well as require that job coordination between those to make sure everything works smoothly. Another option would be to request executors with your largest need up front and potentially waste those resources when they aren't being used, which in turn wastes money. For instance, for an ML application where it does ETL first, many times people request containers with GPUs and the GPUs sit idle while the ETL is happening. This is wasting those GPU resources and in turn money because those GPUs could have been used by other applications until they were really needed.
Note for the catalyst internal use, that can’t be done today.
Q4. What is new in your approach and why do you think it will be successful?
This is a new way for users to specify the per stage resource requirements. This will give users and Spark a lot more flexibility within a job and get better utilization of their hardware.
Q5. Who cares? If you are successful, what difference will it make?
Spark application developers, cluster admins, managers and companies who pay the bills for running Spark. It has the potential to make a huge difference in cost by utilizing resources better and saving developers time.
I’ve talked to different people from different companies and all of them have said this would be a useful feature for them.
Q6. What are the risks?
The scoping of the new API could cause some confusion to the user as to which resources actually get used in a stage. If the user has specified different resources in multiple RDDs that get combined into a single stage, the scheduler will have to merge those and come up with a final container size to request. We will have a specific algorithm for merging but if the user doesn’t realize things get combined or that some RDD’s require shuffle, they might get confused. I don't know how to get around this other then to document the way it works and try to make it obvious to the user what was chosen. Another option here is to have it fail if it gets a conflict to make sure the user is aware. We could have a config flag for this to have it fail first and then they could allow it to by turning the config on. See the design doc for options on scoping.
The cluster managers (like YARN) and dynamic allocation manager have to track everything at a ResourceProfile (specific set of resource requirements) level rather than just a global cores or executors level, so it requires a bunch of data structure changes to those.
Q7. How long will it take?
I suspect this will take multiple months because it’s a fairly large change. I think we can do it in pieces fairly easily though. For instance, I think we can do the dynamic allocation manager and scheduler, YARN cluster manager, and then finally the RDD API. We can do the backend pieces first where the global resource configs apply and then once we add in the actual RDD API, it will only be there and only at that point would the user actually see it. I have a rough prototype of those where I was investigating what would all need to change.
Q8. What are the mid-term and final “exams” to check for success?
Success is for the user to specify the resources per stage with dynamic allocation and everything to work with it. One stage would run with a set of resources and when the next stage starts with different resources the first stage containers are let go and new ones acquired. The mid-term might be to put in the changes for the allocation manager and cluster manager and scheduler and have the normal global resource requirements continue to work as expected.
Appendix A. Proposed API Changes. Optional section defining APIs changes, if any. Backward and forward compatibility must be taken into account.
I split the appendices out into a google doc since it was getting big and to allow inline comments, see link below
Appendix B. Optional Design Sketch: How are the goals going to be accomplished? Give sufficient technical detail to allow a contributor to judge whether it’s likely to be feasible. Note that this is not a full design document.
I split the appendices out into a google doc since it was getting big and to allow inline comments, see link below
Appendix C. Optional Rejected Designs: What alternatives were considered? Why were they rejected? If no alternatives have been considered, the problem needs more thought.
I split the appendices out into a google doc since it was getting big and to allow inline comments, see link below
Attachments
Issue Links
- is related to
-
SPARK-31378 stage level scheduling dynamic allocation bug with initial num executors
- Resolved
-
SPARK-29151 Support fraction resources for task resource scheduling
- Resolved
-
SPARK-31437 Try assigning tasks to existing executors by which required resources in ResourceProfile are satisfied
- Open
-
SPARK-32591 Add better api docs for stage level scheduling Resources
- Open
-
SPARK-33447 Stage level scheduling, allow specifying other spark configs via ResourceProfile
- Open
- links to