Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-1183

Serializable job components: Mapper, Reducer, InputFormat, OutputFormat et al

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.21.0
    • Fix Version/s: None
    • Component/s: client
    • Labels:
      None

      Description

      Currently the Map-Reduce framework uses Configuration to pass information about the various aspects of a job such as Mapper, Reducer, InputFormat, OutputFormat, OutputCommitter etc. and application developers use org.apache.hadoop.mapreduce.Job.set*Class apis to set them at job-submission time:

      Job.setMapperClass(IdentityMapper.class);
      Job.setReducerClass(IdentityReducer.class);
      Job.setInputFormatClass(TextInputFormat.class);
      Job.setOutputFormatClass(TextOutputFormat.class);
      ...
      

      The proposal is that we move to a model where end-users interact with org.apache.hadoop.mapreduce.Job via actual objects which are then serialized by the framework:

      Job.setMapper(new IdentityMapper());
      Job.setReducer(new IdentityReducer());
      Job.setInputFormat(new TextInputFormat("in"));
      Job.setOutputFormat(new TextOutputFormat("out"));
      ...
      

        Issue Links

          Activity

          Hide
          Juan Moreno added a comment -

          Yes Please! I have been dying for this feature. This is similar to the way Storm handles Job creation and submission. The instances serve as prototypes to be distributed across the cluster. Further, this would allow for dynamic submission of jobs. Thus, the JobClient should be changed to accept a dynamically created JobConf.

          Show
          Juan Moreno added a comment - Yes Please! I have been dying for this feature. This is similar to the way Storm handles Job creation and submission. The instances serve as prototypes to be distributed across the cluster. Further, this would allow for dynamic submission of jobs. Thus, the JobClient should be changed to accept a dynamically created JobConf.
          Hide
          Harsh J added a comment -

          (Not a blocker for any release, so unmarking it as one)

          Show
          Harsh J added a comment - (Not a blocker for any release, so unmarking it as one)
          Hide
          Iván de Prado added a comment -

          We have an implementation in Pangool (http://pangool.net) for this. It could serve as a proof-of-concept and for getting ideas for this ticket as well.

          We are using the DistributedCache with success to send serialized instances of the mappers, reducers, comparators and input/output formats. You can see an example of usage here: (http://pangool.net/userguide/TupleMrBuilder.html).

          For supporting legacy classes, we have created wrapper classes that receive in the constructor the class to be instantiated. This is the way to be compatible with the code created the old way.

          Something that would be useful is making all Writable classes implement Serializable. This is needed if you want to be able to create static instances in your mapper/reducer to be reused, without needing to instantiate them in the setup() method.

          Show
          Iván de Prado added a comment - We have an implementation in Pangool ( http://pangool.net ) for this. It could serve as a proof-of-concept and for getting ideas for this ticket as well. We are using the DistributedCache with success to send serialized instances of the mappers, reducers, comparators and input/output formats. You can see an example of usage here: ( http://pangool.net/userguide/TupleMrBuilder.html ). For supporting legacy classes, we have created wrapper classes that receive in the constructor the class to be instantiated. This is the way to be compatible with the code created the old way. Something that would be useful is making all Writable classes implement Serializable. This is needed if you want to be able to create static instances in your mapper/reducer to be reused, without needing to instantiate them in the setup() method.
          Hide
          Nigel Daley added a comment -

          Too late for 0.22. Moving Fix Version from 0.22 to 0.23.

          Show
          Nigel Daley added a comment - Too late for 0.22. Moving Fix Version from 0.22 to 0.23.
          Hide
          guillaume viland added a comment -

          > Allowing applications to store state in the Mapper and/or Reducer will allow for more natural semantics and will stop them using DistributedCache for trivial state management.

          Could you explain and show examples of "applications storing state" ? In a MapReduce framework what is the meaning of stateless/statefull ?

          Show
          guillaume viland added a comment - > Allowing applications to store state in the Mapper and/or Reducer will allow for more natural semantics and will stop them using DistributedCache for trivial state management. Could you explain and show examples of "applications storing state" ? In a MapReduce framework what is the meaning of stateless/statefull ?
          Hide
          Doug Cutting added a comment -

          > I'm concerned we do not understand the others enough to design the interfaces well enough

          That's a risk. All I'm suggesting is that, as we alter job submission here we should keep in mind where we'd like to go. In particular, we should try not to change it incompatibly more than once. Currently we support specification by class name. If we add another Java-specific mechanism now, we'll have to support it too going forward. If we know we'll change it again soon, then that will be three mechanisms we'll need to support for a time. Perhaps that's tolerable, but two would be better. This may be an opportunity to "future-proof" job submission, or it may not be.

          For example, perhaps your implementation will use the existing mechanism to provide the new capability, e.g., it will set a job's mapper to JavaSerializedMapper, that will then look on the classpath for a particular file that contains the serialized mapper. In this case we can argue that the underlying mechanism isn't changed and all's well. On the other hand, if we were to add new job properties that the framework uses in preference to the existing properties, then we should more carefully think about what these are and whether they're steps in the direction we want job configurations to take. Does that make sense?

          Show
          Doug Cutting added a comment - > I'm concerned we do not understand the others enough to design the interfaces well enough That's a risk. All I'm suggesting is that, as we alter job submission here we should keep in mind where we'd like to go. In particular, we should try not to change it incompatibly more than once. Currently we support specification by class name. If we add another Java-specific mechanism now, we'll have to support it too going forward. If we know we'll change it again soon, then that will be three mechanisms we'll need to support for a time. Perhaps that's tolerable, but two would be better. This may be an opportunity to "future-proof" job submission, or it may not be. For example, perhaps your implementation will use the existing mechanism to provide the new capability, e.g., it will set a job's mapper to JavaSerializedMapper, that will then look on the classpath for a particular file that contains the serialized mapper. In this case we can argue that the underlying mechanism isn't changed and all's well. On the other hand, if we were to add new job properties that the framework uses in preference to the existing properties, then we should more carefully think about what these are and whether they're steps in the direction we want job configurations to take. Does that make sense?
          Hide
          Arun C Murthy added a comment -

          Also, we can add the TaskRunner factory without affecting the submission protocol in any significant manner... and it will keep discussions here grounded on a reasonably small set of requirements.

          Show
          Arun C Murthy added a comment - Also, we can add the TaskRunner factory without affecting the submission protocol in any significant manner... and it will keep discussions here grounded on a reasonably small set of requirements.
          Hide
          Arun C Murthy added a comment -

          The reason I'm slightly wary about adding the factory right-away is that it is really hard to design it in isolation without being completely biased about the JavaTaskRunner. I'm concerned we do not understand the others enough to design the interfaces well enough... am I being too paranoid?

          Show
          Arun C Murthy added a comment - The reason I'm slightly wary about adding the factory right-away is that it is really hard to design it in isolation without being completely biased about the JavaTaskRunner. I'm concerned we do not understand the others enough to design the interfaces well enough... am I being too paranoid?
          Hide
          Doug Cutting added a comment -

          > I agree! I'd propose we track that via a new jira though, to keep patches manageable...

          Ideally we'll only alter the format of job submissions once. For example, a job might specify a class that implements a factory interface, where the factory can create mappers, reducers, partitioners, inputformats, outputformats, etc. The Java factory implementation could use files in the distributed cache to create these, or it might create them entirely from data in the config (for back-compatibility). Implementing other factories can be done in other issues, but we might add the factory API in this issue, no?

          Show
          Doug Cutting added a comment - > I agree! I'd propose we track that via a new jira though, to keep patches manageable... Ideally we'll only alter the format of job submissions once. For example, a job might specify a class that implements a factory interface, where the factory can create mappers, reducers, partitioners, inputformats, outputformats, etc. The Java factory implementation could use files in the distributed cache to create these, or it might create them entirely from data in the config (for back-compatibility). Implementing other factories can be done in other issues, but we might add the factory API in this issue, no?
          Hide
          Arun C Murthy added a comment -

          How would we implement this? Would we serialize these to the splits file? To a new per-job file? In a parameter to the job-submission RPC?

          I'm considering a separate, per-job file via the DistributedCache.

          Long-term it would be nice if job-submissions could be easily made by non-Java applications. [...]

          I agree! I'd propose we track that via a new jira though, to keep patches manageable...

          Show
          Arun C Murthy added a comment - How would we implement this? Would we serialize these to the splits file? To a new per-job file? In a parameter to the job-submission RPC? I'm considering a separate, per-job file via the DistributedCache. Long-term it would be nice if job-submissions could be easily made by non-Java applications. [...] I agree! I'd propose we track that via a new jira though, to keep patches manageable...
          Hide
          Doug Cutting added a comment -

          This would be a nice API for Java.

          How would we implement this? Would we serialize these to the splits file? To a new per-job file? In a parameter to the job-submission RPC?

          Long-term it would be nice if job-submissions could be easily made by non-Java applications. So a job submission might specify a TaskRunner implementation name, plus one or more blobs that are consumed by that TaskRunner, to implement map, reduce, partition, inputformat and outputformat, etc. JavaTaskRunner might use Java serialization to create its blobs, while a PythonTaskRunner and CTaskRunner might do something else. The TaskRunners would all be implemented in Java, but would provide the glue for other native MapReduce APIs. If we agree that this is the sort of long-term architecture we seek, should we add it now?

          Show
          Doug Cutting added a comment - This would be a nice API for Java. How would we implement this? Would we serialize these to the splits file? To a new per-job file? In a parameter to the job-submission RPC? Long-term it would be nice if job-submissions could be easily made by non-Java applications. So a job submission might specify a TaskRunner implementation name, plus one or more blobs that are consumed by that TaskRunner, to implement map, reduce, partition, inputformat and outputformat, etc. JavaTaskRunner might use Java serialization to create its blobs, while a PythonTaskRunner and CTaskRunner might do something else. The TaskRunners would all be implemented in Java, but would provide the glue for other native MapReduce APIs. If we agree that this is the sort of long-term architecture we seek, should we add it now?
          Hide
          Philip Zeyliger added a comment -

          I like this API.

          Show
          Philip Zeyliger added a comment - I like this API.
          Hide
          Hong Tang added a comment -

          @arun, yes, that makes sense now.

          @tom, for mappers and reducers that do not implement serializable, we can treat them as if they are stateless, which could be useful to provide backward compatibility.

          Show
          Hong Tang added a comment - @arun, yes, that makes sense now. @tom, for mappers and reducers that do not implement serializable, we can treat them as if they are stateless, which could be useful to provide backward compatibility.
          Hide
          Tom White added a comment -

          +1 This is a nicer API for users, I think.

          The only reason not to serialize mappers and reducers that I can think of is that users will be forced to think about how they are serialized. This may be simply a matter of adding "implements Serializable" (particularly for stateless mappers and reducers), so maybe it's not a big burden (and consistency is important).

          An application which needs a very small amount of state in the Mapper/Reducer (say a small map of metadata) is forced to use DistributedCache

          Alternatively you can store a small amount of state in the configuration, which is generally easier.

          Also, the new API for TextInputFormat and TextOutputFormat could take a varargs list of paths in the constructor, or use the builder pattern.

          Show
          Tom White added a comment - +1 This is a nicer API for users, I think. The only reason not to serialize mappers and reducers that I can think of is that users will be forced to think about how they are serialized. This may be simply a matter of adding "implements Serializable" (particularly for stateless mappers and reducers), so maybe it's not a big burden (and consistency is important). An application which needs a very small amount of state in the Mapper/Reducer (say a small map of metadata) is forced to use DistributedCache Alternatively you can store a small amount of state in the configuration, which is generally easier. Also, the new API for TextInputFormat and TextOutputFormat could take a varargs list of paths in the constructor, or use the builder pattern.
          Hide
          Arun C Murthy added a comment -

          Why do we need to serialize mappers and reducers?

          Two reasons:

          1. Once we go down the path of serializing InputFormat and OutputFormat, we might as well do the whole nine-yards, otherwise it's really odd...
          2. Allowing applications to store state in the Mapper and/or Reducer will allow for more natural semantics and will stop them using DistributedCache for trivial state management.
          Show
          Arun C Murthy added a comment - Why do we need to serialize mappers and reducers? Two reasons: Once we go down the path of serializing InputFormat and OutputFormat, we might as well do the whole nine-yards, otherwise it's really odd... Allowing applications to store state in the Mapper and/or Reducer will allow for more natural semantics and will stop them using DistributedCache for trivial state management.
          Hide
          Hong Tang added a comment -

          Why do we need to serialize mappers and reducers?

          Show
          Hong Tang added a comment - Why do we need to serialize mappers and reducers?
          Hide
          Arun C Murthy added a comment -

          The current Configuration-based system has issues in a couple of use-cases:

          1. The primary drawback: Difficulty in implementing a Composite {Input|Output}Format
            Pig is in the middle of a re-write of their Load/Store interfaces (http://wiki.apache.org/pig/LoadStoreRedesignProposal) where they want to be able to take an arbitrary InputFormat or OutputFormat and wrap it for use within Pig. Similarly a 'CompositeInputFormat' which can work with multiple InputFormats (say a map-side merge between data in multiple SequenceFiles and TFiles) leads to a situation where we push the {Input|Output}

            Format to deal with multiple copies of Configuration and manage them. This necessary because using a single Configuration results in same configuration key being over-written by multiple instances of

            {Input|Output}

            Format (say mapred.input.dir over-written by SequenceFileInputFormat and TFileInputFormat).

          2. Annoyance: An application which needs a very small amount of state in the Mapper/Reducer (say a small map of metadata) is forced to use DistributedCache, it's much more natural to have that state stored in the Mapper/Reducer and have it serialized from the client to the compute nodes.

          Thus the proposal is to move to a model where an actual Mapper/Reducer/InputFormat/OutputFormat object is serialized by the framework, thus eliminating the need for using Configuration for storing the requisite information and using the object to keep the necessary state e.g. FileInputFormat will have a member to keep a list of input-paths to be processed.

          The new api would look like:

          Job job = new Job();
          job.setMapper(new WordCountMapper());
          job.setReducer(new WordCountReducer());
          InputFormat in = new TextInputFormat("in");
          in.addInputPath("in2");
          OutputFormat out = new TextOutputFormat("out");
          job.setInputFormat(in);
          job.setOutputFormat(out);
          job.waitForCompletion();
          

          Thoughts?

          Show
          Arun C Murthy added a comment - The current Configuration-based system has issues in a couple of use-cases: The primary drawback: Difficulty in implementing a Composite {Input|Output}Format Pig is in the middle of a re-write of their Load/Store interfaces ( http://wiki.apache.org/pig/LoadStoreRedesignProposal ) where they want to be able to take an arbitrary InputFormat or OutputFormat and wrap it for use within Pig. Similarly a 'CompositeInputFormat' which can work with multiple InputFormats (say a map-side merge between data in multiple SequenceFiles and TFiles) leads to a situation where we push the {Input|Output} Format to deal with multiple copies of Configuration and manage them. This necessary because using a single Configuration results in same configuration key being over-written by multiple instances of {Input|Output} Format (say mapred.input.dir over-written by SequenceFileInputFormat and TFileInputFormat). Annoyance: An application which needs a very small amount of state in the Mapper/Reducer (say a small map of metadata) is forced to use DistributedCache, it's much more natural to have that state stored in the Mapper/Reducer and have it serialized from the client to the compute nodes. Thus the proposal is to move to a model where an actual Mapper/Reducer/InputFormat/OutputFormat object is serialized by the framework, thus eliminating the need for using Configuration for storing the requisite information and using the object to keep the necessary state e.g. FileInputFormat will have a member to keep a list of input-paths to be processed. The new api would look like: Job job = new Job(); job.setMapper(new WordCountMapper()); job.setReducer(new WordCountReducer()); InputFormat in = new TextInputFormat("in"); in.addInputPath("in2"); OutputFormat out = new TextOutputFormat("out"); job.setInputFormat(in); job.setOutputFormat(out); job.waitForCompletion(); Thoughts?

            People

            • Assignee:
              Owen O'Malley
              Reporter:
              Arun C Murthy
            • Votes:
              3 Vote for this issue
              Watchers:
              31 Start watching this issue

              Dates

              • Created:
                Updated:

                Development