Issue Details (XML | Word | Printable)

Key: HADOOP-1722
Type: Improvement Improvement
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Klaas Bosteels
Reporter: Runping Qi
Votes: 0
Watchers: 11
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Make streaming to handle non-utf8 byte array

Created: 16/Aug/07 02:33 PM   Updated: 20/Oct/09 06:58 AM
Component/s: None
Affects Version/s: None
Fix Version/s: 0.21.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File HADOOP-1722-branch-0.18.patch 2009-02-13 10:19 AM Klaas Bosteels 152 kB
Text File HADOOP-1722-branch-0.19.patch 2009-03-04 05:58 PM Klaas Bosteels 152 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v0.20.1.patch 2009-10-15 09:14 AM Matthias Lehmann 153 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v2.patch 2009-01-27 05:47 PM Klaas Bosteels 119 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v3.patch 2009-01-28 01:24 PM Klaas Bosteels 119 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v4.patch 2009-02-04 04:15 PM Klaas Bosteels 121 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v4.patch 2009-01-28 10:52 PM Klaas Bosteels 119 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v5.patch 2009-02-06 02:32 PM Klaas Bosteels 153 kB
Text File Licensed for inclusion in ASF works HADOOP-1722-v6.patch 2009-02-12 06:59 PM Klaas Bosteels 153 kB
Text File Licensed for inclusion in ASF works HADOOP-1722.patch 2009-01-26 05:24 PM Klaas Bosteels 114 kB
Issue Links:
Reference
 

Hadoop Flags: Reviewed
Release Note: Streaming allows binary (or other non-UTF8) streams.
Resolution Date: 13/Feb/09 04:13 AM


 Description  « Hide
Right now, the streaming framework expects the output sof the steam process (mapper or reducer) are line
oriented UTF-8 text. This limit makes it impossible to use those programs whose outputs may be non-UTF-8
(international encoding, or maybe even binary data). Streaming can overcome this limit by introducing a simple
encoding protocol. For example, it can allow the mapper/reducer to hexencode its keys/values,
the framework decodes them in the Java side.
This way, as long as the mapper/reducer executables follow this encoding protocol,
they can output arabitary bytearray and the streaming framework can handle them.

 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
arkady borkovsky added a comment - 29/Aug/07 05:47 PM
Passing data from from DFS to streaming mapper should be transparent,
By default, the mapper task should receive the exactly the same bytes as stored in DFS without any transformation.
There should also be command line parameters that specify other useful options, including custom input format, decompressions, etc.
There should be no requirements on the command that is used as Streaming Mapper.

This has been broken twice – in Sept. 2006, and in July 2007.
It would be nice to restore the functionality, and make it part of specification. (This implies adding regression cases, etc.)


Owen O'Malley added a comment - 13/Nov/07 10:25 PM - edited
I think the right way to handle this is to support a standard quoting language on input and output from each streaming process. In particular, I think that streaming should have:

tab = field separator
new line = record separator
\t = literal tab
\n = literal newline
\ \ = literal backslash

all other bytes (not characters!) including non-ascii and non-utf8 are passed literally through. Quoting is done on the stdin of the process and unquoting is done on the stdout of the process. This would make it very easy to write arbitrary binary values to the framework from streaming.

Thoughts?


Lohit Vijayarenu added a comment - 13/Nov/07 10:58 PM
bq all other bytes (not characters!) including non-ascii and non-utf8 are passed literally through. Quoting is done on the stdin of the process and unquoting is done on the stdout of the process. This would make it very easy to write arbitrary binary values to the framework from streaming

+1

would it be good to have an option which translate it, preserving the current behavior. It would be easier for few map/reduce scripts for framework to translate it.


Dick King added a comment - 13/Nov/07 11:09 PM - edited
In owen's table, I assume \ \ is a backSLASH, not a backquote.

More substantively, I think I would rather see an escape for the zero byte, perhaps \0 imaginatively enough, added to Owen's table.

-dk


Owen O'Malley added a comment - 13/Nov/07 11:19 PM
Yes, it should have been backslash. I guess it would be ok to unquote the \0 as a null byte, but the point of the exercise is require the minimal amount of quoting. Null bytes should be ok to pass through since they won't be confused with the special field/record delimiters .

eric baldeschwieler added a comment - 16/Nov/07 08:11 AM
I think arkady's point is much more to the point than this quoting proposal, which I think is going the wrong way!

There are two interfaces here - that between man & reduce and that into map and out of reduce. I think they deserve different handling.

1) map in & reduce out - Should by default just consume bytes and produce bytes. The framework should do no interpretation or quoting. It should not try to break the output into lines, keys & values, etc. It is just a byte stream. This will allow true binary output with zero hassle. Some thought on splits is clearly needed...

2) map out & reduce in - Here we clearly need keys and values. But i think quoting might be the wrong direction. It should certainly not be the default. I think we should consider just providing an option that specifies a new binary format will be used. here. Maybe a 4 byte length followed a binary key followed by a 4 byte length and then a binary value? Maybe with a record terminator for sanity checking?


Two observations:

1) Adding quoting by default will break all kinds of programs that work with streaming today. This is undesirable. We should add an option, not change the default behavior.

2) Streaming should not use utf8 anywhere! It should assume that it is processing a stream of bytes that contains certain signal bytes '\n' and '\t'. I think we all agree on this. treating the byte stream as a character stream just confuses things.


Klaas Bosteels added a comment - 26/Jan/09 05:24 PM
Are there any comments on the attached patch? It basically implements an extended version of Eric's idea concerning the addition of an option that triggers the usage of a new binary format. However, instead of a 4 byte length it uses a 1 byte type code (and the number of following bytes is derived from this type code). This leads to a slightly more compact representation for basic types (e.g. a float requires 1 + 4 bytes instead of 4 + 4 bytes), and it also solves another important Streaming issue, namely, that all type information is lost when everything is converted to strings.
Contents

The patch consists of the following parts:

  • A new package org.apache.hadoop.typedbytes in src/core that provides functionality for dealing with sequences of bytes in which the first byte is a type code. This package also includes classes that can convert Writables to/from typed bytes and (de)serialize Records to/from typed bytes. The typed bytes format itself was kept as simple and straightforward as possible in order to make it very easy to write conversion code in other languages.
  • Changes to Streaming that add the -typedbytes none|input|output|all option. When typed bytes are requested for the input, the functionality provided by the package org.apache.hadoop.typedbytes is used to convert all input Writables to typed bytes (which makes it possible to let Streaming programs seamlessly take sequence files containing Records and/or other Writables as input), and when typed bytes are used for the output, Streaming outputs TypedBytesWritables (i.e. instances of the org.apache.hadoop.typedbytes.TypedBytesWritable class, which extends BytesWritable).
  • A new tool DumpTypedBytes in src/tools that dumps DFS files as typed bytes to stdout. This can often be a lot more convenient than printing out the strings returned by the toString() methods, and it can also be used to fetch an input sample from the DFS for testing Streaming programs that use typed bytes.
  • A new input format called AutoInputFormat, which can take text files as well as sequence files (or both at the same time) as input. The functionality to deal with text and sequence files transparantly was required for the DumpTypedBytes tool, and putting it in an input format makes sense since the ability to take both text and sequence files as input can be very useful for Streaming programs. Because Streaming still uses the old mapred API, the patch includes two versions of AutoInputFormat (one for the old and another for the new API).
Example

Using the simple Python module available at http://github.com/klbostee/typedbytes, the mapper script

import sys
import typedbytes

input = typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)

for (key, value) in input:
    for word in value.split():
        output.write((word, 1))

and the reducer script

import sys
import typedbytes
from itertools import groupby
from operator import itemgetter

input = typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)

for (key, group) in groupby(input, itemgetter(0)):
    values = map(itemgetter(1), group)
    output.write((key, sum(values)))

can be used to do a simple wordcount. The unit tests include a similar example in Java.

Remark

This patch renders HADOOP-4304 mostly obsolete, since it provides all underlying functionality required for Dumbo. If this patch gets accepted, then future versions of Dumbo will probably only consists of Python code again and thus be very easy to install and use, which makes adding Dumbo to contrib less of requirement.


Runping Qi added a comment - 26/Jan/09 06:44 PM

Looks good.
A couple things.
1. The type flags: The user may need to specify two different output type flags, one for the map output, and the other is for the reducer output.
2. The reduce input type flag should always be the same as the map output flag, and thus it is completely independent of the input type flag for the mapper
3. Since the mapper/reducer may be implemented in other languages, such as C, we must document the serialization format for the TypedBytesWritable clearly in a language agnostic way. It will be great to have a library for the serialization/deserialization for each common languages.


Klaas Bosteels added a comment - 27/Jan/09 05:47 PM
This second version of my patch addresses the issues raised by Runping:
  • The javadoc for the package org.apache.hadoop.typedbytes now includes a detailed description of the typed bytes format.
  • The (boolean-valued) typedbytes-related properties for streaming are now:
    • stream.map.input.typed.bytes
    • stream.map.output.typed.bytes
    • stream.reduce.input.typed.bytes
    • stream.reduce.output.typed.bytes
  • The command line option -typebytes was changed such that it can take the values none|mapper|reducer|input|output|all (that should cover most cases, and otherwise the properties listed above can be set manually).

BTW: The comment "The reduce input type flag should always be the same as the map output flag" is not really valid, since TypedBytesWritable's toString() outputs sensible strings and hence it would not be impossible to output typed bytes in the mapper and let streaming convert to strings and pass those strings as input to the reducer.

Any other comments?


Klaas Bosteels added a comment - 28/Jan/09 01:24 PM
I realized that it is probably more convenient/intuitive to make -typedbytes input correspond to
  • stream.map.input.typed.bytes=true
  • stream.map.output.typed.bytes=false
  • stream.reduce.input.typed.bytes=false
  • stream.reduce.output.typed.bytes=false

instead of

  • stream.map.input.typed.bytes=true
  • stream.map.output.typed.bytes=false
  • stream.reduce.input.typed.bytes=true
  • stream.reduce.output.typed.bytes=false

and similarly that it would be better to let -typedbytes output correspond to

  • stream.map.input.typed.bytes=false
  • stream.map.output.typed.bytes=false
  • stream.reduce.input.typed.bytes=false
  • stream.reduce.output.typed.bytes=true

instead of

  • stream.map.input.typed.bytes=false
  • stream.map.output.typed.bytes=true
  • stream.reduce.input.typed.bytes=false
  • stream.reduce.output.typed.bytes=true

Maybe this was also (part of) what Runping was trying to say in his comment? In any case, the attached third version of my patch incorporates this minor change.


Runping Qi added a comment - 28/Jan/09 04:00 PM

I think you need to have the flags for the three cases:

1. stream.reduce.output.typed.bytes=true
In this case, the PipeMapRed and PipeReducer classes needs to interpret the reduce output as typed bytes and deserialize it accordingly.
This is the case where the user wants to generate binary data by reducers and output them in the typed bytes format..

2. stream.map.output.typed.bytes=true
In this case, the PipeMapRed and PipeMapper classes needs to interpret the mapper output as typed bytes and deserialize it accordingly.
This is the case where the user wants to generate binary data by mappers. In this case, the types for the map output key/value pairs
(and that for the reducer input key/value pairs) are typed bytes. The types for map output must be the same as those for the reduce input.

3. stream.map.input.typed.bytes=true
The intended use case for this setting may be that the user knows that the input data is in typedbytes, and does not want to PipMapRed (PipeMapper) class to convert them into text by calling toString. Rather, PipeMapRed class should serialize them according to typedbytes.
The mapper program will interpret the serialized format properly.


Klaas Bosteels added a comment - 28/Jan/09 04:28 PM
Yes, that is what the current patch provides, but it also adds a stream.reduce.input.typed.bytes because the format for the map output does not necessarily have to be the same as the format for the reduce input (it makes most sense to use the same format of course, but the streaming framework can convert the typed bytes outputted by a mapper to strings before passing them on to a reducer if the programmer would want that for some reason). So the patch is a bit more general, but it includes all the cases you listed.

Hadoop QA added a comment - 28/Jan/09 08:01 PM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12398901/HADOOP-1722-v3.patch
against trunk revision 738479.

+1 @author. The patch does not contain any @author tags.

+1 tests included. The patch appears to include 24 new or modified tests.

+1 javadoc. The javadoc tool did not generate any warning messages.

+1 javac. The applied patch does not increase the total number of javac compiler warnings.

+1 findbugs. The patch does not introduce any new Findbugs warnings.

+1 Eclipse classpath. The patch retains Eclipse classpath integrity.

+1 core tests. The patch passed core unit tests.

-1 contrib tests. The patch failed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3766/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3766/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3766/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3766/console

This message is automatically generated.


Klaas Bosteels added a comment - 28/Jan/09 10:52 PM
The failed contrib unit tests are not caused by the patch as far as I can tell, but I did find a small unrelated bug (serializing lists did not work properly in all cases). The 4th version of the patch contains a fix for this bug (as well as an update to the unit test that did not catch this bug before). Furthermore, I also changed the semantics of -typedbytes output once more. It now corresponds to
  • stream.map.input.typed.bytes=false
  • stream.map.output.typed.bytes=true
  • stream.reduce.input.typed.bytes=true
  • stream.reduce.output.typed.bytes=true

because it is more likely that you want to take text as input but use type bytes for everything else (instead of only using typed bytes for the reducer output).


Runping Qi added a comment - 29/Jan/09 01:18 AM

I don't understand your reasoning about the format for the map output does not necessarily have to be the same as the format for the reduce input.
Why the programmer would want the streaming framework to convert the typed bytes outputted by a mapper to strings before passing them on to a reducer?
If the programmer wants that, why doesn't the mapper generate text output at the first place?

Another issue: it is not uncommon that the mapper output is in text format and the reducer output is in binary format (typed bytes).
Your last change of the semantics cannot express this case.


eric baldeschwieler added a comment - 29/Jan/09 05:01 AM
I'm glad to see you working on this! We'll take it for a spin.

Klaas Bosteels added a comment - 29/Jan/09 09:24 AM
Replies to Runping's questions:
  • As I said, letting the mapper output typed bytes and the reducer take text as input probably will not be used much in practice, but that does not mean we should remove that option in my opinion.
  • You can always set the properties directly via -D stream.map.input.typed.bytes=false etc. so it is still possible to let only the reducer output typed bytes. The -typedbytes command line option just provides shorthands for the most common combinations really, and if you are going to output typed bytes in the reducer then you might as well output typed bytes in the mapper too (since that will be faster and probably also more convenient from a programming perspective because types are preserved), so it seemed better to me to let the -typedbytes output shorthand correspond to using typed bytes for everything except for the map input. Moreover, the old implementation of -typedbytes output would lead to a sequence file containing Text objects when it is combined with -numReduceTasks 0 and -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat, which seems counterintuitive to me. When the programmer wants typed bytes as output, then all output sequence files should always contain TypedBytesWritables (as is always the case with the modified implementation of -typedbytes output).

Note also that all of this does not really matter that much. Since text gets converted to a typed bytes string, most people will be using typed bytes for everything in practice. The -typedbytes input|output|mapper|reducer options are mostly intended to make it possible to convert existing streaming programs gradually...


Hadoop QA added a comment - 29/Jan/09 09:47 AM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12398937/HADOOP-1722-v4.patch
against trunk revision 738744.

+1 @author. The patch does not contain any @author tags.

+1 tests included. The patch appears to include 24 new or modified tests.

+1 javadoc. The javadoc tool did not generate any warning messages.

+1 javac. The applied patch does not increase the total number of javac compiler warnings.

+1 findbugs. The patch does not introduce any new Findbugs warnings.

+1 Eclipse classpath. The patch retains Eclipse classpath integrity.

+1 core tests. The patch passed core unit tests.

-1 contrib tests. The patch failed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3771/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3771/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3771/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3771/console

This message is automatically generated.


Runping Qi added a comment - 29/Jan/09 03:09 PM

Klaas, I got your points now. Thanks.


Devaraj Das added a comment - 04/Feb/09 06:41 AM
Looks good overall! The one thing that should be considered here is moving the typedBytes package from core to streaming. Is there currently a usecase where typedBytes might be used elsewhere? The same argument holds for DumpTypedBytes & AutoInputFormat as well. Could we have the DumpTypedBytes integrated with StreamJob (as in, if someone wants to test out things, he uses the streaming.jar and passes an option to invoke DumpTypedBytes tool).
The other thing is about special handling for the basic types, as opposed to using raw bytes for everything. How typical is the use case where we have Key/Value types as the basic types. I understand that it makes the on-disk/wire representation compact in the cases where the native types are used, but it would simplify the framework if we dealt with only raw bytes instead (and probably use compression).
It would help if you include an example of a streaming app where binary data is consumed/produced.

Klaas Bosteels added a comment - 04/Feb/09 11:05 AM
Replies to Deveraj's comments:
  • I guess it would indeed make sense to move everything I added in core to streaming. I'll attach a new version of the patch later today.
  • The main advantage of the typed bytes is not that they lead to more compact representations, but rather that they can make it a lot easier to write certain streaming apps. Suppose for instance that you want to write a streaming app that consumes sequence files containing instances of VIntWritable as keys and instances of a custom Record as values. With typed bytes, the keys and values will then then be converted automatically to appropriate typed bytes (namely, the keys will be converted to a typed bytes integer and the values to a typed bytes list consisting of the typed bytes objects that correspond to the attributes of the record), whereas your streaming app would have to implement VIntWritable and Record deserialization itself if streaming would only support raw bytes. So using typed bytes does indeed make things a bit more complex, but it's definitely worth it in my opinion (and you can still use raw bytes if you want to by using a 0 byte as type code).

Klaas Bosteels added a comment - 04/Feb/09 04:15 PM
Version 4 of the patch moves everything that was added in core to streaming, as suggested by Deveraj.

Some comments:

  • Since the typed bytes classes are still in the package org.apache.hadoop.typedbytes (and not in org.apache.hadoop.streaming.typedbytes or so), we can still move them to core later without braking sequence files that rely on TypedBytesWritable.
  • I extended the streaming command-line format from "hadoop jar <streaming.jar> <options>" to "hadoop jar <streaming.jar> <command> <options>". This is backwards compatible because the command "streamjob" is assumed when no command is given explicitly, and it allowed me to add the commands "dumptb" and "loadtb" ("dumbtb" corresponds to the DumpTypedBytes class that used to be in tools, and "loadtb" is a new command that does (more or less) the reverse operation, namely, it reads typed bytes from stdin and writes them to a sequence file on the DFS).

Klaas Bosteels added a comment - 04/Feb/09 04:19 PM
Looks like that last patch should actually have been version 5. It is the correct patch though, I just forgot that I already submitted a 4th version of it...

Devaraj Das added a comment - 04/Feb/09 04:23 PM
One thing i noticed - TypedBytesWritableOutput.writeWritable makes a copy of the Writable passed and then writes it out. Can we instead write directly to the underlying out of TypedBytesOutput.

Devaraj Das added a comment - 04/Feb/09 04:38 PM
Actually, ignore my last comment. I realized that you need to get the length of the Writable.

Hadoop QA added a comment - 04/Feb/09 11:54 PM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12399457/HADOOP-1722-v4.patch
against trunk revision 740532.

+1 @author. The patch does not contain any @author tags.

+1 tests included. The patch appears to include 25 new or modified tests.

+1 javadoc. The javadoc tool did not generate any warning messages.

+1 javac. The applied patch does not increase the total number of javac compiler warnings.

+1 findbugs. The patch does not introduce any new Findbugs warnings.

+1 Eclipse classpath. The patch retains Eclipse classpath integrity.

+1 release audit. The applied patch does not increase the total number of release audit warnings.

+1 core tests. The patch passed core unit tests.

-1 contrib tests. The patch failed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3798/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3798/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3798/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3798/console

This message is automatically generated.


Owen O'Malley added a comment - 05/Feb/09 08:49 AM
I'd suggest that we generalize this a little bit more and make the MRInputWriter and MROutputWriter take the input and output stream. And then instead of:

stream.map.input.typed.bytes=true -> stream.map.input=typed.bytes

So that if someone wants to add another encoder it is trivial to do so.

At some point, we probably should make the AutoInputFormat more complete and promote it into mapreduce.lib, but clearly that can happen in a different patch.

I'm not that convinced that people will use the typed bytes format outside of python, where the library is already present, but it does give them an option, which is currently not there.


Klaas Bosteels added a comment - 05/Feb/09 12:53 PM
I guess we could even do:

stream.map.input.typed.bytes=true -> stream.map.input.writer=org.apache.hadoop.streaming.TypedBytesInputWriter
stream.map.output.typed.bytes=true -> stream.map.output.processor=org.apache.hadoop.streaming.TypedBytesOutputProcessor

etc, or do you think that goes to far?


Owen O'Malley added a comment - 05/Feb/09 05:41 PM
I thought about that, but since streaming is primarily used by developers who don't want to write Java, I think the usability is better if we just have a set of enums/strings that we map into classes in streaming.

"typed.bytes" -> TypedBytesMRInputWriter / TypedBytesMROutputProcessor
"text" -> TextMRInputWriter / TextMROutputProcessor

You might consider renaming the classes to something like StreamingInputWriter and StreamingOutputReader, which are more symmetric with each other.

if someone implemented my suggestion above, it could be called "backquoted" or something.

By the way, you of course could use a different identifier string, like "typedBytes" or "typed_bytes".


eric baldeschwieler added a comment - 06/Feb/09 01:21 AM
Hi committers,

Could you estimate how close we are to being able to accept this patch?

E14


Devaraj Das added a comment - 06/Feb/09 12:18 PM
Eric, the patch looks good. Klaas, if you could update your patch with Owen's last comment, we could look at it for commit..

Klaas Bosteels added a comment - 06/Feb/09 01:50 PM
The patch is nearly done, I'll submit it later today...

Klaas Bosteels added a comment - 06/Feb/09 02:32 PM - edited
This is what changed in version 5 of the patch:
  • I implemented Owen's suggestion by adding the package org.apache.hadoop.streaming.io. This package contains an IdentifierResolver class that resolves string identifiers like "typedbytes" and "text" (case insensitive) into an InputWriter class, an OutputReader class, a key output class, and a value output class (different OutputReader classes require different key and value output classes to be set). Since a different resolver can be used by setting the property stream.io.identifier.resolver.class, external code can add new identifiers by extending IdentifierResolver and pointing stream.io.identifier.resolver.class to this extension.
  • I removed the -typedbytes ... option and added -io <identifier> instead. This latter option is less fine-grained since it triggers usage of the classes corresponding to the given identifier for everything (e.g. -io typedbytes means that typed bytes will be used for the map input, the map output, the reduce input, and the reduce output), but you can still use more fine-grained configurations by setting the relevant properties manually. As suggested by Owen, the properties now are:
    • stream.map.input=<identifier>
    • stream.map.output=<identifier>
    • stream.reduce.input=<identifier>
    • stream.reduce.output=<identifier>
  • Since it was easy to do, I also added RawBytesInputWriter and RawBytesOutputReader, which implement Eric's original suggestion (i.e. just a 4 byte length followed by the raw bytes). Quoting could be added as well of course, but I would rather pass that on to someone else/another patch...

Hadoop QA added a comment - 06/Feb/09 10:32 PM
+1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12399650/HADOOP-1722-v5.patch
against trunk revision 741330.

+1 @author. The patch does not contain any @author tags.

+1 tests included. The patch appears to include 34 new or modified tests.

+1 javadoc. The javadoc tool did not generate any warning messages.

+1 javac. The applied patch does not increase the total number of javac compiler warnings.

+1 findbugs. The patch does not introduce any new Findbugs warnings.

+1 Eclipse classpath. The patch retains Eclipse classpath integrity.

+1 release audit. The applied patch does not increase the total number of release audit warnings.

+1 core tests. The patch passed core unit tests.

+1 contrib tests. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3807/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3807/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3807/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3807/console

This message is automatically generated.


Devaraj Das added a comment - 12/Feb/09 05:07 PM
Klaas, could you please explain why in PipeMapRed.mapRedFinished, the call to waitOutputThreads() has been moved inside the try{} block? Did you spot a bug that made you do this change?

Klaas Bosteels added a comment - 12/Feb/09 05:27 PM
That's just because waitOutputThreads() throws IOException now. I added throws IOException to createOutputReader(), and therefore also to startOutputThreads() and waitOutputThreads(), because OutputReader.initialize() can throw an IOException. While checking this I noticed that I forgot to add throws IOException to InputWriter.initialize() by the way, maybe that should be changed before this gets committed...

Devaraj Das added a comment - 12/Feb/09 06:16 PM
Makes sense. Please file one ( hopefully) last patch.

Klaas Bosteels added a comment - 12/Feb/09 06:59 PM
Added throws IOException to InputWriter.initialize().

Klaas Bosteels added a comment - 12/Feb/09 07:04 PM
Not sure if small changes like this need to go through Hudson again, but we can still cancel if it's not necessary I guess...

Devaraj Das added a comment - 13/Feb/09 04:13 AM
I just committed this. Thanks, Klaas!

Klaas Bosteels added a comment - 13/Feb/09 10:19 AM
Attaching a backported patch for the 0.18 branch. Typed bytes have been the default communication format for all our dumbo progams for a while now, and we haven't run into any issues with that so far. Some quick timings also revealed that dumbo programs appear to be 40% faster when typed bytes are used.

Hudson added a comment - 16/Feb/09 05:00 PM

Klaas Bosteels added a comment - 04/Mar/09 05:58 PM
Attaching a backported patch for the 0.19 branch.

weimin zhu added a comment - 05/Mar/09 05:23 AM - edited
hi,Klaas Bosteels
my case is read binary data and generate text data by mappers.
can you teach me the usage for it.
the following is correct?

$ hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-input data
-output result
-mapper "wc -c"
-numReduceTasks 0
-io rawbytes

but the result is null in the output folder.


Klaas Bosteels added a comment - 05/Mar/09 09:33 AM
Zhuweimin, in that case you probably want to use:

-D stream.map.input=rawbytes

instead of:

-io rawbytes

(You can also use -jobconf instead of -D, but that option has been deprecated).


weimin zhu added a comment - 06/Mar/09 08:40 AM
Thanks, Klaas.

I tried -jobconf option and it worked.
But it looks like a part of the content was lost.
The test results are the following.

Do you have any idea what's wrong?

----------
$ bin/hadoop fs -ls data
Found 2 items
rw-rr- 1 hadoop supergroup 67108864 2009-03-06 17:15 /user/hadoop/data/64m_1.dat
rw-rr- 1 hadoop supergroup 67108864 2009-03-06 17:15 /user/hadoop/data/64m_2.dat

$ hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-input data \
-output dataoutput \
-mapper "wc -c" \
-numReduceTasks 0 \
-jobconf stream.map.input=rawbytes
...
09/03/06 17:17:08 INFO streaming.StreamJob: map 0% reduce 0%
09/03/06 17:17:16 INFO streaming.StreamJob: map 100% reduce 0%
09/03/06 17:17:18 INFO streaming.StreamJob: Job complete: job_200903061543_0012
09/03/06 17:17:18 INFO streaming.StreamJob: Output: dataoutput

$ hadoop fs -cat dataoutput/part*
67107830
67107830
----------


Klaas Bosteels added a comment - 07/Mar/09 10:36 AM
Zhuweimin, presumably you're expecting the number of bytes reported by "wc -c" to be equal to the number of bytes in your input files, but that's not what you should be expecting really. Here's a quick outline of what happens when you run that command:
  1. Since you didn't specify an InputFormat, the TextInputFormat is used which leads to Text values corresponding to "lines" (i.e. sequences of bytes ending with a newline character) and LongWritable keys corresponding to the offsets of the lines in the file.
  2. Because you use rawbytes for the map input, Streaming will pass the keys and values to your mapper as <4 byte length><raw bytes> byte sequences. These byte sequences are obtained using Writable serialization (i.e. by calling the write() method) and prepending the length to the bytes obtained in this way.

You could probably get the behavior you're after by writing a custom InputFormat and InputWriter, but out of the box it's not supported at the moment as far as I know.


Amareshwari Sriramadasu added a comment - 19/Aug/09 11:48 AM
Streaming documentation in forrest should be updated with this feature.

Robert Chansler added a comment - 09/Oct/09 04:00 AM
Editorial pass over all release notes prior to publication of 0.21.

Matthias Lehmann added a comment - 15/Oct/09 09:14 AM
adjusted HADOOP-1722-v6.patch for Hadoop version 0.20.1