Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.15.0
    • Fix Version/s: 0.18.0
    • Component/s: None
    • Labels:
      None
    • Release Note:
      Reduced in-memory copies of keys and values as they flow through the Map-Reduce framework. Changed the storage of intermediate map outputs to use new IFile instead of SequenceFile for better compression.

      Description

      One of the reducers of my job failed with the following exceptions.
      The failure caused the whole job fail eventually.
      Java heapsize was 768MB and sort.io.mb was 140.

      2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
      at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
      at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
      at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
      at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
      at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
      at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
      at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
      at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
      2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
      2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
      2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
      2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
      2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
      at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
      at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
      2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
      2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
      2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
      java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
      at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
      at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
      2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
      2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
      2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
      at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
      at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
      at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
      at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

      1. HADOOP-2095_debug.patch
        2 kB
        Arun C Murthy
      2. HADOOP-2095_CompressedBytesWithCodecPool.patch
        4 kB
        Arun C Murthy
      3. HADOOP-2095_2_20080604.patch
        160 kB
        Arun C Murthy
      4. HADOOP-2095_2_20080604.patch
        161 kB
        Arun C Murthy
      5. HADOOP-2095_2_20080604.patch
        161 kB
        Arun C Murthy
      6. HADOOP-2095_2_20080604.patch
        162 kB
        Arun C Murthy
      7. HADOOP-2095_2_20080604.patch
        162 kB
        Arun C Murthy

        Issue Links

          Activity

          Hide
          Runping Qi added a comment -

          The problem was gone after I set the compressMapOutput attribute to false.

          Show
          Runping Qi added a comment - The problem was gone after I set the compressMapOutput attribute to false.
          Hide
          Devaraj Das added a comment -

          Is it true that native compression was in use when you saw the problem?

          Show
          Devaraj Das added a comment - Is it true that native compression was in use when you saw the problem?
          Hide
          Runping Qi added a comment -

          Yes.

          Show
          Runping Qi added a comment - Yes.
          Hide
          Devaraj Das added a comment -

          My hunch is this that we are ending up with too many files in the ramfs (the map outputs for the failing reduce are small in size). So when merge is initiated on those files, we end up creating too many codecs for decompressing the files, and since they are native, we encounter OOM. So one way to validate this is to decrease the value of mapred.inmem.merge.threshold (defaults to 1000) to something like 200 and then see whether the OOM goes away. The other alternative is to increase the heap size per task.

          Show
          Devaraj Das added a comment - My hunch is this that we are ending up with too many files in the ramfs (the map outputs for the failing reduce are small in size). So when merge is initiated on those files, we end up creating too many codecs for decompressing the files, and since they are native, we encounter OOM. So one way to validate this is to decrease the value of mapred.inmem.merge.threshold (defaults to 1000) to something like 200 and then see whether the OOM goes away. The other alternative is to increase the heap size per task.
          Hide
          Christian Kunz added a comment -

          FYI, I ran into the exactly same issue with massive failures (nearly all reduces), using a 0 value for mapred.inmem.merge.threshold (letting the framework select the threshold)

          using native compression

          1GB of heap space, 1350 nodes

          mapred.inmem.merge.threshold 0
          mapred.reduce.parallel.copies 20
          tasktracker.http.threads 30
          mapred.map.tasks 13008
          mapred.reduce.tasks 3600
          fs.inmemory.size.mb 200
          io.seqfile.sorter.recordlimit 1000000
          io.sort.mb 200
          io.sort.factor 300
          mapred.map.output.compression.type RECORD
          mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
          mapred.compress.map.output true

          Show
          Christian Kunz added a comment - FYI, I ran into the exactly same issue with massive failures (nearly all reduces), using a 0 value for mapred.inmem.merge.threshold (letting the framework select the threshold) using native compression 1GB of heap space, 1350 nodes mapred.inmem.merge.threshold 0 mapred.reduce.parallel.copies 20 tasktracker.http.threads 30 mapred.map.tasks 13008 mapred.reduce.tasks 3600 fs.inmemory.size.mb 200 io.seqfile.sorter.recordlimit 1000000 io.sort.mb 200 io.sort.factor 300 mapred.map.output.compression.type RECORD mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec mapred.compress.map.output true
          Hide
          Devaraj Das added a comment -

          Christian, please don't use 0 for mapred.inmem.merge.threshold. It basically disables the threshold for initiating merge, and, in this case, merge on the ramfs files gets triggered only when the ramfs has collected enough bytes of data (actually 50% of fs.inmemory.size.mb). Depending on how big each map output is, we will accumulate that many files there. For each file, a native codec will be initialized, and I suspect that if we create too many codecs, we will encounter OOM (given a certain child heap size). So, I think it is worth trying the app with a reduced mapred.inmem.merge.threshold like 50/100, just to ensure that we keep a control on the number of codecs we create. Christian/Runping, could you please try this tweak and let us know? Thanks.

          Show
          Devaraj Das added a comment - Christian, please don't use 0 for mapred.inmem.merge.threshold. It basically disables the threshold for initiating merge, and, in this case, merge on the ramfs files gets triggered only when the ramfs has collected enough bytes of data (actually 50% of fs.inmemory.size.mb). Depending on how big each map output is, we will accumulate that many files there. For each file, a native codec will be initialized, and I suspect that if we create too many codecs, we will encounter OOM (given a certain child heap size). So, I think it is worth trying the app with a reduced mapred.inmem.merge.threshold like 50/100, just to ensure that we keep a control on the number of codecs we create. Christian/Runping, could you please try this tweak and let us know? Thanks.
          Hide
          Devaraj Das added a comment -

          Just to clarify, a codec-pool is maintained in the compression module. So, the number of codecs created will be in the order of mapred.inmem.merge.threshold.

          Show
          Devaraj Das added a comment - Just to clarify, a codec-pool is maintained in the compression module. So, the number of codecs created will be in the order of mapred.inmem.merge.threshold.
          Hide
          Devaraj Das added a comment -

          BTW, another thing that needs to be controlled is the io.sort.factor which would control the number of files that gets merged at once post the shuffle (the final merge of the on-disk files). The number of intermediate files opened would be as per the value of io.sort.factor and you might encounter OOM if that is large. So i would recommend that for working around this issue, the io.sort.factor value should be in the same order as the value of mapred.inmem.merge.threshold (that successfully does ramfs merges during the shuffle).

          Show
          Devaraj Das added a comment - BTW, another thing that needs to be controlled is the io.sort.factor which would control the number of files that gets merged at once post the shuffle (the final merge of the on-disk files). The number of intermediate files opened would be as per the value of io.sort.factor and you might encounter OOM if that is large. So i would recommend that for working around this issue, the io.sort.factor value should be in the same order as the value of mapred.inmem.merge.threshold (that successfully does ramfs merges during the shuffle).
          Hide
          Owen O'Malley added a comment -

          Christian,
          Can you generate a set of task files for the reduce with the failure with keep.failed.task.files set to true, so that we can run it in the isolation runner and a profiler?

          Show
          Owen O'Malley added a comment - Christian, Can you generate a set of task files for the reduce with the failure with keep.failed.task.files set to true, so that we can run it in the isolation runner and a profiler?
          Hide
          Devaraj Das added a comment -

          Noticed that codec pool is not used in o.a.h.i.SequenceFile.CompressedBytes.writeUncompressedBytes. This needs to be fixed.

          Show
          Devaraj Das added a comment - Noticed that codec pool is not used in o.a.h.i.SequenceFile.CompressedBytes.writeUncompressedBytes. This needs to be fixed.
          Hide
          Arun C Murthy added a comment -

          Christian/Runping - Could you please re-run your jobs using this debug patch? It just logs the no. of created native codecs and the stack trace to check where they are being created from. Thanks!

          Meanwhile I'll plug away and see how to fix SequenceFile.CompressedBytes as Devaraj pointed out...

          Show
          Arun C Murthy added a comment - Christian/Runping - Could you please re-run your jobs using this debug patch? It just logs the no. of created native codecs and the stack trace to check where they are being created from. Thanks! Meanwhile I'll plug away and see how to fix SequenceFile.CompressedBytes as Devaraj pointed out...
          Hide
          Arun C Murthy added a comment -

          Here is an early patch which fixed SequenceFile.CompressedBytes to use a CodecPool passed along by the SequenceFile.Reader...

          Christian/Runping -I'd appreciate if you could try this patch alongwith the previous debug patch while I try to test it at my end. Thanks!

          Show
          Arun C Murthy added a comment - Here is an early patch which fixed SequenceFile.CompressedBytes to use a CodecPool passed along by the SequenceFile.Reader ... Christian/Runping -I'd appreciate if you could try this patch alongwith the previous debug patch while I try to test it at my end. Thanks!
          Hide
          Arun C Murthy added a comment -

          Sorry, previous patch had the leading path wrong...

          Show
          Arun C Murthy added a comment - Sorry, previous patch had the leading path wrong...
          Hide
          Arun C Murthy added a comment -

          Sorry, previous patch had the leading path wrong...

          Show
          Arun C Murthy added a comment - Sorry, previous patch had the leading path wrong...
          Hide
          Christian Kunz added a comment -

          I still see failures after shuffling during final sort:

          java.lang.OutOfMemoryError: Java heap space
          at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:52)
          at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:90)
          at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:1535)
          at org.apache.hadoop.io.SequenceFile$Reader.readBlock(SequenceFile.java:1574)
          at org.apache.hadoop.io.SequenceFile$Reader.nextRawKey(SequenceFile.java:1878)
          at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2894)
          at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694)
          at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478)
          at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298)
          at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049)

          or

          java.lang.OutOfMemoryError: Java heap space
          at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
          at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
          at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1480)
          at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1379)
          at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1302)
          at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2877)
          at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694)
          at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478)
          at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298)
          at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049)

          Configuration:

          using native compression
          1GB of heap space, 1350 nodes
          mapred.inmem.merge.threshold 1000
          mapred.reduce.parallel.copies 10
          tasktracker.http.threads 10
          mapred.map.tasks 2500
          mapred.reduce.tasks 2500
          fs.inmemory.size.mb 200
          io.seqfile.sorter.recordlimit 1000000
          io.sort.mb 200
          io.sort.factor 1000
          mapred.map.output.compression.type BLOCK
          mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
          mapred.compress.map.output true

          I tried 2 runs:
          1) io.seqfile.compress.blocksize = 1000000 --> 1084 successful reduces, 935 failures
          2) io.seqfile.compress.blocksize = 131072 --> 2286 successful reduces, 1032 failures

          The failures all seem to occur after shuffling, in the final merge-sort. Because the patch uses a pool of codecs I thought I should be able to keep a high sort.factor (to reduce the amount of multi-phasic merge-sort).

          Show
          Christian Kunz added a comment - I still see failures after shuffling during final sort: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:52) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:90) at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:1535) at org.apache.hadoop.io.SequenceFile$Reader.readBlock(SequenceFile.java:1574) at org.apache.hadoop.io.SequenceFile$Reader.nextRawKey(SequenceFile.java:1878) at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2894) at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694) at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298) at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049) or java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43) at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1480) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1379) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1302) at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2877) at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694) at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298) at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049) Configuration: using native compression 1GB of heap space, 1350 nodes mapred.inmem.merge.threshold 1000 mapred.reduce.parallel.copies 10 tasktracker.http.threads 10 mapred.map.tasks 2500 mapred.reduce.tasks 2500 fs.inmemory.size.mb 200 io.seqfile.sorter.recordlimit 1000000 io.sort.mb 200 io.sort.factor 1000 mapred.map.output.compression.type BLOCK mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec mapred.compress.map.output true I tried 2 runs: 1) io.seqfile.compress.blocksize = 1000000 --> 1084 successful reduces, 935 failures 2) io.seqfile.compress.blocksize = 131072 --> 2286 successful reduces, 1032 failures The failures all seem to occur after shuffling, in the final merge-sort. Because the patch uses a pool of codecs I thought I should be able to keep a high sort.factor (to reduce the amount of multi-phasic merge-sort).
          Hide
          Arun C Murthy added a comment -

          Thanks for trying this out Christian! Do you have the logs of the failed tasks somewhere i.e. the syslog file?

          Clearly this needs more work... sigh.

          Show
          Arun C Murthy added a comment - Thanks for trying this out Christian! Do you have the logs of the failed tasks somewhere i.e. the syslog file? Clearly this needs more work... sigh.
          Hide
          Christian Kunz added a comment -

          Logs are available. I send you the pointers offline.

          Show
          Christian Kunz added a comment - Logs are available. I send you the pointers offline.
          Hide
          Runping Qi added a comment -

          1000 as merge factor seems very high.
          How much data each reducer isexpected to process?

          Show
          Runping Qi added a comment - 1000 as merge factor seems very high. How much data each reducer isexpected to process?
          Hide
          Christian Kunz added a comment -

          With compression turned off we never had any problems with high merge factor (which reduces time spent in merging). Concerning the amount of data processed by reducers, the particular job has a few (imbalanced) reducers processing 100+ GB uncompressed. With block compression turned on we were hoping to reduce the merging even further.

          Show
          Christian Kunz added a comment - With compression turned off we never had any problems with high merge factor (which reduces time spent in merging). Concerning the amount of data processed by reducers, the particular job has a few (imbalanced) reducers processing 100+ GB uncompressed. With block compression turned on we were hoping to reduce the merging even further.
          Hide
          Doug Cutting added a comment -

          1000 is a much higher merge factor than is expected by the design.

          Show
          Doug Cutting added a comment - 1000 is a much higher merge factor than is expected by the design.
          Hide
          Arun C Murthy added a comment -

          I'm removing this as a blocker for 0.16.1 while we continue discussions...

          Show
          Arun C Murthy added a comment - I'm removing this as a blocker for 0.16.1 while we continue discussions...
          Hide
          eric baldeschwieler added a comment -

          What is the RAM overhead of an active decompressor? We can probably compute a reasonable merge size from that. If we are seeing lots of very small files, maybe we should consider just working with the uncompressed data for small inputs? That may use less total RAM and it should be easy to determine that on the fly.

          You could cap the number of compressed files in RAM and when you add a new file you could choose to uncompress the smallest file you have to allow you to stay under the limit. You could also merge and recompress your small files, this obviously uses a lot more CPU.

          Show
          eric baldeschwieler added a comment - What is the RAM overhead of an active decompressor? We can probably compute a reasonable merge size from that. If we are seeing lots of very small files, maybe we should consider just working with the uncompressed data for small inputs? That may use less total RAM and it should be easy to determine that on the fly. You could cap the number of compressed files in RAM and when you add a new file you could choose to uncompress the smallest file you have to allow you to stay under the limit. You could also merge and recompress your small files, this obviously uses a lot more CPU.
          Hide
          Runping Qi added a comment -

          I'd like to elaborate a bit alone Eric's comment.

          The root cause of the current problem is that when we merge too many small segments,
          we need to allocate too many codecs, thus run out of memory.
          The main point is to treat large segments and small segments differently.
          It is intuitive that merging a lot of small segments incurrs too much overhead and may be inferior to quick sort.

          Let's say we have a fixed memory budget for in mem merge of fetched map output.
          When we fetch a small segment, we extract the records out of the segment and put in an area for future quick sort.
          When we fetch a big segment, we leave it in the in mem file.
          When the number of in-mem file reach certain limit (say 100), or the total memory consumption reach the budget, we sort the extracted
          records, and merge them along with the inmem files.

          This way, we can guarantee that we will not exceed the total memory budget.

          Thoughts?

          Show
          Runping Qi added a comment - I'd like to elaborate a bit alone Eric's comment. The root cause of the current problem is that when we merge too many small segments, we need to allocate too many codecs, thus run out of memory. The main point is to treat large segments and small segments differently. It is intuitive that merging a lot of small segments incurrs too much overhead and may be inferior to quick sort. Let's say we have a fixed memory budget for in mem merge of fetched map output. When we fetch a small segment, we extract the records out of the segment and put in an area for future quick sort. When we fetch a big segment, we leave it in the in mem file. When the number of in-mem file reach certain limit (say 100), or the total memory consumption reach the budget, we sort the extracted records, and merge them along with the inmem files. This way, we can guarantee that we will not exceed the total memory budget. Thoughts?
          Hide
          eric baldeschwieler added a comment -

          I would like to suggest a very simple improvement.

          1) Compute the maximum number of usable decompressors.
          2) Download splits until ram is full, or we have reached the limit.
          3) If ram is not full
          b) continue downloading splits, but now decompress them as they are loaded

          4) Now merge and dump all the splits, decompressing the first N on the fly

          This is very simple and works in almost all cases. A refinement would be to decompress the smallest split each time you load a new split beyond the merge limit.

          The above seems like it would be very simple to code and would work well in the face of large splits (the merge limit is not reached) and many small splits (many are merged in the first pass). It would be ok in the face of medium splits, which seems like the worst case.

          A more optimal algorithm would presumably merge in ram, compressing on the fly and so on, but this is very complex and has many corner cases.

          Show
          eric baldeschwieler added a comment - I would like to suggest a very simple improvement. 1) Compute the maximum number of usable decompressors. 2) Download splits until ram is full, or we have reached the limit. 3) If ram is not full b) continue downloading splits, but now decompress them as they are loaded 4) Now merge and dump all the splits, decompressing the first N on the fly This is very simple and works in almost all cases. A refinement would be to decompress the smallest split each time you load a new split beyond the merge limit. — The above seems like it would be very simple to code and would work well in the face of large splits (the merge limit is not reached) and many small splits (many are merged in the first pass). It would be ok in the face of medium splits, which seems like the worst case. A more optimal algorithm would presumably merge in ram, compressing on the fly and so on, but this is very complex and has many corner cases.
          Hide
          Arun C Murthy added a comment -

          +1 for Eric's proposal.

          A minor refinement to discuss:

          3) If ram is not full
          c) initiate the merge with the in-memory compressed files which fit the quota; continue downloading the compressed splits and stuff the compressed split as-is in the InMemoryFileSystem (on ram).

          The next pass of the merge will pick it up and run with it. It has the advantage of saving ram and keeping the merge-code simple: each split is either compressed (compression enabled) or not (compression disabled).

          Thoughts?

          Show
          Arun C Murthy added a comment - +1 for Eric's proposal. A minor refinement to discuss: 3) If ram is not full c) initiate the merge with the in-memory compressed files which fit the quota; continue downloading the compressed splits and stuff the compressed split as-is in the InMemoryFileSystem (on ram). The next pass of the merge will pick it up and run with it. It has the advantage of saving ram and keeping the merge-code simple: each split is either compressed (compression enabled) or not (compression disabled). Thoughts?
          Hide
          eric baldeschwieler added a comment -

          I think that runs contrary to the goal of the exercise, which is to merge as many inputs at once as possible (with as little code complexity as possible). We don't want to add extra passes in the case where the inputs are very small.

          Possible refinements from there might include:

          • merging and compressing back into ram (might be very good in some cases)
          • Releasing ram as inputs are merged to allow interleaving of the reads without wasting ram

          PS We should be sure to not have the merge factor considered in this code! Merge factor if it should exist at all should be about the number of inputs from disk. Even that is pretty awkward.

          Show
          eric baldeschwieler added a comment - I think that runs contrary to the goal of the exercise, which is to merge as many inputs at once as possible (with as little code complexity as possible). We don't want to add extra passes in the case where the inputs are very small. Possible refinements from there might include: merging and compressing back into ram (might be very good in some cases) Releasing ram as inputs are merged to allow interleaving of the reads without wasting ram PS We should be sure to not have the merge factor considered in this code! Merge factor if it should exist at all should be about the number of inputs from disk. Even that is pretty awkward.
          Hide
          Owen O'Malley added a comment -

          It is worth noting that we'd need to reserve mapred.reduce.parallel.copies (default 5, but 20 on yahoo's clusters) extra codecs for decompressing on the fly. It isn't clear to me that it would be a win having the codec owned by the copier thread rather than the merger.

          You would also need to add a header that gives the uncompressed size of the map outputs, so that the reducer knows how big the uncompressed file is.

          Of course getting rid of the 4 compression codecs using something like HADOOP-3315, would help a lot.

          Show
          Owen O'Malley added a comment - It is worth noting that we'd need to reserve mapred.reduce.parallel.copies (default 5, but 20 on yahoo's clusters) extra codecs for decompressing on the fly. It isn't clear to me that it would be a win having the codec owned by the copier thread rather than the merger. You would also need to add a header that gives the uncompressed size of the map outputs, so that the reducer knows how big the uncompressed file is. Of course getting rid of the 4 compression codecs using something like HADOOP-3315 , would help a lot .
          Hide
          Arun C Murthy added a comment - - edited

          Somre refinements after further discusssions...

          Prologue: Use a compressed outputstream for the intermediate sequence files i.e. map-outputs, not

          {record|block}

          -compressed sequence files. This cuts down no. of decompressors required at the reducers. Add headers to ensure that the reducer can query each map to find out the exact compressed and uncompressed sizes before it actually copies the data.

          1) Compute the maximum number of usable decompressors (this is going to get tricky with direct-buffers taking up non-heap space which is a fraction of the -Xmx of the jvm).
          2) Download map-outputs until ramfs is full; or we have reached the decompressors' limit.
          3) Trigger InMemFSMergeThread to start the merge. (Currently a new InMemFSMergeThread created for every triggered merge, I plan to fix it so that we use one and only one thread.)
          4) If ramfs is full, suspend the shuffle; else keep shuffling into memory.

          Essentially the idea is that we pack as much into memory before we initiate the merge, this saves us trips to disk (the output of the merge) which, as Devaraj has shown (http://issues.apache.org/jira/browse/HADOOP-3297?focusedCommentId=12592816#action_12592816) leads to much better overall performance.

          Of course the above discussion is valid iff we are dealing with small map-outputs.

          For the contra-case where map-outputs are large, we need a threshold which says:
          If a map-output > 10% of ramfs, then shuffle into memory if possible (i.e. space available in ramfs), else shuffle to disk.
          This ensures that we do not needlessly throttle shuffle if map-outputs are too big to fit into ramfs.

          Thoughts?


          Before I jump in and make changes I'm currently trying to simulate the above behaviour and publish some numbers... watch this space.

          Show
          Arun C Murthy added a comment - - edited Somre refinements after further discusssions... Prologue: Use a compressed outputstream for the intermediate sequence files i.e. map-outputs, not {record|block} -compressed sequence files. This cuts down no. of decompressors required at the reducers. Add headers to ensure that the reducer can query each map to find out the exact compressed and uncompressed sizes before it actually copies the data. 1) Compute the maximum number of usable decompressors (this is going to get tricky with direct-buffers taking up non-heap space which is a fraction of the -Xmx of the jvm). 2) Download map-outputs until ramfs is full; or we have reached the decompressors' limit. 3) Trigger InMemFSMergeThread to start the merge. (Currently a new InMemFSMergeThread created for every triggered merge, I plan to fix it so that we use one and only one thread.) 4) If ramfs is full, suspend the shuffle; else keep shuffling into memory. Essentially the idea is that we pack as much into memory before we initiate the merge, this saves us trips to disk (the output of the merge) which, as Devaraj has shown ( http://issues.apache.org/jira/browse/HADOOP-3297?focusedCommentId=12592816#action_12592816 ) leads to much better overall performance. Of course the above discussion is valid iff we are dealing with small map-outputs. For the contra-case where map-outputs are large, we need a threshold which says: If a map-output > 10% of ramfs, then shuffle into memory if possible (i.e. space available in ramfs), else shuffle to disk. This ensures that we do not needlessly throttle shuffle if map-outputs are too big to fit into ramfs. Thoughts? Before I jump in and make changes I'm currently trying to simulate the above behaviour and publish some numbers... watch this space.
          Hide
          eric baldeschwieler added a comment -

          To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there.

          to expand on the above suggestions:

          I suggest that for objects larger than 25% of RAM, we always just send them directly to disk. A simple rule that let's us reason more easily about other case. I don't think the 10% number above can be replaced with thiis.

          We need to understand how we pause the copies without doing lots of polling. Again, I suggest keeping it simple for now. What about simply setting a global flag the first time a thread starts to read an input that is < 25% of buffer RAM (not piped directly to disk) and doesn't fit in the remaining space. Other readers will then pause until this semaphore is cleared. It is ok if races happen where a few threads try at the same time. If copies fail, we will need to clear this semaphore too.

          We want to be sure not to wait until RAM is totally full before starting the merge, because this might allow a single slow copy to brownout the system. I suggest a simple rule, such as wait until the semaphore discussed above is set and copies filling at least 50% of RAM have completed. Then merge.

          Once all of the above is done, we can file new jiras to improve things. Ideas include:

          • Freeing storage as we merge, so fetches can be interleaved
          • decompressing small segments as we read so we can increase the number of compressed objects merged
          • ...
          Show
          eric baldeschwieler added a comment - To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there. to expand on the above suggestions: I suggest that for objects larger than 25% of RAM, we always just send them directly to disk. A simple rule that let's us reason more easily about other case. I don't think the 10% number above can be replaced with thiis. We need to understand how we pause the copies without doing lots of polling. Again, I suggest keeping it simple for now. What about simply setting a global flag the first time a thread starts to read an input that is < 25% of buffer RAM (not piped directly to disk) and doesn't fit in the remaining space. Other readers will then pause until this semaphore is cleared. It is ok if races happen where a few threads try at the same time. If copies fail, we will need to clear this semaphore too. We want to be sure not to wait until RAM is totally full before starting the merge, because this might allow a single slow copy to brownout the system. I suggest a simple rule, such as wait until the semaphore discussed above is set and copies filling at least 50% of RAM have completed. Then merge. Once all of the above is done, we can file new jiras to improve things. Ideas include: Freeing storage as we merge, so fetches can be interleaved decompressing small segments as we read so we can increase the number of compressed objects merged ...
          Hide
          Arun C Murthy added a comment -

          To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there.

          +1

          Along a similar tangent, here is an even simpler proposal for a first-cut. Please bear in mind that these are a result of the fact that we have noticed that the merge code is actually a juicier target to fix; on large jobs we have noticed that reduces (which started after all maps were completed) were spending way more time in merge rather than in shuffle: 13mins in shuffle, 17mins in merge and 15mins in reduce. So, the idea to fix the OOM to ensure better reliability in a reasonably straight-forward, simple manner and then go after merge in HADOOP-3366.

          Here goes:
          1. Use a compressed stream for map-outputs, not

          {record|block}

          -compressed sequence-files. Ensure both compressed and decompressed sizes are available for the reduce before it actually shuffles the bytes.
          2. Decompress map-outputs as soon as they are shuffled into ramfs if there is enough space, else suspend shuffle as described above by Eric. This ensures that we never hit the #codecs limit, we just need as many codecs as the no. of threads doing the shuffle. The idea is that the merge-factor is anyway going to be limited by the #codecs, we might as well burn-up RAM. We could try and store the compressed outputs as a further refinement in a separate issue.
          3. If RAM is more than (say) 50% full, we start merging in-memory. Also, initially we should use up as much RAM as possible, allowing for some slack. Therefore I propose we do away with the fs.inmemory.size.mb config knob and use 3/4 or 2/3 of the heap-size available as the RAM limit.
          4. If the split is greater than 10% or 25% of available RAM limit, and there is on RAM available we shuffle directly to disk (compressed).
          5. The output of merge is compressed and written to disk, which potentially could be merged along with (4) above.

          Hopefully this is reasonably simple and coherent. I'll put up more thoughts on HADOOP-3366 about merge improvements, current pitfalls etc.

          Thoughts?

          Show
          Arun C Murthy added a comment - To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there. +1 Along a similar tangent, here is an even simpler proposal for a first-cut. Please bear in mind that these are a result of the fact that we have noticed that the merge code is actually a juicier target to fix; on large jobs we have noticed that reduces (which started after all maps were completed) were spending way more time in merge rather than in shuffle: 13mins in shuffle, 17mins in merge and 15mins in reduce. So, the idea to fix the OOM to ensure better reliability in a reasonably straight-forward, simple manner and then go after merge in HADOOP-3366 . Here goes: 1. Use a compressed stream for map-outputs, not {record|block} -compressed sequence-files. Ensure both compressed and decompressed sizes are available for the reduce before it actually shuffles the bytes. 2. Decompress map-outputs as soon as they are shuffled into ramfs if there is enough space, else suspend shuffle as described above by Eric. This ensures that we never hit the #codecs limit, we just need as many codecs as the no. of threads doing the shuffle. The idea is that the merge-factor is anyway going to be limited by the #codecs, we might as well burn-up RAM. We could try and store the compressed outputs as a further refinement in a separate issue. 3. If RAM is more than (say) 50% full, we start merging in-memory. Also, initially we should use up as much RAM as possible, allowing for some slack. Therefore I propose we do away with the fs.inmemory.size.mb config knob and use 3/4 or 2/3 of the heap-size available as the RAM limit. 4. If the split is greater than 10% or 25% of available RAM limit, and there is on RAM available we shuffle directly to disk (compressed). 5. The output of merge is compressed and written to disk, which potentially could be merged along with (4) above. Hopefully this is reasonably simple and coherent. I'll put up more thoughts on HADOOP-3366 about merge improvements, current pitfalls etc. Thoughts?
          Hide
          Devaraj Das added a comment -

          How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first.
          I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge?

          Show
          Devaraj Das added a comment - How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first. I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge?
          Hide
          Arun C Murthy added a comment -

          How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first.

          All threads wait on a single semaphore and we could do a 'notifyAll' ...

          I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge?

          The compression of merge-outputs is to reduce temporary disk usage on the reducer node. Also it has the nice property of keeping all data on disk compressed, since compressed splits which can't fit in-memory go straight to disk.

          Show
          Arun C Murthy added a comment - How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first. All threads wait on a single semaphore and we could do a 'notifyAll' ... I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge? The compression of merge-outputs is to reduce temporary disk usage on the reducer node. Also it has the nice property of keeping all data on disk compressed, since compressed splits which can't fit in-memory go straight to disk.
          Hide
          Runping Qi added a comment -

          We can expect better performance if we use lz0 as the codec for compressing the map outputs and the merge outputs.
          Some more benchmarking may be required to confirm this, though.

          Show
          Runping Qi added a comment - We can expect better performance if we use lz0 as the codec for compressing the map outputs and the merge outputs. Some more benchmarking may be required to confirm this, though.
          Hide
          Arun C Murthy added a comment -

          Here is a nearly complete patch (I need fix a couple of failing test-cases).

          Highlights:
          1. Rework sort/merge to use the new IFile rather than SequenceFiles.
          2. Compression for intermediate map-outputs now implies that the entire file is compressed, no more record/block compression. This helps the codec cost.
          3. Rework intermediate merge to ensure there are no spurious copies of keys/values.

          Benchmarks:
          I ran this with a single reducer job where 2500 maps produced 5MB of data each. Trunk takes 45 mins for the job to complete with the on-disk merge taking nearly 25-30mins and the final merge (as records are fed to 'reduce') taking 12-13mins. With this patch the on-disk merge takes 20-22mins and the final merge takes around 7mins, an overall improvement of nearly 30%.

          Show
          Arun C Murthy added a comment - Here is a nearly complete patch (I need fix a couple of failing test-cases). Highlights: 1. Rework sort/merge to use the new IFile rather than SequenceFiles. 2. Compression for intermediate map-outputs now implies that the entire file is compressed, no more record/block compression. This helps the codec cost. 3. Rework intermediate merge to ensure there are no spurious copies of keys/values. Benchmarks: I ran this with a single reducer job where 2500 maps produced 5MB of data each. Trunk takes 45 mins for the job to complete with the on-disk merge taking nearly 25-30mins and the final merge (as records are fed to 'reduce') taking 12-13mins. With this patch the on-disk merge takes 20-22mins and the final merge takes around 7mins, an overall improvement of nearly 30%.
          Hide
          Devaraj Das added a comment -

          I looked at most of the patch except MapTask. +1 on the patch subject to unit tests passing and the sort benchmark running well.

          Show
          Devaraj Das added a comment - I looked at most of the patch except MapTask. +1 on the patch subject to unit tests passing and the sort benchmark running well.
          Hide
          Arun C Murthy added a comment -

          Thanks for the review Devaraj!

          Here is an updated version of the patch with some minor changes...

          Show
          Arun C Murthy added a comment - Thanks for the review Devaraj! Here is an updated version of the patch with some minor changes...
          Hide
          Mahadev konar added a comment -

          just one comment:

          + LOG.info("Sent out " + totalRead + " bytes (starting from offset: " + startOffset + " of outputFile: " + mapOutputFileName + " for reduce: " + reduce + " from map: " + mapId + " given " + partLength + "/" + rawPartLength);

          you might want to wrap this around!!

          Show
          Mahadev konar added a comment - just one comment: + LOG.info("Sent out " + totalRead + " bytes (starting from offset: " + startOffset + " of outputFile: " + mapOutputFileName + " for reduce: " + reduce + " from map: " + mapId + " given " + partLength + "/" + rawPartLength); you might want to wrap this around!!
          Hide
          Arun C Murthy added a comment -

          Fixed the log statement... thanks for the review Mahadev!

          Show
          Arun C Murthy added a comment - Fixed the log statement... thanks for the review Mahadev!
          Hide
          Arun C Murthy added a comment -

          Pretty-fied another log message...

          Show
          Arun C Murthy added a comment - Pretty-fied another log message...
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12383426/HADOOP-2095_2_20080604.patch
          against trunk revision 663370.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 12 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          -1 javac. The applied patch generated 449 javac compiler warnings (more than the trunk's current 447 warnings).

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          -1 release audit. The applied patch generated 196 release audit warnings (more than the trunk's current 195 warnings).

          +1 core tests. The patch passed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/testReport/
          Release audit warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/current/releaseAuditDiffWarnings.txt
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12383426/HADOOP-2095_2_20080604.patch against trunk revision 663370. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 12 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. -1 javac. The applied patch generated 449 javac compiler warnings (more than the trunk's current 447 warnings). +1 findbugs. The patch does not introduce any new Findbugs warnings. -1 release audit. The applied patch generated 196 release audit warnings (more than the trunk's current 195 warnings). +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/testReport/ Release audit warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/current/releaseAuditDiffWarnings.txt Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/console This message is automatically generated.
          Hide
          Arun C Murthy added a comment -

          The javac warning was:

              [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:467: warning: [dep-ann] deprecated name isnt annotated with @Deprecated
              [javac]   public void setMapOutputCompressionType(CompressionType style) {
              [javac]               ^
              [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:482: warning: [dep-ann] deprecated name isnt annotated with @Deprecated
              [javac]   public CompressionType getMapOutputCompressionType() {
              [javac]                          ^
          

          Fixed now.

          The test-case failure seems unrelated and works on both Linux and Mac.

          Show
          Arun C Murthy added a comment - The javac warning was: [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:467: warning: [dep-ann] deprecated name isnt annotated with @Deprecated [javac] public void setMapOutputCompressionType(CompressionType style) { [javac] ^ [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:482: warning: [dep-ann] deprecated name isnt annotated with @Deprecated [javac] public CompressionType getMapOutputCompressionType() { [javac] ^ Fixed now. The test-case failure seems unrelated and works on both Linux and Mac.
          Hide
          eric baldeschwieler added a comment -

          Why start the merge before the first fetcher thread is blocked due to
          lack of RAM? This seems like the wrong trade-off to me. Especially
          for shuffles before the MAP is done.

          Be sure to account for brown-out / race conditions. IE we may have a
          few really slow reads. Hence we should only wait for the first
          thread to block. This should also influence the size of max element
          we will write to RAM. EG I'd suggest the MAX object stored to RAM be
          less than the (space used) / shuffle threads /2. This will insure
          you have a full set of elements to merge when you start merging. You
          might want to code an "assertion" for that.

          Show
          eric baldeschwieler added a comment - Why start the merge before the first fetcher thread is blocked due to lack of RAM? This seems like the wrong trade-off to me. Especially for shuffles before the MAP is done. Be sure to account for brown-out / race conditions. IE we may have a few really slow reads. Hence we should only wait for the first thread to block. This should also influence the size of max element we will write to RAM. EG I'd suggest the MAX object stored to RAM be less than the (space used) / shuffle threads /2. This will insure you have a full set of elements to merge when you start merging. You might want to code an "assertion" for that.
          Hide
          Arun C Murthy added a comment -

          I just committed this, many thanks to Devaraj, Chris & Mahadev for review it!

          Show
          Arun C Murthy added a comment - I just committed this, many thanks to Devaraj, Chris & Mahadev for review it!
          Hide
          Arun C Murthy added a comment -

          Eric, doesn't it make sense to have a bit of buffer space free even during the merge? On machines where merge is quick (faster CPUs) it will allow shuffle to continue unimpeded...
          Currently a merge is triggered when the buffer is half-full, which I plan to tweak a bit.

          Meanwhile I plan to use HADOOP-3366 to continue the 'stalled shuffle' rework.

          Show
          Arun C Murthy added a comment - Eric, doesn't it make sense to have a bit of buffer space free even during the merge? On machines where merge is quick (faster CPUs) it will allow shuffle to continue unimpeded... Currently a merge is triggered when the buffer is half-full, which I plan to tweak a bit. Meanwhile I plan to use HADOOP-3366 to continue the 'stalled shuffle' rework.
          Hide
          eric baldeschwieler added a comment -

          Let's get some numbers. Producing the largest possible runs should be a long term goal. It sounds like this will be an improvement over what we had.

          An optimal solution would fill RAM (with compressed data) and then release that RAM as it merged, allowing the shuffle to be interleaved.
          Maybe you should create a new JIRA to track possible future improvements?

          Show
          eric baldeschwieler added a comment - Let's get some numbers. Producing the largest possible runs should be a long term goal. It sounds like this will be an improvement over what we had. An optimal solution would fill RAM (with compressed data) and then release that RAM as it merged, allowing the shuffle to be interleaved. Maybe you should create a new JIRA to track possible future improvements?

            People

            • Assignee:
              Arun C Murthy
              Reporter:
              Runping Qi
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development