Hadoop Pipes

We need a way to convert a large body of C++ code to use Hadoop DFS and map/reduce. The primary approach will be to split the C++ code into a separate process that does the application specific code. In many ways, the approach will be similar to Hadoop streaming, but using Writable serialization to convert the types into bytes that are sent to the process via a socket.

A new class org.apache.hadoop.mapred.pipes.Submitter will have a public static method to submit a job as a JobConf and a main method that takes an application and optional configuration file, input directories, and output directory. The cli for the new main will look like:

bin/hadoop jar hadoop-*-pipes.jar \
  [-input inputDir] \
  [-output outputDir ] \
  [-conf configurationFile ] \
  [-jar applicationJarFile ] \
  program

The application program will link against a thin C++ wrapper library that will handle the communication with the rest of the Hadoop system. A goal of the interface is to be "swigable" so that interfaces can be generated for perl, python, and other scripting languages. All of the C++ functions and classes are in the HadoopPipes namespace.

Hadoop will be given a generic Java class for handling the mapper and reducer (PipesMapRunner and PipesReduceRunner). They will fork off the application program and communicate with it over a socket. The communication will be handled by the C++ wrapper library and the PipesMapRunner and PipesReduceRunner.

The application program will pass in a factory object that can create the various objects needed by the framework to the runTask function. The framework will create the Mapper or Reducer as appropriate and call the map or reduce method to invoke the application's code. The JobConf will be available to the application.

class JobConf { 
public:
  bool hasKey(const std::string& key);
  const std::string& get(const std::string& key);
  int getInt(const std::string& key);
  float getFloat(const std::string& key);
  bool getBoolean(const std::string& key);
};
The Mapper and Reducer objects get all of their inputs, outputs, and context via context objects. The advantage of using the context objects is that their interface can be extended with additional methods without breaking clients. Although this interface is different from the current Java interface, the plan is to migrate the Java interface in this direction.

Although the Java implementation is typed, the C++ interfaces of keys and values is just a byte buffer. Since STL strings provide precisely the right functionality and are standard, they will be used. The decision to not use stronger types was to simplify the interface.

/**
 * Task context provides the information about the task and job.
 */
class TaskContext {
public:
  /**
   * Get the JobConf for the current task.
   */
  virtual JobConf* getJobConf() = 0;

  /**
   * Get the current key. 
   * @return the current key or NULL if called before the first map or reduce
   */
  virtual const std::string* getInputKey() = 0;

  /**
   * Get the current value. 
   * @return the current value or NULL if called before the first map or reduce
   */
  virtual const std::string* getInputValue() = 0;

  /**
   * Generate an output record
   */
  virtual void emit(const std::string& key, 
                    const std::string& value) = 0;

  /**
   * Mark your task as having made progress without changing the status 
   * message.
   */
  virtual void progress() = 0;

  /**
   * Set the status message and call progress.
   */
  virtual void setStatus(const std::string& status) = 0;

  /**
   * Get the name of the key class of the input to this task.
   */
  virtual const std::string& getInputKeyClass() = 0;

  /**
   * Get the name of the value class of the input to this task.
   */
  virtual const std::string& getInputValueClass() = 0;

  virtual ~TaskContext() {}
};

/**
 * The context for Mappers.
 */
class MapContext: public TaskContext {
public:

  /**
   * Access the InputSplit of the mapper.
   */
  virtual std::string* getInputSplit() = 0;

};

/**
 * The context for Reducers.
 */
class ReduceContext: public TaskContext {
public:
  /**
   * Advance to the next value.
   */
  virtual bool nextValue() = 0;
};
Since the interesting stuff has moved into the TaskContext, the Mapper and Reducer interfaces are very simple.
class Closable {
public:
  virtual void close() {}
  virtual ~Closable() {}
};

/**
 * The application's mapper class to do map.
 */
class Mapper: public Closable {
public:
  virtual void map(MapContext& context) = 0;
};

/**
 * The application's reducer class to do reduce.
 */
class Reducer: public Closable {
public:
  virtual void reduce(ReduceContext& context) = 0;
};
The application can also define combiner functions. The combiner will be run locally by the framework in the application process to avoid the round trip to the Java process and back. Because the compare function is not available in C++, the combiner will use memcmp to sort the inputs to the combiner. This is not as general as the Java equivalent, which uses the user's comparator, but should cover the majority of the use cases. As the map function outputs key/value pairs, they will be buffered. When the buffer is full, it will be sorted and passed to the combiner. The output of the combiner will be sent to the Java process. The application can also set a partition function to control which key is given to a particular reduce. If a partition function is not defined, the Java one will be used. The partition function will be called by the C++ framework before the key/value pair is sent back to Java.
/**
 * User code to decide where each key should be sent.
 */
class Partitioner {
public:
  virtual int partition(const std::string& key, int numOfReduces) = 0;
  virtual ~Partitioner() {}
};
To make the Hadoop Java sort and merge work, the MapOutputKeyClass will be used one the Java side to deserialize the keys coming out of the mapper. The easiest way to make it work is use Hadoop Record IO objects, so that the C++ and Java implementations are equivalent.

C++ Input and Output Formats

Some clients need to be able to define RecordReaders (for handling map inputs) or RecordWriters (for handling reduce output) in C++ rather than Java. In this case, the description of where the input needs to come from or be written to will be sent to the application program rather than the complete stream of key/value pairs.
/**
 * For applications that want to read the input directly for the map function
 * they can define RecordReaders in C++.
 */
class RecordReader: public Closable {
public:
  virtual bool next(std::string& key, std::string& value) = 0;

  /**
   * The progress of the record reader through the split as a value between
   * 0.0 and 1.0.
   */
  virtual float getProgress() = 0;
};
OutputFormats are controlled similarly:
/**
 * An object to write key/value pairs as they are emited from the reduce.
 */
class RecordWriter: public Closable {
public:
  virtual void emit(const std::string& key,
                    const std::string& value) = 0;
};

Factory

Finally, the application's main method needs to create a factory object that is specialized to create the application's types.
/**
 * A factory to create the necessary application objects.
 */
class Factory {
public:
  virtual Mapper* createMapper(MapContext& context) = 0;
  virtual Reducer* createReducer(ReduceContext& context) = 0;

  /**
   * Create a combiner, if this application has one.
   * @return the new combiner or NULL, if one is not needed
   */
  virtual Reducer* createCombiner(MapContext& context) {
    return NULL; 
  }

  /**
   * Create an application partitioner object.
   * @return the new partitioner or NULL, if the default partitioner should be 
   *     used.
   */
  virtual Partitioner* createPartitioner(MapContext& context) {
    return NULL;
  }

  /**
   * Create an application record reader.
   * @return the new RecordReader or NULL, if the Java RecordReader should be
   *    used.
   */
  virtual RecordReader* createRecordReader(MapContext& context) {
    return NULL; 
  }

  /**
   * Create an application record writer.
   * @return the new RecordWriter or NULL, if the Java RecordWriter should be
   *    used.
   */
  virtual RecordWriter* createRecordWriter(ReduceContext& context) {
    return NULL;
  }

  virtual ~Factory() {}
};

/**
 * Run the assigned task in the framework using the objects created by
 * the Factory.
 * @return true, if the task succeeded.
 */
bool runTask(Factory& factory);

Protocol

The messages between the C++ and Java code are given below:

Downstream messages

The following list is the possible messages from the Java code to the C++ application.
start(int version)
Start communicating with the server, which is speaking the given version.
setJobConf(String[] keyValues)
Send the list of keys and values that represents the JobConf.
runMap(String inputSplit, boolean pipedInput, String keyClass, String valueClass)
Run a map with the given input split. If pipedInput is false the application is required to provide a C++ InputFormat.
mapItem(String key, String value)
Map the given key/value pair.
runReduce(int reduce, boolean pipedOutput)
Run a reduce. If pipedOutput is false, the application is required to provide a C++ RecordWriter.
reduceKey(String key)
Give the reduce a new key.
reduceValue(String value)
Give the reduce a new value with the current key.
abort()
Shutdown the application program immediately
close()
The task has been given all of the input and should finish processing it.

Upstream messaages

The following list is the possible messages from the C++ application to the Java code.
output(String key, String value)
Record an output from a map or reduce.
partitionedOutput(int partition, String key, String Value)
If maps define a C++ partition function, they will send the partition value as well as the key and value.
status(String msg)
The application updated the status string. This is throttled to at most 1 a second.
progress()
Mark the task as having made progress without updating the status message.
done()
The task finished correctly.