A new class org.apache.hadoop.mapred.pipes.Submitter will have a public static method to submit a job as a JobConf and a main method that takes an application and optional configuration file, input directories, and output directory. The cli for the new main will look like:
bin/hadoop jar hadoop-*-pipes.jar \ [-input inputDir] \ [-output outputDir ] \ [-conf configurationFile ] \ [-jar applicationJarFile ] \ program
The application program will link against a thin C++ wrapper library that will handle the communication with the rest of the Hadoop system. A goal of the interface is to be "swigable" so that interfaces can be generated for perl, python, and other scripting languages. All of the C++ functions and classes are in the HadoopPipes namespace.
Hadoop will be given a generic Java class for handling the mapper and reducer (PipesMapRunner and PipesReduceRunner). They will fork off the application program and communicate with it over a socket. The communication will be handled by the C++ wrapper library and the PipesMapRunner and PipesReduceRunner.
The application program will pass in a factory object that can create the various objects needed by the framework to the runTask function. The framework will create the Mapper or Reducer as appropriate and call the map or reduce method to invoke the application's code. The JobConf will be available to the application.
class JobConf {
public:
bool hasKey(const std::string& key);
const std::string& get(const std::string& key);
int getInt(const std::string& key);
float getFloat(const std::string& key);
bool getBoolean(const std::string& key);
};
The Mapper and Reducer objects get all of their inputs, outputs, and
context via context objects. The advantage of using the context
objects is that their interface can be extended with additional
methods without breaking clients. Although this interface is different
from the current Java interface, the plan is to migrate the Java
interface in this direction.
Although the Java implementation is typed, the C++ interfaces of keys and values is just a byte buffer. Since STL strings provide precisely the right functionality and are standard, they will be used. The decision to not use stronger types was to simplify the interface.
/**
* Task context provides the information about the task and job.
*/
class TaskContext {
public:
/**
* Get the JobConf for the current task.
*/
virtual JobConf* getJobConf() = 0;
/**
* Get the current key.
* @return the current key or NULL if called before the first map or reduce
*/
virtual const std::string* getInputKey() = 0;
/**
* Get the current value.
* @return the current value or NULL if called before the first map or reduce
*/
virtual const std::string* getInputValue() = 0;
/**
* Generate an output record
*/
virtual void emit(const std::string& key,
const std::string& value) = 0;
/**
* Mark your task as having made progress without changing the status
* message.
*/
virtual void progress() = 0;
/**
* Set the status message and call progress.
*/
virtual void setStatus(const std::string& status) = 0;
/**
* Get the name of the key class of the input to this task.
*/
virtual const std::string& getInputKeyClass() = 0;
/**
* Get the name of the value class of the input to this task.
*/
virtual const std::string& getInputValueClass() = 0;
virtual ~TaskContext() {}
};
/**
* The context for Mappers.
*/
class MapContext: public TaskContext {
public:
/**
* Access the InputSplit of the mapper.
*/
virtual std::string* getInputSplit() = 0;
};
/**
* The context for Reducers.
*/
class ReduceContext: public TaskContext {
public:
/**
* Advance to the next value.
*/
virtual bool nextValue() = 0;
};
Since the interesting stuff has moved into the TaskContext, the Mapper
and Reducer interfaces are very simple.
class Closable {
public:
virtual void close() {}
virtual ~Closable() {}
};
/**
* The application's mapper class to do map.
*/
class Mapper: public Closable {
public:
virtual void map(MapContext& context) = 0;
};
/**
* The application's reducer class to do reduce.
*/
class Reducer: public Closable {
public:
virtual void reduce(ReduceContext& context) = 0;
};
The application can also define combiner functions. The combiner will
be run locally by the framework in the application process to avoid
the round trip to the Java process and back. Because the compare
function is not available in C++, the combiner will use memcmp to
sort the inputs to the combiner. This is not as general as the Java
equivalent, which uses the user's comparator, but should cover the
majority of the use cases. As the map function outputs key/value
pairs, they will be buffered. When the buffer is full, it will be
sorted and passed to the combiner. The output of the combiner will be
sent to the Java process.
The application can also set a partition function to control which key
is given to a particular reduce. If a partition function is not
defined, the Java one will be used. The partition function will be
called by the C++ framework before the key/value pair is sent back to
Java.
/**
* User code to decide where each key should be sent.
*/
class Partitioner {
public:
virtual int partition(const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
To make the Hadoop Java sort and merge work, the MapOutputKeyClass
will be used one the Java side to deserialize the keys coming out of
the mapper. The easiest way to make it work is use Hadoop Record IO
objects, so that the C++ and Java implementations are equivalent.
/**
* For applications that want to read the input directly for the map function
* they can define RecordReaders in C++.
*/
class RecordReader: public Closable {
public:
virtual bool next(std::string& key, std::string& value) = 0;
/**
* The progress of the record reader through the split as a value between
* 0.0 and 1.0.
*/
virtual float getProgress() = 0;
};
OutputFormats are controlled similarly:
/**
* An object to write key/value pairs as they are emited from the reduce.
*/
class RecordWriter: public Closable {
public:
virtual void emit(const std::string& key,
const std::string& value) = 0;
};
/**
* A factory to create the necessary application objects.
*/
class Factory {
public:
virtual Mapper* createMapper(MapContext& context) = 0;
virtual Reducer* createReducer(ReduceContext& context) = 0;
/**
* Create a combiner, if this application has one.
* @return the new combiner or NULL, if one is not needed
*/
virtual Reducer* createCombiner(MapContext& context) {
return NULL;
}
/**
* Create an application partitioner object.
* @return the new partitioner or NULL, if the default partitioner should be
* used.
*/
virtual Partitioner* createPartitioner(MapContext& context) {
return NULL;
}
/**
* Create an application record reader.
* @return the new RecordReader or NULL, if the Java RecordReader should be
* used.
*/
virtual RecordReader* createRecordReader(MapContext& context) {
return NULL;
}
/**
* Create an application record writer.
* @return the new RecordWriter or NULL, if the Java RecordWriter should be
* used.
*/
virtual RecordWriter* createRecordWriter(ReduceContext& context) {
return NULL;
}
virtual ~Factory() {}
};
/**
* Run the assigned task in the framework using the objects created by
* the Factory.
* @return true, if the task succeeded.
*/
bool runTask(Factory& factory);