|
+1
lots of people beginning to use hadoop - the earlier the better .. Ok, here is a rough cut at a proposed interface for mappers and reducers. I've put them into a new package so that we can deprecate the old classes, but continue to support both. I've done some of the example mappers and reducers in the lib directory. In particular, word count is TokenCountMapper and IntSumReducer.
Thoughts? The strategy here is to remove stuff that is likely to evolve from interfaces that users implement, so that we can change it without breaking user implementations.
Note that, in general, interfaces are much more difficult to evolve than abstract base classes, and should thus only be used when "mixin" behavior is required, e.g., when one needs to be able to add an interface to an existing class. For example, interfaces like Serializeable and Writable are great, since folks might need to have a class that implements both, which wouldn't be possible if they were abstract base classes. Some folks seem to believe that interfaces are somehow 'cleaner' and should be used for all public facing APIs. But that's not true: interfaces are limited to a subset of what abstract base classes can do, and an abstract class's ability to provide default implementations of methods greatly facilitates API evolution. Again (at the risk of sounding repetitive), the only advantage of interfaces is that a class can implement more than one of them. So, as we revisit our core APIs that users implement for the purpose of making them more easily backward compatible, we should use abstract base classes in place of interfaces whenever it is clear that multi-inheritance is not a requirement. Some cases to consider:
Other more minor questions:
Yes, it was intentional. With ReflectionUtils, we configure objects if the run-time type is Configurable or JobConfigurable. So we don't need to declare it in the interface.
I was going to add it later, but I guess I should add it now. It would mean that there is: BytesWritable getRawKey() throws IOException; BytesWritable getRawValue() throws IOException; void collectRaw(BytesWritable key, BytesWritable value) throws IOException; I guess at that point, I could redo the IdentityMapper and IdentityReducer to pass along raw values. > With ReflectionUtils, we configure objects if the run-time type is Configurable or JobConfigurable.
Should we also then close them if they're Closeable? Maybe we don't need any interfaces, but just use Object everywhere and introspect! (The devil is my client today.) > BytesWritable getRawKey() throws IOException; Or should that be 'void getRawKey(ByteBuffer)'? Or perhaps 'void getRawKey(DataOutputBuffer)'? +1
Here are a few comments:
I prefer K1, V1, K2, V2, K3, V3 when emphasizing the relation between all types in a MapReduce job, or K, V when this distinction is not important. Other possibilities would be KI, VI, KO, VO, or KIN, VIN, KOUT, VOUT.
For pipes and streaming to work, it has to be fair to call collect up until the close finishes. For a certain style of applications, they need to trail the input and they need to handle the last set of records in the close. Currently, most of them keep a handle on the collector to use in the close, but it is better to give it to them explicitly.
Hmm. I forgot that we do that. I guess I'd prefer to add a new method rather than make ReduceContext implement Iterator. Iterator<VALUEIN> getValues() throws IOException;
Going through JobConf is another big job that should probably be done at some point. The width of that interface makes it somewhat problematic.
I really find the types hard to read if they are K1, V1. I guess I could make them all uppercase KEYIN, VALUEIN, KEYOUT, VALUEOUT. Ok, here is an update. I made the api into abstract classes. I'm not totally convinced that it is good for Mapper and Reducer, although it does allow me to get rid of the MapperBase and ReduceBase. I also changed the generic parameters to all caps although it looks ugly to my eyes. I also added the iterator and iterable to the reduce context.
I didn't add the raw stuff yet, because it isn't clear to me how to string it all together. (I can do it on the map/reduce api, but how does this play with the input format, output format, partitioner, and comparator?) > I also changed the generic parameters to all caps although it looks ugly to my eyes.
Might capitalization be a good compromise? Also I slightly prefer In/Out before Key/Value rather than after in these names, i.e., InKeyClass, InValClass, OutKeyClass & OutValClass to KeyInClass, ValInClass, KeyOutClass, & ValOutClass. > I didn't add the raw stuff yet [ ... ] That's fine. It should probably be a separate issue. This comment is on the general observation that doug made about interfaces and abstract classes in this Jira.
Doug, your observation about the power of abstract class for evolution is right on! However, your viewpoint that "Some folks seem to believe that interfaces are somehow 'cleaner' and should be used for all public facing APIs. But that's not true" is not quite correct. Joshua Bloch in "Effective Java" argues the use of both interfaces and abstract base classes together. (see Effective Java ...item 16 p 84): "You can combine the virtues of interfaces and abstract classes by providing an abstract skeletal implementation class to go with each non trivial interface you export." I believe your argument is flawed for the following reasons.
The caller then becomes confused about which is their interface. For example there may be methods in the base class that are to be used by the implementers or for testing and debugging the abstract class or its implementation. The callers of the main interface don't need to see any of these implementation-targeted methods. 4) In some cases you may have multiple base classes providing different implementation properties for implementing a single interface; the public interface for both abstract base classes are best defined in a single Java Interface. Your observation about the power of abstract class for evolution is right on: interfaces that are directly implemented by implementations are hard to evolve because ALL the implementations need to be fixed to add the new methods. Unfortunately you jumped to the wrong conclusion of using on an abstract base class for defining an interface. A better approach is to define the interface as a Java Interface and provide at least one abstract base class for the implementations for the interface. Sanjay, if we provide both an abstract base class and an interface, and we change the API, then we still break those folks foolish enough to implement the interface directly. In the cases here, I don't think we should provide interfaces, just abstract classes.
On 'Interfaces vs Abstract classes':
I won't go on the pros an cons, I'll just suggest an approach that IMO works pretty good.
On the use of Context for Mapper.map and Reducer.reduce : Have you considered defining (like the Java Servlet API) request and response parameters, where the request is the INPUT (normally read-only) and the response is the OUTPUT? > the framework the one that always creates this objects and they are not replaceable nothing would break in any application code when they are extended.
That's a big assumption. For example, one might write a user library that chains mappers by implementing custom MapContexts. In my experience, if you expose a public abstract API, users will implement it, even if you didn't imagine they would. Maybe I have not explained things clearly.
You could do custom MapContext even it it is an interface. You would have a MapContext interface and a MapContextWrapper class that implements MapContext and delegates all its methods to the MapContext instance provided to its constructor. If you want to have your own MapContext implementation you'd extend MapContextWrapper in a MyMapContext class and override the necessary methods. If Hadoop later adds a new method to the Interface MapContext it will implemented also in MapContextWrapper, thus your MyMapContext will continue to work without breaking. This is the pattern the Servlet API uses with the request/response interfaces and their corresponding wrapper classes. Using abstract classes to define contracts create some problems in certain situations:
> If you want to have your own MapContext implementation you'd extend MapContextWrapper [ ...]
Sorry if I'm being thick, but I don't understand the point of having the MapContext interface if no one is to implement it directly. We cannot remove methods from it without breaking all callers. We could add methods if all implementations always implement an abstract base class ( MapContextWrapper) instead, but how can we guarantee that folks don't directly implement the interface? > you cannot have a class extending 2 abstract classes, you can do that with interfaces This feature is over-rated. One can easily define two classes that share state. > convenience methods in your abstract class become part of the contract when they should not. Why should they not? What problems does this cause? In my experience, interfaces lead to more testable designs as they are easier to mock/stub/etc than are classes. There are a number of tools that support mocking of interfaces (via dynamic proxies) in Java and therefore enable easier unit test development.
> interfaces lead to more testable designs
Perhaps, but a well-designed abstract class should be just as testable. The bottom line is that, if we're serious about providing backwards-compatiblity to our users, and we expect our APIs to evolve, then we should not use interfaces for anything with more than a single method, simple method. Doug proposed checking the code in as we work on this patch, because it isn't called by the rest of the code and will be far easier to review. So the new api is in src/mapred/org/apache/hadoop/mapreduce. Notable changes since the last patch:
Missing:
This looks great! Thanks!
A few questions:
Overall it's nice to see how natural this looks! > > If you want to have your own MapContext implementation you'd extend MapContextWrapper [ ...]
> > Sorry if I'm being thick, but I don't understand the point of having the MapContext interface if no one is to implement it directly. > We cannot remove methods from it without breaking all callers. We could add methods if all implementations always implement > an abstract base class ( MapContextWrapper) instead, but how can we guarantee that folks don't directly implement the interface? You recommend extending the wrapper class instead implementing the interface. Again this is the pattern followed by the Servlet API for developers to replace the request/response objects that are used downstream. >> you cannot have a class extending 2 abstract classes, you can do that with interfaces Take the OutputCollector and Reporter interfaces, you could have a single class implementing both. Then use your class as parameter to methods that expect either and OutputCollector or a Reporter, you cannot do that if you are using abstract classes. >> convenience methods in your abstract class become part of the contract when they should not. IMO the contract should be as spartan as possible. But doing that is very clear what the container responsibilities are. Take the Servlet interface, it defines 5 methods init, getServletConfig, getServletInfo, service, destroy that is the contract with the container. GenericServlet abstract class adds 7 convenience methods. HttpServlet adds another 9 convenience methods. peace! Overall this looks great. I think the interface is much more approachable than the last one.
1. What is the contract for cleanup()? Is is called if map()/reduce() throws an exception? I think it should be, so Mapper/Reducer#run should call cleanup() in a finally clause.
That makes sense.
I guess so. I can't see any reasonable excuse for implementing multiple of these interfaces in the same class.
Currently, it is just: public void run(Context context) throws IOException, InterruptedException { setup(context); KEYIN key = context.nextKey(null); VALUEIN value = null; while (key != null) { value = context.nextValue(value); map(key, value, context); key = context.nextKey(key); } cleanup(context); } I thought about it, but it seemed to confuse things more than it helped. I guess it mostly depends on whether cleanup is used to close file handles, which should happen, or to process the last record which shouldn't happen. Of course by overriding the run method, the user can do either. What are other people's thoughts?
The problem that I have is that it would need to bypass the RecordReader to do it. If you add to the context InputStream getKey() throws IOException; InputStream getValue() throws IOException; you need to add a parallel method in RecordReader to get raw keys. And presumably the same trick in the RecordWriter for output. On the other hand, a lazy value class could have a file-backed implementation could work with the object interface. Am I missing how this would work?
It is a pretty minor improvement of for(VALUE v: context) - versus - for(VALUE v: context.getValues()) and means that the ReduceContext needs an iterator() method that is relatively ambiguous between iterating over keys or values. I think the current explicit method makes it cleaner.
This makes sense.
smile It probably makes sense, although I'm a little hesitant to break yet another thing.
+1 Nice.
Two questions and one comment: Q1: Why the string version of the Context.getCounter() method needs an int id ? The equivalent method in the Reporter does not which it means that we could do without this id. Having it, IMO, is error prone, what would happen if I give the same group/name with a different id? Q2: Would I be able to have a subclass of the Context that supports multiple outputs (ie via the MultipleOutputs) class? Will I be able to tell the runner to use that class wrapping the original Context? C1: Have you considered instead having a single Context having an InContext and an OutContext where the IN contains incoming stuff (key, values, splits, jobconf, etc.) and the OUT is used for the output stuff (collect). Then the OUT would most likely be the same for both the Map and Reduce and the the IN would have different subclasses for the Mapper and Reducer.
That was a slip up. The old interface looked like that. I'll fix it.
One advantage of making Mapper a base class instead of an interface is that I'd suggest something like: class MultipleOutputMapper extends Mapper { private stuff including outer context <K,V> void collect(String dest, K key, V value) throws IOException { ... } } the the user's mapper can extend MultipleOutputMapper and get the additional collect method. Does that make sense? It would also be possible to have the MultipleOutputMapper make a wrapper Context that included the additional method, but the map method would need to downcast, which seems less user-friendly.
Fundamentally, the map and reduce input are the same and are handled by the TaskAttemptContext. The ReduceContext just provides the utility functions getValues() to iterate through the values for the current key. I think it would be more confusing to have input, state, and output contexts. On #Q2, having a method in the Mapper would work but is not intuitive that the named output has to be written to something else than the context, I was hoping to be able to have a MOContext that extends Context and provide the additional collect methods.
I've played a little bit with the proposed API to see how MultipleOutputs could be integrated in a more natural way.
I've came up with 2 possible alternatives (following code sample for Mapper, for Reducer it would similar) Option1: Defined a MapContext subclass, MOMapContext, that wraps a MapContext instance delegating all methods to it and adding its own methods for multiple output support. Defined a Mapper subclass, MOMapper, that has an abstract moMap(MOMapContext) method and in map(MapContext) creates a MOMapContext instance and invokes the moMap(). Whoever wants to use multiple outputs should extend the MOMapper class instead Mapper. The code would look like: public abstract class MOMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Mapper<MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> { private MultipleOutputs multipleOutputs; public void configure(JobConf jobConf) { multipleOutputs = new MultipleOutputs(jobConf); } public final void map(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> context) throws IOException { MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> moc = new MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>(context, multipleOutputs); moMap(moc); } public abstract void moMap(MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> context) throws IOException; public void close() throws IOException { multipleOutputs.close(); } } public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { private MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext; private MultipleOutputs multipleOutputs; public MOMapContext(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext, MultipleOutputs multipleOutputs) { this.mapContext = mapContext; this.multipleOutputs = multipleOutputs; } //... delegates all MapContext methods to mapContext instance. // MO methods public void collect(String namedOutput, Object key, Object value) throws IOException { Reporter reporter = null; //TODO, how do I get a reporter ???? multipleOutputs.getCollector(namedOutput, reporter).collect(key, value); } public void collect(String namedOutput, String multiName, Object key, Object value) throws IOException { Reporter reporter = null; //TODO, how do I get a reporter ???? multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key, value); } } Option2: Defined a MapContext subclass, MOMapContext, that extends the concrete MapContext IMPL adding methods for multiple output support. The MapContext IMPL class should be both Configurable and Closeable (in the same lifecycle as the Mapper). The TaskRunner should look in the JobConf what implementation of the MapContext to use. Whoever wants to use multiple outputs just defines his/her Mapper as extends Mapper<MOMapContext<KIN, VIN, KOUT, VOUT>> and defines the multiple outpus in the JobConf as usual (this would set the right MapContext implementation). public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends MapContextIMPL<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { private MultipleOutputs multipleOutputs; public void configure(JobConf jobConf) { super.configure(jobConf); multipleOutputs = new MultipleOutputs(jobConf); } // MO methods public void collect(String namedOutput, Object key, Object value) throws IOException { Reporter reporter = null; //TODO, how do I get a reporter ???? multipleOutputs.getCollector(namedOutput, reporter).collect(key, value); } public void collect(String namedOutput, String multiName, Object key, Object value) throws IOException { Reporter reporter = null; //TODO, how do I get a reporter ???? multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key, value); } public void close() throws IOException { multipleOutputs.close(); super.close(); } } IMO Option 2 it would be more natural to the Map/Reduce developer as it does not introduce a separate Map/Reduce class with a different method moMap() to do the actual map logic and it does not need to create a lightweight MOMapContext on every map() invocation. Also, as the MOMapContext would extends the MapContext IMPL class there is not need to do a delegation for all methods as in the Option1 wrapper. In both cases I need to figure out how to get a Reporter to pass to the MultipleOutputs when getting the OutputCollector this is required as the the MultipleOutputs use counters. Thoughts? Alejandro,
You seem to be looking at the wrong code. I committed into trunk the current version of the proposed API. I still think that adding methods to the mapper is far more natural than making a wrapping output context. The two approaches would look like: Option 1 works for both subclasses that override run and/or the map methods class MultipleOutputMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { protected <KEY,VALUE> void collect(String output, KEY key, VALUE value) throws IOException {...} } Option 2 would only work with classes that override the map method public abstract class MultipleOutputMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { protected MultipleOutputContext extends Context { protected MultipleOutputContext(Context outerContext) { ... } void collect(String output, KEY key, VALUE value) throws IOException {...} } protected void setup(MultipleOutputContext context ) throws IOException, InterruptedException { } protected abstract void map(KEYIN key, VALUEIN value, MultipleOutputContext context ) throws IOException, InterruptedException; protected void cleanup(MultipleOutputContext context ) throws IOException, InterruptedException { } public void run(Context outerContext) throws IOException { MultipleOutputContext context = new MultipleOutputContext(outerContext); setup(context); KEYIN key = context.nextKey(null); VALUEIN value = null; while (key != null) { value = context.nextValue(value); map(key, value, context); key = context.nextKey(key); } cleanup(context); } } Note that these are NOT overrides of Mapper.setup, map, and cleanup, but instead are overloads of them. I think that option 1 is cleaner, but either one should work. Got the right code.
Yes, I can see how with the current API option #1 is simpler to implement. I'm not sure of option #2 with with the current API, it would be error prone if the developer implements the wrong signature. What I liked about option #2 was the fact the multiple-outputs were done to the Context as the standard-output. With the API in the patch I was looking at that was possible as the Context was parameterized as generic. On a side note, what is the value of having a inner Mapper.Context just extending MapContext (and the same for Reducer) but no adding any methods? Wouldn't be simpler just to use directly the MapContext in the Mapper methods signature (similarly in the Reducer) ? Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/
reposting question
... what is the value of having a inner Mapper.Context just extending MapContext (and the same for Reducer) but no adding any methods? Wouldn't be simpler just to use directly the MapContext in the Mapper methods signature (similarly in the Reducer) ? The point is to make it easier for the application to write their code. In particular, it is awkward to have 4 template parameters on every method definition. For instance: public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { ... } *or* public void map(Object key, Text value, Context<Object,Text,Text,IntWritable> context ) throws IOException, InterruptedException { ... } Really, I'm just using it as a short hand to make the method signatures easier for the programmer to deal with. On Thu, Aug 28, 2008 at 12:23 AM, Alejandro Abdelnur (JIRA) <jira@apache.org
The point is to make it easier for the application to write their code. In public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { ... } *or* public void map(Object key, Text value, Context<Object,Text,Text,IntWritable> context ) throws IOException, InterruptedException { ... } Really, I'm just using it as a short hand to make the method signatures – Owen The following code in method run() in class org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper always causes ClassCastException:
runners = (MapRunner[]) new Object[numberOfThreads]; Is there a need to explicitly pass Context object in map and cleanup methods? These are expected to be called after setup. Would it be better to set Context in a protected member variable in setup? map and cleanup are expected to use the same reference of Context, right?
I think this will simplify the code as we would require to pass Context to only 2 methods out of 4. So may not even require to create a shorthand Mapper#Context Going forward also if we add more methods, it will keep the method signatures simple. The point is to simplify the code. Clearly they can retain a reference to
it, but it will create lots of questions over whether they stay valid. Having the context object passed into the map and reduce methods makes it very easy and clear. Here is my current patch. It is working with both the original word count and the new word count. Things to do:
1. Move the old word count to the test jar and rename the NewWordCount to WordCount. 2. Add OutputCommitters to the new api. 3. Add some unit tests for the new api. This is looking good! Some random comments from a quick read through:
Ok, this includes the OutputCommitter. After looking closely at the current FileOutputCommitter and the FileOutputFormat, I decided that Allejandro was right and added a getOutputCommitter method to the OutputFormat. I also moved the interaction between the two around. Now:
1. The FileOutputCommitter is given the output directory when it is created. 2. Only the FileOutputCommitter knows where the work directories are. 3. The output files of the FileOutputCommitter are now "part-r-00001" instead of part-00001. 4. The unique filenames are unified with the "normal" output filenames. 5. The old OutputCommitter extends the new OutputCommitter and we always use it through the new api. Before, I must have accidently run the old code. NewWordCount works now. I haven't run the unit tests yet. Starting up that process. I also addressed all of Doug's previous concerns.
A nit: copy is more intuitive left-to-right, src->dst, not dst<-src.
This adds a test case, fixes TestFileOutputCommitter, switches the src and dst for clone, moves the example.WordCount to test OldWordCount and renames the example NewWordCount to WordCount.
It now passes all unit tests. I cast a cursory glance and it looks fine. +1
Minor nits include better documentation for deprecated classes etc.; something we can do after the feature freeze. We need to run jdiff etc. before we cut the release... This version cleans up findbugs and javadoc warnings.
Fix last of the findbugs warnings.
[exec] -1 overall.
[exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 28 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] -1 findbugs. The patch appears to introduce 21 new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. The findbugs are all complaints about hiding method names that is deliberate to allow backward compatibility. -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12396117/h1230.patch against trunk revision 726129. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 28 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 21 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3747/testReport/ This message is automatically generated. Integrated in Hadoop-trunk #783 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/783/
. Removing empty directory that was left in subversion. You might find that unless you pass (Context, Key, Value) as parameters to map(), it is very hard to implement ChainedMapper, since you will have to delegate an entire Context. It will also be very hard to do the things I want to do with Hadoop. Unless I hear a good argument otherwise, I will submit a new ticket.
If you want to lazily deserialize the values, there are a couple of options: (a) Choice of methods on input object: (b) Choice of methods to override in Mapper: I would recommend the following additions:
Also what happens to the hadoop.streaming classes? |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
in the TaskContext object.