Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3561

Allow for pluggable execution contexts in Spark

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 1.1.0
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:

      Description

      Currently Spark provides integration with external resource-managers such as Apache Hadoop YARN, Mesos etc. Specifically in the context of YARN, the current architecture of Spark-on-YARN can be enhanced to provide significantly better utilization of cluster resources for large scale, batch and/or ETL applications when run alongside other applications (Spark and others) and services in YARN.

      Proposal:
      The proposed approach would introduce a pluggable JobExecutionContext (trait) - a gateway and a delegate to Hadoop execution environment - as a non-public api (@Experimental) not exposed to end users of Spark.
      The trait will define 6 operations:

      • hadoopFile
      • newAPIHadoopFile
      • broadcast
      • runJob
      • persist
      • unpersist

      Each method directly maps to the corresponding methods in current version of SparkContext. JobExecutionContext implementation will be accessed by SparkContext via master URL as "execution-context:foo.bar.MyJobExecutionContext" with default implementation containing the existing code from SparkContext, thus allowing current (corresponding) methods of SparkContext to delegate to such implementation. An integrator will now have an option to provide custom implementation of DefaultExecutionContext by either implementing it from scratch or extending form DefaultExecutionContext.

      Please see the attached design doc for more details.

      1. SPARK-3561.pdf
        106 kB
        Oleg Zhurakousky

        Issue Links

          Activity

          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Detailed design document

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Detailed design document
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -
          Show
          ozhurakousky Oleg Zhurakousky added a comment - PR is available - https://github.com/apache/spark/pull/2422
          Hide
          apachespark Apache Spark added a comment -

          User 'olegz' has created a pull request for this issue:
          https://github.com/apache/spark/pull/2422

          Show
          apachespark Apache Spark added a comment - User 'olegz' has created a pull request for this issue: https://github.com/apache/spark/pull/2422
          Hide
          pwendell Patrick Wendell added a comment - - edited

          Hey Oleg Zhurakousky - could you provide more of a complete of the proposal here? Traditionally, the SparkContext API itself has not been designed to be a pluggable component. Instead we have a pluggable resource management subsystem and that's which would be the natural place to add support for things like elastic scaling etc.

          There is also work going on in SPARK-3174 in this regard relating to YARN elasticity.

          It would be good to have an end-to-end proposal of what you are envisioning, and a comparison to the work in SPARK-3174 (that's an active JIRA btw, so keep an eye out for new designs there).

          Show
          pwendell Patrick Wendell added a comment - - edited Hey Oleg Zhurakousky - could you provide more of a complete of the proposal here? Traditionally, the SparkContext API itself has not been designed to be a pluggable component. Instead we have a pluggable resource management subsystem and that's which would be the natural place to add support for things like elastic scaling etc. There is also work going on in SPARK-3174 in this regard relating to YARN elasticity. It would be good to have an end-to-end proposal of what you are envisioning, and a comparison to the work in SPARK-3174 (that's an active JIRA btw, so keep an eye out for new designs there).
          Hide
          ozhurakousky Oleg Zhurakousky added a comment - - edited

          Patrick, thanks for following up.

          Indeed Spark does provide first-class extensibility mechanism at many different levels (shuffle, rdd, readers/writers, etc.), however, we believe it is missing a crucial one and that is the "execution context”. And while SparkContext itself could easily be extended or mixed in with a custom trait to achieve such customization, it is less then ideal extension mechanism, since it would require code modification every time user wants to swap an execution environment (e.g., from “local” in testing to “yarn” in prod) if such environment is not supported.
          And in fact Spark already supports an externally configurable model where the target execution environment is managed through “master" URL. However, the nature, implementation and most importantly customization of these environments are internal to Spark.

          master match {
                case "yarn-client" =>
                case mesosUrl @ MESOS_REGEX(_) =>
                . . .
          }
          

          Further more, any additional integration and/or customization work that may come in the future would require modification to the above case statement which I am also sure you’d agree is less then ideal integration style, since it would require a new release of Spark every time new case statement is added. So essentially what we’re proposing is to formalize what has always been supported by Spark to an externally configurable model so customization around native functionality of the target execution environment could be handled in a flexible and pluggable way.

          So in this model we are simply proposing a variation of the "chain of responsibility pattern” where DAG execution could be delegated to an execution context with no change to end user programs or semantics.
          Based on our investigation we’ve identified 4 core operations which you can see in JobExecutionContext.
          Two of them provide access to source RDD creation thus allowing customization of data sourcing (custom readers, direct block access etc.). One for broadcast to integrate with broadcast capabilities provided natively. And last but not least is the main execution delegate for the job - “runJob”.

          And while I am sure there will be more questions, I hope the above response clarifies the overall intention of this proposal

          Show
          ozhurakousky Oleg Zhurakousky added a comment - - edited Patrick, thanks for following up. Indeed Spark does provide first-class extensibility mechanism at many different levels (shuffle, rdd, readers/writers, etc.), however, we believe it is missing a crucial one and that is the "execution context”. And while SparkContext itself could easily be extended or mixed in with a custom trait to achieve such customization, it is less then ideal extension mechanism, since it would require code modification every time user wants to swap an execution environment (e.g., from “local” in testing to “yarn” in prod) if such environment is not supported. And in fact Spark already supports an externally configurable model where the target execution environment is managed through “master" URL. However, the nature , implementation and most importantly customization of these environments are internal to Spark. master match { case "yarn-client" => case mesosUrl @ MESOS_REGEX(_) => . . . } Further more, any additional integration and/or customization work that may come in the future would require modification to the above case statement which I am also sure you’d agree is less then ideal integration style, since it would require a new release of Spark every time new case statement is added. So essentially what we’re proposing is to formalize what has always been supported by Spark to an externally configurable model so customization around native functionality of the target execution environment could be handled in a flexible and pluggable way. So in this model we are simply proposing a variation of the "chain of responsibility pattern” where DAG execution could be delegated to an execution context with no change to end user programs or semantics. Based on our investigation we’ve identified 4 core operations which you can see in JobExecutionContext . Two of them provide access to source RDD creation thus allowing customization of data sourcing (custom readers, direct block access etc.). One for broadcast to integrate with broadcast capabilities provided natively. And last but not least is the main execution delegate for the job - “runJob”. And while I am sure there will be more questions, I hope the above response clarifies the overall intention of this proposal
          Hide
          ozhurakousky Oleg Zhurakousky added a comment - - edited

          Patrick, sorry as I feel like I missed the core emphasis of what we are trying to accomplish with this.
          As described in the design document attached, our main goal is to expose Spark to native Hadoop features (i.e., stateless YARN shuffle, Tez etc.), thus increasing the existing capabilities of Spark such as interactive, in-memory and streaming to batch and ETL in a shared, multi-tenant environments, thus benefiting Spark community considerably by allowing Spark to be applied for all use-cases and capabilties on and in Hadoop.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - - edited Patrick, sorry as I feel like I missed the core emphasis of what we are trying to accomplish with this. As described in the design document attached, our main goal is to expose Spark to native Hadoop features (i.e., stateless YARN shuffle, Tez etc.), thus increasing the existing capabilities of Spark such as interactive, in-memory and streaming to batch and ETL in a shared, multi-tenant environments, thus benefiting Spark community considerably by allowing Spark to be applied for all use-cases and capabilties on and in Hadoop.
          Hide
          seanmcn Sean McNamara added a comment - - edited

          We have some workload use-cases and this would potentially be very helpful, especially in lieu of SPARK-3174.

          Show
          seanmcn Sean McNamara added a comment - - edited We have some workload use-cases and this would potentially be very helpful, especially in lieu of SPARK-3174 .
          Hide
          kawaa Adam Kawa added a comment -

          We also would be very interested in trying this out (especially for large, batch applications that we wish to run on Spark).

          Show
          kawaa Adam Kawa added a comment - We also would be very interested in trying this out (especially for large, batch applications that we wish to run on Spark).
          Hide
          mayank_bansal Mayank Bansal added a comment -

          HI Guys,

          we at ebay are having some issues in cluster utilization while running spark-on-yarn with batch workloads of Hadoop. I think this would be nice to try out and see if we overcome this issue. We would be intrested in trying this out.

          Thanks,
          Mayank

          Show
          mayank_bansal Mayank Bansal added a comment - HI Guys, we at ebay are having some issues in cluster utilization while running spark-on-yarn with batch workloads of Hadoop. I think this would be nice to try out and see if we overcome this issue. We would be intrested in trying this out. Thanks, Mayank
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Thank you for the interest guys. We are working on the prototype which we will publish soon.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Thank you for the interest guys. We are working on the prototype which we will publish soon.
          Hide
          sandyr Sandy Ryza added a comment - - edited

          I think there may be somewhat of a misunderstanding about the relationship between Spark and YARN. YARN is not an "execution environment", but a cluster resource manager that has the ability to start processes on behalf of execution engines like Spark. Spark already supports YARN as a cluster resource manager, but YARN doesn't provide its own execution engine. YARN doesn't provide a stateless shuffle (although execution engines built atop it like MR and Tez do).

          If I understand, the broader intent is to decouple the Spark API from the execution engine it runs on top of. Changing the title to reflect this. That said, the Spark API is currently very tightly integrated with its execution engine, and frankly, decoupling the two so that Spark would be able to run on top of execution engines with similar properties seems more trouble than its worth.

          Show
          sandyr Sandy Ryza added a comment - - edited I think there may be somewhat of a misunderstanding about the relationship between Spark and YARN. YARN is not an "execution environment", but a cluster resource manager that has the ability to start processes on behalf of execution engines like Spark. Spark already supports YARN as a cluster resource manager, but YARN doesn't provide its own execution engine. YARN doesn't provide a stateless shuffle (although execution engines built atop it like MR and Tez do). If I understand, the broader intent is to decouple the Spark API from the execution engine it runs on top of. Changing the title to reflect this. That said, the Spark API is currently very tightly integrated with its execution engine, and frankly, decoupling the two so that Spark would be able to run on top of execution engines with similar properties seems more trouble than its worth.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment - - edited

          Sandy Ryza

          Indeed YARN is a resource manager that supports multiple execution environments by facilitating resource allocation and management. On the other hand, Spark, Tez and many other (custom) execution environments are currently run on YARN. (NOTE: Custom execution environments on YARN are becoming very common in large enterprises). Such decoupling will ensure that Spark can integrate with any and all (where applicable) in a pluggable and extensible fashion.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - - edited Sandy Ryza Indeed YARN is a resource manager that supports multiple execution environments by facilitating resource allocation and management. On the other hand, Spark, Tez and many other (custom) execution environments are currently run on YARN. (NOTE: Custom execution environments on YARN are becoming very common in large enterprises). Such decoupling will ensure that Spark can integrate with any and all (where applicable) in a pluggable and extensible fashion.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment - - edited

          Sandy, one other thing:
          While I understand the reasoning for changes to the title and the description of the JIRA, it would probably be better to coordinate this with the original submitter before making such changes in the future (similar to the way Patrick suggested in SPARK-3174). This would alleviate potential discrepancies in the overall message and intentions of the JIRA.
          Anyway, I’ve edited both the title and the description taking into consideration your edits.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - - edited Sandy, one other thing: While I understand the reasoning for changes to the title and the description of the JIRA, it would probably be better to coordinate this with the original submitter before making such changes in the future (similar to the way Patrick suggested in SPARK-3174 ). This would alleviate potential discrepancies in the overall message and intentions of the JIRA. Anyway, I’ve edited both the title and the description taking into consideration your edits.
          Hide
          srowen Sean Owen added a comment -

          I'd be interested to see a more specific motivating use case. Is this about using Tez for example, and where does it help to stack Spark on Tez on YARN? or MR2, etc. Spark Core and Tez overlap, to be sure, and I'm not sure how much value it adds to run one on the other. Kind of like running Oracle on MySQL or something. For whatever it is: is it maybe not more natural to integrate the feature into Spark itself?

          It would be great if it this were all just a matter of one extra trait and interface. In practice I suspect there are a number of hidden assumptions throughout the code that may leak through attempts at this abstraction.

          I am definitely asking rather than asserting, curious to see more specifics about the upside.

          Show
          srowen Sean Owen added a comment - I'd be interested to see a more specific motivating use case. Is this about using Tez for example, and where does it help to stack Spark on Tez on YARN? or MR2, etc. Spark Core and Tez overlap, to be sure, and I'm not sure how much value it adds to run one on the other. Kind of like running Oracle on MySQL or something. For whatever it is: is it maybe not more natural to integrate the feature into Spark itself? It would be great if it this were all just a matter of one extra trait and interface. In practice I suspect there are a number of hidden assumptions throughout the code that may leak through attempts at this abstraction. I am definitely asking rather than asserting, curious to see more specifics about the upside.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          After giving it some thought, I am changing the title and the description back to the original as it would be more appropriate to discuss whatever question anyone may have via comments.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - After giving it some thought, I am changing the title and the description back to the original as it would be more appropriate to discuss whatever question anyone may have via comments.
          Hide
          pwendell Patrick Wendell added a comment -

          Oleg Zhurakousky do you have a timeline for posting a more complete design doc for what the idea is with this?

          Show
          pwendell Patrick Wendell added a comment - Oleg Zhurakousky do you have a timeline for posting a more complete design doc for what the idea is with this?
          Hide
          pwendell Patrick Wendell added a comment - - edited

          I also changed the title here that reflects the current design doc and pull request. We have a culture in the project of having JIRA titles reflect accurately the current proposal. We can change it again if a new doc causes the scope of this to change.

          Oleg Zhurakousky I'd prefer not to change the title back until there is a new design proposed. The problem is that people are confusing this with SPARK-3174 and SPARK-3797.

          Show
          pwendell Patrick Wendell added a comment - - edited I also changed the title here that reflects the current design doc and pull request. We have a culture in the project of having JIRA titles reflect accurately the current proposal. We can change it again if a new doc causes the scope of this to change. Oleg Zhurakousky I'd prefer not to change the title back until there is a new design proposed. The problem is that people are confusing this with SPARK-3174 and SPARK-3797 .
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Patrick, your point about confusion with other JIRAs makes sense. Thanks.

          With regard to detailed design, can you please let me know what you are looking for and what would be useful? The only changes I'm proposing is the addition of an @Experimental interface with the 4 methods for the reasons stated in the design doc.

          For example:

          • Would it be useful if I sent another PR with the implementation of the interface?
          • Would it be useful if I shared benchmarks which showcase some of the benefits of alternative execution for Batch/ETL scenarios?

          Since this is my first involvement in the Spark community, I appreciate your guidance and I'm happy to provide any details you might find useful. Thanks!

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Patrick, your point about confusion with other JIRAs makes sense. Thanks. With regard to detailed design, can you please let me know what you are looking for and what would be useful? The only changes I'm proposing is the addition of an @Experimental interface with the 4 methods for the reasons stated in the design doc. For example: Would it be useful if I sent another PR with the implementation of the interface? Would it be useful if I shared benchmarks which showcase some of the benefits of alternative execution for Batch/ETL scenarios? Since this is my first involvement in the Spark community, I appreciate your guidance and I'm happy to provide any details you might find useful. Thanks!
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Patrick Wendell If I understood the proposal and the initial pr submitted - the intent of this JIRA, as initally proposed by Oleg Zhurakousky is fairly different from the other efforts referenced if I am not wrong.
          The focus of this change seems to be to completely bypass spark execution engine and substitute an alternative : so only the current api (and so dag creation from the spark program) and user interfaces in spark remain - the block management, execution engine, execution state management, etc would all be replaced under the covers by what Tez (or something else in future) provides.

          If I am not wrong the changes would be :
          a) Applies only to yarn mode - when specified execution environment can be run.
          b) the current spark AM would no longer request for any executors.
          c) spark block manager would no longer be required (other than possibly for hosting broadcast via http i guess ?).
          d) the actual DAG execution would be taken up by the overridden execution engine - spark's Task manager and DAG scheduler are noop's.

          I might be missing things which Oleg can elaborate on.

          This functionality, IMO, is fundamentally different from what is being explored in the other jira's - and so has value to be pursued independent of the other efforts.
          Obviously this does not work in all usecases where spark is run on - but handles a subset of usecases where other execution engines might do much better than spark currently does - simply because of better code maturity and specialized usecases they target.

          Show
          mridulm80 Mridul Muralidharan added a comment - Patrick Wendell If I understood the proposal and the initial pr submitted - the intent of this JIRA, as initally proposed by Oleg Zhurakousky is fairly different from the other efforts referenced if I am not wrong. The focus of this change seems to be to completely bypass spark execution engine and substitute an alternative : so only the current api (and so dag creation from the spark program) and user interfaces in spark remain - the block management, execution engine, execution state management, etc would all be replaced under the covers by what Tez (or something else in future) provides. If I am not wrong the changes would be : a) Applies only to yarn mode - when specified execution environment can be run. b) the current spark AM would no longer request for any executors. c) spark block manager would no longer be required (other than possibly for hosting broadcast via http i guess ?). d) the actual DAG execution would be taken up by the overridden execution engine - spark's Task manager and DAG scheduler are noop's. I might be missing things which Oleg can elaborate on. This functionality, IMO, is fundamentally different from what is being explored in the other jira's - and so has value to be pursued independent of the other efforts. Obviously this does not work in all usecases where spark is run on - but handles a subset of usecases where other execution engines might do much better than spark currently does - simply because of better code maturity and specialized usecases they target.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          Oleg Zhurakousky I think the disconnect here is that the interfaces proposed do not show much value in what is supposed to be the functionality to be exposed to the users. A followup pr showing how this interfaces are used in context of Tez would show value in why this change is relevant in context of spark.

          The disconnect, if I am not wrong, is that we do not want to expose spi's which we would then need to maintain in spark core - while unknown implementations extend it in non standard ways causing issues to our end users.

          For example, even though TaskScheduler is an spi and can in theory be extended in arbitrary ways - all the spi implementations currently 'live' within spark and are in harmony with rest of the code - and changes which occur within spark core (when functionality is added or extended).
          This allows us to decouple the actual TaskScheduler implementation from spark code, while still keeping them in sync and maintainable while adding functionality independent of other pieces : case in point, yarn support has significantly evolved from when I initially added it - to the point where it probably does not share even a single line of code I initially wrote - and yet this has been done pretty much independent of changes to core while at the same time ensuring that it is compatible with changes in spark core and vice versa.

          The next step, imo, would be a PR which shows how these interfaces are used for non trivial usecase : Tez in this case.
          The default implementation provided in the pr can be removed (since it should not be used/exposed to users).

          Once that is done, we can evaluate the interface proposed in context of the functionality exposed, and see how it fits in context of rest of spark.

          Show
          mridulm80 Mridul Muralidharan added a comment - Oleg Zhurakousky I think the disconnect here is that the interfaces proposed do not show much value in what is supposed to be the functionality to be exposed to the users. A followup pr showing how this interfaces are used in context of Tez would show value in why this change is relevant in context of spark. The disconnect, if I am not wrong, is that we do not want to expose spi's which we would then need to maintain in spark core - while unknown implementations extend it in non standard ways causing issues to our end users. For example, even though TaskScheduler is an spi and can in theory be extended in arbitrary ways - all the spi implementations currently 'live' within spark and are in harmony with rest of the code - and changes which occur within spark core (when functionality is added or extended). This allows us to decouple the actual TaskScheduler implementation from spark code, while still keeping them in sync and maintainable while adding functionality independent of other pieces : case in point, yarn support has significantly evolved from when I initially added it - to the point where it probably does not share even a single line of code I initially wrote - and yet this has been done pretty much independent of changes to core while at the same time ensuring that it is compatible with changes in spark core and vice versa. The next step, imo, would be a PR which shows how these interfaces are used for non trivial usecase : Tez in this case. The default implementation provided in the pr can be removed (since it should not be used/exposed to users). Once that is done, we can evaluate the interface proposed in context of the functionality exposed, and see how it fits in context of rest of spark.
          Hide
          nchammas Nicholas Chammas added a comment -

          Obviously this does not work in all usecases where spark is run on - but handles a subset of usecases where other execution engines might do much better than spark currently does - simply because of better code maturity and specialized usecases they target.

          As a side note, if we know what these use cases are where other engines do much better than Spark, then we want to create JIRA issues for them and tackle them where possible, or at least document why Spark cannot currently address those use cases as well as people want.

          If the main driver behind this proposal is Spark's immaturity in some areas, then I'd hope that that driver would have a short lifespan. If it's the specialized use cases that we want to address, then I wonder how plugging in one general engine (e.g. Tez) in place of another would help.

          Granted I'm just commenting at a conceptual level; as others have already mentioned, specific use cases would help clarify what the real need is. For example, Sean McNamara, Adam Kawa, and Mayank Bansal mentioned some workloads they were having trouble with earlier in this thread that they thought might be addressed by this proposal. It would be good to understand more specifically the issues they were running into.

          Show
          nchammas Nicholas Chammas added a comment - Obviously this does not work in all usecases where spark is run on - but handles a subset of usecases where other execution engines might do much better than spark currently does - simply because of better code maturity and specialized usecases they target. As a side note, if we know what these use cases are where other engines do much better than Spark, then we want to create JIRA issues for them and tackle them where possible, or at least document why Spark cannot currently address those use cases as well as people want. If the main driver behind this proposal is Spark's immaturity in some areas, then I'd hope that that driver would have a short lifespan. If it's the specialized use cases that we want to address, then I wonder how plugging in one general engine (e.g. Tez) in place of another would help. Granted I'm just commenting at a conceptual level; as others have already mentioned, specific use cases would help clarify what the real need is. For example, Sean McNamara , Adam Kawa , and Mayank Bansal mentioned some workloads they were having trouble with earlier in this thread that they thought might be addressed by this proposal. It would be good to understand more specifically the issues they were running into.
          Hide
          pwendell Patrick Wendell added a comment -

          I wanted to wait for a more complete design - but I can short-circuit this a bit and say that in general, I don’t think it makes much sense to open up Spark internals to be extended like this. The goal here as I understand it is to improve the multi-tenancy of Spark-on-YARN. The proposed solution is to add a Tez dependency through this pluggable execution model. For a few reasons I don’t think that approach makes much sense:

          1. This opens up Spark internals and adds very invasive new unstable API’s. As of Spark 1.0 we’ve stabilized the API’s and made the extension points of Spark fairly clear. Things like runJob are internal hooks into the scheduler we do not want developers to override. An actual implementation would likely need to extend/access a bunch of other internals like Mridul mentioned.
          2. There is existing work to make Spark scale elastically on YARN (see SPARK-3174). The current work will apply to all modes and not expose new public API’s. The proposal here is a different architecture, but as I understand, the main motivation is to make Spark function better in large multi-tenant YARN clusters.
          3. For users, having multiple execution engines would lead to fragmentation and discrepancies. I'd prefer community efforts around YARN focus on the existing extension points of Spark and cases where we can make changes that benefit all deployment modes. This is especially true given that Tez requires YARN and yet > 50% of Spark users do not use YARN (Mesos, Standalone, etc).
          4. In the short term, if someone really wants to prototype this they can extend SparkContext, use reflection, etc and provide Tez integration. Therefore, I don’t think we need to have changes in upstream Spark to allow the community to prototype this. Merging this API upstream would IMO signify the blessing of the maintainers of Spark core for this approach.

          tl;dr: Spark can't depend on Tez in order to function well on YARN. From day one the design of YARN was to allow multiple applications to run without having to all depend on each other. My feeling is we should focus instead on improving Spark-on-YARN via the existing points of extension.

          Show
          pwendell Patrick Wendell added a comment - I wanted to wait for a more complete design - but I can short-circuit this a bit and say that in general, I don’t think it makes much sense to open up Spark internals to be extended like this. The goal here as I understand it is to improve the multi-tenancy of Spark-on-YARN. The proposed solution is to add a Tez dependency through this pluggable execution model. For a few reasons I don’t think that approach makes much sense: 1. This opens up Spark internals and adds very invasive new unstable API’s. As of Spark 1.0 we’ve stabilized the API’s and made the extension points of Spark fairly clear. Things like runJob are internal hooks into the scheduler we do not want developers to override. An actual implementation would likely need to extend/access a bunch of other internals like Mridul mentioned. 2. There is existing work to make Spark scale elastically on YARN (see SPARK-3174 ). The current work will apply to all modes and not expose new public API’s. The proposal here is a different architecture, but as I understand, the main motivation is to make Spark function better in large multi-tenant YARN clusters. 3. For users, having multiple execution engines would lead to fragmentation and discrepancies. I'd prefer community efforts around YARN focus on the existing extension points of Spark and cases where we can make changes that benefit all deployment modes. This is especially true given that Tez requires YARN and yet > 50% of Spark users do not use YARN (Mesos, Standalone, etc). 4. In the short term, if someone really wants to prototype this they can extend SparkContext, use reflection, etc and provide Tez integration. Therefore, I don’t think we need to have changes in upstream Spark to allow the community to prototype this. Merging this API upstream would IMO signify the blessing of the maintainers of Spark core for this approach. tl;dr: Spark can't depend on Tez in order to function well on YARN. From day one the design of YARN was to allow multiple applications to run without having to all depend on each other. My feeling is we should focus instead on improving Spark-on-YARN via the existing points of extension.
          Hide
          mridulm80 Mridul Muralidharan added a comment -

          I agree with Patrick Wendell that it does not help spark to introduce dependency of tez on core.
          Oleg Zhurakousky is tez available on all yarn clusters ? Or is it an additional runtime dependency ?

          If it is available by default - we can make it a runtime switch to use tez for jobs running on yarn-standalone and yarn-client mode.
          But before that ...

          While better multi-tennancy would be a likely benefit - my specific interest in this patch is more to do with the much better shuffle performance that tez offers Specicially for ETL jobs, I can see other benefits which might be relevant - one of our collaborative filtering implementation, though not ETL, comes fairly close to it in job characterstics and suffers due to some of our shuffle issues ...

          As I alluded to, I do not think we should have an openended extension point - where any class name can be provided which extends functionality in arbitrary manner - for example, like the spi we have for compression codecs.
          As Patrick mentioned, this gives the impression that the approach is blessed by spark developers - even if tagged with Experimental.
          Particularly with core internals, I would be very wary of exposing them via an spi - simply because we need the freedom to evolve them for performance or functionality reasons.

          On other hand, I am in favour of exploring this option to see what sort of benefits we get out of this assuming it has been prototyped already - which I thought was the case here, though I am yet to see a PR with that (not sure if I missed it !).
          Given that Tez is supposed to be reasonably mature - if there is a spark + tez version, I want to see what benefits (if any) are observed as a result of this effort.
          I had discussed spark + tez integration about an year or so back with Matei - but at that time, tez was probably not that mature - maybe this is a better time !

          Oleg Zhurakousky Do you have a spark on tez prototype done already ? Or is this an experiment you are yet to complete ? If complete, what sort of performance difference do you see ? What metrics are you using ?

          If there are significant benefits, I would want to take a closer look at the final proposed patch ... I would be interested in it making into spark in some form.

          As Nicholas Chammas mentioned - if it is possible to address it in spark directly, nothing like it - particularly since it will benefit all modes of execution and not just yarn + tez combination.
          If the gap cant be narrowed, and the benefits are significant (for some, as of now underfined, definition of "benefits" and "significant") - then we can consider tez dependency in yarn module.

          Ofcourse, all these questions are moot - until we have better quantitative judgement of what the expected gains are and what the experimental results are.

          Show
          mridulm80 Mridul Muralidharan added a comment - I agree with Patrick Wendell that it does not help spark to introduce dependency of tez on core. Oleg Zhurakousky is tez available on all yarn clusters ? Or is it an additional runtime dependency ? If it is available by default - we can make it a runtime switch to use tez for jobs running on yarn-standalone and yarn-client mode. But before that ... While better multi-tennancy would be a likely benefit - my specific interest in this patch is more to do with the much better shuffle performance that tez offers Specicially for ETL jobs, I can see other benefits which might be relevant - one of our collaborative filtering implementation, though not ETL, comes fairly close to it in job characterstics and suffers due to some of our shuffle issues ... As I alluded to, I do not think we should have an openended extension point - where any class name can be provided which extends functionality in arbitrary manner - for example, like the spi we have for compression codecs. As Patrick mentioned, this gives the impression that the approach is blessed by spark developers - even if tagged with Experimental. Particularly with core internals, I would be very wary of exposing them via an spi - simply because we need the freedom to evolve them for performance or functionality reasons. On other hand, I am in favour of exploring this option to see what sort of benefits we get out of this assuming it has been prototyped already - which I thought was the case here, though I am yet to see a PR with that (not sure if I missed it !). Given that Tez is supposed to be reasonably mature - if there is a spark + tez version, I want to see what benefits (if any) are observed as a result of this effort. I had discussed spark + tez integration about an year or so back with Matei - but at that time, tez was probably not that mature - maybe this is a better time ! Oleg Zhurakousky Do you have a spark on tez prototype done already ? Or is this an experiment you are yet to complete ? If complete, what sort of performance difference do you see ? What metrics are you using ? If there are significant benefits, I would want to take a closer look at the final proposed patch ... I would be interested in it making into spark in some form. As Nicholas Chammas mentioned - if it is possible to address it in spark directly, nothing like it - particularly since it will benefit all modes of execution and not just yarn + tez combination. If the gap cant be narrowed, and the benefits are significant (for some, as of now underfined, definition of "benefits" and "significant") - then we can consider tez dependency in yarn module. Ofcourse, all these questions are moot - until we have better quantitative judgement of what the expected gains are and what the experimental results are.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment - - edited

          Patrick, I think there is misunderstanding about the mechanics of this proposal, so I'd like to clarify. The proposal here is certainly not to introduce any new dependencies to Spark Core and
          existing pull request (https://github.com/apache/spark/pull/2422) clearly shows it.

          What I am proposing is to expose an integration point in Spark by means of extracting existing Spark operations into a configurable and @Experimental strategy, allowing Spark not only to integrate with other execution contexts, but it would also be very useful in unit-testing as it would provide a clear separation between assembly and execution layer allowing them to be tested in isolation.

          I think this feature would benefit Spark tremendously; particularly given how several folks have already expressed their interest in this feature/direction.

          Appreciate your help and advise in helping to get this contribution into Spark. Thanks!

          Show
          ozhurakousky Oleg Zhurakousky added a comment - - edited Patrick, I think there is misunderstanding about the mechanics of this proposal, so I'd like to clarify. The proposal here is certainly not to introduce any new dependencies to Spark Core and existing pull request ( https://github.com/apache/spark/pull/2422 ) clearly shows it. What I am proposing is to expose an integration point in Spark by means of extracting existing Spark operations into a configurable and @Experimental strategy, allowing Spark not only to integrate with other execution contexts, but it would also be very useful in unit-testing as it would provide a clear separation between assembly and execution layer allowing them to be tested in isolation. I think this feature would benefit Spark tremendously; particularly given how several folks have already expressed their interest in this feature/direction. Appreciate your help and advise in helping to get this contribution into Spark. Thanks!
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Resubmitting Pull Request (squashed) which has additional 2 methods added to the JobExecutionContext to support caching. - https://github.com/apache/spark/pull/2849
          Prototype will be published shortly as well

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Resubmitting Pull Request (squashed) which has additional 2 methods added to the JobExecutionContext to support caching. - https://github.com/apache/spark/pull/2849 Prototype will be published shortly as well
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Updated PR

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Updated PR
          Hide
          pwendell Patrick Wendell added a comment -

          Hey Oleg Zhurakousky - adding an @Experimental interface is our way of previewing future public API's to the community. What I'm saying is that Spark's internal execution (and in particular runJob) is not now, and IMO should never be, a public API we want others to extend. This just isn't the design of Spark to have pluggable execution engines, and that is at the core of this proposal. We have a many other extension points for Spark-YARN integration, such as our resource management layer which is specifically designed for this.

          For things like unit testing, we can just refactor using internal/private interfaces rather than public ones, so I think the testing discussion is orthogonal to this patch.

          Show
          pwendell Patrick Wendell added a comment - Hey Oleg Zhurakousky - adding an @Experimental interface is our way of previewing future public API's to the community. What I'm saying is that Spark's internal execution (and in particular runJob) is not now, and IMO should never be, a public API we want others to extend. This just isn't the design of Spark to have pluggable execution engines, and that is at the core of this proposal. We have a many other extension points for Spark-YARN integration, such as our resource management layer which is specifically designed for this. For things like unit testing, we can just refactor using internal/private interfaces rather than public ones, so I think the testing discussion is orthogonal to this patch.
          Hide
          pwendell Patrick Wendell added a comment -

          One other thing - if projects really do want to play around with swapping out the execution engine, the best way would be to just extend SparkContext and override internals in your own version.

          Show
          pwendell Patrick Wendell added a comment - One other thing - if projects really do want to play around with swapping out the execution engine, the best way would be to just extend SparkContext and override internals in your own version.
          Hide
          vanzin Marcelo Vanzin added a comment -

          the best way would be to just extend SparkContext and override internals

          To be fair, that would be pretty hard, since a lot of the initialization of Spark internals happen in SparkContext's constructor.

          Show
          vanzin Marcelo Vanzin added a comment - the best way would be to just extend SparkContext and override internals To be fair, that would be pretty hard, since a lot of the initialization of Spark internals happen in SparkContext's constructor.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Marcelo Vanzin

          I would not call it hard (we have done it in the initial POC by simply mixing custom trait into SC - essentially extending it), however I do agree that a lot of Spark's initialization would still happen due to the implementation of SC itself thus creating and initializing some of the artifacts that may not be used with different execution context.
          Question; Why was it done like this and not pushed into some SC.init operation?

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Marcelo Vanzin I would not call it hard (we have done it in the initial POC by simply mixing custom trait into SC - essentially extending it), however I do agree that a lot of Spark's initialization would still happen due to the implementation of SC itself thus creating and initializing some of the artifacts that may not be used with different execution context. Question; Why was it done like this and not pushed into some SC.init operation?
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Just as an FYI

          The POC code for Tez-based reference implementation of aforementioned execution context is available - https://github.com/hortonworks/spark-native-yarn together with a samples project - https://github.com/hortonworks/spark-native-yarn-samples.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Just as an FYI The POC code for Tez-based reference implementation of aforementioned execution context is available - https://github.com/hortonworks/spark-native-yarn together with a samples project - https://github.com/hortonworks/spark-native-yarn-samples .
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Adding a link to Tez-based RI
          Also, at the time of writing this comment it now also supports Spark MLLIB.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Adding a link to Tez-based RI Also, at the time of writing this comment it now also supports Spark MLLIB.
          Hide
          pwendell Patrick Wendell added a comment -

          Hey Oleg Zhurakousky - as I said before, I just don't think it's a good idea to make the internals of Spark execution pluggable, given our approach to API compatibility. This is attempting to opening up many internal API's.

          One of the main reasons for this proposal was to have better elasticity in YARN. Towards that end, can you try out the new elastic scaling code for YARN (SPARK-3174)? It will ship in Spark 1.2 and be one of the main new features. This integrates nicely with YARN's native shuffle service. In fact IIRC the main design of the shuffle service in YARN was specifically for this purpose. In that way, I think elements of this proposal are indeed making it into Spark.

          In terms of breaking out the initialization of a SparkContext - that is probably a good idea (it's been discussed separately).

          Show
          pwendell Patrick Wendell added a comment - Hey Oleg Zhurakousky - as I said before, I just don't think it's a good idea to make the internals of Spark execution pluggable, given our approach to API compatibility. This is attempting to opening up many internal API's. One of the main reasons for this proposal was to have better elasticity in YARN. Towards that end, can you try out the new elastic scaling code for YARN ( SPARK-3174 )? It will ship in Spark 1.2 and be one of the main new features. This integrates nicely with YARN's native shuffle service. In fact IIRC the main design of the shuffle service in YARN was specifically for this purpose. In that way, I think elements of this proposal are indeed making it into Spark. In terms of breaking out the initialization of a SparkContext - that is probably a good idea (it's been discussed separately).
          Hide
          nchammas Nicholas Chammas added a comment -

          One of the main reasons for this proposal was to have better elasticity in YARN. Towards that end, can you try out the new elastic scaling code for YARN (SPARK-3174)? It will ship in Spark 1.2 and be one of the main new features. This integrates nicely with YARN's native shuffle service. In fact IIRC the main design of the shuffle service in YARN was specifically for this purpose. In that way, I think elements of this proposal are indeed making it into Spark.

          Now that 1.2.0 (which includes SPARK-3174) is out, what is the status of this proposal?

          From what I understand, the interest in this proposal stemmed mainly from people wanting elastic scaling and better utilization of cluster resources in Spark-on-YARN deployments. 1.2.0 includes such improvements as part of SPARK-3174.

          Are those improvements sufficiently addressing users' needs in that regard? If not, is the best solution then to implement this proposal here, or do we just need to keep iterating on Spark's existing integration with YARN via existing integration points?

          Perhaps I didn't properly grasp the motivation for this proposal, but reading through the discussion in the comments here, it seems like implementing a new execution context for Spark just to improve its performance on YARN is overkill.

          Show
          nchammas Nicholas Chammas added a comment - One of the main reasons for this proposal was to have better elasticity in YARN. Towards that end, can you try out the new elastic scaling code for YARN ( SPARK-3174 )? It will ship in Spark 1.2 and be one of the main new features. This integrates nicely with YARN's native shuffle service. In fact IIRC the main design of the shuffle service in YARN was specifically for this purpose. In that way, I think elements of this proposal are indeed making it into Spark. Now that 1.2.0 (which includes SPARK-3174 ) is out, what is the status of this proposal? From what I understand, the interest in this proposal stemmed mainly from people wanting elastic scaling and better utilization of cluster resources in Spark-on-YARN deployments. 1.2.0 includes such improvements as part of SPARK-3174 . Are those improvements sufficiently addressing users' needs in that regard? If not, is the best solution then to implement this proposal here, or do we just need to keep iterating on Spark's existing integration with YARN via existing integration points? Perhaps I didn't properly grasp the motivation for this proposal, but reading through the discussion in the comments here, it seems like implementing a new execution context for Spark just to improve its performance on YARN is overkill.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Sorry for the delay in response, I'll just blame the holidays
          No, I have not had a chance to run the elasticity tests against 1.2, so I am gonna have to follow up on that.

          The main motivation for this proposal is to formalize an extension model around Spark’s execution environment to allow other execution environments (new and existing) to be easily plugged-in by a system integrator without requiring a new release of Spark (giving current integration mechanism which relies on ‘case’ statement with hard-coded values).
          Reasons for why this is necessary? are many, but could all be summarized around an old *generalization* vs. *specialization* argument. And while Tez, elastic scaling, utilization of cluster resources are all good examples and indeed were the initial motivators, they are certainly not the end and current efforts of several clients of ours who are integrating Spark with their custom execution environments using the proposed approach is a good evidence of its viability and an obvious benefit to Spark’s technology, allowing it to become a developer friendly “face” of many execution environments/technologies while continuing innovation of its own.

          So I think the next logical step would be to gather “for” and “against” arguments around "pluggable execution context for Spark” in general, then we can discuss implementation.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Sorry for the delay in response, I'll just blame the holidays No, I have not had a chance to run the elasticity tests against 1.2, so I am gonna have to follow up on that. The main motivation for this proposal is to formalize an extension model around Spark’s execution environment to allow other execution environments (new and existing) to be easily plugged-in by a system integrator without requiring a new release of Spark (giving current integration mechanism which relies on ‘case’ statement with hard-coded values). Reasons for why this is necessary? are many, but could all be summarized around an old * generalization * vs. * specialization * argument. And while Tez, elastic scaling, utilization of cluster resources are all good examples and indeed were the initial motivators, they are certainly not the end and current efforts of several clients of ours who are integrating Spark with their custom execution environments using the proposed approach is a good evidence of its viability and an obvious benefit to Spark’s technology, allowing it to become a developer friendly “face” of many execution environments/technologies while continuing innovation of its own. So I think the next logical step would be to gather “for” and “against” arguments around "pluggable execution context for Spark” in general, then we can discuss implementation.
          Hide
          pwendell Patrick Wendell added a comment -

          So if the question is: "Is Spark only API or is it an integrated API/execution engine"... we've taken a fairly clear stance over the history of the project that it's an integrated engine. I.e. Spark is not something like Pig where it's intended primarily as a user API and we expect there to be different physical execution engines plugged in underneath.

          In the past we haven't found this prevents Spark from working well in different environments. For instance, with Mesos, on YARN, etc. And for this we've integrated at different layers such as the storage layer and the scheduling layer, where there were well defined API's and integration points in the broader ecosystem. Compared with alternatives Spark is far more flexible in terms of runtime environments. The RDD API is so generic that it's very easy to customize and integrate.

          For this reason, my feeling with decoupling execution from the rest of Spark is that it would tie our hands architecturally and not add much benefit. I don't see a good reason to make this broader change in the strategy of the project.

          If there are specific improvements you see for making Spark work well on YARN, then we can definitely look at them.

          Show
          pwendell Patrick Wendell added a comment - So if the question is: "Is Spark only API or is it an integrated API/execution engine"... we've taken a fairly clear stance over the history of the project that it's an integrated engine. I.e. Spark is not something like Pig where it's intended primarily as a user API and we expect there to be different physical execution engines plugged in underneath. In the past we haven't found this prevents Spark from working well in different environments. For instance, with Mesos, on YARN, etc. And for this we've integrated at different layers such as the storage layer and the scheduling layer, where there were well defined API's and integration points in the broader ecosystem. Compared with alternatives Spark is far more flexible in terms of runtime environments. The RDD API is so generic that it's very easy to customize and integrate. For this reason, my feeling with decoupling execution from the rest of Spark is that it would tie our hands architecturally and not add much benefit. I don't see a good reason to make this broader change in the strategy of the project. If there are specific improvements you see for making Spark work well on YARN, then we can definitely look at them.
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Thanks Patrick

          I 100% agree that Spark is NOT just an API and in fact in our current efforts we are using much more of Spark then its user facing API but here is the thing;
          The reasons for extending execution environment could be many and indeed RDD is a great extension point, just like SparkContext is to accomplish that. However, both are less then ideal since they would require constant code modification forcing re-compilation and re-packaging of an application every time one wants to delegate to an alternative execution environment (regardless of the reasons).
          But since we all seem to agree (based on previous comments) that SparkContext is the right API-based extension point to address such extension requirements, then why not allow it to be extended via configuration as well? Merely a convenience without any harm. . . . no different then a configuration based “driver” model (e.g., JDBC).

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Thanks Patrick I 100% agree that Spark is NOT just an API and in fact in our current efforts we are using much more of Spark then its user facing API but here is the thing; The reasons for extending execution environment could be many and indeed RDD is a great extension point, just like SparkContext is to accomplish that. However, both are less then ideal since they would require constant code modification forcing re-compilation and re-packaging of an application every time one wants to delegate to an alternative execution environment (regardless of the reasons). But since we all seem to agree (based on previous comments) that SparkContext is the right API-based extension point to address such extension requirements, then why not allow it to be extended via configuration as well? Merely a convenience without any harm. . . . no different then a configuration based “driver” model (e.g., JDBC).
          Hide
          ozhurakousky Oleg Zhurakousky added a comment -

          Here is an interesting read that provides an ever stronger case for separating flow construction vs execution context.
          http://blog.acolyer.org/2015/04/27/musketeer-part-i-whats-the-best-data-processing-system/
          and
          http://www.cl.cam.ac.uk/research/srg/netos/camsas/pubs/eurosys15-musketeer.pdf

          The key points are:

          It thus makes little sense to force the user to target a single system at workflow implementation time. Instead, we argue that users should, in principle, be able to execute their high-level workflow on any data processing system (§3). Being able to do this has three main benefits:
          1. Users write their workflow once, in a way they choose, but can easily execute it on alternative systems;
          2. Multiple sub-components of a workflow can be executed on different back-end systems; and
          3. Existing workflows can easily be ported to new systems.

          Show
          ozhurakousky Oleg Zhurakousky added a comment - Here is an interesting read that provides an ever stronger case for separating flow construction vs execution context. http://blog.acolyer.org/2015/04/27/musketeer-part-i-whats-the-best-data-processing-system/ and http://www.cl.cam.ac.uk/research/srg/netos/camsas/pubs/eurosys15-musketeer.pdf The key points are: It thus makes little sense to force the user to target a single system at workflow implementation time. Instead, we argue that users should, in principle, be able to execute their high-level workflow on any data processing system (§3). Being able to do this has three main benefits: 1. Users write their workflow once, in a way they choose, but can easily execute it on alternative systems; 2. Multiple sub-components of a workflow can be executed on different back-end systems; and 3. Existing workflows can easily be ported to new systems.

            People

            • Assignee:
              Unassigned
              Reporter:
              ozhurakousky Oleg Zhurakousky
            • Votes:
              4 Vote for this issue
              Watchers:
              83 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development