Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-1849

Implement a FlumeJava-like library for operations over parallel collections using Hadoop MapReduce

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The API used internally at Google is described in great detail at http://portal.acm.org/citation.cfm?id=1806596.1806638.

        Activity

        Hide
        qwertymaniac Harsh J added a comment -

        Crunch is now a top level Apache project (Apache Crunch): http://crunch.apache.org.

        Resolving as Implemented (long time pending, issue gone stale here).

        Show
        qwertymaniac Harsh J added a comment - Crunch is now a top level Apache project (Apache Crunch): http://crunch.apache.org . Resolving as Implemented (long time pending, issue gone stale here).
        Hide
        rahul.sharma Rahul Sharma added a comment -

        This is now available in Apache incubator at http://incubator.apache.org/crunch/. Pls pour in your thoughts/suggestions.

        Show
        rahul.sharma Rahul Sharma added a comment - This is now available in Apache incubator at http://incubator.apache.org/crunch/ . Pls pour in your thoughts/suggestions.
        Hide
        hammer Jeff Hammerbacher added a comment -

        Josh Wills from Cloudera has implemented the FlumeJava API in an open source project called Crunch: http://www.cloudera.com/blog/2011/10/introducing-crunch/. The code is currently on Github: https://github.com/cloudera/crunch. Once we've built a community of contributors, we plan to take the project to the Apache Incubator. If you're interested in programmatic workflow construction from Java, we'd welcome your contributions!

        Show
        hammer Jeff Hammerbacher added a comment - Josh Wills from Cloudera has implemented the FlumeJava API in an open source project called Crunch: http://www.cloudera.com/blog/2011/10/introducing-crunch/ . The code is currently on Github: https://github.com/cloudera/crunch . Once we've built a community of contributors, we plan to take the project to the Apache Incubator. If you're interested in programmatic workflow construction from Java, we'd welcome your contributions!
        Hide
        tlipcon Todd Lipcon added a comment -

        Josh Wills has implemented this as a project on github: https://github.com/cloudera/crunch/

        Show
        tlipcon Todd Lipcon added a comment - Josh Wills has implemented this as a project on github: https://github.com/cloudera/crunch/
        Hide
        tdunning Ted Dunning added a comment -

        Pig is not a suitable framework for this because it imposes very high overhead due to a very wide and complex API and no abstract syntax layer.

        Contrarily, Plume is moving along very nicely. We have a preliminary optimizer that actually does some important optimizations that Pig doesn't do.

        Check it out:

        See http://github.com/tdunning/Plume for the source, http://tdunning.blogspot.com/2010/07/new-grool.html for some discussion.

        Show
        tdunning Ted Dunning added a comment - Pig is not a suitable framework for this because it imposes very high overhead due to a very wide and complex API and no abstract syntax layer. Contrarily, Plume is moving along very nicely. We have a preliminary optimizer that actually does some important optimizations that Pig doesn't do. Check it out: See http://github.com/tdunning/Plume for the source, http://tdunning.blogspot.com/2010/07/new-grool.html for some discussion.
        Hide
        vicaya Luke Lu added a comment -

        IMO, this kind of "fluent"/LINQish interface might be better built on top of pig (preferably pig libraries, a la LINQ for SQL etc.) as you don't want/need to reinvent much of the scheduling/optimization in these existing work.

        Show
        vicaya Luke Lu added a comment - IMO, this kind of "fluent"/LINQish interface might be better built on top of pig (preferably pig libraries, a la LINQ for SQL etc.) as you don't want/need to reinvent much of the scheduling/optimization in these existing work.
        Hide
        tdunning Ted Dunning added a comment -

        The github implementation of Plume now supports local evaluation in an eager as opposed to lazy fashion. Avro file reading is working in at least one example. The execution plan optimizer is beginning to work. We have an emulated map-reduce framework working to support mocking up the full map-reduce execution. Nobody is working on the Hadoop interface yet so if there is a volunteer for that, they would be very welcome.

        Anybody who would like to contribute is welcome as long as they are willing to Apache license their contributions.

        See http://github.com/tdunning/Plume for the source, http://tdunning.blogspot.com/2010/07/new-grool.html for some discussion.

        Show
        tdunning Ted Dunning added a comment - The github implementation of Plume now supports local evaluation in an eager as opposed to lazy fashion. Avro file reading is working in at least one example. The execution plan optimizer is beginning to work. We have an emulated map-reduce framework working to support mocking up the full map-reduce execution. Nobody is working on the Hadoop interface yet so if there is a volunteer for that, they would be very welcome. Anybody who would like to contribute is welcome as long as they are willing to Apache license their contributions. See http://github.com/tdunning/Plume for the source, http://tdunning.blogspot.com/2010/07/new-grool.html for some discussion.
        Hide
        tdunning Ted Dunning added a comment -

        See http://tdunning.blogspot.com/2010/07/new-grool.html

        I have started a github with an eager, local execution approximate clone of FlumeJava. My thought is to work out the API design before moving to parallel execution.

        My current implementation has a word count example and has flushed out at least one issue in the API design (which google seems to have gotten right, btw). It could use an Avro expert to look at it to guide how to integrate Avro into the type structure.

        Show
        tdunning Ted Dunning added a comment - See http://tdunning.blogspot.com/2010/07/new-grool.html I have started a github with an eager, local execution approximate clone of FlumeJava. My thought is to work out the API design before moving to parallel execution. My current implementation has a word count example and has flushed out at least one issue in the API design (which google seems to have gotten right, btw). It could use an Avro expert to look at it to guide how to integrate Avro into the type structure.
        Hide
        vicaya Luke Lu added a comment -

        Agree that this should really be a separate project, as it's at a much higher level than mapreduce per se and it should be able to utilize features from a non-mapreduce framework, say, an alternative BSP implementation.

        Show
        vicaya Luke Lu added a comment - Agree that this should really be a separate project, as it's at a much higher level than mapreduce per se and it should be able to utilize features from a non-mapreduce framework, say, an alternative BSP implementation.
        Hide
        cutting Doug Cutting added a comment -

        I think this is better done as a separate project. Our goal in the MapReduce project should be to provide a low-level execution engine for higher-level APIs like this. Wherever possible we should strive to reduce the amount of user-level code in the base mapreduce system. This permits user-code to be versioned independently from the critical fault-tolerant base system. The base system should focus on reliability and performance, not on high-level features.

        Show
        cutting Doug Cutting added a comment - I think this is better done as a separate project. Our goal in the MapReduce project should be to provide a low-level execution engine for higher-level APIs like this. Wherever possible we should strive to reduce the amount of user-level code in the base mapreduce system. This permits user-code to be versioned independently from the critical fault-tolerant base system. The base system should focus on reliability and performance, not on high-level features.
        Hide
        jake.mannix Jake Mannix added a comment -

        While I agree that doing cool Hadoop via functional JVM languages like Scala and Clojure are great ideas, I think part of the point of the findings of this paper (and the point of this particular JIRA ticket) is concerning a simple, object-oriented Java API which has "distributed primitives" that the typical java programmer can easily understand and integrate with their current code with minimal effort.

        Show
        jake.mannix Jake Mannix added a comment - While I agree that doing cool Hadoop via functional JVM languages like Scala and Clojure are great ideas, I think part of the point of the findings of this paper (and the point of this particular JIRA ticket) is concerning a simple, object-oriented Java API which has "distributed primitives" that the typical java programmer can easily understand and integrate with their current code with minimal effort.
        Hide
        jghoman Jakob Homan added a comment -

        I am not sure closure such as scala's would work on a distributed, multi JVM setup such as Hadoop. Otherwise I agree with Luke's POV.

        Matei and the Spark guys got it working quite well: http://www.cs.berkeley.edu/~matei/spark/

        Show
        jghoman Jakob Homan added a comment - I am not sure closure such as scala's would work on a distributed, multi JVM setup such as Hadoop. Otherwise I agree with Luke's POV. Matei and the Spark guys got it working quite well: http://www.cs.berkeley.edu/~matei/spark/
        Hide
        ogrisel Olivier Grisel added a comment -

        I am not sure closure such as scala's would work on a distributed, multi JVM setup such as Hadoop. Otherwise I agree with Luke's POV.

        Show
        ogrisel Olivier Grisel added a comment - I am not sure closure such as scala's would work on a distributed, multi JVM setup such as Hadoop. Otherwise I agree with Luke's POV.
        Hide
        vicaya Luke Lu added a comment -

        I had some experience with Cascading in production code. One of the major benefits of being a java library from my POV is easy unit testing of various user defined operations, which is inconvenient in most DSLs. OTOH, Cascading forces you to define data-flows explicitly (which is not so bad, if you have nice FlowBuilder utility class).

        FlumeJava, IMO, actually captures the essence of MapReduce originated from functional programming. The immutable P* collections and side-effect free (no global effect) DoFn's allows many optimization opportunities a la Haskell's lazy evaluation (deferred evaluation in the paper.) However the lack of type inference and closure in Java makes the usage much more verbose than necessary. I think similar libraries could be better implemented in Scala.

        Show
        vicaya Luke Lu added a comment - I had some experience with Cascading in production code. One of the major benefits of being a java library from my POV is easy unit testing of various user defined operations, which is inconvenient in most DSLs. OTOH, Cascading forces you to define data-flows explicitly (which is not so bad, if you have nice FlowBuilder utility class). FlumeJava, IMO, actually captures the essence of MapReduce originated from functional programming. The immutable P* collections and side-effect free (no global effect) DoFn's allows many optimization opportunities a la Haskell's lazy evaluation (deferred evaluation in the paper.) However the lack of type inference and closure in Java makes the usage much more verbose than necessary. I think similar libraries could be better implemented in Scala.
        Hide
        jake.mannix Jake Mannix added a comment -

        [quote]
        The main difference from Pig seems to be allowing users to work in Java.
        [quote]

        To add my $0.02: FlumeJava lets the developers work in an object-oriented language, period. The difference between writing a Pig "script", or a SQL (or Hive variant therof) "query" and being able to seamlessly integrate distributed primitives (primitive not meaning java primitive, but "basic building block") in a standard java program is amazing

        The real comparison is between FlumeJava and Cascading, which also lets you stay in java-land, and has a query-plan optimizer. I'm no expert in Cascading, but it seems the primitives in Cascading are "verbs" related to flows, while FlumeJava really settles on a DistributedDataSet (PCollection, for them) as the object which has methods, and can be passed to methods of other (either distributed or normal) objects. I don't know if that is clearly better, but it certainly seems more in line with the way most people program in java.

        Show
        jake.mannix Jake Mannix added a comment - [quote] The main difference from Pig seems to be allowing users to work in Java. [quote] To add my $0.02: FlumeJava lets the developers work in an object-oriented language, period . The difference between writing a Pig "script", or a SQL (or Hive variant therof) "query" and being able to seamlessly integrate distributed primitives (primitive not meaning java primitive, but "basic building block") in a standard java program is amazing The real comparison is between FlumeJava and Cascading , which also lets you stay in java-land, and has a query-plan optimizer. I'm no expert in Cascading, but it seems the primitives in Cascading are "verbs" related to flows, while FlumeJava really settles on a DistributedDataSet (PCollection, for them) as the object which has methods, and can be passed to methods of other (either distributed or normal) objects. I don't know if that is clearly better, but it certainly seems more in line with the way most people program in java.
        Hide
        hammer Jeff Hammerbacher added a comment -

        Some things you get for free from being a Java library: control flow (branching, looping, etc.), composability (functions, classes, packages), IDE support, etc. Having PigLatin execute on top of something like FlumeJava could be interesting

        Show
        hammer Jeff Hammerbacher added a comment - Some things you get for free from being a Java library: control flow (branching, looping, etc.), composability (functions, classes, packages), IDE support, etc. Having PigLatin execute on top of something like FlumeJava could be interesting
        Hide
        hammer Jeff Hammerbacher added a comment -

        Owen: sure. They provide "derived operators" as well, like count(), join(), and top(). The main difference from Pig seems to be allowing users to work in Java. In fact, the Google team initially implemented their approach in a new language called Lumberjack, but mentions that, among other things, the implementation of a new language was a lot of work, and most importantly, novelty is an obstacle to adoption. They settled on Java and seem to have had some internal success.

        Show
        hammer Jeff Hammerbacher added a comment - Owen: sure. They provide "derived operators" as well, like count(), join(), and top(). The main difference from Pig seems to be allowing users to work in Java. In fact, the Google team initially implemented their approach in a new language called Lumberjack, but mentions that, among other things, the implementation of a new language was a lot of work, and most importantly, novelty is an obstacle to adoption. They settled on Java and seem to have had some internal success.
        Hide
        owen.omalley Owen O'Malley added a comment -

        I haven't read the paper yet, but can you summarize how this differs from Pig? Those operators all map into Pig's operators one to one. Pig also supports join, which is really nice to have automated support for.

        Show
        owen.omalley Owen O'Malley added a comment - I haven't read the paper yet, but can you summarize how this differs from Pig? Those operators all map into Pig's operators one to one. Pig also supports join, which is really nice to have automated support for.
        Hide
        tdunning Ted Dunning added a comment -

        Another sweet trick for this would be to allow multiple modes of execution that are all efficiently implemented. These should include local threaded, non-redundant distributed map-reduce (a la Twister) and full-on Hadoop. That gives highest speed for small jobs, medium speed for medium jobs at the cost of task failure = job resubmit and full scalability and reliability for the largest jobs.

        Right now, anything but full scale hadoop execution is the red-headed child and gets no love.

        Show
        tdunning Ted Dunning added a comment - Another sweet trick for this would be to allow multiple modes of execution that are all efficiently implemented. These should include local threaded, non-redundant distributed map-reduce (a la Twister) and full-on Hadoop. That gives highest speed for small jobs, medium speed for medium jobs at the cost of task failure = job resubmit and full scalability and reliability for the largest jobs. Right now, anything but full scale hadoop execution is the red-headed child and gets no love.
        Hide
        jake.mannix Jake Mannix added a comment -

        +1 from this casual observer over from Mahout-land (nobody ever seems to believe me that this would make Hadoop programmers soooooo much more efficient).

        I've written a half-baked, bug-ridden, inefficient version of this several times in the past, and it would be so useful to have done right.

        An api which essentially wrapped a SequenceFile<K,V> and allowed you to do things like

        Path dataPath = new Path("hdfs://foo/bar");
        PTable<K,V> data = new PTable<K,V>(dataPath);
        LightWeightMap<K,V,KOUT,VOUT> map = new MyMapper();
        PTable<KOUT,VOUT> transformedData = data.parallelDo(map);

        etc. would be awesome.

        Of course, the real trick is writing a good optimizer which can figure out how to squish together separate M/R steps into one (for example, parallelDo() returns a PCollection, which you might then do groupByKey() on, but these could often easily be combined into the Map and Reduce steps of a single job).

        Show
        jake.mannix Jake Mannix added a comment - +1 from this casual observer over from Mahout-land (nobody ever seems to believe me that this would make Hadoop programmers soooooo much more efficient). I've written a half-baked, bug-ridden, inefficient version of this several times in the past, and it would be so useful to have done right. An api which essentially wrapped a SequenceFile<K,V> and allowed you to do things like Path dataPath = new Path("hdfs://foo/bar"); PTable<K,V> data = new PTable<K,V>(dataPath); LightWeightMap<K,V,KOUT,VOUT> map = new MyMapper(); PTable<KOUT,VOUT> transformedData = data.parallelDo(map); etc. would be awesome. Of course, the real trick is writing a good optimizer which can figure out how to squish together separate M/R steps into one (for example, parallelDo() returns a PCollection, which you might then do groupByKey() on, but these could often easily be combined into the Map and Reduce steps of a single job).
        Hide
        hammer Jeff Hammerbacher added a comment -

        Data Model

        • "The central class of the FlumeJava library is PCollection<T>, a (possibly huge) immutable bag of elements of type T."
          • Can be unordered (collection) or ordered (sequence)
          • Could be created with an underlying Java Collection<T> for local execution
          • recordsOf() can be used to indicate how to read the elements of the collection (cf. Pig's LoadFunc or Hive's SerDe)
        • Second central class: PTable<K, V>
          • Immutable multi-map with keys of class K and values of class V
          • Subclass of PCollection<Pair<K, V>>

        Operators

        • parallelDo(PCollection<T>): PCollection<S>; runs S doFunc(T) over each element
        • groupByKey(PTable<Pair<K,V>>): PTable<Pair<K, Collection<V>>>: turns a multi-map into a uni-map
        • combineValues(PTable<Pair<K, Collection<V>>): PTable<Pair<K, V>>: does the reduction
        • flatten(): logical view of multiple PCollections as one PCollection
        • writeToRecordFileTable() to flush the output of a pipeline to a table
        Show
        hammer Jeff Hammerbacher added a comment - Data Model "The central class of the FlumeJava library is PCollection<T> , a (possibly huge) immutable bag of elements of type T." Can be unordered (collection) or ordered (sequence) Could be created with an underlying Java Collection<T> for local execution recordsOf() can be used to indicate how to read the elements of the collection (cf. Pig's LoadFunc or Hive's SerDe) Second central class: PTable<K, V> Immutable multi-map with keys of class K and values of class V Subclass of PCollection<Pair<K, V>> Operators parallelDo(PCollection<T>): PCollection<S>; runs S doFunc(T) over each element groupByKey(PTable<Pair<K,V>>): PTable<Pair<K, Collection<V>>>: turns a multi-map into a uni-map combineValues(PTable<Pair<K, Collection<V>>): PTable<Pair<K, V>>: does the reduction flatten(): logical view of multiple PCollections as one PCollection writeToRecordFileTable() to flush the output of a pipeline to a table

          People

          • Assignee:
            Unassigned
            Reporter:
            hammer Jeff Hammerbacher
          • Votes:
            5 Vote for this issue
            Watchers:
            43 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development