Issue Details (XML | Word | Printable)

Key: HADOOP-234
Type: New Feature New Feature
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Owen O'Malley
Reporter: Sanjay Dahiya
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Hadoop Pipes for writing map/reduce jobs in C++ and python

Created: 19/May/06 04:28 PM   Updated: 08/Jul/09 04:41 PM
Return to search
Component/s: None
Affects Version/s: None
Fix Version/s: 0.14.0

Time Tracking:
Not Specified

File Attachments:
  Size
PDF File Hadoop MaReduce Developer doc.pdf 2006-05-30 06:58 PM Sanjay Dahiya 112 kB
HTML File Licensed for inclusion in ASF works hadoop-pipes.html 2007-02-18 08:28 AM Owen O'Malley 11 kB
Text File Licensed for inclusion in ASF works pipes-2.patch 2007-05-16 12:21 AM Owen O'Malley 3.98 MB
Text File Licensed for inclusion in ASF works pipes.patch 2007-04-18 10:30 PM Owen O'Malley 3.98 MB
Issue Links:
Dependants
 

Resolution Date: 16/May/07 07:23 PM


 Description  « Hide
MapReduce C++ support

Requirements

1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused.
2. Avoid users having to write both Java and C++ for this to work.
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions.
4. The interface should be SWIG'able.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Doug Cutting added a comment - 20/May/06 02:49 AM
It would be best to avoid deserializing & reserializing objects in Java before passing them to C. So you might instead always use BytesWritable for the input and output keys and values in Java, and implement a subclass of SequenceFileInputFormat that uses SequenceFile.next(DataOutputBuffer) to copy the raw binary content into the BytesWritable. Then your mapper and reducer implementations could simply get the bytes from the BytesWritable and pass it to the native method, without re-serializing. Does this make sense?

Owen O'Malley added a comment - 23/May/06 05:03 AM
I agree with Doug that if you are going to throw away the advantages of actually having meaningful types on the C++ that the keys and values on the Java side should be BytesWritable.

That said, I think it would be much less error-prone for the users and easier to understand and debug if you followed the Hadoop API much closer. Define a Writable and WritableComparable interfaces in C++. The Record IO classes will support them with a minor change to the code generator.


Sanjay Dahiya added a comment - 23/May/06 01:28 PM
I agree that keys/values should be BytesWritable in case we are not using typed data (Records). But I am trying to understand how to to avoid serialization/de-serialization multiple times between Java and C++ and still use Record IO.
So when we define a new record format and generate classes to work with the record, the generated classes contain Reader and Writer for the record. These read/write 'Record' objects from streams. One way to implement this would be to modify the class generation and make Reader extend SequenceInputFile and return(optionally) a ByetWritable rather than a Record. The idea is that the Record know how to read itself and its size etc. so we let it read from the input but it reads in BytesWritable rather than the record type in Java. we pass this BytesWritable to C++ and do deserialization only once to get the right type.
With this in place we should be able to avoid the need for serialization/de-serialization and users will not need to write extra code per type of record.

Doug Cutting added a comment - 23/May/06 11:48 PM
I think we can mostly achieve what we want by using the SequenceFile.next(DataOutputBuffer) method to read the raw bytes of each entry, regardless of the declared types. Thus the SequenceFile can still be created with the correct Java key and value classes, but when we actually read entries in Java we won't use instances of those classes, but rather just read the raw content of the entry into a byte-array container like BytesWritable that is passed to C. Then the C code can deserialize the Record instance from the byte-array container. So a C-based mapper should specify the real input key and value classes, but its InputFormat implementation will ignore that when entries are read, passing raw bytes to C.

Milind Bhandarkar added a comment - 24/May/06 12:32 AM
This can be done by providing just one C++ class in record IO. Currently, C++ version for record IO generates methods for each class that read from and write to InStream and OutStream interfaces, that contain only read and write methods. Creation of concrete classes that implement these interfaces is outside of the code generation. One could proovide a byteStream class in C++ that provides these interfaces. The construction of BytesOutStream happens in C++ and after serializing C++ record, this stream goes to Java via JNI, which is then converted to BytesWritable and written to the sequencefile. BytesInStream is created in Java tied with the sequencefile, and supplies bytes to the C++ record.

Sanjay Dahiya added a comment - 26/May/06 07:17 AM
I am not sure if I am following correctly here, I am new to the codebase and so I took some time to document the MapReduce flow in the system. There are some things I am not clear about (bold italics in attachment). would be great if you could clarify. It may help others new to the project.

In this specific case, RecordReader.next() is called by the framework (MapTask) and next() method knows the boundry of the next Record in the File. In case it is a complex RecordIO type the boundry wont be known to anyone except the generated object, which contains code for reading and writing the object. This is the part which is not clear how a generic SequenceFile would know size/boundries of RecordIO objects. If the generated code contains code for reading the next record in a ByteWritable in addition to Record, this will be easy to implement.
I think I am missing something here, can you please clarify ?


Sanjay Dahiya made changes - 26/May/06 07:17 AM
Field Original Value New Value
Attachment Hadoop MaReduce Developer doc.pdf [ 12334583 ]
Sanjay Dahiya added a comment - 30/May/06 06:58 PM
Updated ...

Sanjay Dahiya made changes - 30/May/06 06:58 PM
Attachment Hadoop MaReduce Developer doc.pdf [ 12334736 ]
Sameer Paranjpye made changes - 31/May/06 07:25 AM
Fix Version/s 0.4 [ 12311021 ]
Doug Cutting made changes - 06/Jun/06 06:16 AM
Workflow jira [ 12372161 ] no reopen closed [ 12372946 ]
Doug Cutting made changes - 07/Jun/06 04:38 AM
Workflow no reopen closed [ 12372946 ] no-reopen-closed [ 12373278 ]
Doug Cutting made changes - 29/Jun/06 04:11 AM
Fix Version/s 0.4.0 [ 12311021 ]
Fix Version/s 0.5.0 [ 12311939 ]
Doug Cutting made changes - 03/Aug/06 05:46 PM
Workflow no-reopen-closed [ 12373278 ] no-reopen-closed, patch-avail [ 12377476 ]
Doug Cutting made changes - 04/Aug/06 08:05 PM
Fix Version/s 0.5.0 [ 12311939 ]
Fix Version/s 0.6.0 [ 12312025 ]
Doug Cutting made changes - 08/Sep/06 08:23 PM
Fix Version/s 0.6.0 [ 12312025 ]
Sanjay Dahiya added a comment - 14/Oct/06 02:07 AM

[[ Old comment, sent by email on Mon, 22 May 2006 17:28:16 +0530 ]]

Hi Doug -
I was also looking for a way to avoid serialization and de-
serialization, but I am still not clear how do we use existing record
IO with this (without modifying generated classes)
When we define a new record format and generate classes to work with
the record, the generated classes contain Reader and Writer for the
record. These read/write 'Record' objects from streams. One way to
implement this would be to modify the class generation and make
Reader extend SequenceInputFile and return(optionally) a ByetWritable
rather than a Record. With this in place we should be able to avoid
the need for serialization/de-serialization and users will not need
to write extra code per type of record. Or am I missing something here ?

~Sanjay


Doug Cutting made changes - 15/Dec/06 09:40 PM
Assignee Owen O'Malley [ owen.omalley ]
Owen O'Malley made changes - 18/Feb/07 08:27 AM
Fix Version/s 0.12.0 [ 12312293 ]
Description MapReduce C++ support

Requirements

1. Allow users to write Map and Reduce functions in C++, rest of the
infrastructure already present in Java should be reused.
2. Avoid users having to write both Java and C++ for this to work.
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions.
4. Use Record IO for describing record format, both MR java framework and C++ should
use the same format to work seemlessly.
5. Allow users to write simple map reduce tasks without learning record IO if keys and values are
simple strings.

Implementation notes

- If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements
mapper and reducer methods in C++.
- For composite Record IO types user starts with defining a record format using Record IO DDL.
- User generates Java and C++ classes from the DDL using record IO.
- Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes.
- User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface
  makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
  generated C++ record IO classes.
- User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector.

Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) -

Native(JNI) Java proxy for the Mapper :
---------------------------------------
Without Record IO :-
--------------------
public class SimpleNativeMapper extends MapReduceBase implements Mapper {

/**
* Works on simple strings.
**/
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {

mapNative(key.toString().getBytes()
, value.toString().getBytes(), output, reporter);
}

/**
* Native implementation.
**/
private native void mapNative(byte[] key, byte[] value,
OutputCollector output, Reporter reporter) throws IOException;

}


With Record IO :-
------------------
public class RecordIONativeMapper extends MapReduceBase implements Mapper {

/**
* Implementation of map method, this acts as a JNI proxy for actual map
* method implemented in C++. Works for Record IO based records.
* @see map(byte[] , byte[], OutputCollector, Reporter)
*/
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {

byte[] keyBytes = null ;
byte[] valueBytes = null ;

try{
// we need to serialize the key and record and pass the serialized
// format to C++ / JNI methods so they can interpret it using appropriate
// record IO classes.
{
ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ;
BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;

((Record)key).serialize(boa, "WhatIsTag");
keyBytes = keyStream.toByteArray();
}
{
ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ;
BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;

((Record)key).serialize(boa, "WhatIsTag");
valueBytes = valueStream.toByteArray();
}
}catch(ClassCastException e){
// throw better exceptions
throw new IOException("Input record must be of Record IO Type");
}
// pass the serialized byte[] to C++ implementation.
mapNative(keyBytes, valueBytes, output, reporter);
}

/**
* Implementation in C++.
*/
private native void mapNative(byte[] key, byte[] value,
OutputCollector output, Reporter reporter) throws IOException;
}

OutputCollector Proxy for C++
------------------------------
public class NativeOutputCollector implements OutputCollector {

// standard method from interface
public void collect(WritableComparable key, Writable value)
throws IOException {
}

// deserializes key and value and calls collect(WritableComparable, Writable)
public void collectFromNative(byte[]key, byte[]value){
// deserialize key and value to java types ( as configured in JobConf )
// call actual collect method
}
}



Core Native functions ( helper for user provided Mapper and Reducer )
---------------------------------------------------------------------
#include "org_apache_hadoop_mapred_NativeMapper.h"
#include "UserMapper.h"

/**
* A C++ proxy method, calls actual implementation of the Mapper. This method
signature is generated by javah.
**/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
  (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value,
   jobject output_collector, jobject reporter);
{
 
  // convert char* and pass on to user defined map method.
  // user's map method should take care of converting it to correct record IO
  // type.
  int keyLen = (*env)->GetArrayLength(env, key) ;
  int valueLen = (*env)->GetArrayLength(env, valueLen) ;

  const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
  const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
 
  // Call User defined method
  user_map(keyBuf, valueBuf, output_collector, reporter) ;
 
  (*env)->ReleaseByteArrayElements(env, key, keyBuf, JNI_ABORT) ;
  (*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ;
}

/**
Helper method, acts as a proxy to OutputCollector in java. key and value
must be serialized forms of records as specified in JobConf.
**/
void output_collector(const char * key, const char *value,
jobject output_collector, jobject reporter){

// invoke java NativeOutputCollector.collect with key and value.
}


User defined Mapper ( and Reducer )
------------------------------------
/**
implements user defined map operation.
**/
void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {

//1. deserialize key/value in the appropriate format using record IO.

//2. process key/value and generate the intermediate key/values in record IO format.

//3. Deserialize intermediate key/values to intermed_key and intermed_value

//4. pass intermed_key/intermed_value using helper function -
// output_collector(intermed_key, intermed_value, collector, recorder);


}
MapReduce C++ support

Requirements

1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused.
2. Avoid users having to write both Java and C++ for this to work.
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions.
4. The interface should be SWIG'able.
Owen O'Malley added a comment - 18/Feb/07 08:28 AM
This is my current plan of approach.

Owen O'Malley made changes - 18/Feb/07 08:28 AM
Attachment hadoop-pipes.html [ 12351444 ]
Owen O'Malley made changes - 18/Feb/07 08:30 AM
Attachment Hadoop MaReduce Developer doc.pdf [ 12334583 ]
Owen O'Malley made changes - 18/Feb/07 08:31 AM
Status Open [ 1 ] In Progress [ 3 ]
Doug Cutting added a comment - 19/Feb/07 10:44 PM
+1 This proposal looks good. I note that the contexts could, with the addition of a nextKey() method, be turned into iterators, permitting alternate map and reduce control structures, like MapRunnable. This may prove to be a feature.

Doug Cutting made changes - 02/Mar/07 10:17 PM
Fix Version/s 0.12.0 [ 12312293 ]
Owen O'Malley made changes - 12/Apr/07 05:22 PM
Link This issue depends on HADOOP-1250 [ HADOOP-1250 ]
Owen O'Malley made changes - 12/Apr/07 05:22 PM
Link This issue depends on HADOOP-1251 [ HADOOP-1251 ]
Owen O'Malley added a comment - 18/Apr/07 10:30 PM
Here is a first pass of this patch. I still need to add unit tests.

Owen O'Malley made changes - 18/Apr/07 10:30 PM
Attachment pipes.patch [ 12355795 ]
Owen O'Malley made changes - 20/Apr/07 08:50 PM
Summary Support for writing Map/Reduce functions in C++ Hadoop Pipes for writing map/reduce jobs in C++ and python
Doug Cutting added a comment - 20/Apr/07 09:25 PM
A few quick comments:
  • the pipes package should have a package.html file
  • only Submitter.java should be public, right?
  • one must 'chmod +x' the configure scripts
  • the create-c++-configure task fails for me on Ubuntu w/ 'Can't exec "aclocal"'
  • 'bin/hadoop pipes' should print a helpful message
  • a README.txt file in src/examples/pipes telling how to run things would go a long way

Owen O'Malley made changes - 16/May/07 12:17 AM
Component/s pipes [ 12311773 ]
Component/s mapred [ 12310690 ]
Fix Version/s 0.14.0 [ 12312474 ]
Owen O'Malley made changes - 16/May/07 12:21 AM
Attachment pipes-2.patch [ 12357433 ]
Owen O'Malley made changes - 16/May/07 12:22 AM
Status In Progress [ 3 ] Patch Available [ 10002 ]

Doug Cutting added a comment - 16/May/07 07:23 PM
I just committed this. Thanks, Owen!

Doug Cutting made changes - 16/May/07 07:23 PM
Status Patch Available [ 10002 ] Resolved [ 5 ]
Resolution Fixed [ 1 ]
Repository Revision Date User Message
ASF #538693 Wed May 16 19:23:48 UTC 2007 cutting HADOOP-234. Add pipes facility, which permits writing MapReduce programs in C++.
Files Changed
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
ADD /lucene/hadoop/trunk/src/c++/utils/depcomp
ADD /lucene/hadoop/trunk/src/c++/pipes/api/hadoop
ADD /lucene/hadoop/trunk/src/c++/utils/m4
ADD /lucene/hadoop/trunk/src/c++/utils/ltmain.sh
ADD /lucene/hadoop/trunk/src/c++/utils/api/hadoop/StringUtils.hh
ADD /lucene/hadoop/trunk/src/c++/utils/aclocal.m4
ADD /lucene/hadoop/trunk/src/examples/pipes/configure
ADD /lucene/hadoop/trunk/src/examples/pipes/missing
ADD /lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes
ADD /lucene/hadoop/trunk/src/examples/pipes/aclocal.m4
ADD /lucene/hadoop/trunk/src/examples/pipes/.autom4te.cfg
ADD /lucene/hadoop/trunk/src/c++/pipes/install-sh
ADD /lucene/hadoop/trunk/src/c++/utils/m4/hadoop_utils.m4
MODIFY /lucene/hadoop/trunk/bin/hadoop
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
ADD /lucene/hadoop/trunk/src/c++/pipes/api/hadoop/Pipes.hh
ADD /lucene/hadoop/trunk/src/c++/pipes/configure
MODIFY /lucene/hadoop/trunk/CHANGES.txt
ADD /lucene/hadoop/trunk/src/c++/pipes/missing
ADD /lucene/hadoop/trunk/src/examples/pipes/impl/wordcount-part.cc
ADD /lucene/hadoop/trunk/src/c++/pipes/.autom4te.cfg
ADD /lucene/hadoop/trunk/src/c++/utils/impl/SerialUtils.cc
ADD /lucene/hadoop/trunk/src/examples/pipes/Makefile.in
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
ADD /lucene/hadoop/trunk/src/c++/utils/config.sub
ADD /lucene/hadoop/trunk/src/examples/pipes
ADD /lucene/hadoop/trunk/src/examples/pipes/config.sub
ADD /lucene/hadoop/trunk/src/examples/pipes/Makefile.am
ADD /lucene/hadoop/trunk/src/c++/pipes/aclocal.m4
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
ADD /lucene/hadoop/trunk/src/c++/utils/impl
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
ADD /lucene/hadoop/trunk/src/examples/pipes/impl
ADD /lucene/hadoop/trunk/src/examples/pipes/depcomp
ADD /lucene/hadoop/trunk/src/c++/utils/impl/StringUtils.cc
ADD /lucene/hadoop/trunk/src/c++/pipes/Makefile.in
ADD /lucene/hadoop/trunk/src/examples/pipes/conf
ADD /lucene/hadoop/trunk/src/examples/pipes/impl/wordcount-simple.cc
ADD /lucene/hadoop/trunk/src/c++/utils/impl/config.h.in
ADD /lucene/hadoop/trunk/src/examples/pipes/ltmain.sh
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
ADD /lucene/hadoop/trunk/src/c++/utils/configure.ac
ADD /lucene/hadoop/trunk/src/examples/pipes/impl/config.h.in
ADD /lucene/hadoop/trunk/src/c++/pipes
ADD /lucene/hadoop/trunk/src/c++/utils/config.guess
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html
ADD /lucene/hadoop/trunk/src/examples/pipes/configure.ac
ADD /lucene/hadoop/trunk/src/c++/pipes/Makefile.am
ADD /lucene/hadoop/trunk/src/examples/pipes/config.guess
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
ADD /lucene/hadoop/trunk/src/c++/utils/configure
ADD /lucene/hadoop/trunk/src/c++/utils/missing
ADD /lucene/hadoop/trunk/src/c++/pipes/api/hadoop/TemplateFactory.hh
ADD /lucene/hadoop/trunk/src/examples/pipes/README.txt
ADD /lucene/hadoop/trunk/src/c++/utils/.autom4te.cfg
MODIFY /lucene/hadoop/trunk/build.xml
ADD /lucene/hadoop/trunk/src/c++/pipes/api
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
ADD /lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
ADD /lucene/hadoop/trunk/src/c++/utils/api/hadoop
ADD /lucene/hadoop/trunk/src/c++/pipes/depcomp
ADD /lucene/hadoop/trunk/src/c++/pipes/ltmain.sh
ADD /lucene/hadoop/trunk/src/c++/pipes/config.sub
ADD /lucene/hadoop/trunk/src/examples/pipes/conf/word.xml
ADD /lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
ADD /lucene/hadoop/trunk/src/c++/utils/install-sh
ADD /lucene/hadoop/trunk/src/examples/pipes/install-sh
ADD /lucene/hadoop/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
ADD /lucene/hadoop/trunk/src/c++/pipes/impl
ADD /lucene/hadoop/trunk/src/c++/utils/api/hadoop/SerialUtils.hh
ADD /lucene/hadoop/trunk/src/c++/pipes/compile
ADD /lucene/hadoop/trunk/src/c++/utils/Makefile.in
ADD /lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java
ADD /lucene/hadoop/trunk/src/c++/pipes/impl/config.h.in
ADD /lucene/hadoop/trunk/src/c++/pipes/configure.ac
ADD /lucene/hadoop/trunk/src/examples/pipes/conf/word-part.xml
ADD /lucene/hadoop/trunk/src/c++/pipes/config.guess
ADD /lucene/hadoop/trunk/src/c++/utils
ADD /lucene/hadoop/trunk/src/c++/utils/Makefile.am
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes
ADD /lucene/hadoop/trunk/src/c++/utils/api

Repository Revision Date User Message
ASF #538704 Wed May 16 20:06:31 UTC 2007 cutting HADOOP-234. Make configure scripts executable.
Files Changed
MODIFY /lucene/hadoop/trunk/src/examples/pipes/configure
MODIFY /lucene/hadoop/trunk/src/c++/utils/configure
MODIFY /lucene/hadoop/trunk/src/c++/pipes/configure

Repository Revision Date User Message
ASF #538722 Wed May 16 20:37:44 UTC 2007 omalley HADOOP-234 Fix wording of the package.html to be in the current rather than
future tense.
Files Changed
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/package.html

Hadoop QA added a comment - 17/May/07 11:26 AM

Doug Cutting made changes - 20/Aug/07 06:11 PM
Status Resolved [ 5 ] Closed [ 6 ]
Owen O'Malley made changes - 08/Jul/09 04:41 PM
Component/s pipes [ 12311773 ]