Hadoop Common
  1. Hadoop Common
  2. HADOOP-372

should allow to specify different inputformat classes for different input dirs for Map/Reduce jobs

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.4.0
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Environment:

      all

    • Hadoop Flags:
      Reviewed

      Description

      Right now, the user can specify multiple input directories for a map reduce job.
      However, the files under all the directories are assumed to be in the same format,
      with the same key/value classes. This proves to be a serious limit in many situations.
      Here is an example. Suppose I have three simple tables:
      one has URLs and their rank values (page ranks),
      another has URLs and their classification values,
      and the third one has the URL meta data such as crawl status, last crawl time, etc.
      Suppose now I need a job to generate a list of URLs to be crawled next.
      The decision depends on the info in all the three tables.
      Right now, there is no easy way to accomplish this.

      However, this job can be done if the framework allows to specify different inputformats for different input dirs.
      Suppose my three tables are in the following directory respectively: rankTable, classificationTable. and metaDataTable.
      If we extend JobConf class with the following method (as Owen suggested to me):
      addInputPath(aPath, anInputFormatClass, anInputKeyClass, anInputValueClass)
      Then I can specify my job as follows:
      addInputPath(rankTable, SequenceFileInputFormat.class, UTF8.class, DoubleWritable.class)
      addInputPath(classificationTable, TextInputFormat.class, UTF8,class, UTF8.class)
      addInputPath(metaDataTable, SequenceFileInputFormat.class, UTF8.class, MyRecord.class)
      If an input directory is added through the current API, it will have the same meaning as it is now.
      Thus this extension will not affect any applications that do not need this new feature.

      It is relatively easy for the M/R framework to create an appropriate record reader for a map task based on the above information.
      And that is the only change needed for supporting this extension.

      1. hadoop-372.patch
        10 kB
        Tom White
      2. hadoop-372.patch
        23 kB
        Chris Smith
      3. hadoop-372.patch
        23 kB
        Chris Smith
      4. hadoop-372.patch
        23 kB
        Chris Smith
      5. hadoop-372.patch
        23 kB
        Chris Smith

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          Can you provide more details? Is the intent for the mapred.input.format.class property to become multivalued, a parallel list to mapred.input.dir, and when the latter is longer than the former, the first (or last?) input format is used for unmatched entries? I can imagine how MapTask might create its keys, values and a RecordReader, but how would getSplits() and checkInputDirectories() work?

          Another approach to implementing this is to write an InputFormat that wraps keys and/or values from files of different types in ObjectWritable. Then map() methods unwrap, introspect and cast. With your approach map methods still need to introspect and cache, this just adds the wrapper.

          To eliminate the wrapper we'd need to move the getInputKeyClass() and getInputValueClass() methods to RecordReader. These are only called in MapRunner.java, when a RecordReader is already available, so this would be an easy change, and the default implementation could be back-compatible, accessing the job.

          That's a simpler approach, no? Just add files with different keys and value types, and let the types in the files drive things rather than having to declare them up front.

          Show
          Doug Cutting added a comment - Can you provide more details? Is the intent for the mapred.input.format.class property to become multivalued, a parallel list to mapred.input.dir, and when the latter is longer than the former, the first (or last?) input format is used for unmatched entries? I can imagine how MapTask might create its keys, values and a RecordReader, but how would getSplits() and checkInputDirectories() work? Another approach to implementing this is to write an InputFormat that wraps keys and/or values from files of different types in ObjectWritable. Then map() methods unwrap, introspect and cast. With your approach map methods still need to introspect and cache, this just adds the wrapper. To eliminate the wrapper we'd need to move the getInputKeyClass() and getInputValueClass() methods to RecordReader. These are only called in MapRunner.java, when a RecordReader is already available, so this would be an easy change, and the default implementation could be back-compatible, accessing the job. That's a simpler approach, no? Just add files with different keys and value types, and let the types in the files drive things rather than having to declare them up front.
          Hide
          Runping Qi added a comment -

          Doug,

          My thought is to add a Map object to the JobConf class that keep track the explicit association between input path and the classes (inputformat, key and value). If there is no entry in the Map object for a Path, then the path is associated with the classes set through the current APIs (setInputFormatClass(), setInputKeyClass, setInputValueClass) (or default classes if they are not set). It will be convenient if the JobConf class provides APIs for getting/setting them:

          class getInputFormatClassForPath(Path p)
          class getInputKeyClassForPath(Path p)
          class getInputValueClassForPath(Path p)

          It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader (and this should be more logical!).
          This is easy to achieve since the implementation of the method of InputFormat Interface
          getRecordReader FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
          has enough information to extract the key/value classes for the split.

          It is also convenient to let Split class keep track for the inputformat, key, value class, so that we can get the right RecordReader for a given split.

          In MapTask class, use the following lines:
          final RecordReader rawIn = // open input
          split.getInputFormat().getRecordReader
          (FileSystem.get(job), split, job, reporter);
          to replace
          final RecordReader rawIn = // open input
          job.getInputFormat().getRecordReader
          (FileSystem.get(job), split, job, reporter);

          Finally, the the getSplits of InputFormatBase class should be changed to a static method, since it is independent of any concrete InputFormat implementation (and this is kind of necessary, since the exact inputformat will not be known prior to creating splits). The initTasks() method of
          JobInProgress class needs to make sure all the inputformat classes are loaded properly, or let getSplits() method to take care of it.

          That should have covered most of the needed changes

          Show
          Runping Qi added a comment - Doug, My thought is to add a Map object to the JobConf class that keep track the explicit association between input path and the classes (inputformat, key and value). If there is no entry in the Map object for a Path, then the path is associated with the classes set through the current APIs (setInputFormatClass(), setInputKeyClass, setInputValueClass) (or default classes if they are not set). It will be convenient if the JobConf class provides APIs for getting/setting them: class getInputFormatClassForPath(Path p) class getInputKeyClassForPath(Path p) class getInputValueClassForPath(Path p) It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader (and this should be more logical!). This is easy to achieve since the implementation of the method of InputFormat Interface getRecordReader FileSystem fs, FileSplit split, JobConf job, Reporter reporter) has enough information to extract the key/value classes for the split. It is also convenient to let Split class keep track for the inputformat, key, value class, so that we can get the right RecordReader for a given split. In MapTask class, use the following lines: final RecordReader rawIn = // open input split.getInputFormat().getRecordReader (FileSystem.get(job), split, job, reporter); to replace final RecordReader rawIn = // open input job.getInputFormat().getRecordReader (FileSystem.get(job), split, job, reporter); Finally, the the getSplits of InputFormatBase class should be changed to a static method, since it is independent of any concrete InputFormat implementation (and this is kind of necessary, since the exact inputformat will not be known prior to creating splits). The initTasks() method of JobInProgress class needs to make sure all the inputformat classes are loaded properly, or let getSplits() method to take care of it. That should have covered most of the needed changes
          Hide
          Runping Qi added a comment -

          Second thought about the stuff related to InputFormat after chatting with Owen. The getSplits should stay as non-static, but should take
          a inputPathDir as a parameter. The initTasks should load all the
          classes and create an InputFormat object per class. For each inputPathDir, the initTasks should call the splits method against the corresponding InputFormat object, and finally union all the splits into one list.

          Show
          Runping Qi added a comment - Second thought about the stuff related to InputFormat after chatting with Owen. The getSplits should stay as non-static, but should take a inputPathDir as a parameter. The initTasks should load all the classes and create an InputFormat object per class. For each inputPathDir, the initTasks should call the splits method against the corresponding InputFormat object, and finally union all the splits into one list.
          Hide
          Doug Cutting added a comment -

          > My thought is to add a Map object to the JobConf class [ ...]

          JobConf, underneath, is a Properties instance, mapping String->String. Adding a map to it is a non-trivial extension to the configuration mechanism.

          I didn't hear a response to my suggestion that we instead simply move the getInputKeyClass() and getInputValueClass() methods to RecordReader. Would that not meet your needs?

          Show
          Doug Cutting added a comment - > My thought is to add a Map object to the JobConf class [ ...] JobConf, underneath, is a Properties instance, mapping String->String. Adding a map to it is a non-trivial extension to the configuration mechanism. I didn't hear a response to my suggestion that we instead simply move the getInputKeyClass() and getInputValueClass() methods to RecordReader. Would that not meet your needs?
          Hide
          Runping Qi added a comment -

          Doug,

          I did respond your suggestion in my previous comment (it is in the middle of the text, thus may be easy to overlook:

          >It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader (and >this should be more logical!).
          >This is easy to achieve since the implementation of the method of InputFormat Interface
          > getRecordReader FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
          >has enough information to extract the key/value classes for the split.

          I think String-->String mapping is enough for encoding the mapping from inputpathdirs to classes (same as how we do with the input key/value classes, except with different attribute names).

          Show
          Runping Qi added a comment - Doug, I did respond your suggestion in my previous comment (it is in the middle of the text, thus may be easy to overlook : >It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader (and >this should be more logical!). >This is easy to achieve since the implementation of the method of InputFormat Interface > getRecordReader FileSystem fs, FileSplit split, JobConf job, Reporter reporter) >has enough information to extract the key/value classes for the split. I think String-->String mapping is enough for encoding the mapping from inputpathdirs to classes (same as how we do with the input key/value classes, except with different attribute names).
          Hide
          Doug Cutting added a comment -

          >It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader

          And would this change alone not resolve this issue for you? You could intermix input files with different types in a job, and specify no input key or value classes. The InputFormat would determine the types of the inputs. For SequenceFiles, the types are named in the file, for text files they are fixed. It would not allow you to, e.g., intermix text and SequenceFile inputs, but that's not what you've asked for here, and that could easily be achived through a dispatching InputFormat.

          Show
          Doug Cutting added a comment - >It is a good idea to move the getInputKeyClass() and getInputValueClass() methods to RecordReader And would this change alone not resolve this issue for you? You could intermix input files with different types in a job, and specify no input key or value classes. The InputFormat would determine the types of the inputs. For SequenceFiles, the types are named in the file, for text files they are fixed. It would not allow you to, e.g., intermix text and SequenceFile inputs, but that's not what you've asked for here, and that could easily be achived through a dispatching InputFormat.
          Hide
          Owen O'Malley added a comment -

          I like the idea of moving the getInput

          {Key,Value}

          Class to the RecordReader, since there is really no need for the user to specify it except as potential error checking.

          The desire is to do joins across different kinds of tables. So if you have two tables that both contain urls, you could write a mapper for each table and generate the join via having the url being the map output key. I agree that you could implement it by using ObjectWritable to wrap all of the input types, but you'll end up using a lot of "instanceof" to figure out type dynamically and dispatching to the right kind of map.

          I would propose being able to add input directories with specific processing chains:

          JobConf conf = new JobConf();
          conf.addInputPath(new Path("dir1");
          conf.addInputPath(new Path("dir2");
          conf.addInputPath(new Path("my-input"), MyInputFormat.class, MyMapper.class);
          conf.addInputPath(new Path("other-input"), OtherInputFormat.class, OtherMapper.class);

          So "dir1" and "dir2" would be processed with the default InputFormat and Mapper.
          "my-input" would be processed with MyInputFormat and MyMapper and "other-input" would be processed with OtherInputFormat and OtherMapper.

          This allows it to be backward compatible with a minimum of fuss for users that don't want to use multiple input sources.

          Clearly, you would want to encode the class names with the paths when you were encoding the strings.

          Show
          Owen O'Malley added a comment - I like the idea of moving the getInput {Key,Value} Class to the RecordReader, since there is really no need for the user to specify it except as potential error checking. The desire is to do joins across different kinds of tables. So if you have two tables that both contain urls, you could write a mapper for each table and generate the join via having the url being the map output key. I agree that you could implement it by using ObjectWritable to wrap all of the input types, but you'll end up using a lot of "instanceof" to figure out type dynamically and dispatching to the right kind of map. I would propose being able to add input directories with specific processing chains: JobConf conf = new JobConf(); conf.addInputPath(new Path("dir1"); conf.addInputPath(new Path("dir2"); conf.addInputPath(new Path("my-input"), MyInputFormat.class, MyMapper.class); conf.addInputPath(new Path("other-input"), OtherInputFormat.class, OtherMapper.class); So "dir1" and "dir2" would be processed with the default InputFormat and Mapper. "my-input" would be processed with MyInputFormat and MyMapper and "other-input" would be processed with OtherInputFormat and OtherMapper. This allows it to be backward compatible with a minimum of fuss for users that don't want to use multiple input sources. Clearly, you would want to encode the class names with the paths when you were encoding the strings.
          Hide
          Doug Cutting added a comment -

          It seems like a lot of work to avoid instanceof in one's map method. We could instead add a method like:

          Mapper getMapper(Class keyClass, Class valueClass);

          I also still worry that a static split method won't provide enough flexibility. If a job is asked to create N splits, how will these be apportioned to the different inputs? By number of directories, files, bytes, or what?

          Show
          Doug Cutting added a comment - It seems like a lot of work to avoid instanceof in one's map method. We could instead add a method like: Mapper getMapper(Class keyClass, Class valueClass); I also still worry that a static split method won't provide enough flexibility. If a job is asked to create N splits, how will these be apportioned to the different inputs? By number of directories, files, bytes, or what?
          Hide
          Runping Qi added a comment -

          It would be great if we make it optional for the user to specify key/value classes for the inputs.
          Theses classes, if specified by the user, would be used for validation check only. That way, the framework will get the input key/value class info from the the input files alone. That will be very robust.

          Assuming that all records in each split will have the same key/value classes, then the framework can determine the appropriate mapper class upfront, and call the map function of that mapper class, thus the map function does not need to call instanceof method.

          The split will not be static. The only constraint is that a split will not span across multiple files, which is true now.

          Show
          Runping Qi added a comment - It would be great if we make it optional for the user to specify key/value classes for the inputs. Theses classes, if specified by the user, would be used for validation check only. That way, the framework will get the input key/value class info from the the input files alone. That will be very robust. Assuming that all records in each split will have the same key/value classes, then the framework can determine the appropriate mapper class upfront, and call the map function of that mapper class, thus the map function does not need to call instanceof method. The split will not be static. The only constraint is that a split will not span across multiple files, which is true now.
          Hide
          Doug Cutting added a comment -

          I think this issue is now mostly duplicated by HADOOP-450, which Owen is working on. If there are no objections, I will close this issue.

          Perhaps we should also add a new issue related to supporting different map functions for different input directories.

          Show
          Doug Cutting added a comment - I think this issue is now mostly duplicated by HADOOP-450 , which Owen is working on. If there are no objections, I will close this issue. Perhaps we should also add a new issue related to supporting different map functions for different input directories.
          Hide
          Owen O'Malley added a comment -

          HADOOP-450 certainly made this easier, but it doesn't really solve the problem. However, I think that with an implementation of HADOOP-451 (abstract input splits), we could do this all in library code. Something along the lines of:

          public class MultiInputPath

          { /* must be called after the InputFormat and MapRunner have been set in the JobConf */ public static void addHooks(JobConf conf); /* add a new input directory and the relevant input format and mapper classes */ public static void addInputPath(conf, Path inputDirectory, Class InputFormat, Class Mapper); }

          The MultiInputPath will wrap the user's InputFormat, InputSplit, and MapRunner. In particular, the InputSplit will contain the user's InputSplit as well as the InputFormat and Mapper classes.

          This will pull all of the specialized code into a library instead of the framework, which is a big win.

          Thoughts?

          Show
          Owen O'Malley added a comment - HADOOP-450 certainly made this easier, but it doesn't really solve the problem. However, I think that with an implementation of HADOOP-451 (abstract input splits), we could do this all in library code. Something along the lines of: public class MultiInputPath { /* must be called after the InputFormat and MapRunner have been set in the JobConf */ public static void addHooks(JobConf conf); /* add a new input directory and the relevant input format and mapper classes */ public static void addInputPath(conf, Path inputDirectory, Class InputFormat, Class Mapper); } The MultiInputPath will wrap the user's InputFormat, InputSplit, and MapRunner. In particular, the InputSplit will contain the user's InputSplit as well as the InputFormat and Mapper classes. This will pull all of the specialized code into a library instead of the framework, which is a big win. Thoughts?
          Hide
          Runping Qi added a comment -

          The patch for Hadoop-450 laid the foundation for this issue. But the specific aspects of this issue are yet to be addressed, and I started to work on a patch for it. The patch is to:

          • allow the user to specify different input format classes for different input directories
          • allow the user to specify different mapper classes for different key/value class pair.

          My thought is to extend the JobConf class with the following methods:

          public void setInputFormatClass(Class theInputFormatClass, Path p)

          for specifying different input format classes
          for specifying different mapper classes.

          The FileSplit class should be extended to have a method:
          public Class getInputFormatClass()

          The initTasks method of JobInProgress should make due changes to create file FileSplit objects with the correct input format class information.

          For supporting different mapper classes, we can expend the JobConf class:

          public void setMapperClass(Class theMapperClass, Class theKeyClass, Class theValueClass)
          public Class getMapperClass(Class theKeyClass, Class theValueClass)

          The idea is that, for each split, we know the input format class, from there, we know the corresponding record reader, then we know the key/value classes of the input records.

          Another possibility is to allow the user to specify a mapper class per input path, in the same way as for the input format class. To do that, the FileSplit class needs to support the following method:
          public Class getMapperClass()

          Thoughts?

          Show
          Runping Qi added a comment - The patch for Hadoop-450 laid the foundation for this issue. But the specific aspects of this issue are yet to be addressed, and I started to work on a patch for it. The patch is to: allow the user to specify different input format classes for different input directories allow the user to specify different mapper classes for different key/value class pair. My thought is to extend the JobConf class with the following methods: public void setInputFormatClass(Class theInputFormatClass, Path p) for specifying different input format classes for specifying different mapper classes. The FileSplit class should be extended to have a method: public Class getInputFormatClass() The initTasks method of JobInProgress should make due changes to create file FileSplit objects with the correct input format class information. For supporting different mapper classes, we can expend the JobConf class: public void setMapperClass(Class theMapperClass, Class theKeyClass, Class theValueClass) public Class getMapperClass(Class theKeyClass, Class theValueClass) The idea is that, for each split, we know the input format class, from there, we know the corresponding record reader, then we know the key/value classes of the input records. Another possibility is to allow the user to specify a mapper class per input path, in the same way as for the input format class. To do that, the FileSplit class needs to support the following method: public Class getMapperClass() Thoughts?
          Hide
          Owen O'Malley added a comment -

          I really think this should be done outside of the framework without adding 4 new fairly complicated public methods into the JobConf.

          I also don't like using types to distinguish which Mapper to use. A typical case will be two directories using TextInputFormats where you want to apply separate Mappers to pull out different fields. I think it is much nicer to define the entire pipeline, rather than using types to infer it.

          InputSplit (from an input directory) -> RecordReader (via InputFormat) -> Mapper

          Show
          Owen O'Malley added a comment - I really think this should be done outside of the framework without adding 4 new fairly complicated public methods into the JobConf. I also don't like using types to distinguish which Mapper to use. A typical case will be two directories using TextInputFormats where you want to apply separate Mappers to pull out different fields. I think it is much nicer to define the entire pipeline, rather than using types to infer it. InputSplit (from an input directory) -> RecordReader (via InputFormat) -> Mapper
          Hide
          Owen O'Malley added a comment -

          Ok, let me modify this a bit. How about if we define a new class that defines a map input processing pipeline. Each pipeline is given the input directory and the JobConf when it is created and then gets to pick the appropriate InputFormat and Mapper classes.

          public class InputPipeline

          { public InputPipeline(Path inputDir, JobConf conf); public Class createInputFormat(); public Class getMapper(); public int getRequestedNumMaps(); }

          The JobConf then picks up 2 methods:

          JobConf:
          public void setInputPipelineClass(Class cls);
          public Class getInputPipelineClass();

          The default will be InputPipeline that just uses the values from the JobConf for all paths.

          The framework changes are pretty light. Just creating the InputPipeline when iterating through the input directories and using that to create the splits. We need to add the InputFormat and Mapper classes to the MapTask and make MapTask.localizeConfiguration set the Mapper class.

          To complete the picture, I'd also add a class in org.apache.hadoop.mapred.lib that looks like:

          public class MultiInputPipeline extends InputPipeline

          { public static void addInputPipeline(JobConf conf, Path inputDir, Class inputFormat, Class mapper, int numMaps); ... }

          That makes this look like the other hooks that are currently in Hadoop and provides the flexibility that the users need.

          Show
          Owen O'Malley added a comment - Ok, let me modify this a bit. How about if we define a new class that defines a map input processing pipeline. Each pipeline is given the input directory and the JobConf when it is created and then gets to pick the appropriate InputFormat and Mapper classes. public class InputPipeline { public InputPipeline(Path inputDir, JobConf conf); public Class createInputFormat(); public Class getMapper(); public int getRequestedNumMaps(); } The JobConf then picks up 2 methods: JobConf: public void setInputPipelineClass(Class cls); public Class getInputPipelineClass(); The default will be InputPipeline that just uses the values from the JobConf for all paths. The framework changes are pretty light. Just creating the InputPipeline when iterating through the input directories and using that to create the splits. We need to add the InputFormat and Mapper classes to the MapTask and make MapTask.localizeConfiguration set the Mapper class. To complete the picture, I'd also add a class in org.apache.hadoop.mapred.lib that looks like: public class MultiInputPipeline extends InputPipeline { public static void addInputPipeline(JobConf conf, Path inputDir, Class inputFormat, Class mapper, int numMaps); ... } That makes this look like the other hooks that are currently in Hadoop and provides the flexibility that the users need.
          Hide
          Doug Cutting added a comment -

          I'm having trouble seeing how this is used. Can you provide some pseudo application code?

          Without understanding it, the API looks ugly. There's a lot of meta-stuff going on--passing classes around rather than instances. Your prior proposal was much more straightforward: RecordReader creates the Mapper. What problem with that are you now trying to solve?

          Show
          Doug Cutting added a comment - I'm having trouble seeing how this is used. Can you provide some pseudo application code? Without understanding it, the API looks ugly. There's a lot of meta-stuff going on--passing classes around rather than instances. Your prior proposal was much more straightforward: RecordReader creates the Mapper. What problem with that are you now trying to solve?
          Hide
          Runping Qi added a comment -

          I think it may be better to make InputPipeline as an interface:

          public interface InputPipeline

          { public Class createInputFormat(Path inputDir, JobConf conf); public Class getMapper(Path inputDir, JobConf conf); public int getRequestedNumMaps(Path inputDir, JobConf conf); }

          In order to customizing input processing pipeline, the user is expected to provide a class
          that implements the interface:

          public class MyInputPipeline implements InputPipeline

          { public MyInputPipeline(); public Class getInputFormat(Path inputDir, JobConf conf); public Class getMapper(Path inputDir, JobConf conf); public int getRequestedNumMaps(Path inputDir, JobConf conf); }

          The class provides a argumentless constructor, and implements the logic to
          determine an InputFormat class and a mapper class that apply to the data under the
          specified input directory, and determine
          the expected number of mappers for the data under the directory.

          The access to this class is through two new methods of JobConf:

          public void setInputPipelineClass(Class cls);
          public Class getInputPipelineClass();

          To specify such a class, the user just simply call:
          myJob.setInputPipelineClass(MyInputPipeline.class)

          The initTasks method of JobInProgress class can do something like the following:

          String jobFile = profile.getJobFile();

          JobConf jd = new JobConf(localJobFile);
          FileSystem fs = FileSystem.get(conf);
          String userPipelineClassName = jd.get("mapred.input.pipeline.class");
          InputPipeline inputPipeline ;
          if (userPipelineClassName != null && localJarFile != null) {
          try {
          ClassLoader loader =
          new URLClassLoader(new URL[]

          { localFs.pathToFile(localJarFile).toURL() }

          );
          Class inputPipelineClass = Class.forName(userPipelineClassName , true, loader);
          inputPipeline = (InputPipeline )inputPipelineClass.newInstance();
          } catch (Exception e)

          { throw new IOException(e.toString()); }

          } else

          { inputPipeline = jd.getInputPipeline(); }

          ArrayList allSplits;
          InputFormat inputFormat;
          InputFormat genericInputFormat;
          Class mapperClass;
          Class genericMapperClass;
          int numMapTasks;
          // code to get the generic input format, mapper class for the job

          Path[] dirs = job.getInputPaths();
          for (int i = 0; i < dirs.length; i++) {
          inputFormat = genericInputFormat;
          if (inputPipeline != null) {
          Class inputFormatClass = inputPipeline.getInputFormat(dirs[i], jd);

          if (inputFormatClass != null)

          { inputFormat = (InputFormat)inputFormatClass.newInstance(); }


          mapperClass = inputPipeline.getMapper(dirs[i], jd);
          if (mapperClass == null)

          { mapperClass = genericMapperClass; }

          int numMapTasks = inputPipeline.getRequestedNumMaps(dirs[i], jd);

          }
          FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);
          // add the new splits to allSplits
          ....
          }

          FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);

          Show
          Runping Qi added a comment - I think it may be better to make InputPipeline as an interface: public interface InputPipeline { public Class createInputFormat(Path inputDir, JobConf conf); public Class getMapper(Path inputDir, JobConf conf); public int getRequestedNumMaps(Path inputDir, JobConf conf); } In order to customizing input processing pipeline, the user is expected to provide a class that implements the interface: public class MyInputPipeline implements InputPipeline { public MyInputPipeline(); public Class getInputFormat(Path inputDir, JobConf conf); public Class getMapper(Path inputDir, JobConf conf); public int getRequestedNumMaps(Path inputDir, JobConf conf); } The class provides a argumentless constructor, and implements the logic to determine an InputFormat class and a mapper class that apply to the data under the specified input directory, and determine the expected number of mappers for the data under the directory. The access to this class is through two new methods of JobConf: public void setInputPipelineClass(Class cls); public Class getInputPipelineClass(); To specify such a class, the user just simply call: myJob.setInputPipelineClass(MyInputPipeline.class) The initTasks method of JobInProgress class can do something like the following: String jobFile = profile.getJobFile(); JobConf jd = new JobConf(localJobFile); FileSystem fs = FileSystem.get(conf); String userPipelineClassName = jd.get("mapred.input.pipeline.class"); InputPipeline inputPipeline ; if (userPipelineClassName != null && localJarFile != null) { try { ClassLoader loader = new URLClassLoader(new URL[] { localFs.pathToFile(localJarFile).toURL() } ); Class inputPipelineClass = Class.forName(userPipelineClassName , true, loader); inputPipeline = (InputPipeline )inputPipelineClass.newInstance(); } catch (Exception e) { throw new IOException(e.toString()); } } else { inputPipeline = jd.getInputPipeline(); } ArrayList allSplits; InputFormat inputFormat; InputFormat genericInputFormat; Class mapperClass; Class genericMapperClass; int numMapTasks; // code to get the generic input format, mapper class for the job Path[] dirs = job.getInputPaths(); for (int i = 0; i < dirs.length; i++) { inputFormat = genericInputFormat; if (inputPipeline != null) { Class inputFormatClass = inputPipeline.getInputFormat(dirs [i] , jd); if (inputFormatClass != null) { inputFormat = (InputFormat)inputFormatClass.newInstance(); } mapperClass = inputPipeline.getMapper(dirs [i] , jd); if (mapperClass == null) { mapperClass = genericMapperClass; } int numMapTasks = inputPipeline.getRequestedNumMaps(dirs [i] , jd); } FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks); // add the new splits to allSplits .... } FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);
          Hide
          Runping Qi added a comment -

          One correction on my previous comment: InputFormat should extend with the following method:

          FileSplit[] getSplits(FileSystem fs, JobConf jd, Path p, int numMapTasks);

          and the initTasks method should call:
          FileSplit[] splits = inputFormat.getSplits(fs, jd, dirs[i], numMapTasks);

          Show
          Runping Qi added a comment - One correction on my previous comment: InputFormat should extend with the following method: FileSplit[] getSplits(FileSystem fs, JobConf jd, Path p, int numMapTasks); and the initTasks method should call: FileSplit[] splits = inputFormat.getSplits(fs, jd, dirs [i] , numMapTasks);
          Hide
          Owen O'Malley added a comment -

          The question is how the user requests multiple InputPipelines. Either they do:

          job.setInputPipeline(MyInputPipelineFactory.class);

          or

          job.addInputPath(myInputDir, MyInputFormat.class, MyMapper.class, numMaps);

          Setting a class is more similar to the other hooks to control processing in the JobConf, but the expanded addInputPath is more user friendly. I guess you're right that it is better to be friendly. smile

          Whether the InputFormat's and Mapper's class is stored in the MapTask or FileSplit is a separate question. I think it makes more sense to put into the MapTask since that is a private type and doesn't change an API. Especially when you consider that we are going to generalize the FileSplit to InputSplit.

          Finally, I think that getSplits should take a Path[] rather that just a path so that we can split evenly over a set of input paths. So I propose getSplits(FileSystem, Path[], JobConf, int numMaps).

          In the long run, as we move Path's over to URL's that include their FileSystem we should drop explicit FileSystem parameters like this. But that is another patch. grin

          Show
          Owen O'Malley added a comment - The question is how the user requests multiple InputPipelines. Either they do: job.setInputPipeline(MyInputPipelineFactory.class); or job.addInputPath(myInputDir, MyInputFormat.class, MyMapper.class, numMaps); Setting a class is more similar to the other hooks to control processing in the JobConf, but the expanded addInputPath is more user friendly. I guess you're right that it is better to be friendly. smile Whether the InputFormat's and Mapper's class is stored in the MapTask or FileSplit is a separate question. I think it makes more sense to put into the MapTask since that is a private type and doesn't change an API. Especially when you consider that we are going to generalize the FileSplit to InputSplit. Finally, I think that getSplits should take a Path[] rather that just a path so that we can split evenly over a set of input paths. So I propose getSplits(FileSystem, Path[], JobConf, int numMaps). In the long run, as we move Path's over to URL's that include their FileSystem we should drop explicit FileSystem parameters like this. But that is another patch. grin
          Hide
          Doug Cutting added a comment -

          > job.addInputPath(myInputDir, MyInputFormat.class, MyMapper.class, numMaps);

          Why can't this simply be

          job.addInputPath(myInputDir, MyInputFormat.class);

          and let the mapper be determined by the inputformat?

          I also don't think that users should typically be specifying the number of map tasks. Their code should run unchanged on clusters of various sizes. The number of map tasks should thus be determined dynamically when splitting.

          I also think the above should simply be syntactic sugar for:

          job.setInputFormat(MyInputFormat.class)
          job.set("my.input.dirs", "foo,bar");
          job.set("my.mappers", "FooMapper,BarMapper");

          Then implement MyInputFormat to process my.input.dirs and my.mappers.

          In other words, we should foremost first make it possible to do this in user code, then perhaps provide some utilities that simplify things. So the first patch I'd like to see is mappers from record readers. Then we can write some InputFormats that use these new features and perhaps promote some of them into the standard public APIs. Does this make sense? Am I missing something?

          Show
          Doug Cutting added a comment - > job.addInputPath(myInputDir, MyInputFormat.class, MyMapper.class, numMaps); Why can't this simply be job.addInputPath(myInputDir, MyInputFormat.class); and let the mapper be determined by the inputformat? I also don't think that users should typically be specifying the number of map tasks. Their code should run unchanged on clusters of various sizes. The number of map tasks should thus be determined dynamically when splitting. I also think the above should simply be syntactic sugar for: job.setInputFormat(MyInputFormat.class) job.set("my.input.dirs", "foo,bar"); job.set("my.mappers", "FooMapper,BarMapper"); Then implement MyInputFormat to process my.input.dirs and my.mappers. In other words, we should foremost first make it possible to do this in user code, then perhaps provide some utilities that simplify things. So the first patch I'd like to see is mappers from record readers. Then we can write some InputFormats that use these new features and perhaps promote some of them into the standard public APIs. Does this make sense? Am I missing something?
          Hide
          Owen O'Malley added a comment -

          The problem with that is often I won't want to use a non-standard InputFormat. A very typical case is to have the same input format, but different Mappers.

          job.addInputPath("foo", TextInputFormat.class, FooMapper.class);
          job.addInputPath("bar", TextInputFormat.class, BarMapper.class);

          The same applies to SequenceFileInputFormat (or combinations of the two), especially now that the input key/value types are created by the RecordReader.

          Show
          Owen O'Malley added a comment - The problem with that is often I won't want to use a non-standard InputFormat. A very typical case is to have the same input format, but different Mappers. job.addInputPath("foo", TextInputFormat.class, FooMapper.class); job.addInputPath("bar", TextInputFormat.class, BarMapper.class); The same applies to SequenceFileInputFormat (or combinations of the two), especially now that the input key/value types are created by the RecordReader.
          Hide
          Doug Cutting added a comment -

          > A very typical case is to have the same input format, but different Mappers

          But, if the mapper is a function of the input format this can instead be:

          job.addInputPath("foo", FooInput.class);
          job.addInputPath("bar", BarInput.class);

          Where FooInput is defined with something like:

          public class FooInput extends TextInput {
          public void map(...)

          { ... }

          ;
          }

          In other words, if you're going to define custom mappers anyway, then it's no more work to define custom Input formats.

          Show
          Doug Cutting added a comment - > A very typical case is to have the same input format, but different Mappers But, if the mapper is a function of the input format this can instead be: job.addInputPath("foo", FooInput.class); job.addInputPath("bar", BarInput.class); Where FooInput is defined with something like: public class FooInput extends TextInput { public void map(...) { ... } ; } In other words, if you're going to define custom mappers anyway, then it's no more work to define custom Input formats.
          Hide
          arkady borkovsky added a comment -

          The issue as stated in the title is an important useful feature on it's own.

          However the actual problem that motivated it, as described in the original post, does not necessarily imply the suggested solution.
          The problem Runping describes looks like a case of Join to me.
          The suggested solution would help to implement a work-aorund using MapReduce.
          But is this the best way of allocating development resources?
          Implementing Join using MultiJobInput in Map step and SplitIterator in Reduce step is a temporary work-around solution.
          Would it make sense to define interfaces in terms of actual problem (Join)? In short term it still may be implemented piggy-backing MapReduce using two-level keys and MultiJobInput. But the user would be able to think in more appropriate terms.

          Show
          arkady borkovsky added a comment - The issue as stated in the title is an important useful feature on it's own. However the actual problem that motivated it, as described in the original post, does not necessarily imply the suggested solution. The problem Runping describes looks like a case of Join to me. The suggested solution would help to implement a work-aorund using MapReduce. But is this the best way of allocating development resources? Implementing Join using MultiJobInput in Map step and SplitIterator in Reduce step is a temporary work-around solution. Would it make sense to define interfaces in terms of actual problem (Join)? In short term it still may be implemented piggy-backing MapReduce using two-level keys and MultiJobInput. But the user would be able to think in more appropriate terms.
          Hide
          Michel Tourn added a comment -

          Support for "all-pairs" joins is a far-reaching requirement that I don't touch here.

          But I agree with Doug's last comment:

          >In other words, if you're going to define custom mappers anyway,
          >then it's no more work to define custom Input formats.

          Moreover, unless I am missing something, the current APIs already nicely address the requirement in the JIRA issue title.

          JobConf.addInputPath(Path dir)

          JobConf.setInputFormat(Class theClass)

          InputFormat

          { FileSplit[] getSplits(FileSystem, JobConf, preferredNumSplits) RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) ... }

          Given this current API the flow looks like this:

          During Task execution ( InputFormat.getRecordReader() ):

          taks's FileSplit + job's single InputFormat -> Path context -> inputDirectory context --> dispatched "sub" InputFormat --> getRecordReader --> RecordReader instance.

          During JobTracker splits computation ( InputFormat.getSplits() ):

          job's single InputFormat + job's list of input directories --> list of input dirs/files --> list of sub-InputFormat-s --> dispatch and "aggregate" the results from your sub-InputFormat-s .getSplits()

          This is enough to implement the special case discussed in the HADOOP-372 title:

          InputDirectory --> InputFormat

          A framework class or the examples or the Wiki FAQ could demonstrate how one can write such a generic dispatching class:

          class DispatchInputFormat(InputFormat[], JobConf) implements InputFormat

          It is generic but not universal.
          different applications will need to use different information to make the InputFormat dispatch / aggregation decisions.

          Now three ways to customize this DispatchInputFormat.

          1./3. Writing zero java code:

          replace:
          >job.addInputPath("foo", FooInput.class);
          with:
          job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput bar=org.example.BarInput")

          If you want client-side type checking for the classnames, do it in a helper method.
          For example:

          static void DispatchInputFormat.addDir(
          JobConf, Path dir, Class<InputFormat> clazz)
          Call:
          DispatchInputFormat.add(job, new Path("foo"), FooInput.class);
          DispatchInputFormat.add(job, new Path("bar"), BarInput.class);

          Where Class<InputFormat> uses Generics to enforce at compile-time that FooInput implements InputFormat.

          2./3. code reuse without copy-paste

          A few well-placed hooks could allow users to reuse and customize the DispatchInputFormat code without duplicating it:

          class MyInputFormat extends DispatchInputFormat {

          //override
          protected InputFormat inputDirToFormat(Path inputDir)

          { ... }


          }

          3./3. code reuse with copy-paste

          And for more complex requirements that do not fit well with inputDirToFormat():

          one would instead use DispatchInputFormat as a starting point, a source code example.

          Show
          Michel Tourn added a comment - Support for "all-pairs" joins is a far-reaching requirement that I don't touch here. But I agree with Doug's last comment: >In other words, if you're going to define custom mappers anyway, >then it's no more work to define custom Input formats. Moreover, unless I am missing something, the current APIs already nicely address the requirement in the JIRA issue title. JobConf.addInputPath(Path dir) JobConf.setInputFormat(Class theClass) InputFormat { FileSplit[] getSplits(FileSystem, JobConf, preferredNumSplits) RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) ... } Given this current API the flow looks like this: During Task execution ( InputFormat.getRecordReader() ): taks's FileSplit + job's single InputFormat -> Path context -> inputDirectory context --> dispatched "sub" InputFormat --> getRecordReader --> RecordReader instance. During JobTracker splits computation ( InputFormat.getSplits() ): job's single InputFormat + job's list of input directories --> list of input dirs/files --> list of sub-InputFormat-s --> dispatch and "aggregate" the results from your sub-InputFormat-s .getSplits() This is enough to implement the special case discussed in the HADOOP-372 title: InputDirectory --> InputFormat A framework class or the examples or the Wiki FAQ could demonstrate how one can write such a generic dispatching class: class DispatchInputFormat(InputFormat[], JobConf) implements InputFormat It is generic but not universal. different applications will need to use different information to make the InputFormat dispatch / aggregation decisions. Now three ways to customize this DispatchInputFormat. 1./3. Writing zero java code: replace: >job.addInputPath("foo", FooInput.class); with: job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput bar=org.example.BarInput") If you want client-side type checking for the classnames, do it in a helper method. For example: static void DispatchInputFormat.addDir( JobConf, Path dir, Class<InputFormat> clazz) Call: DispatchInputFormat.add(job, new Path("foo"), FooInput.class); DispatchInputFormat.add(job, new Path("bar"), BarInput.class); Where Class<InputFormat> uses Generics to enforce at compile-time that FooInput implements InputFormat. 2./3. code reuse without copy-paste A few well-placed hooks could allow users to reuse and customize the DispatchInputFormat code without duplicating it: class MyInputFormat extends DispatchInputFormat { //override protected InputFormat inputDirToFormat(Path inputDir) { ... } } 3./3. code reuse with copy-paste And for more complex requirements that do not fit well with inputDirToFormat(): one would instead use DispatchInputFormat as a starting point, a source code example.
          Hide
          Benjamin Reed added a comment -

          We have a desperate need to be able to specify different inputformat classes, mappers, and partition functions in the same job. Our need roughly corresponds to this issue. However, we do not assume that a given input will need to be processed by only one inputformat, mapper, or partition function.

          I think Owen was right about directing the discussion towards HADOOP-451. If HADOOP-451 was fixed, we (meaning my project) would not have any issues in this area. We actually use a similar syntax to the one proposed by Michel: job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput bar=org.example.BarInput"), but imagine if we get job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput foo=org.example.BarInput"), now foo must be processed by both FooInput and BarInput. When the splits are created and sent to the mappers, the DispatchInputFormat will get

          {"foo", offset, length}

          , but it has no way of knowing wether to apply FooInput or BarInput. With HADOOP-451 fixed, we could encode the InputFormat to use in the split.

          Show
          Benjamin Reed added a comment - We have a desperate need to be able to specify different inputformat classes, mappers, and partition functions in the same job. Our need roughly corresponds to this issue. However, we do not assume that a given input will need to be processed by only one inputformat, mapper, or partition function. I think Owen was right about directing the discussion towards HADOOP-451 . If HADOOP-451 was fixed, we (meaning my project) would not have any issues in this area. We actually use a similar syntax to the one proposed by Michel: job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput bar=org.example.BarInput"), but imagine if we get job.set("DispatchInputFormat.inputdirmap", "foo=org.example.FooInput foo=org.example.BarInput"), now foo must be processed by both FooInput and BarInput. When the splits are created and sent to the mappers, the DispatchInputFormat will get {"foo", offset, length} , but it has no way of knowing wether to apply FooInput or BarInput. With HADOOP-451 fixed, we could encode the InputFormat to use in the split.
          Hide
          Owen O'Malley added a comment -

          Ok, Doug, Runping & I chatted offline over a whiteboard and here is a proposal that I believe captures something that will work:

          public class InputHandler

          { public List<FileSplit> getSplits(JobConf); public RecordReader getRecordReader(JobConf, FileSplit); public Mapper getMapper(JobConf, FileSplit, RecordReader); }

          where InputHandler is a generalization of the current InputFomat. It is clearly possible to write user code then that handles multiple input directories with different RecordReaders and Mappers, but it isn't a huge jump from where we are now.

          Show
          Owen O'Malley added a comment - Ok, Doug, Runping & I chatted offline over a whiteboard and here is a proposal that I believe captures something that will work: public class InputHandler { public List<FileSplit> getSplits(JobConf); public RecordReader getRecordReader(JobConf, FileSplit); public Mapper getMapper(JobConf, FileSplit, RecordReader); } where InputHandler is a generalization of the current InputFomat. It is clearly possible to write user code then that handles multiple input directories with different RecordReaders and Mappers, but it isn't a huge jump from where we are now.
          Hide
          Benjamin Reed added a comment -

          I like this proposal. Assuming that the RecordReader in getMapper is the really record reader (ie the one returned from getRecordReader) it would address HADOOP-433.

          How would this interact with the ability to define a MapRunnable?

          Show
          Benjamin Reed added a comment - I like this proposal. Assuming that the RecordReader in getMapper is the really record reader (ie the one returned from getRecordReader) it would address HADOOP-433 . How would this interact with the ability to define a MapRunnable?
          Hide
          Doug Cutting added a comment -

          > How would this interact with the ability to define a MapRunnable?

          Good question. Perhaps this interface should return a MapRunnable rather than a Mapper?

          Also, note that the uses of FileSplit will be generalized to the more abstract Split by HADOOP-451.

          Finally, I'd prefer the name "JobInput". But I wonder: should we change it from InputFormat at all? Do we intend to rename OutputFormat too? If not, renaming this might create more confusion in the API than it removes.

          Show
          Doug Cutting added a comment - > How would this interact with the ability to define a MapRunnable? Good question. Perhaps this interface should return a MapRunnable rather than a Mapper? Also, note that the uses of FileSplit will be generalized to the more abstract Split by HADOOP-451 . Finally, I'd prefer the name "JobInput". But I wonder: should we change it from InputFormat at all? Do we intend to rename OutputFormat too? If not, renaming this might create more confusion in the API than it removes.
          Hide
          Owen O'Malley added a comment -

          Because very few users do anything with MapRunnable, I think we should just make the relevant changes to MapRunner.

          Clearly, the name of the class is totally a judgment call. To me, the input "format" is the class of the RecordReader. The scope of this class is naturally much wider.

          So in terms of the user interface, I would propose that the default InputMetaFormat asks the job conf for a RecordReader and a Mapper. So, there should be new JobConf methods:

          // defaults to TextRecordReader
          void setRecordReaderClass(Class);
          Class getRecordReaderClass();

          is that reasonable?

          Show
          Owen O'Malley added a comment - Because very few users do anything with MapRunnable, I think we should just make the relevant changes to MapRunner. Clearly, the name of the class is totally a judgment call. To me, the input "format" is the class of the RecordReader. The scope of this class is naturally much wider. So in terms of the user interface, I would propose that the default InputMetaFormat asks the job conf for a RecordReader and a Mapper. So, there should be new JobConf methods: // defaults to TextRecordReader void setRecordReaderClass(Class); Class getRecordReaderClass(); is that reasonable?
          Hide
          Doug Cutting added a comment -

          We don't expect most users to directly implement this API, rather this is the API the system invokes. So the core API should be MapRunnable, and the library should can provide a standard implementation (MapRunner) whose constructor takes a Mapper class. Otherwise folks would be unable to use a different MapRunnable for each task and we'd be back where we started.

          I don't think we need to add JobConf methods beyond those to access the InputFormat. The default InputFormat will remain TextInputHandler, for back-compatibility, no? We don't want JobConf to keep growing arbitrarily, so we should move to using static methods on library classes to access properties specific to those classes. Thus, if one specifies an InputFormat implementation named MetaInputHandler, then the method to set the record reader would be MetaInputHandler.setRecordReaderClass(Class).

          InputFormatBase will get a new method, getMapRunnable, whose default implementation is something like:

          MapRunnable getMapRunnable(Split)

          { return new MapRunner(InputFormatBase.getMapperClass(getConf())); }

          JobConf.getMapperClass() should be thus deprecated and made to call InputFormatBase.getMapperClass(). So a job might:

          job.setInputFormatClass(SequenceFileInputFormat.class);
          SequenceFileInputFormat.setMapperClass(MyMapper.class);

          Show
          Doug Cutting added a comment - We don't expect most users to directly implement this API, rather this is the API the system invokes. So the core API should be MapRunnable, and the library should can provide a standard implementation (MapRunner) whose constructor takes a Mapper class. Otherwise folks would be unable to use a different MapRunnable for each task and we'd be back where we started. I don't think we need to add JobConf methods beyond those to access the InputFormat. The default InputFormat will remain TextInputHandler, for back-compatibility, no? We don't want JobConf to keep growing arbitrarily, so we should move to using static methods on library classes to access properties specific to those classes. Thus, if one specifies an InputFormat implementation named MetaInputHandler, then the method to set the record reader would be MetaInputHandler.setRecordReaderClass(Class). InputFormatBase will get a new method, getMapRunnable, whose default implementation is something like: MapRunnable getMapRunnable(Split) { return new MapRunner(InputFormatBase.getMapperClass(getConf())); } JobConf.getMapperClass() should be thus deprecated and made to call InputFormatBase.getMapperClass(). So a job might: job.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setMapperClass(MyMapper.class);
          Hide
          Doug Cutting added a comment -

          A thought: instead of altering existing interfaces, we might create new interfaces and support both for a time. That would keep things back-compatible. How much more pain would it be to implement that way?

          Show
          Doug Cutting added a comment - A thought: instead of altering existing interfaces, we might create new interfaces and support both for a time. That would keep things back-compatible. How much more pain would it be to implement that way?
          Hide
          Owen O'Malley added a comment - - edited

          Ok, my current thoughts are as follows:

          interface JobInput {
            void validateInput(JobConf conf)
            List<InputSplit> createSplits(JobConf conf);
          }
          

          and the natural implementation over FileSystem files and directories:

          class FileSystemJobInput implements JobInput { 
            ...
            public static void addInputPath(JobConf conf, Path path);
            public static void addInputPath(JobConf conf, Path path, 
                     Class<? extends RecordReader> reader, 
                     Class<? extends Mapper> mapper);
            public static void setDefaultRecordReader(JobConf conf, 
                     Class<? extends RecordReader> reader);
          }
          

          which would be the default and be used by most applications. The other major change is to InputSplits, which get the ability to define their RecordReader and Mapper.

          interface InputSplit extends Writable {
            ...
            RecordReader createRecordReader();
            Mapper createMapper();
          }
          

          Finally, RecordReader gets a method to set the InputSplit:

          interface RecordReader {
            void initialize(InputSplit split, Progressable progress) throws IOException;
            ...
          }
          

          so that their interface is standard and can be created via ReflectionUtils.newInstance.

          Show
          Owen O'Malley added a comment - - edited Ok, my current thoughts are as follows: interface JobInput { void validateInput(JobConf conf) List<InputSplit> createSplits(JobConf conf); } and the natural implementation over FileSystem files and directories: class FileSystemJobInput implements JobInput { ... public static void addInputPath(JobConf conf, Path path); public static void addInputPath(JobConf conf, Path path, Class <? extends RecordReader> reader, Class <? extends Mapper> mapper); public static void setDefaultRecordReader(JobConf conf, Class <? extends RecordReader> reader); } which would be the default and be used by most applications. The other major change is to InputSplits, which get the ability to define their RecordReader and Mapper. interface InputSplit extends Writable { ... RecordReader createRecordReader(); Mapper createMapper(); } Finally, RecordReader gets a method to set the InputSplit: interface RecordReader { void initialize(InputSplit split, Progressable progress) throws IOException; ... } so that their interface is standard and can be created via ReflectionUtils.newInstance.
          Hide
          Runping Qi added a comment -

          This looks good to me.

          Should we also have a method to set the default mapper?

          One note here is that, it is auumed that all the mapper will genenerate
          the map output values of the same class. I think this is due to the constraint
          imposed by the SequenceFile.
          It would be nice to relax this constrain to only require the output values are writable.
          The framework can serialize the mapout values and tag them with their actual class.
          The value class of the map output files will be simply ByteWritable.
          The framework can automatically restore the values to their actual class
          before they are passed to the reduce function.

          Show
          Runping Qi added a comment - This looks good to me. Should we also have a method to set the default mapper? One note here is that, it is auumed that all the mapper will genenerate the map output values of the same class. I think this is due to the constraint imposed by the SequenceFile. It would be nice to relax this constrain to only require the output values are writable. The framework can serialize the mapout values and tag them with their actual class. The value class of the map output files will be simply ByteWritable. The framework can automatically restore the values to their actual class before they are passed to the reduce function.
          Hide
          Doug Cutting added a comment -

          The other major change is to InputSplits, which get the ability to define their RecordReader and Mapper.

          Hmm. This is a big departure from the previously proposed API:

          public interface JobInput {
            public List<InputSplit> getSplits(JobConf);
            public RecordReader getRecordReader(JobConf, InputSplit);
            public Mapper getMapper(JobConf, InputSplit, RecordReader);
          }
          

          Your new API moves the latter two methods to the InputSplit. Can you motivate this?

          I question whether it's a good idea to move such "policy" methods to an "implementation" class like InputSplit. It seems to me that we'll want to use inheritance to implement InputSplits, and inheritance can fight with implementation of interfaces. A typical application will want to be able to orthogonally define its Mapper and its splitter/RecordReader, and we want to make it as simple as possible to paste such independent implementations together. Splitter and RecordReader implementations will often go together, so it makes sense to have them share implementations. But Mappers are frequently independent. How, using the above, would one define a single mapper that operates over inputs in different formats, but that produce compatible keys and values (i.e., merging or joining)? One should be able to do that by specifying some sort of compound input format, and only a single mapper implementation. One should be able to extend mappers and splitters independently and then glue them together at the last minute. Attaching mappers to the split instance seems like it could complicate that.

          Before we agree on these APIs, I'd like to see some APIs for both reusable splitters and record readers as well as sample application code that uses mappers. Perhaps we should start a wiki page with code examples given various APIs?

          Show
          Doug Cutting added a comment - The other major change is to InputSplits, which get the ability to define their RecordReader and Mapper. Hmm. This is a big departure from the previously proposed API: public interface JobInput { public List<InputSplit> getSplits(JobConf); public RecordReader getRecordReader(JobConf, InputSplit); public Mapper getMapper(JobConf, InputSplit, RecordReader); } Your new API moves the latter two methods to the InputSplit. Can you motivate this? I question whether it's a good idea to move such "policy" methods to an "implementation" class like InputSplit. It seems to me that we'll want to use inheritance to implement InputSplits, and inheritance can fight with implementation of interfaces. A typical application will want to be able to orthogonally define its Mapper and its splitter/RecordReader, and we want to make it as simple as possible to paste such independent implementations together. Splitter and RecordReader implementations will often go together, so it makes sense to have them share implementations. But Mappers are frequently independent. How, using the above, would one define a single mapper that operates over inputs in different formats, but that produce compatible keys and values (i.e., merging or joining)? One should be able to do that by specifying some sort of compound input format, and only a single mapper implementation. One should be able to extend mappers and splitters independently and then glue them together at the last minute. Attaching mappers to the split instance seems like it could complicate that. Before we agree on these APIs, I'd like to see some APIs for both reusable splitters and record readers as well as sample application code that uses mappers. Perhaps we should start a wiki page with code examples given various APIs?
          Hide
          Owen O'Malley added a comment -

          Moving the RecordReader and Mapper into the InputSplit is necessary to support different flavors of joins. In particular, you can do things like:

          addInputPath("/foo/*_2007/" , Reader1.class, Mapper1.class);
          addInputPath("/foo/march_*/", Reader2.class, Mapper2. class);

          and correctly handle any overlaps that occur.

          Part of what has happened recently with RecordReaders shows that they are independent of InputFormats. We have moved almost all of them out of the InputFormats that originally defined them because applications needed to control the splits without changing the RecordReaders.

          Show
          Owen O'Malley added a comment - Moving the RecordReader and Mapper into the InputSplit is necessary to support different flavors of joins. In particular, you can do things like: addInputPath("/foo/*_2007/" , Reader1.class, Mapper1.class); addInputPath("/foo/march_*/", Reader2.class, Mapper2. class); and correctly handle any overlaps that occur. Part of what has happened recently with RecordReaders shows that they are independent of InputFormats. We have moved almost all of them out of the InputFormats that originally defined them because applications needed to control the splits without changing the RecordReaders.
          Hide
          Doug Cutting added a comment -

          > addInputPath("/foo/*_2007/" , Reader1.class, Mapper1.class);

          It might be nice if this were factored to a generic, file-independent interface in the base class, like:

          void addInput(JobConf job, String name, Reader, Mapper);

          This could just add the names to lists on the job, and could be reused by non-file JobInputs.

          Also the base class for InputSplits should have Reader and Mapper fields, right? FileSplit should only add path, start and length fields--the file-specific bits. The generic base class should implement createRecordReader() and createMapper(), right?

          A use case to keep in mind is, if we add an HBaseJobInput, how much code from FSJobInput would need to be duplicated? Hopefully very little.

          Show
          Doug Cutting added a comment - > addInputPath("/foo/*_2007/" , Reader1.class, Mapper1.class); It might be nice if this were factored to a generic, file-independent interface in the base class, like: void addInput(JobConf job, String name, Reader, Mapper); This could just add the names to lists on the job, and could be reused by non-file JobInputs. Also the base class for InputSplits should have Reader and Mapper fields, right? FileSplit should only add path, start and length fields--the file-specific bits. The generic base class should implement createRecordReader() and createMapper(), right? A use case to keep in mind is, if we add an HBaseJobInput, how much code from FSJobInput would need to be duplicated? Hopefully very little.
          Hide
          Doug Cutting added a comment -

          Another thought: we ought to use context objects (/HADOOP-1230) wherever possible here, to simplify future evolution. For example, instead of RecordReader#initialize(InputSplit split, Progressable progress) we should probably have RecordReader#initialize(InputSplit split, MapContext context), with MapContext including a 'getProgress()' method. That way we can add more methods to the system-implemented context without altering the user-implemented API.

          Show
          Doug Cutting added a comment - Another thought: we ought to use context objects (/ HADOOP-1230 ) wherever possible here, to simplify future evolution. For example, instead of RecordReader#initialize(InputSplit split, Progressable progress) we should probably have RecordReader#initialize(InputSplit split, MapContext context), with MapContext including a 'getProgress()' method. That way we can add more methods to the system-implemented context without altering the user-implemented API.
          Hide
          Owen O'Malley added a comment -

          I'll also rename the JobInput to InputSplitter and create a class InputSplitBase that has the generic code for handling the record readers and mappers.

          Show
          Owen O'Malley added a comment - I'll also rename the JobInput to InputSplitter and create a class InputSplitBase that has the generic code for handling the record readers and mappers.
          Hide
          Tom White added a comment -

          Here's a first cut at providing this feature. Note that it doesn't require any framework changes - it's all in the library classes, as Owen pointed out above.

          For a concrete example, consider:

          ExtendedJobConf conf = new ExtendedJobConf(MultiInputFormat.class);
          conf.setMapperClass(KeyMapper.class);
          conf.setReducerClass(IdentityReducer.class);
          
          conf.setOutputPath(new Path("output"));
          
          conf.addInputPath(new Path("input/multiinput/1"),
                  TextInputFormat.class);
          conf.addInputPath(new Path("input/multiinput/2"),
                  SequenceFileInputFormat.class);
          

          It's also possible to specialize the mappers by input path:

          ExtendedJobConf conf = new ExtendedJobConf(MultiInputFormatWithMapper.class);
          conf.setReducerClass(IdentityReducer.class);
          
          conf.setOutputPath(new Path("output"));
          
          conf.addInputPath(new Path("input/multiinput/1"),
                  TextInputFormat.class, IdentityMapper.class);
          conf.addInputPath(new Path("input/multiinput/3"),
                  KeyValueTextInputFormat.class, ValueMapper.class);
          

          Note that this patch uses ExtendedJobConf - are the new addInputPath methods useful enough to be added to JobConf?

          Show
          Tom White added a comment - Here's a first cut at providing this feature. Note that it doesn't require any framework changes - it's all in the library classes, as Owen pointed out above. For a concrete example, consider: ExtendedJobConf conf = new ExtendedJobConf(MultiInputFormat.class); conf.setMapperClass(KeyMapper.class); conf.setReducerClass(IdentityReducer.class); conf.setOutputPath( new Path( "output" )); conf.addInputPath( new Path( "input/multiinput/1" ), TextInputFormat.class); conf.addInputPath( new Path( "input/multiinput/2" ), SequenceFileInputFormat.class); It's also possible to specialize the mappers by input path: ExtendedJobConf conf = new ExtendedJobConf(MultiInputFormatWithMapper.class); conf.setReducerClass(IdentityReducer.class); conf.setOutputPath( new Path( "output" )); conf.addInputPath( new Path( "input/multiinput/1" ), TextInputFormat.class, IdentityMapper.class); conf.addInputPath( new Path( "input/multiinput/3" ), KeyValueTextInputFormat.class, ValueMapper.class); Note that this patch uses ExtendedJobConf - are the new addInputPath methods useful enough to be added to JobConf?
          Hide
          Johan Oskarsson added a comment -

          I've just tried this out, with a few minor changes to make it run on our 0.15.2 cluster.
          This makes writing a lot of our programs a lot easier and the code a lot cleaner, great work Tom!

          Personally I'd like to see the methods in JobConf instead of an extended class, but either way works.

          Show
          Johan Oskarsson added a comment - I've just tried this out, with a few minor changes to make it run on our 0.15.2 cluster. This makes writing a lot of our programs a lot easier and the code a lot cleaner, great work Tom! Personally I'd like to see the methods in JobConf instead of an extended class, but either way works.
          Hide
          Tom White added a comment -

          Personally I'd like to see the methods in JobConf

          Me too - I'll make that change before submitting the patch for inclusion.

          Show
          Tom White added a comment - Personally I'd like to see the methods in JobConf Me too - I'll make that change before submitting the patch for inclusion.
          Hide
          Runping Qi added a comment -

          Hi, that sounds cool.
          After sitting there in open state for almost 2 years, it's nice to see a patch is available!

          Thanks a lot, Tom.

          Runping

          Show
          Runping Qi added a comment - Hi, that sounds cool. After sitting there in open state for almost 2 years, it's nice to see a patch is available! Thanks a lot, Tom. Runping
          Hide
          Johan Oskarsson added a comment -

          We've unfortunately run into an issue with this patch, if we add a lot of input directories we get a huge number of map tasks.
          In this case almost all of the directories have the same mappers and input formats.

          I'm guessing that the issue is that our input format's getSplits will be called separately on each directory instead of over all of the directories at once. Meaning that if we have a directory with for example one 10mb file it will be split up into many map jobs. Normally it would be a part of many files and probably end up into just one map job.

          Perhaps it would be possible to merge all directories using the same input format and mapper into one input format getSplits call?

          Show
          Johan Oskarsson added a comment - We've unfortunately run into an issue with this patch, if we add a lot of input directories we get a huge number of map tasks. In this case almost all of the directories have the same mappers and input formats. I'm guessing that the issue is that our input format's getSplits will be called separately on each directory instead of over all of the directories at once. Meaning that if we have a directory with for example one 10mb file it will be split up into many map jobs. Normally it would be a part of many files and probably end up into just one map job. Perhaps it would be possible to merge all directories using the same input format and mapper into one input format getSplits call?
          Hide
          Chris Smith added a comment -

          Attached is a modified version of Tom's patch.

          Changes:

          • Updated to work against the trunk
          • Moved methods from ExtendedJobConf to static methods in FileInputFormat
          • Fixed issue described by Johan above (multiple paths with the same InputFormat and Mappers are now grouped and split together, rather than separately, so you don't get excessive numbers of map jobs)
          • Generally tidied up and documented the code
          • Added a few unit tests
          Show
          Chris Smith added a comment - Attached is a modified version of Tom's patch. Changes: Updated to work against the trunk Moved methods from ExtendedJobConf to static methods in FileInputFormat Fixed issue described by Johan above (multiple paths with the same InputFormat and Mappers are now grouped and split together, rather than separately, so you don't get excessive numbers of map jobs) Generally tidied up and documented the code Added a few unit tests
          Hide
          Johan Oskarsson added a comment -

          Trying to assign this to Chris Smith since he's actively working on it.
          Unfortunately he's not in the Assignee list, so I'll unassigne this first.

          Show
          Johan Oskarsson added a comment - Trying to assign this to Chris Smith since he's actively working on it. Unfortunately he's not in the Assignee list, so I'll unassigne this first.
          Hide
          Johan Oskarsson added a comment -

          Seems Chris can't assign it to himself or submit the patch to hudson, so I'll let someone with greater jira permissions help him out

          Show
          Johan Oskarsson added a comment - Seems Chris can't assign it to himself or submit the patch to hudson, so I'll let someone with greater jira permissions help him out
          Hide
          Tom White added a comment -

          Thanks for picking this up Chris. Generally looks good. There seem to be some problems with tabs, and there's a System.out.println in the test that doesn't need to be there.

          Show
          Tom White added a comment - Thanks for picking this up Chris. Generally looks good. There seem to be some problems with tabs, and there's a System.out.println in the test that doesn't need to be there.
          Hide
          Chris Smith added a comment -

          New patch, without tabs or superfluous System.out.printlns

          Show
          Chris Smith added a comment - New patch, without tabs or superfluous System.out.printlns
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12385878/hadoop-372.patch
          against trunk revision 676069.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2846/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12385878/hadoop-372.patch against trunk revision 676069. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2846/console This message is automatically generated.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12385966/hadoop-372.patch
          against trunk revision 676069.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to cause Findbugs to fail.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/testReport/
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12385966/hadoop-372.patch against trunk revision 676069. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to cause Findbugs to fail. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/testReport/ Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2856/console This message is automatically generated.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12385985/hadoop-372.patch
          against trunk revision 676772.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12385985/hadoop-372.patch against trunk revision 676772. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2860/console This message is automatically generated.
          Hide
          Chris Smith added a comment -

          The unit test failure (a timeout in hdfs.TestFileCreation.testClientTriggeredLeaseRecovery) wasn't caused by this patch. The tests pass locally, and nothing in the patch will even be used without either specifying the Delegating

          {Mapper,InputFormat}

          manually or using the new methods in FileInputFormat, which that test case obviously doesn't do.

          Show
          Chris Smith added a comment - The unit test failure (a timeout in hdfs.TestFileCreation.testClientTriggeredLeaseRecovery) wasn't caused by this patch. The tests pass locally, and nothing in the patch will even be used without either specifying the Delegating {Mapper,InputFormat} manually or using the new methods in FileInputFormat, which that test case obviously doesn't do.
          Hide
          Tom White added a comment -

          I've just committed this. Thanks Chris!

          Show
          Tom White added a comment - I've just committed this. Thanks Chris!
          Hide
          Alejandro Abdelnur added a comment -

          I find this feature useful but I'm not sure about how it has been implemented (apologies for the late comment, just become aware of it when seeing the Jira Resolved email).

          FileInputFormat API changes:

          All the methods added to the FileInputFormat are not related to InputFormat functionality.

          In my opinion they should be in a separate class (something like MultipleInputs )

          Package for all the patch classes:

          All these classes should be in org.apache.hadoop.mapred.lib thus keeping the core mapred with exactly that, the core, this is a useful extension but still is an extension.

          JobConf for the different mappers:

          The current code forces all mappers to have the same JobConf, this means that mappers have to be written with knowledge of the other mappers to avoid collision in the configuration keys.

          How about passing along a JobConf instance when adding an input path with its own Mapper, something like HADOOP-3702 is doing when adding a mapper to the chain?

          Show
          Alejandro Abdelnur added a comment - I find this feature useful but I'm not sure about how it has been implemented (apologies for the late comment, just become aware of it when seeing the Jira Resolved email). FileInputFormat API changes: All the methods added to the FileInputFormat are not related to InputFormat functionality. In my opinion they should be in a separate class (something like MultipleInputs ) Package for all the patch classes: All these classes should be in org.apache.hadoop.mapred.lib thus keeping the core mapred with exactly that, the core, this is a useful extension but still is an extension. JobConf for the different mappers: The current code forces all mappers to have the same JobConf , this means that mappers have to be written with knowledge of the other mappers to avoid collision in the configuration keys. How about passing along a JobConf instance when adding an input path with its own Mapper , something like HADOOP-3702 is doing when adding a mapper to the chain?
          Hide
          Tom White added a comment -

          Alejandro,

          Thanks for the feedback. I've created HADOOP-3853 to cover your first two points. The third (JobConf for the different mappers) is an enhancement that should probably go in another issue.

          Show
          Tom White added a comment - Alejandro, Thanks for the feedback. I've created HADOOP-3853 to cover your first two points. The third (JobConf for the different mappers) is an enhancement that should probably go in another issue.
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/ )

            People

            • Assignee:
              Chris Smith
              Reporter:
              Runping Qi
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development