|
I think the right way to handle this is to support a standard quoting language on input and output from each streaming process. In particular, I think that streaming should have:
tab = field separator all other bytes (not characters!) including non-ascii and non-utf8 are passed literally through. Quoting is done on the stdin of the process and unquoting is done on the stdout of the process. This would make it very easy to write arbitrary binary values to the framework from streaming. Thoughts? bq all other bytes (not characters!) including non-ascii and non-utf8 are passed literally through. Quoting is done on the stdin of the process and unquoting is done on the stdout of the process. This would make it very easy to write arbitrary binary values to the framework from streaming
+1 would it be good to have an option which translate it, preserving the current behavior. It would be easier for few map/reduce scripts for framework to translate it. Yes, it should have been backslash. I guess it would be ok to unquote the \0 as a null byte, but the point of the exercise is require the minimal amount of quoting. Null bytes should be ok to pass through since they won't be confused with the special field/record delimiters .
I think arkady's point is much more to the point than this quoting proposal, which I think is going the wrong way!
There are two interfaces here - that between man & reduce and that into map and out of reduce. I think they deserve different handling. 1) map in & reduce out - Should by default just consume bytes and produce bytes. The framework should do no interpretation or quoting. It should not try to break the output into lines, keys & values, etc. It is just a byte stream. This will allow true binary output with zero hassle. Some thought on splits is clearly needed... 2) map out & reduce in - Here we clearly need keys and values. But i think quoting might be the wrong direction. It should certainly not be the default. I think we should consider just providing an option that specifies a new binary format will be used. here. Maybe a 4 byte length followed a binary key followed by a 4 byte length and then a binary value? Maybe with a record terminator for sanity checking? Two observations: 1) Adding quoting by default will break all kinds of programs that work with streaming today. This is undesirable. We should add an option, not change the default behavior. 2) Streaming should not use utf8 anywhere! It should assume that it is processing a stream of bytes that contains certain signal bytes '\n' and '\t'. I think we all agree on this. treating the byte stream as a character stream just confuses things. Are there any comments on the attached patch? It basically implements an extended version of Eric's idea concerning the addition of an option that triggers the usage of a new binary format. However, instead of a 4 byte length it uses a 1 byte type code (and the number of following bytes is derived from this type code). This leads to a slightly more compact representation for basic types (e.g. a float requires 1 + 4 bytes instead of 4 + 4 bytes), and it also solves another important Streaming issue, namely, that all type information is lost when everything is converted to strings.
ContentsThe patch consists of the following parts:
ExampleUsing the simple Python module available at http://github.com/klbostee/typedbytes import sys import typedbytes input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, value) in input: for word in value.split(): output.write((word, 1)) and the reducer script import sys import typedbytes from itertools import groupby from operator import itemgetter input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, group) in groupby(input, itemgetter(0)): values = map(itemgetter(1), group) output.write((key, sum(values))) can be used to do a simple wordcount. The unit tests include a similar example in Java. RemarkThis patch renders Looks good. This second version of my patch addresses the issues raised by Runping:
BTW: The comment "The reduce input type flag should always be the same as the map output flag" is not really valid, since TypedBytesWritable's toString() outputs sensible strings and hence it would not be impossible to output typed bytes in the mapper and let streaming convert to strings and pass those strings as input to the reducer. Any other comments? I realized that it is probably more convenient/intuitive to make -typedbytes input correspond to
instead of
and similarly that it would be better to let -typedbytes output correspond to
instead of
Maybe this was also (part of) what Runping was trying to say in his comment? In any case, the attached third version of my patch incorporates this minor change. I think you need to have the flags for the three cases: 1. stream.reduce.output.typed.bytes=true 2. stream.map.output.typed.bytes=true 3. stream.map.input.typed.bytes=true Yes, that is what the current patch provides, but it also adds a stream.reduce.input.typed.bytes because the format for the map output does not necessarily have to be the same as the format for the reduce input (it makes most sense to use the same format of course, but the streaming framework can convert the typed bytes outputted by a mapper to strings before passing them on to a reducer if the programmer would want that for some reason). So the patch is a bit more general, but it includes all the cases you listed.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12398901/HADOOP-1722-v3.patch against trunk revision 738479. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 24 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3766/testReport/ This message is automatically generated. The failed contrib unit tests are not caused by the patch as far as I can tell, but I did find a small unrelated bug (serializing lists did not work properly in all cases). The 4th version of the patch contains a fix for this bug (as well as an update to the unit test that did not catch this bug before). Furthermore, I also changed the semantics of -typedbytes output once more. It now corresponds to
because it is more likely that you want to take text as input but use type bytes for everything else (instead of only using typed bytes for the reducer output). I don't understand your reasoning about the format for the map output does not necessarily have to be the same as the format for the reduce input. Another issue: it is not uncommon that the mapper output is in text format and the reducer output is in binary format (typed bytes). I'm glad to see you working on this! We'll take it for a spin.
Replies to Runping's questions:
Note also that all of this does not really matter that much. Since text gets converted to a typed bytes string, most people will be using typed bytes for everything in practice. The -typedbytes input|output|mapper|reducer options are mostly intended to make it possible to convert existing streaming programs gradually... -1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12398937/HADOOP-1722-v4.patch against trunk revision 738744. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 24 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3771/testReport/ This message is automatically generated. Klaas, I got your points now. Thanks. Looks good overall! The one thing that should be considered here is moving the typedBytes package from core to streaming. Is there currently a usecase where typedBytes might be used elsewhere? The same argument holds for DumpTypedBytes & AutoInputFormat as well. Could we have the DumpTypedBytes integrated with StreamJob (as in, if someone wants to test out things, he uses the streaming.jar and passes an option to invoke DumpTypedBytes tool).
The other thing is about special handling for the basic types, as opposed to using raw bytes for everything. How typical is the use case where we have Key/Value types as the basic types. I understand that it makes the on-disk/wire representation compact in the cases where the native types are used, but it would simplify the framework if we dealt with only raw bytes instead (and probably use compression). It would help if you include an example of a streaming app where binary data is consumed/produced. Replies to Deveraj's comments:
Version 4 of the patch moves everything that was added in core to streaming, as suggested by Deveraj.
Some comments:
Looks like that last patch should actually have been version 5. It is the correct patch though, I just forgot that I already submitted a 4th version of it...
One thing i noticed - TypedBytesWritableOutput.writeWritable makes a copy of the Writable passed and then writes it out. Can we instead write directly to the underlying out of TypedBytesOutput.
Actually, ignore my last comment. I realized that you need to get the length of the Writable.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12399457/HADOOP-1722-v4.patch against trunk revision 740532. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 25 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3798/testReport/ This message is automatically generated. I'd suggest that we generalize this a little bit more and make the MRInputWriter and MROutputWriter take the input and output stream. And then instead of:
stream.map.input.typed.bytes=true -> stream.map.input=typed.bytes So that if someone wants to add another encoder it is trivial to do so. At some point, we probably should make the AutoInputFormat more complete and promote it into mapreduce.lib, but clearly that can happen in a different patch. I'm not that convinced that people will use the typed bytes format outside of python, where the library is already present, but it does give them an option, which is currently not there. I guess we could even do:
stream.map.input.typed.bytes=true -> stream.map.input.writer=org.apache.hadoop.streaming.TypedBytesInputWriter etc, or do you think that goes to far? I thought about that, but since streaming is primarily used by developers who don't want to write Java, I think the usability is better if we just have a set of enums/strings that we map into classes in streaming.
"typed.bytes" -> TypedBytesMRInputWriter / TypedBytesMROutputProcessor You might consider renaming the classes to something like StreamingInputWriter and StreamingOutputReader, which are more symmetric with each other. if someone implemented my suggestion above, it could be called "backquoted" or something. By the way, you of course could use a different identifier string, like "typedBytes" or "typed_bytes". Hi committers,
Could you estimate how close we are to being able to accept this patch? E14 Eric, the patch looks good. Klaas, if you could update your patch with Owen's last comment, we could look at it for commit..
The patch is nearly done, I'll submit it later today...
This is what changed in version 5 of the patch:
+1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12399650/HADOOP-1722-v5.patch against trunk revision 741330. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 34 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3807/testReport/ This message is automatically generated. Klaas, could you please explain why in PipeMapRed.mapRedFinished, the call to waitOutputThreads() has been moved inside the try{} block? Did you spot a bug that made you do this change?
That's just because waitOutputThreads() throws IOException now. I added throws IOException to createOutputReader(), and therefore also to startOutputThreads() and waitOutputThreads(), because OutputReader.initialize() can throw an IOException. While checking this I noticed that I forgot to add throws IOException to InputWriter.initialize() by the way, maybe that should be changed before this gets committed...
Makes sense. Please file one ( hopefully) last patch.
Added throws IOException to InputWriter.initialize().
Not sure if small changes like this need to go through Hudson again, but we can still cancel if it's not necessary I guess...
I just committed this. Thanks, Klaas!
Attaching a backported patch for the 0.18 branch. Typed bytes have been the default communication format for all our dumbo
Integrated in Hadoop-trunk #756 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/756/
Attaching a backported patch for the 0.19 branch.
hi,Klaas Bosteels
my case is read binary data and generate text data by mappers. can you teach me the usage for it. the following is correct? $ hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar but the result is null in the output folder. Zhuweimin, in that case you probably want to use:
-D stream.map.input=rawbytes instead of: -io rawbytes (You can also use -jobconf instead of -D, but that option has been deprecated). Thanks, Klaas.
I tried -jobconf option and it worked. Do you have any idea what's wrong? ---------- $ hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar $ hadoop fs -cat dataoutput/part* Zhuweimin, presumably you're expecting the number of bytes reported by "wc -c" to be equal to the number of bytes in your input files, but that's not what you should be expecting really. Here's a quick outline of what happens when you run that command:
You could probably get the behavior you're after by writing a custom InputFormat Streaming documentation in forrest should be updated with this feature.
Editorial pass over all release notes prior to publication of 0.21.
adjusted
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
By default, the mapper task should receive the exactly the same bytes as stored in DFS without any transformation.
There should also be command line parameters that specify other useful options, including custom input format, decompressions, etc.
There should be no requirements on the command that is used as Streaming Mapper.
This has been broken twice – in Sept. 2006, and in July 2007.
It would be nice to restore the functionality, and make it part of specification. (This implies adding regression cases, etc.)