Issue Details (XML | Word | Printable)

Key: HADOOP-2095
Type: Bug Bug
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Arun C Murthy
Reporter: Runping Qi
Votes: 1
Watchers: 5
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Reducer failed due to Out ofMemory

Created: 23/Oct/07 08:38 PM   Updated: 08/Jul/09 04:52 PM
Component/s: None
Affects Version/s: 0.15.0
Fix Version/s: 0.18.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works HADOOP-2095_2_20080604.patch 2008-06-05 02:49 AM Arun C Murthy 162 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_2_20080604.patch 2008-06-04 11:56 PM Arun C Murthy 162 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_2_20080604.patch 2008-06-04 09:34 PM Arun C Murthy 161 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_2_20080604.patch 2008-06-04 08:54 PM Arun C Murthy 161 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_2_20080604.patch 2008-06-04 09:55 AM Arun C Murthy 160 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_CompressedBytesWithCodecPool.patch 2007-10-31 06:50 PM Arun C Murthy 4 kB
Text File Licensed for inclusion in ASF works HADOOP-2095_debug.patch 2007-10-31 06:34 PM Arun C Murthy 2 kB
Issue Links:
Blocker
 
dependent
 

Release Note: Reduced in-memory copies of keys and values as they flow through the Map-Reduce framework. Changed the storage of intermediate map outputs to use new IFile instead of SequenceFile for better compression.
Resolution Date: 05/Jun/08 04:07 AM


 Description  « Hide
One of the reducers of my job failed with the following exceptions.
The failure caused the whole job fail eventually.
Java heapsize was 768MB and sort.io.mb was 140.

2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Runping Qi made changes - 23/Oct/07 08:38 PM
Field Original Value New Value
Component/s mapred [ 12310690 ]
Description

One of the reducers of my job failed with the following exceptions.
The failure caused the whole job fail eventually.
Java heapsize was 768MB and sort.io.mb was 140.


2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)


One of the reducers of my job failed with the following exceptions.
The failure caused the whole job fail eventually.
Java heapsize was 768MB and sort.io.mb was 140.


2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

Fix Version/s 0.15.0 [ 12312565 ]
Runping Qi added a comment - 23/Oct/07 10:26 PM
The problem was gone after I set the compressMapOutput attribute to false.

Owen O'Malley made changes - 24/Oct/07 09:16 PM
Fix Version/s 0.15.0 [ 12312565 ]
Devaraj Das added a comment - 25/Oct/07 04:26 AM
Is it true that native compression was in use when you saw the problem?

Runping Qi added a comment - 25/Oct/07 04:31 AM
Yes.

Devaraj Das added a comment - 25/Oct/07 05:41 AM
My hunch is this that we are ending up with too many files in the ramfs (the map outputs for the failing reduce are small in size). So when merge is initiated on those files, we end up creating too many codecs for decompressing the files, and since they are native, we encounter OOM. So one way to validate this is to decrease the value of mapred.inmem.merge.threshold (defaults to 1000) to something like 200 and then see whether the OOM goes away. The other alternative is to increase the heap size per task.

Christian Kunz added a comment - 26/Oct/07 08:07 PM
FYI, I ran into the exactly same issue with massive failures (nearly all reduces), using a 0 value for mapred.inmem.merge.threshold (letting the framework select the threshold)

using native compression

1GB of heap space, 1350 nodes

mapred.inmem.merge.threshold 0
mapred.reduce.parallel.copies 20
tasktracker.http.threads 30
mapred.map.tasks 13008
mapred.reduce.tasks 3600
fs.inmemory.size.mb 200
io.seqfile.sorter.recordlimit 1000000
io.sort.mb 200
io.sort.factor 300
mapred.map.output.compression.type RECORD
mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
mapred.compress.map.output true


Devaraj Das added a comment - 27/Oct/07 03:35 AM
Christian, please don't use 0 for mapred.inmem.merge.threshold. It basically disables the threshold for initiating merge, and, in this case, merge on the ramfs files gets triggered only when the ramfs has collected enough bytes of data (actually 50% of fs.inmemory.size.mb). Depending on how big each map output is, we will accumulate that many files there. For each file, a native codec will be initialized, and I suspect that if we create too many codecs, we will encounter OOM (given a certain child heap size). So, I think it is worth trying the app with a reduced mapred.inmem.merge.threshold like 50/100, just to ensure that we keep a control on the number of codecs we create. Christian/Runping, could you please try this tweak and let us know? Thanks.

Devaraj Das added a comment - 27/Oct/07 03:48 AM
Just to clarify, a codec-pool is maintained in the compression module. So, the number of codecs created will be in the order of mapred.inmem.merge.threshold.

Devaraj Das added a comment - 28/Oct/07 08:42 AM
BTW, another thing that needs to be controlled is the io.sort.factor which would control the number of files that gets merged at once post the shuffle (the final merge of the on-disk files). The number of intermediate files opened would be as per the value of io.sort.factor and you might encounter OOM if that is large. So i would recommend that for working around this issue, the io.sort.factor value should be in the same order as the value of mapred.inmem.merge.threshold (that successfully does ramfs merges during the shuffle).

Owen O'Malley added a comment - 30/Oct/07 06:56 PM
Christian,
Can you generate a set of task files for the reduce with the failure with keep.failed.task.files set to true, so that we can run it in the isolation runner and a profiler?

Devaraj Das added a comment - 31/Oct/07 05:39 AM
Noticed that codec pool is not used in o.a.h.i.SequenceFile.CompressedBytes.writeUncompressedBytes. This needs to be fixed.

Arun C Murthy made changes - 31/Oct/07 06:01 PM
Description
One of the reducers of my job failed with the following exceptions.
The failure caused the whole job fail eventually.
Java heapsize was 768MB and sort.io.mb was 140.


2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

One of the reducers of my job failed with the following exceptions.
The failure caused the whole job fail eventually.
Java heapsize was 768MB and sort.io.mb was 140.


2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files threw an exception: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 done copying task_200710231912_0001_m_001428_0 output .
2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized InMemoryFileSystem: ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of size (in bytes): 209715200
2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 output from xxx
2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 output from xxx
2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 output from .
2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 output from .
2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 output from .
2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 output from .
2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 output from .
2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output copy failure: java.lang.NullPointerException
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
at org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

Assignee Arun C Murthy [ acmurthy ]
Fix Version/s 0.16.0 [ 12312740 ]
Arun C Murthy added a comment - 31/Oct/07 06:34 PM
Christian/Runping - Could you please re-run your jobs using this debug patch? It just logs the no. of created native codecs and the stack trace to check where they are being created from. Thanks!

Meanwhile I'll plug away and see how to fix SequenceFile.CompressedBytes as Devaraj pointed out...


Arun C Murthy made changes - 31/Oct/07 06:34 PM
Attachment HADOOP-2095_debug.patch [ 12368779 ]
Arun C Murthy added a comment - 31/Oct/07 06:46 PM
Here is an early patch which fixed SequenceFile.CompressedBytes to use a CodecPool passed along by the SequenceFile.Reader...

Christian/Runping -I'd appreciate if you could try this patch alongwith the previous debug patch while I try to test it at my end. Thanks!


Arun C Murthy made changes - 31/Oct/07 06:46 PM
Attachment HADOOP-2095_CompressedBytesWithCodecPool.patch [ 12368780 ]
Arun C Murthy made changes - 31/Oct/07 06:48 PM
Attachment HADOOP-2095_CompressedBytesWithCodecPool.patch [ 12368780 ]
Arun C Murthy added a comment - 31/Oct/07 06:49 PM
Sorry, previous patch had the leading path wrong...

Arun C Murthy made changes - 31/Oct/07 06:49 PM
Attachment HADOOP-2095_CompressedBytesWithCodecPool.patch [ 12368781 ]
Arun C Murthy made changes - 31/Oct/07 06:49 PM
Attachment HADOOP-2095_CompressedBytesWithCodecPool.patch [ 12368781 ]
Arun C Murthy added a comment - 31/Oct/07 06:50 PM
Sorry, previous patch had the leading path wrong...

Arun C Murthy made changes - 31/Oct/07 06:50 PM
Nigel Daley made changes - 22/Jan/08 07:32 PM
Fix Version/s 0.16.0 [ 12312740 ]
Sameer Paranjpye made changes - 31/Jan/08 05:47 PM
Fix Version/s 0.16.1 [ 12312927 ]
Affects Version/s 0.15.0 [ 12312565 ]
Christian Kunz added a comment - 18/Feb/08 07:45 PM
I still see failures after shuffling during final sort:

java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:52)
at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:90)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:1535)
at org.apache.hadoop.io.SequenceFile$Reader.readBlock(SequenceFile.java:1574)
at org.apache.hadoop.io.SequenceFile$Reader.nextRawKey(SequenceFile.java:1878)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2894)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049)

or

java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
at org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1480)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1379)
at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1302)
at org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2877)
at org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2694)
at org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2478)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:298)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2049)

Configuration:

using native compression
1GB of heap space, 1350 nodes
mapred.inmem.merge.threshold 1000
mapred.reduce.parallel.copies 10
tasktracker.http.threads 10
mapred.map.tasks 2500
mapred.reduce.tasks 2500
fs.inmemory.size.mb 200
io.seqfile.sorter.recordlimit 1000000
io.sort.mb 200
io.sort.factor 1000
mapred.map.output.compression.type BLOCK
mapred.map.output.compression.codec org.apache.hadoop.io.compress.DefaultCodec
mapred.compress.map.output true

I tried 2 runs:
1) io.seqfile.compress.blocksize = 1000000 --> 1084 successful reduces, 935 failures
2) io.seqfile.compress.blocksize = 131072 --> 2286 successful reduces, 1032 failures

The failures all seem to occur after shuffling, in the final merge-sort. Because the patch uses a pool of codecs I thought I should be able to keep a high sort.factor (to reduce the amount of multi-phasic merge-sort).


Arun C Murthy added a comment - 18/Feb/08 07:56 PM
Thanks for trying this out Christian! Do you have the logs of the failed tasks somewhere i.e. the syslog file?

Clearly this needs more work... sigh.


Christian Kunz added a comment - 18/Feb/08 08:16 PM
Logs are available. I send you the pointers offline.

Runping Qi added a comment - 18/Feb/08 08:17 PM

1000 as merge factor seems very high.
How much data each reducer isexpected to process?


Christian Kunz added a comment - 18/Feb/08 08:36 PM
With compression turned off we never had any problems with high merge factor (which reduces time spent in merging). Concerning the amount of data processed by reducers, the particular job has a few (imbalanced) reducers processing 100+ GB uncompressed. With block compression turned on we were hoping to reduce the merging even further.

Doug Cutting added a comment - 19/Feb/08 07:47 PM
1000 is a much higher merge factor than is expected by the design.

Arun C Murthy added a comment - 21/Feb/08 05:20 AM
I'm removing this as a blocker for 0.16.1 while we continue discussions...

Arun C Murthy made changes - 21/Feb/08 05:20 AM
Fix Version/s 0.16.1 [ 12312927 ]
Runping Qi made changes - 26/Feb/08 12:57 AM
Link This issue is blocked by HADOOP-2887 [ HADOOP-2887 ]
eric baldeschwieler added a comment - 28/Feb/08 07:15 AM
What is the RAM overhead of an active decompressor? We can probably compute a reasonable merge size from that. If we are seeing lots of very small files, maybe we should consider just working with the uncompressed data for small inputs? That may use less total RAM and it should be easy to determine that on the fly.

You could cap the number of compressed files in RAM and when you add a new file you could choose to uncompress the smallest file you have to allow you to stay under the limit. You could also merge and recompress your small files, this obviously uses a lot more CPU.


Runping Qi added a comment - 24/Apr/08 11:24 PM

I'd like to elaborate a bit alone Eric's comment.

The root cause of the current problem is that when we merge too many small segments,
we need to allocate too many codecs, thus run out of memory.
The main point is to treat large segments and small segments differently.
It is intuitive that merging a lot of small segments incurrs too much overhead and may be inferior to quick sort.

Let's say we have a fixed memory budget for in mem merge of fetched map output.
When we fetch a small segment, we extract the records out of the segment and put in an area for future quick sort.
When we fetch a big segment, we leave it in the in mem file.
When the number of in-mem file reach certain limit (say 100), or the total memory consumption reach the budget, we sort the extracted
records, and merge them along with the inmem files.

This way, we can guarantee that we will not exceed the total memory budget.

Thoughts?


eric baldeschwieler added a comment - 25/Apr/08 04:57 AM
I would like to suggest a very simple improvement.

1) Compute the maximum number of usable decompressors.
2) Download splits until ram is full, or we have reached the limit.
3) If ram is not full
b) continue downloading splits, but now decompress them as they are loaded

4) Now merge and dump all the splits, decompressing the first N on the fly

This is very simple and works in almost all cases. A refinement would be to decompress the smallest split each time you load a new split beyond the merge limit.

The above seems like it would be very simple to code and would work well in the face of large splits (the merge limit is not reached) and many small splits (many are merged in the first pass). It would be ok in the face of medium splits, which seems like the worst case.

A more optimal algorithm would presumably merge in ram, compressing on the fly and so on, but this is very complex and has many corner cases.


Arun C Murthy added a comment - 25/Apr/08 11:45 PM
+1 for Eric's proposal.

A minor refinement to discuss:

3) If ram is not full
c) initiate the merge with the in-memory compressed files which fit the quota; continue downloading the compressed splits and stuff the compressed split as-is in the InMemoryFileSystem (on ram).

The next pass of the merge will pick it up and run with it. It has the advantage of saving ram and keeping the merge-code simple: each split is either compressed (compression enabled) or not (compression disabled).

Thoughts?


eric baldeschwieler added a comment - 28/Apr/08 05:05 PM
I think that runs contrary to the goal of the exercise, which is to merge as many inputs at once as possible (with as little code complexity as possible). We don't want to add extra passes in the case where the inputs are very small.

Possible refinements from there might include:

  • merging and compressing back into ram (might be very good in some cases)
  • Releasing ram as inputs are merged to allow interleaving of the reads without wasting ram

PS We should be sure to not have the merge factor considered in this code! Merge factor if it should exist at all should be about the number of inputs from disk. Even that is pretty awkward.


Owen O'Malley added a comment - 28/Apr/08 11:13 PM
It is worth noting that we'd need to reserve mapred.reduce.parallel.copies (default 5, but 20 on yahoo's clusters) extra codecs for decompressing on the fly. It isn't clear to me that it would be a win having the codec owned by the copier thread rather than the merger.

You would also need to add a header that gives the uncompressed size of the map outputs, so that the reducer knows how big the uncompressed file is.

Of course getting rid of the 4 compression codecs using something like HADOOP-3315, would help a lot.


Arun C Murthy added a comment - 08/May/08 07:04 AM - edited
Somre refinements after further discusssions...

Prologue: Use a compressed outputstream for the intermediate sequence files i.e. map-outputs, not {record|block}-compressed sequence files. This cuts down no. of decompressors required at the reducers. Add headers to ensure that the reducer can query each map to find out the exact compressed and uncompressed sizes before it actually copies the data.

1) Compute the maximum number of usable decompressors (this is going to get tricky with direct-buffers taking up non-heap space which is a fraction of the -Xmx of the jvm).
2) Download map-outputs until ramfs is full; or we have reached the decompressors' limit.
3) Trigger InMemFSMergeThread to start the merge. (Currently a new InMemFSMergeThread created for every triggered merge, I plan to fix it so that we use one and only one thread.)
4) If ramfs is full, suspend the shuffle; else keep shuffling into memory.

Essentially the idea is that we pack as much into memory before we initiate the merge, this saves us trips to disk (the output of the merge) which, as Devaraj has shown (http://issues.apache.org/jira/browse/HADOOP-3297?focusedCommentId=12592816#action_12592816) leads to much better overall performance.

Of course the above discussion is valid iff we are dealing with small map-outputs.

For the contra-case where map-outputs are large, we need a threshold which says:
If a map-output > 10% of ramfs, then shuffle into memory if possible (i.e. space available in ramfs), else shuffle to disk.
This ensures that we do not needlessly throttle shuffle if map-outputs are too big to fit into ramfs.

Thoughts?


Before I jump in and make changes I'm currently trying to simulate the above behaviour and publish some numbers... watch this space.


Arun C Murthy made changes - 08/May/08 07:15 AM
Link This issue is depended upon by HADOOP-3366 [ HADOOP-3366 ]
eric baldeschwieler added a comment - 08/May/08 08:53 PM
To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there.

to expand on the above suggestions:

I suggest that for objects larger than 25% of RAM, we always just send them directly to disk. A simple rule that let's us reason more easily about other case. I don't think the 10% number above can be replaced with thiis.

We need to understand how we pause the copies without doing lots of polling. Again, I suggest keeping it simple for now. What about simply setting a global flag the first time a thread starts to read an input that is < 25% of buffer RAM (not piped directly to disk) and doesn't fit in the remaining space. Other readers will then pause until this semaphore is cleared. It is ok if races happen where a few threads try at the same time. If copies fail, we will need to clear this semaphore too.

We want to be sure not to wait until RAM is totally full before starting the merge, because this might allow a single slow copy to brownout the system. I suggest a simple rule, such as wait until the semaphore discussed above is set and copies filling at least 50% of RAM have completed. Then merge.

Once all of the above is done, we can file new jiras to improve things. Ideas include:

  • Freeing storage as we merge, so fetches can be interleaved
  • decompressing small segments as we read so we can increase the number of compressed objects merged
  • ...

Arun C Murthy added a comment - 13/May/08 06:51 AM

To clarify some of the thinking above... The short term goal is not to find the optimal solution. It is to get something done that is clean, understandable and works acceptably well in all cases. We can refine from there.

+1

Along a similar tangent, here is an even simpler proposal for a first-cut. Please bear in mind that these are a result of the fact that we have noticed that the merge code is actually a juicier target to fix; on large jobs we have noticed that reduces (which started after all maps were completed) were spending way more time in merge rather than in shuffle: 13mins in shuffle, 17mins in merge and 15mins in reduce. So, the idea to fix the OOM to ensure better reliability in a reasonably straight-forward, simple manner and then go after merge in HADOOP-3366.

Here goes:
1. Use a compressed stream for map-outputs, not {record|block}-compressed sequence-files. Ensure both compressed and decompressed sizes are available for the reduce before it actually shuffles the bytes.
2. Decompress map-outputs as soon as they are shuffled into ramfs if there is enough space, else suspend shuffle as described above by Eric. This ensures that we never hit the #codecs limit, we just need as many codecs as the no. of threads doing the shuffle. The idea is that the merge-factor is anyway going to be limited by the #codecs, we might as well burn-up RAM. We could try and store the compressed outputs as a further refinement in a separate issue.
3. If RAM is more than (say) 50% full, we start merging in-memory. Also, initially we should use up as much RAM as possible, allowing for some slack. Therefore I propose we do away with the fs.inmemory.size.mb config knob and use 3/4 or 2/3 of the heap-size available as the RAM limit.
4. If the split is greater than 10% or 25% of available RAM limit, and there is on RAM available we shuffle directly to disk (compressed).
5. The output of merge is compressed and written to disk, which potentially could be merged along with (4) above.

Hopefully this is reasonably simple and coherent. I'll put up more thoughts on HADOOP-3366 about merge improvements, current pitfalls etc.

Thoughts?


Devaraj Das added a comment - 13/May/08 12:49 PM
How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first.
I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge?

Arun C Murthy added a comment - 13/May/08 02:27 PM

How would the case of multiple threads being suspended be handled? If we have a choice, we should place into ramfs the smaller files first.

All threads wait on a single semaphore and we could do a 'notifyAll' ...

I am slightly concerned about the cpu cycles we will burn for compress/decompress the output of merges. What is the reason for compressing the output of merge?

The compression of merge-outputs is to reduce temporary disk usage on the reducer node. Also it has the nice property of keeping all data on disk compressed, since compressed splits which can't fit in-memory go straight to disk.


Runping Qi added a comment - 13/May/08 04:23 PM

We can expect better performance if we use lz0 as the codec for compressing the map outputs and the merge outputs.
Some more benchmarking may be required to confirm this, though.


Arun C Murthy added a comment - 04/Jun/08 09:55 AM
Here is a nearly complete patch (I need fix a couple of failing test-cases).

Highlights:
1. Rework sort/merge to use the new IFile rather than SequenceFiles.
2. Compression for intermediate map-outputs now implies that the entire file is compressed, no more record/block compression. This helps the codec cost.
3. Rework intermediate merge to ensure there are no spurious copies of keys/values.

Benchmarks:
I ran this with a single reducer job where 2500 maps produced 5MB of data each. Trunk takes 45 mins for the job to complete with the on-disk merge taking nearly 25-30mins and the final merge (as records are fed to 'reduce') taking 12-13mins. With this patch the on-disk merge takes 20-22mins and the final merge takes around 7mins, an overall improvement of nearly 30%.


Arun C Murthy made changes - 04/Jun/08 09:55 AM
Attachment HADOOP-2095_2_20080604.patch [ 12383368 ]
Devaraj Das added a comment - 04/Jun/08 07:12 PM
I looked at most of the patch except MapTask. +1 on the patch subject to unit tests passing and the sort benchmark running well.

Arun C Murthy added a comment - 04/Jun/08 08:54 PM
Thanks for the review Devaraj!

Here is an updated version of the patch with some minor changes...


Arun C Murthy made changes - 04/Jun/08 08:54 PM
Attachment HADOOP-2095_2_20080604.patch [ 12383412 ]
Arun C Murthy made changes - 04/Jun/08 08:54 PM
Fix Version/s 0.18.0 [ 12312972 ]
Status Open [ 1 ] Patch Available [ 10002 ]
Mahadev konar added a comment - 04/Jun/08 09:10 PM
just one comment:

+ LOG.info("Sent out " + totalRead + " bytes (starting from offset: " + startOffset + " of outputFile: " + mapOutputFileName + " for reduce: " + reduce + " from map: " + mapId + " given " + partLength + "/" + rawPartLength);

you might want to wrap this around!!


Arun C Murthy added a comment - 04/Jun/08 09:34 PM
Fixed the log statement... thanks for the review Mahadev!

Arun C Murthy made changes - 04/Jun/08 09:34 PM
Attachment HADOOP-2095_2_20080604.patch [ 12383415 ]
Arun C Murthy added a comment - 04/Jun/08 11:56 PM
Pretty-fied another log message...

Arun C Murthy made changes - 04/Jun/08 11:56 PM
Attachment HADOOP-2095_2_20080604.patch [ 12383426 ]
Hadoop QA added a comment - 05/Jun/08 02:20 AM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12383426/HADOOP-2095_2_20080604.patch
against trunk revision 663370.

+1 @author. The patch does not contain any @author tags.

+1 tests included. The patch appears to include 12 new or modified tests.

+1 javadoc. The javadoc tool did not generate any warning messages.

-1 javac. The applied patch generated 449 javac compiler warnings (more than the trunk's current 447 warnings).

+1 findbugs. The patch does not introduce any new Findbugs warnings.

-1 release audit. The applied patch generated 196 release audit warnings (more than the trunk's current 195 warnings).

+1 core tests. The patch passed core unit tests.

-1 contrib tests. The patch failed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/testReport/
Release audit warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/current/releaseAuditDiffWarnings.txt
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2580/console

This message is automatically generated.


Arun C Murthy added a comment - 05/Jun/08 02:49 AM
The javac warning was:
    [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:467: warning: [dep-ann] deprecated name isnt annotated with @Deprecated
    [javac]   public void setMapOutputCompressionType(CompressionType style) {
    [javac]               ^
    [javac] /zonestorage/hudson/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/java/org/apache/hadoop/mapred/JobConf.java:482: warning: [dep-ann] deprecated name isnt annotated with @Deprecated
    [javac]   public CompressionType getMapOutputCompressionType() {
    [javac]                          ^

Fixed now.

The test-case failure seems unrelated and works on both Linux and Mac.


Arun C Murthy made changes - 05/Jun/08 02:49 AM
Attachment HADOOP-2095_2_20080604.patch [ 12383432 ]
eric baldeschwieler added a comment - 05/Jun/08 04:02 AM
Why start the merge before the first fetcher thread is blocked due to
lack of RAM? This seems like the wrong trade-off to me. Especially
for shuffles before the MAP is done.

Be sure to account for brown-out / race conditions. IE we may have a
few really slow reads. Hence we should only wait for the first
thread to block. This should also influence the size of max element
we will write to RAM. EG I'd suggest the MAX object stored to RAM be
less than the (space used) / shuffle threads /2. This will insure
you have a full set of elements to merge when you start merging. You
might want to code an "assertion" for that.


Repository Revision Date User Message
ASF #663440 Thu Jun 05 04:06:13 UTC 2008 acmurthy HADOOP-2095. Improve the Map-Reduce shuffle/merge by cutting down buffer-copies; changed intermediate sort/merge to use the new IFile format rather than SequenceFiles and compression of map-outputs is now implemented by compressing the entire file rather than SequenceFile compression. Shuffle also has been changed to use a simple byte-buffer manager rather than the InMemoryFileSystem.
Configuration changes to hadoop-default.xml:
  deprecated mapred.map.output.compression.type
Files Changed
ADD /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
MODIFY /hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
MODIFY /hadoop/core/trunk/conf/hadoop-default.xml
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/DataOutputBuffer.java
MODIFY /hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
ADD /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/CodecPool.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
ADD /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
MODIFY /hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/io/DataInputBuffer.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
MODIFY /hadoop/core/trunk/CHANGES.txt
MODIFY /hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
ADD /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java
ADD /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IFile.java

Arun C Murthy added a comment - 05/Jun/08 04:07 AM
I just committed this, many thanks to Devaraj, Chris & Mahadev for review it!

Arun C Murthy made changes - 05/Jun/08 04:07 AM
Resolution Fixed [ 1 ]
Status Patch Available [ 10002 ] Resolved [ 5 ]
Repository Revision Date User Message
ASF #663442 Thu Jun 05 04:24:44 UTC 2008 acmurthy HADOOP-2095. Adding missing header to src/java/org/apache/hadoop/mapred/RamManager.java
Files Changed
MODIFY /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java

Arun C Murthy added a comment - 05/Jun/08 04:32 AM
Eric, doesn't it make sense to have a bit of buffer space free even during the merge? On machines where merge is quick (faster CPUs) it will allow shuffle to continue unimpeded...
Currently a merge is triggered when the buffer is half-full, which I plan to tweak a bit.

Meanwhile I plan to use HADOOP-3366 to continue the 'stalled shuffle' rework.


Repository Revision Date User Message
ASF #663447 Thu Jun 05 04:37:01 UTC 2008 acmurthy HADOOP-2095. Removed MapOutputLocation.java
Files Changed
DEL /hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java

eric baldeschwieler added a comment - 05/Jun/08 05:59 AM
Let's get some numbers. Producing the largest possible runs should be a long term goal. It sounds like this will be an improvement over what we had.

An optimal solution would fill RAM (with compressed data) and then release that RAM as it merged, allowing the shuffle to be interleaved.
Maybe you should create a new JIRA to track possible future improvements?


Arun C Murthy made changes - 20/Jun/08 10:48 PM
Release Note Improvements to the data-path doing the merge of sorted data segments both in Map and Reduce; mainly concerned with reducing in-memory copies of keys and values as they flow through the Map-Reduce framework. Also, SequenceFiles are no longer used to store intermediate map-outputs and have been superceeded by a new IFile format, which helps in reducing in-memory copies and better memory usage for compression of intermediate map-outputs.
Robert Chansler made changes - 27/Jun/08 08:36 PM
Release Note Improvements to the data-path doing the merge of sorted data segments both in Map and Reduce; mainly concerned with reducing in-memory copies of keys and values as they flow through the Map-Reduce framework. Also, SequenceFiles are no longer used to store intermediate map-outputs and have been superceeded by a new IFile format, which helps in reducing in-memory copies and better memory usage for compression of intermediate map-outputs. Reduced in-memory copies of keys and values as they flow through the Map-Reduce framework. Changed the storage of intermediate map outputs to use new IFile instead of SequenceFile for better compression.
Nigel Daley made changes - 22/Aug/08 07:50 PM
Status Resolved [ 5 ] Closed [ 6 ]
Owen O'Malley made changes - 08/Jul/09 04:52 PM
Component/s mapred [ 12310690 ]