Hadoop Common
  1. Hadoop Common
  2. HADOOP-4640

Add ability to split text files compressed with lzo

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Trivial Trivial
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.20.0
    • Component/s: io
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Right now any file compressed with lzop will be processed by one mapper. This is a shame since the lzo algorithm would be very suitable for large log files and similar common hadoop data sets. The compression rate is not the best out there but the decompression speed is amazing. Since lzo writes compressed data in blocks it would be possible to make an input format that can split the files.

      1. HADOOP-4640.patch
        21 kB
        Johan Oskarsson
      2. HADOOP-4640.patch
        20 kB
        Johan Oskarsson
      3. HADOOP-4640.patch
        19 kB
        Johan Oskarsson
      4. HADOOP-4640.patch
        17 kB
        Johan Oskarsson

        Activity

        Hide
        Hudson added a comment -

        Integrated in Hadoop-trunk #670 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/670/)
        . Adds an input format that can split lzo compressed text files. (johan)

        Show
        Hudson added a comment - Integrated in Hadoop-trunk #670 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/670/ ) . Adds an input format that can split lzo compressed text files. (johan)
        Hide
        Johan Oskarsson added a comment -

        I just committed this.
        The test failure isn't related to this patch. I created this issue for it HADOOP-4716

        Show
        Johan Oskarsson added a comment - I just committed this. The test failure isn't related to this patch. I created this issue for it HADOOP-4716
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12394241/HADOOP-4640.patch
        against trunk revision 719787.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 3 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        -1 findbugs. The patch appears to introduce 1 new Findbugs warnings.

        +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

        -1 core tests. The patch failed core unit tests.

        +1 contrib tests. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12394241/HADOOP-4640.patch against trunk revision 719787. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3642/console This message is automatically generated.
        Hide
        Johan Oskarsson added a comment -

        Submitting to hudson now that the queue has disappeared.

        Show
        Johan Oskarsson added a comment - Submitting to hudson now that the queue has disappeared.
        Hide
        Johan Oskarsson added a comment -

        I don't think it's worth calling getPos(), as you say it shouldn't cause any issues the way it is now. TextInputFormat does it in the same way.

        Show
        Johan Oskarsson added a comment - I don't think it's worth calling getPos(), as you say it shouldn't cause any issues the way it is now. TextInputFormat does it in the same way.
        Hide
        Chris Douglas added a comment -

        I hadn't noticed the findbugs warning. I suppose odd/stale/incorrect results from getProgress are mostly benign. Do you think it's worth calling getPos() from getProgress instead of using the pos field? That should resolve the findbugs warning.

        Show
        Chris Douglas added a comment - I hadn't noticed the findbugs warning. I suppose odd/stale/incorrect results from getProgress are mostly benign. Do you think it's worth calling getPos() from getProgress instead of using the pos field? That should resolve the findbugs warning.
        Hide
        Johan Oskarsson added a comment -

        Local test-patch gives one findbugs error as expected (synchronization). All unit tests pass.
        There is a hudson run queued up for the previous version of the patch, not sure how to cancel that.

        [exec] -1 overall.
        [exec]
        [exec] +1 @author. The patch does not contain any @author tags.
        [exec]
        [exec] +1 tests included. The patch appears to include 3 new or modified tests.
        [exec]
        [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
        [exec]
        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
        [exec]
        [exec] -1 findbugs. The patch appears to introduce 1 new Findbugs warnings.
        [exec]
        [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
        [exec]

        Show
        Johan Oskarsson added a comment - Local test-patch gives one findbugs error as expected (synchronization). All unit tests pass. There is a hudson run queued up for the previous version of the patch, not sure how to cancel that. [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. [exec]
        Hide
        Chris Douglas added a comment -

        +1 Patch looks good.

        You might want to try running test-patch and the unit tests on your machine; Hudson looks backed up.

        Show
        Chris Douglas added a comment - +1 Patch looks good. You might want to try running test-patch and the unit tests on your machine; Hudson looks backed up.
        Hide
        Johan Oskarsson added a comment -

        Have added unit test for the LzoIndex issue, seems the other test still worked since the splits just got shifted one block. Corrected the method to work as described now.

        Show
        Johan Oskarsson added a comment - Have added unit test for the LzoIndex issue, seems the other test still worked since the splits just got shifted one block. Corrected the method to work as described now.
        Hide
        Chris Douglas added a comment -

        As for the close() I did as suggested, although it rubs me the wrong way to read all those bytes without needing to. I guess the practical performance impact will be minimal though.

        It's only calculating a checksum of the remaining bytes from a direct buffer. For the default 64k block, I'd guess it adds somewhere between 20 and 50ms in the close. If it had to make another trip to the native code, I agree that would be improper, but this should be a trivial cost.

        I'm not sure I follow LzoIndex::findIndexPosition. Given {0, 5, 10, 15} as block positions, findIndexPosition(1) will return 10, but findIndexPosition(5) returns 5. Should the former case also return 5? findIndexPosition(11) returns -1, which also seems contrary to its javadoc explanation.

        Show
        Chris Douglas added a comment - As for the close() I did as suggested, although it rubs me the wrong way to read all those bytes without needing to. I guess the practical performance impact will be minimal though. It's only calculating a checksum of the remaining bytes from a direct buffer. For the default 64k block, I'd guess it adds somewhere between 20 and 50ms in the close. If it had to make another trip to the native code, I agree that would be improper, but this should be a trivial cost. I'm not sure I follow LzoIndex::findIndexPosition. Given { 0, 5, 10, 15 } as block positions, findIndexPosition(1) will return 10, but findIndexPosition(5) returns 5. Should the former case also return 5? findIndexPosition(11) returns -1, which also seems contrary to its javadoc explanation.
        Hide
        Johan Oskarsson added a comment -

        Replaced the TreeSet with long[], fixed the incorrect checksum count, fixed the indexer loop termination.
        As for the close() I did as suggested, although it rubs me the wrong way to read all those bytes without needing to. I guess the practical performance impact will be minimal though.

        Show
        Johan Oskarsson added a comment - Replaced the TreeSet with long[], fixed the incorrect checksum count, fixed the indexer loop termination. As for the close() I did as suggested, although it rubs me the wrong way to read all those bytes without needing to. I guess the practical performance impact will be minimal though.
        Hide
        Chris Douglas added a comment -

        Will only skip verifying the checksums in the close method if we haven't decompressed the whole block. That block will be verified by another split later anyway.

        The data is already decompressed, but it hasn't been read out of the codec's buffer. Adding a new, public method instead of calculating the checksum for the remainder of the buffered block seems like the wrong tradeoff. something like:

        public void close() throws IOException {
          byte[] b = new byte[4096];
          while (!decompressor.finished()) {
            decompressor.decompress(b, 0, b.length);
          }
          super.close();
          verifyChecksums();
        }
        

        should work, right? Allocating in the close is less optimal than, say, passing the Checksum object to the codec, but this requires fewer changes to the interfaces.

        • Using a TreeSet of Long seems unnecessary when the indices are sorted. Since the number of blocks stored in the index can be calculated from its length, a type wrapping a long[] seems more appropriate (the member function on said type can use Arrays::binarySearch instead of TreeSet::ceiling).
        • It doesn't need to be part of this patch, but it's worth noting that splittable lzop inputs will create hot spots of the blocks storing the headers. If this were abstracted, then the split could be annotated with the properties of the file and the RecordReader initialized with block properties.
        • The count of checksums should include both compressed and decompressed checksums.
        • Instead of pos + 8 in createIndex, it would make more sense to record the position in the stream after reading the two ints (so skipping the block uses the more readable pos + compressedBlockSize + 4 * numChecksums).
        • The only termination condition in LzoTextInputFormat::createIndex is uncompressedBlockSize == 0. Values < 0 for uncompressedBlockSize should throw EOFException while values <= 0 for compressedBlockSize should throw IOException.
        Show
        Chris Douglas added a comment - Will only skip verifying the checksums in the close method if we haven't decompressed the whole block. That block will be verified by another split later anyway. The data is already decompressed, but it hasn't been read out of the codec's buffer. Adding a new, public method instead of calculating the checksum for the remainder of the buffered block seems like the wrong tradeoff. something like: public void close() throws IOException { byte [] b = new byte [4096]; while (!decompressor.finished()) { decompressor.decompress(b, 0, b.length); } super .close(); verifyChecksums(); } should work, right? Allocating in the close is less optimal than, say, passing the Checksum object to the codec, but this requires fewer changes to the interfaces. Using a TreeSet of Long seems unnecessary when the indices are sorted. Since the number of blocks stored in the index can be calculated from its length, a type wrapping a long[] seems more appropriate (the member function on said type can use Arrays::binarySearch instead of TreeSet::ceiling). It doesn't need to be part of this patch, but it's worth noting that splittable lzop inputs will create hot spots of the blocks storing the headers. If this were abstracted, then the split could be annotated with the properties of the file and the RecordReader initialized with block properties. The count of checksums should include both compressed and decompressed checksums. Instead of pos + 8 in createIndex, it would make more sense to record the position in the stream after reading the two ints (so skipping the block uses the more readable pos + compressedBlockSize + 4 * numChecksums ). The only termination condition in LzoTextInputFormat::createIndex is uncompressedBlockSize == 0. Values < 0 for uncompressedBlockSize should throw EOFException while values <= 0 for compressedBlockSize should throw IOException.
        Hide
        Johan Oskarsson added a comment -

        Updated patch with most of the suggestions incorporated.

        • Will continue if the index is missing with the whole file as one split
        • Will only skip verifying the checksums in the close method if we haven't decompressed the whole block. That block will be verified by another split later anyway.
        • Removed lzop from the codecs list in the config
        • The indexer method is now aware of the number of checksum algorithms used so it seeks to the next block properly
        • Changed the unit test to write a lzop compressed file, index and read it back again
        • As suggested the RecordReaders don't have to read the index, it's done when getting the splits instead

        I haven't done any work on an output format, I'd rather leave that for another ticket since it will require more extensive modifications of the compression classes. The option I'm leaning towards is to register a class that implements an Indexer interface in the stream classes (LzopOutputStream and BlockCompressorStream).

        As before this will give one findbugs error.

        Show
        Johan Oskarsson added a comment - Updated patch with most of the suggestions incorporated. Will continue if the index is missing with the whole file as one split Will only skip verifying the checksums in the close method if we haven't decompressed the whole block. That block will be verified by another split later anyway. Removed lzop from the codecs list in the config The indexer method is now aware of the number of checksum algorithms used so it seeks to the next block properly Changed the unit test to write a lzop compressed file, index and read it back again As suggested the RecordReaders don't have to read the index, it's done when getting the splits instead I haven't done any work on an output format, I'd rather leave that for another ticket since it will require more extensive modifications of the compression classes. The option I'm leaning towards is to register a class that implements an Indexer interface in the stream classes (LzopOutputStream and BlockCompressorStream). As before this will give one findbugs error.
        Hide
        Owen O'Malley added a comment -

        Canceling until Chris' comments are addressed.

        Show
        Owen O'Malley added a comment - Canceling until Chris' comments are addressed.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12393793/HADOOP-4640.patch
        against trunk revision 713612.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 3 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        -1 findbugs. The patch appears to introduce 1 new Findbugs warnings.

        +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

        +1 core tests. The patch passed core unit tests.

        +1 contrib tests. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/testReport/
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12393793/HADOOP-4640.patch against trunk revision 713612. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3583/console This message is automatically generated.
        Hide
        Chris Douglas added a comment -

        Good idea

        • On LzopCodec: Removing the unused bufferSize field is clearly useful. The condition protected against by decompressedWholeBlock is best left to close() and not verifyChecksum, though... right? It might be better if this were to finish reading the block and verify the checksum rather than ignoring it.
        • LzopCodec was removed from the default list of codecs, per HADOOP-4030
        • +1 for an OutputFormat
        • The size of each block (including checksums) depends on the codecs specified in the header; LzoTextInputFormat::index assumes only one checksum per block, which may not be the case:
          +        is.seek(pos + compressedBlockSize + 4); // crc int?
          
        • Each RecordReader doesn't need to slurp and sort the full index. If each FileSplit were guaranteed to point to the beginning of a block, all the splits could be generated by the client using the index.
        Show
        Chris Douglas added a comment - Good idea On LzopCodec: Removing the unused bufferSize field is clearly useful. The condition protected against by decompressedWholeBlock is best left to close() and not verifyChecksum, though... right? It might be better if this were to finish reading the block and verify the checksum rather than ignoring it. LzopCodec was removed from the default list of codecs, per HADOOP-4030 +1 for an OutputFormat The size of each block (including checksums) depends on the codecs specified in the header; LzoTextInputFormat::index assumes only one checksum per block, which may not be the case: + is.seek(pos + compressedBlockSize + 4); // crc int? Each RecordReader doesn't need to slurp and sort the full index. If each FileSplit were guaranteed to point to the beginning of a block, all the splits could be generated by the client using the index.
        Hide
        Doug Cutting added a comment -

        > What is our policy on this?

        I don't know that we have a clear policy. In this case, I think it would be fine for the tests to succeed with a warning if native code is not available. Ideally we should have tests that are only run when native code is available.

        A few questions:

        • Should the InputFormat require the index, as in your patch, or rather should it degrade gracefully, so that if indexes do not exist it creates a single split per file?
        • It would be great to have an OutputFormat that creates indexes as files are written. Is that possible?
        Show
        Doug Cutting added a comment - > What is our policy on this? I don't know that we have a clear policy. In this case, I think it would be fine for the tests to succeed with a warning if native code is not available. Ideally we should have tests that are only run when native code is available. A few questions: Should the InputFormat require the index, as in your patch, or rather should it degrade gracefully, so that if indexes do not exist it creates a single split per file? It would be great to have an OutputFormat that creates indexes as files are written. Is that possible?
        Hide
        Johan Oskarsson added a comment -

        Previous file was the patch

        Show
        Johan Oskarsson added a comment - Previous file was the patch
        Hide
        Johan Oskarsson added a comment -

        First version of the lzo splittable input format. Please review.

        I decided to write a unit test that doesn't require the lzo native libs to be loaded, it's not ideal but it works.
        The other option would be to write one that needs the native libs and otherwise doesn't run the test. What is our policy on this?

        There will probably be one findbugs error, the same one exists in the LineRecordReader I based this off, I don't think it will cause any harm.

        Show
        Johan Oskarsson added a comment - First version of the lzo splittable input format. Please review. I decided to write a unit test that doesn't require the lzo native libs to be loaded, it's not ideal but it works. The other option would be to write one that needs the native libs and otherwise doesn't run the test. What is our policy on this? There will probably be one findbugs error, the same one exists in the LineRecordReader I based this off, I don't think it will cause any harm.
        Hide
        Johan Oskarsson added a comment -

        I've got a working input format that can split lzo files. It requires an index of the file to be created before the lzo file can be split.
        In a fairly non scientific experiment I got 30% performance increase using this compared to reading the uncompressed files on our 20 node cluster.

        I need to clean up and test the code a bit before I'll submit it.

        Show
        Johan Oskarsson added a comment - I've got a working input format that can split lzo files. It requires an index of the file to be created before the lzo file can be split. In a fairly non scientific experiment I got 30% performance increase using this compared to reading the uncompressed files on our 20 node cluster. I need to clean up and test the code a bit before I'll submit it.

          People

          • Assignee:
            Johan Oskarsson
            Reporter:
            Johan Oskarsson
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development