Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-5018

Support raw binary data with Hadoop streaming

    Details

    • Type: New Feature New Feature
    • Status: Patch Available
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.1.2
    • Fix Version/s: None
    • Component/s: contrib/streaming
    • Labels:
    • Target Version/s:
    • Release Note:
      Add "-io justbytes" I/O format to allow raw binary streaming.

      Description

      People often have a need to run older programs over many files, and turn to Hadoop streaming as a reliable, performant batch system. There are good reasons for this:

      1. Hadoop is convenient: they may already be using it for mapreduce jobs, and it is easy to spin up a cluster in the cloud.
      2. It is reliable: HDFS replicates data and the scheduler retries failed jobs.
      3. It is reasonably performant: it moves the code to the data, maintaining locality, and scales with the number of nodes.

      Historically Hadoop is of course oriented toward processing key/value pairs, and so needs to interpret the data passing through it. Unfortunately, this makes it difficult to use Hadoop streaming with programs that don't deal in key/value pairs, or with binary data in general. For example, something as simple as running md5sum to verify the integrity of files will not give the correct result, due to Hadoop's interpretation of the data.

      There have been several attempts at binary serialization schemes for Hadoop streaming, such as TypedBytes (HADOOP-1722); however, these are still aimed at efficiently encoding key/value pairs, and not passing data through unmodified. Even the "RawBytes" serialization scheme adds length fields to the data, rendering it not-so-raw.

      I often have a need to run a Unix filter on files stored in HDFS; currently, the only way I can do this on the raw data is to copy the data out and run the filter on one machine, which is inconvenient, slow, and unreliable. It would be very convenient to run the filter as a map-only job, allowing me to build on existing (well-tested!) building blocks in the Unix tradition instead of reimplementing them as mapreduce programs.

      However, most existing tools don't know about file splits, and so want to process whole files; and of course many expect raw binary input and output. The solution is to run a map-only job with an InputFormat and OutputFormat that just pass raw bytes and don't split. It turns out to be a little more complicated with streaming; I have attached a patch with the simplest solution I could come up with. I call the format "JustBytes" (as "RawBytes" was already taken), and it should be usable with most recent versions of Hadoop.

      1. MAPREDUCE-5018.patch
        24 kB
        Steven Willis
      2. MAPREDUCE-5018-branch-1.1.patch
        20 kB
        Steven Willis
      3. mapstream
        2 kB
        Jay Hacker
      4. justbytes.jar
        17 kB
        Jay Hacker
      5. MAPREDUCE-5018.patch
        20 kB
        Jay Hacker

        Issue Links

          Activity

          Hide
          Jay Hacker added a comment -

          This patch adds a 'JustBytesWritable' and supporting InputFormat, OutputFormat, InputWriter, and OutputReader to support passing raw, unmodified, unaugmented bytes through Hadoop streaming. The purpose is to be able to run arbitrary Unix filters on entire binary files stored in HDFS as map-only jobs, taking advantage of locality and reliability offered by Hadoop.

          The code is very straightforward; most methods are only one line.

          A few design notes:

          1. Data is stored in a JustBytesWritable, which is the simplest possible Writable wrapper around a byte[]. It literally just reads until the buffer is full or EOF and remembers the number of bytes.

          2. Data is read by JustBytesInputFormat in 64K chunks by default and stored in a JustBytesWritable key; the value is a NullWritable, but no value is ever read or written. They key is used instead of the value to allow the possibility of using it in a reduce.

          3. Input files are never split, as most programs are not able to handle splits.

          4. Input files are not decompressed, as the purpose is to get raw data to a program, people may want to operate on compressed data (e.g., md5sum on archives), and as most tools do not expect automatic decompression, this is the "least surprising" option. It's also trivial to throw a "zcat" in front of your filter.

          5. Output is even simpler than input, and just writes the bytes of a JustBytesWritable key to the output stream. Output is never compressed, for similar reasons as above.

          6. The code uses the old mapred API, as that is what streaming uses.

          Streaming inserts an InputWriter between the InputFormat and the map executable, and an OutputReader between the map executable and the OutputFormat; the JustBytes version simply pass the key bytes on through.

          I've augmented IdentifierResolver to recognize "-io justbytes" on the command line and set the input/output classes appropriately.

          I've included a shell script called "mapstream" to run streaming with all required command line parameters; it makes running a binary map-only job as easy as:

          mapstream indir command outdir

          which runs "command" on every file in indir and writes the results to outdir.

          I welcome feedback, especially if there is an even simpler way to do this. I'm not hung up on the JustBytes name, I'd be happy to switch to a better one. If people like the general approach, I will add unit tests and resubmit. Also please let me know if I should break this into separate patches for common and mapreduce.

          Show
          Jay Hacker added a comment - This patch adds a 'JustBytesWritable' and supporting InputFormat, OutputFormat, InputWriter, and OutputReader to support passing raw, unmodified, unaugmented bytes through Hadoop streaming. The purpose is to be able to run arbitrary Unix filters on entire binary files stored in HDFS as map-only jobs, taking advantage of locality and reliability offered by Hadoop. The code is very straightforward; most methods are only one line. A few design notes: 1. Data is stored in a JustBytesWritable, which is the simplest possible Writable wrapper around a byte[]. It literally just reads until the buffer is full or EOF and remembers the number of bytes. 2. Data is read by JustBytesInputFormat in 64K chunks by default and stored in a JustBytesWritable key; the value is a NullWritable, but no value is ever read or written. They key is used instead of the value to allow the possibility of using it in a reduce. 3. Input files are never split, as most programs are not able to handle splits. 4. Input files are not decompressed, as the purpose is to get raw data to a program, people may want to operate on compressed data (e.g., md5sum on archives), and as most tools do not expect automatic decompression, this is the "least surprising" option. It's also trivial to throw a "zcat" in front of your filter. 5. Output is even simpler than input, and just writes the bytes of a JustBytesWritable key to the output stream. Output is never compressed, for similar reasons as above. 6. The code uses the old mapred API, as that is what streaming uses. Streaming inserts an InputWriter between the InputFormat and the map executable, and an OutputReader between the map executable and the OutputFormat; the JustBytes version simply pass the key bytes on through. I've augmented IdentifierResolver to recognize "-io justbytes" on the command line and set the input/output classes appropriately. I've included a shell script called "mapstream" to run streaming with all required command line parameters; it makes running a binary map-only job as easy as: mapstream indir command outdir which runs "command" on every file in indir and writes the results to outdir. I welcome feedback, especially if there is an even simpler way to do this. I'm not hung up on the JustBytes name, I'd be happy to switch to a better one. If people like the general approach, I will add unit tests and resubmit. Also please let me know if I should break this into separate patches for common and mapreduce.
          Hide
          Jay Hacker added a comment -

          justbytes patch submitted for code review.

          Show
          Jay Hacker added a comment - justbytes patch submitted for code review.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12570317/MAPREDUCE-5018.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no new tests are needed for this patch.
          Also please list what manual steps were performed to verify this patch.

          -1 javac. The patch appears to cause the build to fail.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/3352//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12570317/MAPREDUCE-5018.patch against trunk revision . +1 @author . The patch does not contain any @author tags. -1 tests included . The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch. -1 javac . The patch appears to cause the build to fail. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/3352//console This message is automatically generated.
          Hide
          Jay Hacker added a comment -

          I believe this is a more general, simpler, and more up-to-date approach to getting binary data in and out of Hadoop.

          Show
          Jay Hacker added a comment - I believe this is a more general, simpler, and more up-to-date approach to getting binary data in and out of Hadoop.
          Hide
          Jay Hacker added a comment -

          I've attached a jar file with source and compiled binaries for people who want to try it out without recompiling Hadoop. You can use the attached 'mapstream' shell script to run it easily.

          For those interested in performance, the TL;DR is about 10X slower than native. That's running 'cat' as the mapper on one file that fits in one block, compared to cat on a local ext4 filesystem on the same machine. If your files span multiple blocks, the non-local reads will be even slower. That also doesn't include job overhead. However, most mappers will be more CPU intensive, and the relative overhead of I/O diminishes; YMMV.

          Show
          Jay Hacker added a comment - I've attached a jar file with source and compiled binaries for people who want to try it out without recompiling Hadoop. You can use the attached 'mapstream' shell script to run it easily. For those interested in performance, the TL;DR is about 10X slower than native. That's running 'cat' as the mapper on one file that fits in one block, compared to cat on a local ext4 filesystem on the same machine. If your files span multiple blocks, the non-local reads will be even slower. That also doesn't include job overhead. However, most mappers will be more CPU intensive, and the relative overhead of I/O diminishes; YMMV.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12570328/mapstream
          against trunk revision .

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/3353//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12570328/mapstream against trunk revision . -1 patch . The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/3353//console This message is automatically generated.
          Hide
          PrateekM added a comment -

          Hi Jay,
          If its not splittable, how are you going to gain the benefit of using hadoop's infrastructure? The infrastructure would be busy making network IOs to build up the binary file from replicated sources and passing it a single mapper?

          Show
          PrateekM added a comment - Hi Jay, If its not splittable, how are you going to gain the benefit of using hadoop's infrastructure? The infrastructure would be busy making network IOs to build up the binary file from replicated sources and passing it a single mapper?
          Hide
          Jay Hacker added a comment -

          PrateekM, you're right, there are cases where it's not efficient. Consider this though: if you have 100 TB of files in HDFS that you want to md5sum (or what have you), would you rather do an inefficient distributed md5sum on the cluster, or copy 100 TB out to a single machine and wait for a single md5sum? Can you even fit that on one machine?

          You still gain reliability: there are multiple copies of each file, and failed jobs get restarted. It's also just convenient.

          Here's the trick to make it efficient: use many files, and set the block size of individual files big enough to fit the whole file:

          hadoop fs -D dfs.block.size=1073741824 -put ...

          Then all reads are local, and you get all the performance Hadoop can give you.

          Show
          Jay Hacker added a comment - PrateekM , you're right, there are cases where it's not efficient. Consider this though: if you have 100 TB of files in HDFS that you want to md5sum (or what have you), would you rather do an inefficient distributed md5sum on the cluster, or copy 100 TB out to a single machine and wait for a single md5sum? Can you even fit that on one machine? You still gain reliability: there are multiple copies of each file, and failed jobs get restarted. It's also just convenient. Here's the trick to make it efficient: use many files, and set the block size of individual files big enough to fit the whole file: hadoop fs -D dfs.block.size=1073741824 -put ... Then all reads are local, and you get all the performance Hadoop can give you.
          Hide
          PrateekM added a comment -

          Yes in that case its fine..We are creating a modified version of JustBytesInputFormat that does the splits as we could split our binary data with FixedLength Record sizes.Thanks for JustBytes!

          One more query, at places our data contains \n and \r characters as part of the binary data and we dont want the stdin to interpret these characters, since its corrupts the data once it reaches the mapper.
          Is there anything that can be done? I dont want to hexencode it before writing it to the stream to the mapper..

          Show
          PrateekM added a comment - Yes in that case its fine..We are creating a modified version of JustBytesInputFormat that does the splits as we could split our binary data with FixedLength Record sizes.Thanks for JustBytes! One more query, at places our data contains \n and \r characters as part of the binary data and we dont want the stdin to interpret these characters, since its corrupts the data once it reaches the mapper. Is there anything that can be done? I dont want to hexencode it before writing it to the stream to the mapper..
          Hide
          Jay Hacker added a comment -

          You're welcome!

          It might be easier to just split your inputs yourself before putting them in HDFS (see split(1)), but perhaps your files are already in HDFS.

          JustBytes shouldn't modify or interpret your data at all; it reads an entire file in binary, gives those exact bytes to your mapper, and writes out the exact bytes your mapper gives. It does not know or care about newlines. I would encourage you to run md5sum on your data outside HDFS and via mapstream to verify that it is not changing your data at all, and let me know if it is.

          Show
          Jay Hacker added a comment - You're welcome! It might be easier to just split your inputs yourself before putting them in HDFS (see split(1) ), but perhaps your files are already in HDFS. JustBytes shouldn't modify or interpret your data at all; it reads an entire file in binary, gives those exact bytes to your mapper, and writes out the exact bytes your mapper gives. It does not know or care about newlines. I would encourage you to run md5sum on your data outside HDFS and via mapstream to verify that it is not changing your data at all, and let me know if it is.
          Hide
          Steven Willis added a comment -

          A patch for the 1.1 branch

          Show
          Steven Willis added a comment - A patch for the 1.1 branch
          Hide
          Steven Willis added a comment -

          New patch with tests

          Show
          Steven Willis added a comment - New patch with tests
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12644886/MAPREDUCE-5018.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 1 new or modified test files.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 javadoc. The javadoc tool appears to have generated 2 warning messages.
          See https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//artifact/trunk/patchprocess/diffJavadocWarnings.txt for details.

          +1 eclipse:eclipse. The patch built with eclipse:eclipse.

          +1 findbugs. The patch does not introduce any new Findbugs (version 1.3.9) warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core hadoop-tools/hadoop-streaming.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//testReport/
          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12644886/MAPREDUCE-5018.patch against trunk revision . +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. +1 javac . The applied patch does not increase the total number of javac compiler warnings. -1 javadoc . The javadoc tool appears to have generated 2 warning messages. See https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//artifact/trunk/patchprocess/diffJavadocWarnings.txt for details. +1 eclipse:eclipse . The patch built with eclipse:eclipse. +1 findbugs . The patch does not introduce any new Findbugs (version 1.3.9) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core hadoop-tools/hadoop-streaming. +1 contrib tests . The patch passed contrib unit tests. Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//testReport/ Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4662//console This message is automatically generated.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org
          against trunk revision 47f7f18.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/5260//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org against trunk revision 47f7f18. -1 patch . The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/5260//console This message is automatically generated.

            People

            • Assignee:
              Steven Willis
              Reporter:
              Jay Hacker
            • Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:

                Development