Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24615

SPIP: Accelerator-aware task scheduling for Spark

Details

    • Epic
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 3.0.0
    • Spark Core
    • GPU-aware Scheduling

    Description

      (The JIRA received a major update on 2019/02/28. Some comments were based on an earlier version. Please ignore them. New comments start at comment-16778026.)

      Background and Motivation

      GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. While users from the AI community use GPUs heavily, they often need Apache Spark to load and process large datasets and to handle complex data scenarios like streaming. YARN and Kubernetes already support GPUs in their recent releases. Although Spark supports those two cluster managers, Spark itself is not aware of GPUs exposed by them and hence Spark cannot properly request GPUs and schedule them for users. This leaves a critical gap to unify big data and AI workloads and make life simpler for end users.

      To make Spark be aware of GPUs, we shall make two major changes at high level:

      • At cluster manager level, we update or upgrade cluster managers to include GPU support. Then we expose user interfaces for Spark to request GPUs from them.
      • Within Spark, we update its scheduler to understand available GPUs allocated to executors, user task requests, and assign GPUs to tasks properly.

      Based on the work done in YARN and Kubernetes to support GPUs and some offline prototypes, we could have necessary features implemented in the next major release of Spark. You can find a detailed scoping doc here, where we listed user stories and their priorities.

      Goals

      • Make Spark 3.0 GPU-aware in standalone, YARN, and Kubernetes.
      • No regression on scheduler performance for normal jobs.

      Non-goals

      • Fine-grained scheduling within one GPU card.
        • We treat one GPU card and its memory together as a non-divisible unit.
      • Support TPU.
      • Support Mesos.
      • Support Windows.

      Target Personas

      • Admins who need to configure clusters to run Spark with GPU nodes.
      • Data scientists who need to build DL applications on Spark.
      • Developers who need to integrate DL features on Spark.

      Attachments

        Issue Links

          Activity

            tgraves Thomas Graves added a comment -

            maybe I"m missing it but how is this working with the resource manager?

            rdd.map { xxxx }

              .reduceByKey { xxx }

              .mapPartition { accelerator required tasks }

              .enableAccelerator()

              .collect()

             

            If I asked for 200 executors up front, it has to know this at time it asks for them.

            tgraves Thomas Graves added a comment - maybe I"m missing it but how is this working with the resource manager? rdd.map { xxxx }   .reduceByKey { xxx }   .mapPartition { accelerator required tasks }   .enableAccelerator()   .collect()   If I asked for 200 executors up front, it has to know this at time it asks for them.
            tgraves Thomas Graves added a comment - jerryshao ^
            jerryshao Saisai Shao added a comment -

            Sorry tgraves for the late response. Yes,  when requesting executors, user should know accelerators are required or not. If there's no satisfied accelerators, the job will be pending or not launched. 

            jerryshao Saisai Shao added a comment - Sorry tgraves for the late response. Yes,  when requesting executors, user should know accelerators are required or not. If there's no satisfied accelerators, the job will be pending or not launched. 
            tgraves Thomas Graves added a comment -

            The user is responsible for asking yarn for the right executors?  I assume perhaps based of the node label or something like that?  Yarn in like 3.x has the ability to support other resource types like gpus, it would be much more efficient if you can ask the resource manager for the necessary resources, is there plan to support that?    I also don't think its very good behavior to have the job just hang if user doesn't have the necessary resources.  it feels like that should either be fail or if it can be linked to dynamic allocation that would be better. 

            I left some questions in the design doc.

            tgraves Thomas Graves added a comment - The user is responsible for asking yarn for the right executors?  I assume perhaps based of the node label or something like that?  Yarn in like 3.x has the ability to support other resource types like gpus, it would be much more efficient if you can ask the resource manager for the necessary resources, is there plan to support that?    I also don't think its very good behavior to have the job just hang if user doesn't have the necessary resources.  it feels like that should either be fail or if it can be linked to dynamic allocation that would be better.  I left some questions in the design doc.
            jerryshao Saisai Shao added a comment -

            Yes, currently the user is responsible for asking yarn to get resources like GPU, for example like --num-gpus

            Yes, I agree with you, the concerns you mentioned above is valid. But currently the design of this Jira only targets to the task level scheduling with accelerator resources already available. It assumes that accelerator resources is already got by executor and reported back to driver. Driver will schedule the tasks based on the available resources.

            For Spark to communicate with cluster manager to request other resources like GPU, currently it is not covered in this design doc.

            Xiangrui mentioned that Spark to communicate with cluster manager should also be covered in this SPIP, so I'm still under drafting.

             

            jerryshao Saisai Shao added a comment - Yes, currently the user is responsible for asking yarn to get resources like GPU, for example like --num-gpus .  Yes, I agree with you, the concerns you mentioned above is valid. But currently the design of this Jira only targets to the task level scheduling with accelerator resources already available. It assumes that accelerator resources is already got by executor and reported back to driver. Driver will schedule the tasks based on the available resources. For Spark to communicate with cluster manager to request other resources like GPU, currently it is not covered in this design doc. Xiangrui mentioned that Spark to communicate with cluster manager should also be covered in this SPIP, so I'm still under drafting.  
            tgraves Thomas Graves added a comment -

            ok, I agree, I think this SPIP should at least cover how it could be integrated with the clusters managers to make sure its possible without large redesign later.  The implementation could obviously wait.

            tgraves Thomas Graves added a comment - ok, I agree, I think this SPIP should at least cover how it could be integrated with the clusters managers to make sure its possible without large redesign later.  The implementation could obviously wait.
            jerryshao Saisai Shao added a comment -

            Sure, I will also add it as Xiangrui also suggested the same concern.

            jerryshao Saisai Shao added a comment - Sure, I will also add it as Xiangrui also suggested the same concern.
            tgraves Thomas Graves added a comment -

            did the design doc permissions change? I can't seem to access it now.

            A few overall concerns. We are now making accelerator configurations available per stage, but what about cpu and memory?  It seems like if we are going to start making things configurable at the stage/rdd level it would be nice to be consistent.  People have asked for this ability in the past.

            What about the case where to run some ML algorithm you would want machines of different types?  For instance tensorflow with a parameter server might want gpu nodes for the workers but the parameter server would just be a cpu.  This would also apply to the barrier scheduler so might cross post there. 

            tgraves Thomas Graves added a comment - did the design doc permissions change? I can't seem to access it now. A few overall concerns. We are now making accelerator configurations available per stage, but what about cpu and memory?  It seems like if we are going to start making things configurable at the stage/rdd level it would be nice to be consistent.  People have asked for this ability in the past. What about the case where to run some ML algorithm you would want machines of different types?  For instance tensorflow with a parameter server might want gpu nodes for the workers but the parameter server would just be a cpu.  This would also apply to the barrier scheduler so might cross post there. 
            jerryshao Saisai Shao added a comment -

            Hi tgraves I'm rewriting the design doc based on the comments mentioned above, so temporarily make it inaccessible, sorry about it, I will reopen it.

            I think it is hard to control the memory usage per stage/task, because task is running in the executor which shared within a JVM. For CPU, yes I think we can do it, but I'm not sure the usage scenario of it.

            For the requirement of using different types of machine, what I can think of is leveraging dynamic resource allocation. For example, if user wants run some MPI jobs with barrier enabled, then Spark could allocate some new executors with accelerator resource via cluster manager (for example using node label if it is running on YARN). But I will not target this as a goal in this design, since a) it is a non-goal for barrier scheduler currently; b) it makes the design too complex, would be better to separate to another work.

            jerryshao Saisai Shao added a comment - Hi tgraves I'm rewriting the design doc based on the comments mentioned above, so temporarily make it inaccessible, sorry about it, I will reopen it. I think it is hard to control the memory usage per stage/task, because task is running in the executor which shared within a JVM. For CPU, yes I think we can do it, but I'm not sure the usage scenario of it. For the requirement of using different types of machine, what I can think of is leveraging dynamic resource allocation. For example, if user wants run some MPI jobs with barrier enabled, then Spark could allocate some new executors with accelerator resource via cluster manager (for example using node label if it is running on YARN). But I will not target this as a goal in this design, since a) it is a non-goal for barrier scheduler currently; b) it makes the design too complex, would be better to separate to another work.
            tgraves Thomas Graves added a comment -

            I think the usage for cpu/memory is the same.  You know one job or stage has x number of tasks and perhaps they are caching at the time and need more memory or cpu. For instance look at your accelerator example. Let say I'm doing some etl before my ML.  Once I get to the point I want to do the ML I want to ask for the gpu's as well as ask for more memory during that stage because I didn't need more before this stage for all the etl work.  I realize you already have executors, but ideally spark with the cluster manager could potentially release the existing ones and ask for new ones with those requirements.   That is obviously a separate task to do the latter part but I think if we are creating an interface we should keep those cases in mind.  

            tgraves Thomas Graves added a comment - I think the usage for cpu/memory is the same.  You know one job or stage has x number of tasks and perhaps they are caching at the time and need more memory or cpu. For instance look at your accelerator example. Let say I'm doing some etl before my ML.  Once I get to the point I want to do the ML I want to ask for the gpu's as well as ask for more memory during that stage because I didn't need more before this stage for all the etl work.  I realize you already have executors, but ideally spark with the cluster manager could potentially release the existing ones and ask for new ones with those requirements.   That is obviously a separate task to do the latter part but I think if we are creating an interface we should keep those cases in mind.  
            jerryshao Saisai Shao added a comment -

            Thanks tgraves for the suggestion. 

            Once I get to the point I want to do the ML I want to ask for the gpu's as well as ask for more memory during that stage because I didn't need more before this stage for all the etl work.  I realize you already have executors, but ideally spark with the cluster manager could potentially release the existing ones and ask for new ones with those requirements.

            Yes, I already discussed with my colleague offline, this is a valid scenario, but I think to achieve this we should change the current dynamic resource allocation mechanism Currently I marked this as a Non-Goal in this proposal, only focus on statically resource requesting (--executor-cores, --executor-gpus). I think we should support it later.

            jerryshao Saisai Shao added a comment - Thanks tgraves for the suggestion.  Once I get to the point I want to do the ML I want to ask for the gpu's as well as ask for more memory during that stage because I didn't need more before this stage for all the etl work.  I realize you already have executors, but ideally spark with the cluster manager could potentially release the existing ones and ask for new ones with those requirements. Yes, I already discussed with my colleague offline, this is a valid scenario, but I think to achieve this we should change the current dynamic resource allocation mechanism Currently I marked this as a Non-Goal in this proposal, only focus on statically resource requesting (--executor-cores, --executor-gpus). I think we should support it later.
            tgraves Thomas Graves added a comment - - edited

            but my point is exactly that, it shouldn't be yet another mechanism for this, why not make a generic one that can handle all resources?   The dynamic allocation mechanism should ideally also handle gpus as I've stated above.

            tgraves Thomas Graves added a comment - - edited but my point is exactly that, it shouldn't be yet another mechanism for this, why not make a generic one that can handle all resources?   The dynamic allocation mechanism should ideally also handle gpus as I've stated above.
            mengxr Xiangrui Meng added a comment -

            tgraves Could you help link some past requests on configurable CPU/memory per stage? And you are suggesting making the API generalizable to those scenarios, but not including the feature under the scope of this proposal, correct?

            Btw, how do you like the following API?

            rdd.withResources
              .prefer("/gpu/k80", 2) // prefix of resource logical name, amount
              .require("/cpu", 1)
              .require("/memory", 8192000000)
              .require("/disk", 1000000000000)

             

            mengxr Xiangrui Meng added a comment - tgraves  Could you help link some past requests on configurable CPU/memory per stage? And you are suggesting making the API generalizable to those scenarios, but not including the feature under the scope of this proposal, correct? Btw, how do you like the following API? rdd.withResources .prefer( "/gpu/k80" , 2) // prefix of resource logical name, amount .require( "/cpu" , 1) .require( "/memory" , 8192000000) .require( "/disk" , 1000000000000)  
            jerryshao Saisai Shao added a comment -

            Hi tgraves I'm still not sure how to handle memory per stage. Unlike MR, Spark shares the task runtime in a single JVM, I'm not sure how to control the memory usage within the JVM. Are you suggesting the similar approach like using GPU, when memory requirement cannot be satisfied, release the current executors and requesting new executors by dynamic resource allocation?

            jerryshao Saisai Shao added a comment - Hi tgraves I'm still not sure how to handle memory per stage. Unlike MR, Spark shares the task runtime in a single JVM, I'm not sure how to control the memory usage within the JVM. Are you suggesting the similar approach like using GPU, when memory requirement cannot be satisfied, release the current executors and requesting new executors by dynamic resource allocation?
            tgraves Thomas Graves added a comment -

            yes if any requirement can't be satisfied it would use dynamic allocation to release and reacquire containers.    I'm not saying we have to implement those parts right now, I'm saying we should keep them in mind during the design of this so they could be added later.  I linked one old Jira that was about dynamically changing things. Its been brought up many times after in prs and just talking to customers not sure if there are other Jira as well.  Its also somewhat related to SPARK-20589 where people just want to configure things per stage.

            I actually question if this should be done at the rdd level as well.  A set of partitions don't care what the resources are, its generally the action you are taking on those rdd(s). Note it could be more then one rdd.  I could do etl stuff on an RDD which resources would be totally different then if I ran tensorflow on that RDD for example.  I do realize this is being tied in with the barrier stuff which is on the mapPartitions

            I'm not trying to be difficult and realize this Jira is more specific to the external ML algo's but don't want many api's for the same thing.

            I unfortunately haven't thought through a good solution for this, a while back my initial thought was to be able to pass in that resource context to the api calls, this obviously gets more tricky especially with pure sql support.  I need to think about some more.  the above proposal for .withResources is definitely closer but wonder about tying to the rdd still.

            cc irashid mridulm80 who I think this has been brought up before with.

            tgraves Thomas Graves added a comment - yes if any requirement can't be satisfied it would use dynamic allocation to release and reacquire containers.    I'm not saying we have to implement those parts right now, I'm saying we should keep them in mind during the design of this so they could be added later.  I linked one old Jira that was about dynamically changing things. Its been brought up many times after in prs and just talking to customers not sure if there are other Jira as well.  Its also somewhat related to SPARK-20589 where people just want to configure things per stage. I actually question if this should be done at the rdd level as well.  A set of partitions don't care what the resources are, its generally the action you are taking on those rdd(s). Note it could be more then one rdd.  I could do etl stuff on an RDD which resources would be totally different then if I ran tensorflow on that RDD for example.  I do realize this is being tied in with the barrier stuff which is on the mapPartitions I'm not trying to be difficult and realize this Jira is more specific to the external ML algo's but don't want many api's for the same thing. I unfortunately haven't thought through a good solution for this, a while back my initial thought was to be able to pass in that resource context to the api calls, this obviously gets more tricky especially with pure sql support.  I need to think about some more.  the above proposal for .withResources is definitely closer but wonder about tying to the rdd still. cc irashid mridulm80 who I think this has been brought up before with.

            tgraves This was indeed a recurring issue - the ability to modulate ask's to RM based on current requirements.
            What you bring out is an excellent point - changing resource requirements would be very useful - particularly for applications with heterogenous resource needs. Even currently when executor_memory/executor_cores does not allign well with stage requirements, we end up with OOM - resulting in over-provisioning memory needs; resulting in suboptimal use. GPU/accelerator aware scheduler is an extension of the same - where we have other resources to consider.

            I agree with tgraves that a more general way to model this would look at all resources (when declaratively specified ofcourse) and use the information to allocate resources (from RM) and for task schedule (within spark).

            mridulm80 Mridul Muralidharan added a comment - tgraves This was indeed a recurring issue - the ability to modulate ask's to RM based on current requirements. What you bring out is an excellent point - changing resource requirements would be very useful - particularly for applications with heterogenous resource needs. Even currently when executor_memory/executor_cores does not allign well with stage requirements, we end up with OOM - resulting in over-provisioning memory needs; resulting in suboptimal use. GPU/accelerator aware scheduler is an extension of the same - where we have other resources to consider. I agree with tgraves that a more general way to model this would look at all resources (when declaratively specified ofcourse) and use the information to allocate resources (from RM) and for task schedule (within spark).
            tgraves Thomas Graves added a comment -

            Ok so thinking about this a bit more I slightly misread what it was applying to.  Really you are associated it with the new rdd that will be created (val rddWithGPUResult = rdd.withResources.xxx) , not the original and to regenerate the new rddwithGPUResult you would want to know it was created using those resources.  The thing that isn't clear to me is the scoping of this. 

            For instance if I say I have the code

            val rdd1 = sc.textFile("README.md")

            val rdd2 = rdd1.withResources.mapPartitions().collect().

            Does the withResources apply to the entire line up to the action?  

            What if I change it to say

            val rdd2 = rdd1.withResources.mapPartitions()

            val res = rdd2.collect()

            Does the withResources only apply to the mapPartitions, which is really what I think you want with some of the ml algorithms.  So we need to define what it applies to.  Doing something similar but would have more obvious scope:

             

            val rdd2 = withResources

            {    rdd1.mapPartitions() }

            The above would be very obvious to the user the scope on it.

            You also have things people could do like:

            val rdd1 = rdd.withResources.mapPartitions()

            val rdd2 = rdd.withResources.mapPartitions()

            val rdd3 rdd1.join(rdd2)

            I think in this case rdd1 and rdd2 have to be individually materialized before you do the join for rdd3.

            So its more like an implicit val rdd1 = rdd.withResources.mapPartitions().eval() . You end up with putting in some stage boundaries.

            Have you thought about the scope and have ideas around this?

             

            tgraves Thomas Graves added a comment - Ok so thinking about this a bit more I slightly misread what it was applying to.  Really you are associated it with the new rdd that will be created (val rddWithGPUResult = rdd.withResources.xxx) , not the original and to regenerate the new rddwithGPUResult you would want to know it was created using those resources.  The thing that isn't clear to me is the scoping of this.  For instance if I say I have the code val rdd1 =  sc.textFile("README.md") val rdd2 = rdd1.withResources.mapPartitions().collect(). Does the withResources apply to the entire line up to the action?   What if I change it to say val rdd2 = rdd1.withResources.mapPartitions() val res = rdd2.collect() Does the withResources only apply to the mapPartitions, which is really what I think you want with some of the ml algorithms.  So we need to define what it applies to.  Doing something similar but would have more obvious scope:   val rdd2 = withResources {    rdd1.mapPartitions() } The above would be very obvious to the user the scope on it. You also have things people could do like: val rdd1 = rdd.withResources .mapPartitions() val rdd2 = rdd.withResources .mapPartitions() val rdd3 rdd1.join(rdd2) I think in this case rdd1 and rdd2 have to be individually materialized before you do the join for rdd3. So its more like an implicit val rdd1 = rdd.withResources .mapPartitions().eval() . You end up with putting in some stage boundaries. Have you thought about the scope and have ideas around this?  
            tgraves Thomas Graves added a comment -

            the other thing which I think I mentioned above is could this handle saying, I want 1 node with 4GB, 4 cores and 3 nodes with 2 gpus, 10GB, 1cores) . 

            tgraves Thomas Graves added a comment - the other thing which I think I mentioned above is could this handle saying, I want 1 node with 4GB, 4 cores and 3 nodes with 2 gpus, 10GB, 1cores) . 
            jerryshao Saisai Shao added a comment -

            Hi tgraves what you mentioned above is also what we think about and try to figure out a way to solve it. (this problem also existed in barrier execution).

            From user point, specifying resource through RDD is the only feasible way currently what I can think, though resource is bound to stage/task not particular RDD. This means user could specify resources for different RDDs in a single stage, Spark can only use one resource within this stage. This will bring out several problems as you mentioned:

            Specify resources to which RDD

            For example rddA.withResource.mapPartition { xxx }.collect() is not different from rddA.mapPartition { xxx }.withResource.collect. Since all the rdds are executed in the same stage. So in the current design, not matter the resource is specified with rddA or mapped RDD, the result is the same.

            one to one dependency RDDs with different resources

            For example rddA.withResource.mapPartition { xxx }.withResource.collec(), here assuming the resource request for rddA and mapped RDD is different, since they're running in a single stage, so we should fix such conflict.

            multiple dependencies RDDs with different resources

            For example:

            val rddA = rdd.withResources.mapPartitions()
            
            val rddB = rdd.withResources.mapPartitions()
            
            val rddC = rddA.join(rddB)
            

            If the resources in rddA is different from rddB, then we should also fix such conflicts.

            Previously I proposed to use largest resource requirement to satisfy all the needs. But it may also cause the resource wasting, mengxr mentioned to set/merge resources per partition to avoid waste. In the meanwhile, it there's a API exposed to set resources in the stage level, then this problem will not be existed, but Spark doesn't expose such APIs to user, the only thing user can specify is from RDD level, I'm still thinking of a good way to fix it.
             

            jerryshao Saisai Shao added a comment - Hi tgraves what you mentioned above is also what we think about and try to figure out a way to solve it. (this problem also existed in barrier execution). From user point, specifying resource through RDD is the only feasible way currently what I can think, though resource is bound to stage/task not particular RDD. This means user could specify resources for different RDDs in a single stage, Spark can only use one resource within this stage. This will bring out several problems as you mentioned: Specify resources to which RDD For example rddA.withResource.mapPartition { xxx }.collect() is not different from rddA.mapPartition { xxx }.withResource.collect . Since all the rdds are executed in the same stage. So in the current design, not matter the resource is specified with rddA or mapped RDD, the result is the same. one to one dependency RDDs with different resources For example rddA.withResource.mapPartition { xxx }.withResource.collec() , here assuming the resource request for rddA and mapped RDD is different, since they're running in a single stage, so we should fix such conflict. multiple dependencies RDDs with different resources For example: val rddA = rdd.withResources.mapPartitions() val rddB = rdd.withResources.mapPartitions() val rddC = rddA.join(rddB) If the resources in rddA is different from rddB , then we should also fix such conflicts. Previously I proposed to use largest resource requirement to satisfy all the needs. But it may also cause the resource wasting, mengxr mentioned to set/merge resources per partition to avoid waste. In the meanwhile, it there's a API exposed to set resources in the stage level, then this problem will not be existed, but Spark doesn't expose such APIs to user, the only thing user can specify is from RDD level, I'm still thinking of a good way to fix it.  

            Is this work related to Spark's project Hydrogen's API?

            https://www.datanami.com/2018/06/05/project-hydrogen-unites-apache-spark-with-dl-frameworks/

            We also want to make Spark aware of accelerators so you can actually comfortably use FPGA or GPUs in your latest clusters.

            Tagar Ruslan Dautkhanov added a comment - Is this work related to Spark's project Hydrogen's API? https://www.datanami.com/2018/06/05/project-hydrogen-unites-apache-spark-with-dl-frameworks/ We also want to make Spark aware of accelerators so you can actually comfortably use FPGA or GPUs in your latest clusters.
            jerryshao Saisai Shao added a comment -

            Tagar yes!

            jerryshao Saisai Shao added a comment - Tagar yes!
            tgraves Thomas Graves added a comment -

            Right so I think part of this is trying to make it more obvious to the user what the scope actually is.  This in some ways is similar to caching.  I see people many times force an evaluation after a cache to force the data to actually be cached because otherwise it might not do what they expect.  that is why I mentioned the .eval() type functionality.

             For the example:
            val rddA = rdd.withResources.mapPartitions()

            val rddB = rdd.withResources.mapPartitions()

            val rddC = rddA.join(rddB)

             

            Above the mapPartitions would normally get their own stages correct?  So I would think those stages would be with the resources specific but the join would be with the default resources.  then you wouldn't have to worry about merging, etc.  But you have the case with map or others where they wouldn't normally get their own stage so the question is perhaps should they, or do you provide something to?

             

            tgraves Thomas Graves added a comment - Right so I think part of this is trying to make it more obvious to the user what the scope actually is.  This in some ways is similar to caching.  I see people many times force an evaluation after a cache to force the data to actually be cached because otherwise it might not do what they expect.  that is why I mentioned the .eval() type functionality.  For the example: val rddA = rdd.withResources.mapPartitions() val rddB = rdd.withResources.mapPartitions() val rddC = rddA.join(rddB)   Above the mapPartitions would normally get their own stages correct?  So I would think those stages would be with the resources specific but the join would be with the default resources.  then you wouldn't have to worry about merging, etc.  But you have the case with map or others where they wouldn't normally get their own stage so the question is perhaps should they, or do you provide something to?  
            irashid Imran Rashid added a comment -

            hi, just catching up here – agree with a lot of Tom's concerns. There are a lot of corner cases we'd need to think about. Eg., you mentioned joins, that in general they have stage boundaries – but then again you might in some cases have a shared partitioner, which would remove the stage boundary. And then there are other cases like zip().

            What if you had two rdds in one stage with conflicting requirements? Eg. they request different accelerator types, and no node in the cluster has both available? Would you fail? Or introduce a stage boundary? I think failing is fine, it would be OK if we require the user to put in their own boundary there (eg. by writing to hdfs) just want to think it through.

            irashid Imran Rashid added a comment - hi, just catching up here – agree with a lot of Tom's concerns. There are a lot of corner cases we'd need to think about. Eg., you mentioned joins, that in general they have stage boundaries – but then again you might in some cases have a shared partitioner, which would remove the stage boundary. And then there are other cases like zip() . What if you had two rdds in one stage with conflicting requirements? Eg. they request different accelerator types, and no node in the cluster has both available? Would you fail? Or introduce a stage boundary? I think failing is fine, it would be OK if we require the user to put in their own boundary there (eg. by writing to hdfs) just want to think it through.
            jerryshao Saisai Shao added a comment -

            Hi tgraves irashid thanks a lot for your comments.

            Currently in my design I don't insert a specific stage boundary with different resources, the stage boundary is still the same (by shuffle or by result). so withResouces is not an eval() action which trigger a stage. Instead, it just adds a resource hint to the RDD.

            So which means RDDs with different resources requirements in one stage may have conflicts. For example: rdd1.withResources.mapPartitions { xxx }.withResources.mapPartitions { xxx }.collect, resources in rdd1 may be different from map rdd, so currently what I can think is that:

            1. always pick the latter with warning log to say that multiple different resources in one stage is illegal.
            2. fail the stage with warning log to say that multiple different resources in one stage is illegal.
            3. merge conflicts with maximum resources needs. For example rdd1 requires 3 gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would be 4 gpus per task. (This is the high level description, details will be per partition based merging) [chosen].

            Take join for example, where rdd1 and rdd2 may have different resource requirements, and joined RDD will potentially have other resource requirements.

            For example:

            val rddA = rdd.mapPartitions().withResources
            val rddB = rdd.mapPartitions().withResources
            val rddC = rddA.join(rddB).withResources
            rddC.collect()
            

            Here this 3 withResources may have different requirements. Since rddC is running in a different stage, so there's no need to merge the resource conflicts. But rddA and rddB are running in the same stage with different tasks (partitions). So the merging strategy I'm thinking is based on the partition, tasks running with partitions from rddA will use the resource specified by rddA, so does rddB.

            jerryshao Saisai Shao added a comment - Hi tgraves irashid thanks a lot for your comments. Currently in my design I don't insert a specific stage boundary with different resources, the stage boundary is still the same (by shuffle or by result). so withResouces is not an eval() action which trigger a stage. Instead, it just adds a resource hint to the RDD. So which means RDDs with different resources requirements in one stage may have conflicts. For example: rdd1.withResources.mapPartitions { xxx }.withResources.mapPartitions { xxx }.collect , resources in rdd1 may be different from map rdd, so currently what I can think is that: 1. always pick the latter with warning log to say that multiple different resources in one stage is illegal. 2. fail the stage with warning log to say that multiple different resources in one stage is illegal. 3. merge conflicts with maximum resources needs. For example rdd1 requires 3 gpus per task, rdd2 requires 4 gpus per task, then the merged requirement would be 4 gpus per task. (This is the high level description, details will be per partition based merging) [chosen] . Take join for example, where rdd1 and rdd2 may have different resource requirements, and joined RDD will potentially have other resource requirements. For example: val rddA = rdd.mapPartitions().withResources val rddB = rdd.mapPartitions().withResources val rddC = rddA.join(rddB).withResources rddC.collect() Here this 3 withResources may have different requirements. Since rddC is running in a different stage, so there's no need to merge the resource conflicts. But rddA and rddB are running in the same stage with different tasks (partitions). So the merging strategy I'm thinking is based on the partition, tasks running with partitions from rddA will use the resource specified by rddA , so does rddB .
            tgraves Thomas Graves added a comment -

            so I guess my question is this the right approach at all.  Should we make it more obvious to the user where the boundaries would end by adding something like the eval().  I guess they could do like what they do for caching today which is force an action like count() to force the eval.

            If we are making the user add in their own action to evaluate, I would pick failing if there are conflicting resources. That way its completely obvious to the user that they might not be getting what they want.  The only hard part is then though if the user doesn't know how to resolve that because of the way spark decides to pick its stages which might not be obvious to the users.  Especially on the SQL side, but perhaps we aren't going to support that right now or support via hints?

            Its still not clear to me from the above are we saying the withResources applies to just the immediate next stage then, or it sticks until they change it again?

             

            tgraves Thomas Graves added a comment - so I guess my question is this the right approach at all.  Should we make it more obvious to the user where the boundaries would end by adding something like the eval().  I guess they could do like what they do for caching today which is force an action like count() to force the eval. If we are making the user add in their own action to evaluate, I would pick failing if there are conflicting resources. That way its completely obvious to the user that they might not be getting what they want.  The only hard part is then though if the user doesn't know how to resolve that because of the way spark decides to pick its stages which might not be obvious to the users.  Especially on the SQL side, but perhaps we aren't going to support that right now or support via hints? Its still not clear to me from the above are we saying the withResources applies to just the immediate next stage then, or it sticks until they change it again?  
            jerryshao Saisai Shao added a comment - - edited

            Hi tgraves, I think eval() might unnecessarily break the lineage which could execute in one stage, for example: data transforming -> training -> transforming, this could possibly run in one stage, using eval will break into several stages, I'm not sure if it is the good choice. Also if we use eval to break the lineage, how do we store the intermediate data, like shuffle, or in memory/ on disk?

            Yes, how to break the boundaries is hard for user to know, but currently I cannot figure out a good solution, unless we use eval() to explicitly separate them. To solve the conflicts, failing might be one choice. In the SQL or DF area, I don't think we have to expose such low level RDD APIs to user, maybe some hints should be enough (though I haven't thought about it).

            Currently in my design, withResources only applies to the stage in which RDD will be executed, the following stages will still be ordinary stages without additional resources.

            jerryshao Saisai Shao added a comment - - edited Hi tgraves , I think eval() might unnecessarily break the lineage which could execute in one stage, for example: data transforming -> training -> transforming, this could possibly run in one stage, using eval will break into several stages, I'm not sure if it is the good choice. Also if we use eval to break the lineage, how do we store the intermediate data, like shuffle, or in memory/ on disk? Yes, how to break the boundaries is hard for user to know, but currently I cannot figure out a good solution, unless we use eval() to explicitly separate them. To solve the conflicts, failing might be one choice. In the SQL or DF area, I don't think we have to expose such low level RDD APIs to user, maybe some hints should be enough (though I haven't thought about it). Currently in my design, withResources only applies to the stage in which RDD will be executed, the following stages will still be ordinary stages without additional resources.
            eje Erik Erlandson added a comment -

            Am I understanding correctly that this can't assign executors to desired resources without resorting to Dynamic Allocation to tear down an Executor and reallocate it somewhere else?

            eje Erik Erlandson added a comment - Am I understanding correctly that this can't assign executors to desired resources without resorting to Dynamic Allocation to tear down an Executor and reallocate it somewhere else?
            jerryshao Saisai Shao added a comment - - edited

            Leveraging dynamic allocation to tear down and bring up desired executor is a non-goal here, we will address it in future, currently we're still focusing on using static allocation like spark.executor.gpus.

            jerryshao Saisai Shao added a comment - - edited Leveraging dynamic allocation to tear down and bring up desired executor is a non-goal here, we will address it in future, currently we're still focusing on using static allocation like spark.executor.gpus.
            merlintang Mingjie Tang added a comment -

            From user's perspective, user only concern about the GPU resource for RDD, and do not understand the stage or partitions of RDD. Therefore, underline resource allocation mechanism would  assign the resources to executor automatically. 

            Similar as cache or persistence to different level, maybe we can provide different configuration to users. Then, resource allocation to follow the predefined policy to allocate resource. 

            merlintang Mingjie Tang added a comment - From user's perspective, user only concern about the GPU resource for RDD, and do not understand the stage or partitions of RDD. Therefore, underline resource allocation mechanism would  assign the resources to executor automatically.  Similar as cache or persistence to different level, maybe we can provide different configuration to users. Then, resource allocation to follow the predefined policy to allocate resource. 
            tgraves Thomas Graves added a comment -

            jerryshao  just curious where this is at, are you still working on it?

            tgraves Thomas Graves added a comment - jerryshao   just curious where this is at, are you still working on it?
            felixcheung Felix Cheung added a comment -

            We are interested to know as well.

             

            mengxr touched on this maybe Oct/Nov 2018, but I haven't heard anything else since.

            felixcheung Felix Cheung added a comment - We are interested to know as well.   mengxr  touched on this maybe Oct/Nov 2018, but I haven't heard anything else since.
            jerryshao Saisai Shao added a comment -

            I'm really sorry about the delay. Due to some changes on my side, I didn't have enough time to work on this before. But I talked to Xiangrui offline recently, we will continue to work on this and finalize it in 3.0.

            jerryshao Saisai Shao added a comment - I'm really sorry about the delay. Due to some changes on my side, I didn't have enough time to work on this before. But I talked to Xiangrui offline recently, we will continue to work on this and finalize it in 3.0.
            jiangxb1987 Xingbo Jiang added a comment - - edited

            I updated the SPIP and Product docs, please review and leave comments in this ticket.

            jiangxb1987 Xingbo Jiang added a comment - - edited I updated the SPIP and Product docs, please review and leave comments in this ticket.
            mengxr Xiangrui Meng added a comment -

            Attached the PDF files.

            mengxr Xiangrui Meng added a comment - Attached the PDF files.
            jomach Jorge Machado added a comment -

            Hi Guys, is there any progress here ? I would like to help on the implementation step. I did not see any interfaces designed yet.

            jomach Jorge Machado added a comment - Hi Guys, is there any progress here ? I would like to help on the implementation step. I did not see any interfaces designed yet.
            tgraves Thomas Graves added a comment -

            Hey jomach, thanks for the offer, you can find SPIP attached and designs are in the EPIC jiras.  So yes we are working on the issues in the Epic, and most of them are complete, see the links above.  I think most of them are already assigned, currently we are waiting on the standalonemode implementation PR to be reviewed.  I think the only one not assigned would be the mesos one, but we were waiting on someone with mesos experience that was wanting that, if that is your background you would take a look at that.

            tgraves Thomas Graves added a comment - Hey jomach , thanks for the offer, you can find SPIP attached and designs are in the EPIC jiras.  So yes we are working on the issues in the Epic, and most of them are complete, see the links above.  I think most of them are already assigned, currently we are waiting on the standalonemode implementation PR to be reviewed.  I think the only one not assigned would be the mesos one, but we were waiting on someone with mesos experience that was wanting that, if that is your background you would take a look at that.
            S71955 Sujith Chacko added a comment -

            Will this feature be a part of Spark 3.0?  Any update on release timeline of this feature. Thanks

            S71955 Sujith Chacko added a comment - Will this feature be a part of Spark 3.0?  Any update on release timeline of this feature. Thanks
            tgraves Thomas Graves added a comment -

            yes it will be in 3.0, the feature is complete other then if someone wants mesos support, see the linked jiras in the epic.

             

            You can find documentation checked in master branch:

            https://github.com/apache/spark/blob/master/docs/configuration.md#custom-resource-scheduling-and-configuration-overview

            tgraves Thomas Graves added a comment - yes it will be in 3.0, the feature is complete other then if someone wants mesos support, see the linked jiras in the epic.   You can find documentation checked in master branch: https://github.com/apache/spark/blob/master/docs/configuration.md#custom-resource-scheduling-and-configuration-overview
            S71955 Sujith Chacko added a comment -

            Great!!Thanks for the update.

            S71955 Sujith Chacko added a comment - Great!!Thanks for the update.
            jomach Jorge Machado added a comment -

            tgraves thanks for the input. It would be great to have one or two examples on how to use the GPUs within a dataset. 

            I tried to figure out the api but I did not find any useful docs. Any tip?

            jomach Jorge Machado added a comment - tgraves  thanks for the input. It would be great to have one or two examples on how to use the GPUs within a dataset.  I tried to figure out the api but I did not find any useful docs. Any tip?
            tgraves Thomas Graves added a comment -

            This is purely a scheduling feature and Spark will assign GPUs to particular tasks. From there its the users responsibility to look at those assignments and do whatever they want with the GPU.  For instance you might pass it into tensor flow on Spark or some other ML/AI framework.

            Do you mean the actual Dataset operations using GPU?   Such as doing df.join.groupby.filter?  

            That isn't supported inside of Spark itself, nor is part of this feature. There was another Jira  (SPARK-27396) we added support for adding columnar plugin to Spark that would allow someone to write a plugin that does stuff on the GPU.  Nvidia is working on such a plugin but it is not publicly available yet.

            tgraves Thomas Graves added a comment - This is purely a scheduling feature and Spark will assign GPUs to particular tasks. From there its the users responsibility to look at those assignments and do whatever they want with the GPU.  For instance you might pass it into tensor flow on Spark or some other ML/AI framework. Do you mean the actual Dataset operations using GPU?   Such as doing df.join.groupby.filter?   That isn't supported inside of Spark itself, nor is part of this feature. There was another Jira  ( SPARK-27396 ) we added support for adding columnar plugin to Spark that would allow someone to write a plugin that does stuff on the GPU.  Nvidia is working on such a plugin but it is not publicly available yet.
            jomach Jorge Machado added a comment -

            Yeah, that was my question. Thanks for the response. I will look at rapid.ai and try to use it inside a partition or so... 

            jomach Jorge Machado added a comment - Yeah, that was my question. Thanks for the response. I will look at rapid.ai and try to use it inside a partition or so... 
            Kachris Chris added a comment -

            This is a great improvement for Spark!

            Spark can benefit from the power of accelerators like FPGAs. Note that using the InAccel plugin, you can also speedup your Spark ML applications with zero code changes and it is fully compatible with the Accelerator-aware task scheduler in Spark. 

            You can watch the Spark summit video on the integration of FPGAs in Spark on this video 

            https://www.youtube.com/watch?v=790Xc1Pa7QE

            Kachris Chris added a comment - This is a great improvement for Spark! Spark can benefit from the power of accelerators like FPGAs. Note that using the InAccel plugin, you can also speedup your Spark ML applications with zero code changes and it is fully compatible with the Accelerator-aware task scheduler in Spark.  You can watch the Spark summit video on the integration of FPGAs in Spark on this video  https://www.youtube.com/watch?v=790Xc1Pa7QE
            tgraves Thomas Graves added a comment -

            mengxr  jiangxb1987  it would be nice to mark this as completed in for Spark 3.0 as folks have been asking if it is in spark 3 or not.  What do you think about separating out the remaining items in the epic?

            tgraves Thomas Graves added a comment - mengxr   jiangxb1987   it would be nice to mark this as completed in for Spark 3.0 as folks have been asking if it is in spark 3 or not.  What do you think about separating out the remaining items in the epic?
            gurwls223 Hyukjin Kwon added a comment - - edited

            tgraves, I think we can do. Yes, +1 for separating the leftover tasks.

            gurwls223 Hyukjin Kwon added a comment - - edited tgraves , I think we can do. Yes, +1 for separating the leftover tasks.

            People

              tgraves Thomas Graves
              jerryshao Saisai Shao
              Xiangrui Meng Xiangrui Meng
              Votes:
              13 Vote for this issue
              Watchers:
              74 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: