Thank you for your comments. Here are my thoughts.
On the Map side, the external sorter would also need the partition number, notjust the key and value. I am not sure how RecordWriter can be used. In the current MapTask, the sorting starts in MapOutputBuffer which implements
MapOutputCollector. I thought it is natural for an external sorter to extendMapOutputCollector interface as well. Perhaps, following Steve's suggestion we can rewrite MapOutputCollector as:
public interface MapOutputCollector<K, V> extends Closeable
public void collect(K key, V value, int partition)
throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException;
At present, "extends Closeable" is missing.
If you treat the framework's sorter as a black box, it accepts a set of key and value pairs but produces raw key and value pairs. There is an asymmetry.
An external sorter may not produce a RawKeyValueIterator(due to its own serialization mechanism - for example if records are piped to GNU sort, the key and value may be serialized with a TAB separator between them.) If an
external sorter would like to use CombinerRunner classes defined in Task.java, it cannot do so without incurring an additional data move. I was looking for an iterator that will return simple key and values. I could not find any that is efficient. The RecordReader looks appropriate functionally, but is not efficient when passed to ValuesIterator(defined in Task.java) which gets used as part of running a Combiner. The key and values returned from RecordReader will have to be copied. Any kind of such data move will affect the performance especially when dealing with huge volume of data.
I had to come up with a simple key, value iterator as below:
public interface KeyValueIterator<K, V> extends Closeable
* Get the current key.
* @param key where the current key should be stored. If this is null, a new
* instance will be created and returned.
* @param key current key
K getCurrentKey(K key) throws IOException, InterruptedException;
* Get the current value.
* @param value where the current value should be stored. If this is null, a
* new instance will be created and returned.
* @return value current value.
V getCurrentValue(V value) throws IOException, InterruptedException;
* Set up to get the current key and value (for getKey() and getValue()).
* @return <code>true</code> if there exists a key/value, <code>false</code>
boolean nextKeyValue() throws IOException, InterruptedException;
* Get the Progress object; this has a float (0.0 - 1.0) indicating the bytes
* processed by the iterator so far.
* @return progress object.
I was able to wrap the framework's RawKeyValueIterator inside an implementation of KeyValueIterator without additional data move. This makes sure that anything outside the sorter sees only the simple key, value iterator. The serialized representation stays internal to the sorter black box. The external sorter is also happy as it does not incur any extra data move
The abstract base class and
As I mentioned in my previous post, I created an abstract base class called MapOutputSorterAbstract(I am attaching the source to Jira-2454) in order to access package protected class methods. I would appreciate if developers
MAPREDUCE-279 can take a look at the class and comment on whether the class can live completely outside the framework. In MapTask.java, I needed to change the access of APPROX_HEADER_LENGTH to package public from private.
My specific questions are:
If TaskReporter, Counter, MapOutputFile be accessible as public classes, how can they be passed to external sorter from MapTask.java?
Will CombinerRunner as defined in Task.java be available?(I had to change the access to public.)
The classes SpillRecord and IndexRecord should also be made public. Since IndexRecord is not an inner class of SpillRecord, I created another file IndexRecord.java and moved the code there.
On the Reduce side, I was trying to come up with an interface that can be implemented by both the framework as well as an external sorter. It was not easy to decouple shuffling and Merger since the shuffle is driving the Merge not
the other way around. Since I wanted to reuse the framework's shuffle code, I ended up using a few ugly if's so that data is shuffled either to the framework's Merger or to an external sorter.
If the shuffle code can somehow be invoked from outside the core packages using public interfaces, the external sorter on the Reduce side can just implement a simple key, value iterator. I think this might require some inversion of control and code rewrite in some sensitive areas.
I thought of taking a simple approach: Users can configure a job with the name of the external sorter classes in configuration parameters like mapred.map.externalsort.class and mapred.reduce.externalsort.class and use
simple Java class loader to load the class. This is very similar to configuring a Mapper class for example. Am my missing something? Is there a strong reason to use ServiceLoader?
Please give me your feedback.
If developers do not get the full picture of what I was playing with, I can try to make my changes locally on top of
MAPREDUCE-279 branch and post a patch file.
Thanks everyone for your patience.