Crunch
  1. Crunch
  2. CRUNCH-296

Support new distributed execution engines (e.g., Spark)

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0, 0.8.2
    • Component/s: Core
    • Labels:
      None

      Description

      I've been working on this off-and-on for awhile, but it's currently in a state where I feel like it's worth sharing: I came up with an implementation of the Crunch APIs that runs on top of Apache Spark instead of MapReduce.

      My goal for this is pretty simple; I want to be able to change any instances of "new MRPipeline(...)" to "new SparkPipeline(...)", not change anything else at all, and have my pipelines run on Spark instead of as a series of MR jobs. Turns out that we can pretty much do exactly that. Not everything works yet, but lots of things do-- joins and cogroups work, the PageRank and TfIdf integration tests work. Some things that do not work that I'm aware of: in-memory joins and some of the more complex file output handling rules, but I believe that these things are fixable. Some thing that might work or might not: HBase inputs and outputs on top of Spark.

      This is just an idea I had, and I would understand if other people don't want to work on this or don't think it's the right direction for the project. My minimal request would be to include the refactoring of the core APIs necessary to support plugging in new execution frameworks so I can keep working on this stuff.

      1. CRUNCH-296d.patch
        1.92 MB
        Josh Wills
      2. CRUNCH-296d.patch
        1.91 MB
        Josh Wills
      3. CRUNCH-296c.patch
        1.90 MB
        Josh Wills
      4. CRUNCH-296b.patch
        1.86 MB
        Josh Wills
      5. CRUNCH-296.patch
        1.86 MB
        Josh Wills

        Activity

        Hide
        Josh Wills added a comment -

        The gigantic patch.

        Show
        Josh Wills added a comment - The gigantic patch.
        Hide
        Gabriel Reid added a comment -

        Looks and sounds very interesting – I'm definitely looking forward to taking a closer look at this and playing around with it.

        I think it's worth considering where we want to go with this (and/or where we don't want to go with it), as it is straying away from the tagline of "Simple and Efficient MapReduce Pipelines". That being said, as long as this doesn't get in the way of working with MapReduce (assuming that's what the intention of Crunch will remain), then I'm all for it.

        Show
        Gabriel Reid added a comment - Looks and sounds very interesting – I'm definitely looking forward to taking a closer look at this and playing around with it. I think it's worth considering where we want to go with this (and/or where we don't want to go with it), as it is straying away from the tagline of "Simple and Efficient MapReduce Pipelines". That being said, as long as this doesn't get in the way of working with MapReduce (assuming that's what the intention of Crunch will remain), then I'm all for it.
        Hide
        Tom White added a comment -

        This sounds like a great addition!

        Regarding whether it fits in the project, I think it does. MapReduce is the workhorse, and I can't see it going away, but Spark and Tez (both in the Apache Incubator) can be more efficient for certain types of pipelines, so it makes sense to support them as alternative execution engines. For comparison, work is currently underway to make Hive and Pig both take advantage of the more flexible DAGs that Tez supports, so it's natural to do something similar in Crunch.

        Show
        Tom White added a comment - This sounds like a great addition! Regarding whether it fits in the project, I think it does. MapReduce is the workhorse, and I can't see it going away, but Spark and Tez (both in the Apache Incubator) can be more efficient for certain types of pipelines, so it makes sense to support them as alternative execution engines. For comparison, work is currently underway to make Hive and Pig both take advantage of the more flexible DAGs that Tez supports, so it's natural to do something similar in Crunch.
        Hide
        Josh Wills added a comment -

        Thanks Tom. My goal for the project is to be useful to MapReduce developers, and I suspect that many MapReduce developers are going to become Tez/Spark developers in the coming years. I think that anything we can do to smooth those transitions and ensure that they can easily select the right framework for the job at hand is a worthwhile goal for this community.

        Show
        Josh Wills added a comment - Thanks Tom. My goal for the project is to be useful to MapReduce developers, and I suspect that many MapReduce developers are going to become Tez/Spark developers in the coming years. I think that anything we can do to smooth those transitions and ensure that they can easily select the right framework for the job at hand is a worthwhile goal for this community.
        Hide
        Sandy Ryza added a comment -

        Excited for this.

        Show
        Sandy Ryza added a comment - Excited for this.
        Hide
        Micah Whitacre added a comment - - edited

        +1 on the idea. Still need to review the patch.

        Show
        Micah Whitacre added a comment - - edited +1 on the idea. Still need to review the patch.
        Hide
        Gabriel Reid added a comment -

        I went through the patch, and it looks good to me. I haven't got any experience with Spark, so I can't say much about the Spark stuff, but the refactoring in crunch-core looks good, and looks like it should fit well if we want to expand to Tez or something else in the future as well. I also like the speed that the Spark integration tests run at

        Show
        Gabriel Reid added a comment - I went through the patch, and it looks good to me. I haven't got any experience with Spark, so I can't say much about the Spark stuff, but the refactoring in crunch-core looks good, and looks like it should fit well if we want to expand to Tez or something else in the future as well. I also like the speed that the Spark integration tests run at
        Hide
        Josh Wills added a comment -

        Here's a new patch that integrates the changes in CRUNCH-294.

        Show
        Josh Wills added a comment - Here's a new patch that integrates the changes in CRUNCH-294 .
        Hide
        Josh Wills added a comment -

        Latest and greatest: passes all of the sorting and mapside join integration tests.

        It turns out that our sorting library (in a completely unsurprising fashion) expects that keys will be sorted in the reducer. But there's no reason that has to be the case in Spark, and I'm sure that there are a lot of use cases where turning off sorting would be a welcome speed improvement. So here's my question: do we turn off sorting in Spark by default and add an option in GroupingOptions to turn it on, or do we leave it on by default in Spark and add an option to turn it off?

        Show
        Josh Wills added a comment - Latest and greatest: passes all of the sorting and mapside join integration tests. It turns out that our sorting library (in a completely unsurprising fashion) expects that keys will be sorted in the reducer. But there's no reason that has to be the case in Spark, and I'm sure that there are a lot of use cases where turning off sorting would be a welcome speed improvement. So here's my question: do we turn off sorting in Spark by default and add an option in GroupingOptions to turn it on, or do we leave it on by default in Spark and add an option to turn it off?
        Hide
        Gabriel Reid added a comment -

        Cool!

        About the sorting issue, I think that turning it off by default and adding the option in GroupingOptions to turn it on makes the most sense. As far as I understand, the main point of Spark is improved performance, so it would seem wrong to lose a portion of the speed improvement for something that can be handled as easily as just only turning it on when needed.

        Show
        Gabriel Reid added a comment - Cool! About the sorting issue, I think that turning it off by default and adding the option in GroupingOptions to turn it on makes the most sense. As far as I understand, the main point of Spark is improved performance, so it would seem wrong to lose a portion of the speed improvement for something that can be handled as easily as just only turning it on when needed.
        Hide
        Josh Wills added a comment -

        Okay-- so are you cool w/me editing the code in the o.a.c.lib packages to explicitly turn on sorting where it is necessary?

        Show
        Josh Wills added a comment - Okay-- so are you cool w/me editing the code in the o.a.c.lib packages to explicitly turn on sorting where it is necessary?
        Hide
        Gabriel Reid added a comment -

        Yeah, that sounds right. I'm thinking that this is basically just a matter of adding a sortedKeys() or similar method to GroupingOptions and GroupingOptions.Builder, right?

        Off the top of my head I would expect that this is probably only needed in the sorting and joining library code (or am I totally underestimating the impact that this would have)?

        Show
        Gabriel Reid added a comment - Yeah, that sounds right. I'm thinking that this is basically just a matter of adding a sortedKeys() or similar method to GroupingOptions and GroupingOptions.Builder, right? Off the top of my head I would expect that this is probably only needed in the sorting and joining library code (or am I totally underestimating the impact that this would have)?
        Hide
        Josh Wills added a comment -

        Adds support for requireSortedKeys() and the trick for applying combine functions in Spark that we discussed on the list. (Thanks to Gabriel Reid for that.)

        Show
        Josh Wills added a comment - Adds support for requireSortedKeys() and the trick for applying combine functions in Spark that we discussed on the list. (Thanks to Gabriel Reid for that.)
        Hide
        Josh Wills added a comment -

        Gabriel Reid this is my (hopefully!) last stab at this; I added support for writing multiple PCollections to the same output target and also created cache() and cache(CachingOptions) methods on PCollection to expose Spark's caching functionality to the client (in MR mode, cache() is just syntactic sugar for a materialize call where you don't care about the outputs.) I feel ready to commit.

        Show
        Josh Wills added a comment - Gabriel Reid this is my (hopefully!) last stab at this; I added support for writing multiple PCollections to the same output target and also created cache() and cache(CachingOptions) methods on PCollection to expose Spark's caching functionality to the client (in MR mode, cache() is just syntactic sugar for a materialize call where you don't care about the outputs.) I feel ready to commit.
        Hide
        Gabriel Reid added a comment -

        Looks good to me in general, although I'm running into one small issue – it's not compiling for me directly under maven (jdk 1.7.0_40 on Mac OS X), although it is fine compiling within Eclipse.

        The issue is line 42 in BaseDoTable.java. Replacing that line with

            return (CombineFn) fn;
        

        resolves the issue for me, although looking at that change that's required to get it to compile and the generics info that's being thrown away, I wonder if there is something else to worry about there.

        At first I wasn't so sure about the new cache() methods on PCollection, but thinking about it more I think it's actually even a more logical naming for the way that materialize() is currently used to cache results of a computation on disk, so I'm all for it.

        Show
        Gabriel Reid added a comment - Looks good to me in general, although I'm running into one small issue – it's not compiling for me directly under maven (jdk 1.7.0_40 on Mac OS X), although it is fine compiling within Eclipse. The issue is line 42 in BaseDoTable.java. Replacing that line with return (CombineFn) fn; resolves the issue for me, although looking at that change that's required to get it to compile and the generics info that's being thrown away, I wonder if there is something else to worry about there. At first I wasn't so sure about the new cache() methods on PCollection, but thinking about it more I think it's actually even a more logical naming for the way that materialize() is currently used to cache results of a computation on disk, so I'm all for it.
        Hide
        Josh Wills added a comment -

        Gabriel Reid that error makes sense to me, actually-- the CombineFn generic definition has more constraints on it than the DoFn<S, Pair<K, V>> has at that point (i.e., there's a constraint that S must be Pair<K, Iterable<V>>). I'm fine with that fix.

        Re: caching, I agree with you. I hesitated at first, but then I realized that a lot of the use of materialize() is really to signal a split point in a pipeline, and that returning the PCollection instead of an Iterable will be more literate.

        Show
        Josh Wills added a comment - Gabriel Reid that error makes sense to me, actually-- the CombineFn generic definition has more constraints on it than the DoFn<S, Pair<K, V>> has at that point (i.e., there's a constraint that S must be Pair<K, Iterable<V>>). I'm fine with that fix. Re: caching, I agree with you. I hesitated at first, but then I realized that a lot of the use of materialize() is really to signal a split point in a pipeline, and that returning the PCollection instead of an Iterable will be more literate.
        Hide
        Josh Wills added a comment -

        Ridiculously huge patch committed.

        Show
        Josh Wills added a comment - Ridiculously huge patch committed.

          People

          • Assignee:
            Josh Wills
            Reporter:
            Josh Wills
          • Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development