Issue Details (XML | Word | Printable)

Key: HADOOP-830
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Devaraj Das
Reporter: Devaraj Das
Votes: 2
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Improve the performance of the Merge phase

Created: 15/Dec/06 08:52 AM   Updated: 08/Jul/09 04:52 PM
Return to search
Component/s: None
Affects Version/s: None
Fix Version/s: 0.11.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works 830-after-comments.patch 2007-01-05 06:50 PM Devaraj Das 37 kB
Text File Licensed for inclusion in ASF works 830-for-review.new.patch 2007-01-02 06:22 AM Devaraj Das 35 kB
Text File Licensed for inclusion in ASF works 830-for-review.patch 2006-12-28 02:37 PM Devaraj Das 32 kB
Text File Licensed for inclusion in ASF works 830-with-checksum.patch 2007-01-19 08:32 AM Devaraj Das 42 kB
Text File Licensed for inclusion in ASF works 830-with-javadoc-fix.patch 2007-01-11 05:30 PM Devaraj Das 38 kB
Text File Licensed for inclusion in ASF works 830-with-real-javadoc-fix.patch 2007-01-12 08:57 AM Devaraj Das 38 kB
Issue Links:
Reference
 

Resolution Date: 19/Jan/07 06:19 PM


 Description  « Hide
This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).

The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).

Both the above buffers are of equal sizes configured to have default values of 100M.

Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.

With the above we are doing copying/merging in parallel.

The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).

If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.

To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.

Comments?



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Sameer Paranjpye added a comment - 15/Dec/06 06:33 PM
I'd be a little more conservative with large map outputs. The scratch and merge buffers add value when the map outputs are substantially smaller than the buffers. You should probably spill map outputs larger than 25% of buffer space or something similar.

Devaraj Das added a comment - 18/Dec/06 06:29 AM
In the original proposal, I suggested that we can have one buffer for all the files that the InMemoryFileSystem manages. After a discussion with Owen on this, it seems like the alternative arrangement of having one byte[] per file in the InMemoryFileSystem is a better approach, given that we know the lengths of the files before we allocate byte[] buffers for those.

In the original proposal, there was an assumption of two equal sized buffers & merge would happen when we have 50% of the total buffer space filled with map outputs. This can be mapped to the multiple buffers (one byte[] per file) case (wherein we consider the total size of all the small buffers).

Yes, Sameer, we can & should spill a map output directly to disk if its size is more than a certain fraction of the total buffer space.


Devaraj Das added a comment - 28/Dec/06 02:37 PM
Implemented as discussed. MapOutput files are buffered in an InMemoryFileSystem. The MapOutputs are spilled directly to disk if the size of the file is more than 25% of the total filesystem size (75M by default). When the used space of the filesystem becomes more than 50%, merge is triggered and the output goes directly to disk. During that time, merging/copying proceeds in parallel. The InMemoryFileSystem (uri is mem://<name>) extends from FileSystem. Although it has a few special methods, it appears as a regular FileSystem for the most part to the rest of the framework. It keeps file-data as byte arrays, and there is a map from filenames to byte[ ]. The figures 25% & 50% are hardcoded constants for now.

Devaraj Das added a comment - 02/Jan/07 06:23 AM
This patch does a better progress reporting.

Owen O'Malley added a comment - 02/Jan/07 07:15 PM
This is looking good. Comments:
1. I think that "ramfs:" would be less confusing that "mem:" as a prefix for the ram file system.
2. The float constants should be represented as "0.25f" rather than "(float) 0.25".
3. This patch introduces some methods with more FileSystems in the parameter list, when the correct path is to use the getFileSystem on the Paths. So cloneFileAttributes should look like:
public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException
4. The old cloneFileAttributes should be marked as depricated.
5. The javadoc on the "factor" parameter should mention that it is the maximum merge fan in.
6. The patch looks for the file's existence in the local and ram file systems to see where it was placed. Since in the past, we have had issues with files getting deleted via race conditions, it seems better to remember where each file was placed. I suggest that MapOutputLocation.getFile return a Path of where the file was stored. CopyResult should keep that instead of the size, which was only used as an error flag when it is -1. Then the reduce task runner can keep a list of files that are in each file system.
7. getFile also has a catch block where Throwable is caught and ignored. That will cause lots of errors to go unreported. I'd rather have the ram FileSystem catch OutOfMemoryException explicitly when it is creating the new file and return null, like it does if the ram FileSystem is full.

Devaraj Das added a comment - 05/Jan/07 06:49 PM
Thanks for the review, Owen.
I incorporated all the comments except for a part of comment #6. With respect to comment #6, I made the change to do with getFile. Using the return value of getFile (a fully qualified path name which accurately says on which filesystem the map output got download), I now do an accurate renaming of the temp filename to the final filename. But I think I am missing something to do with understanding why the ReduceTaskRunner should keep the list of files that got created in each filesystem. Is it not sufficient to just accurately rename as I am doing now? I believe (if I remember right), the race condition we saw earlier was related to correctly creating a map output file, if we happen to somehow run into the rare case where multiple threads end up downloading a given map output (HADOOP-723).
I don't think we will have that race condition in the new patch (that I am going to submit now). Please let me know if I am missing something.

Devaraj Das added a comment - 11/Jan/07 05:03 PM
A bugfixed patch. This has been tested quite well.

Hadoop QA added a comment - 11/Jan/07 05:19 PM
-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348746/830-initial9.patch) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

Devaraj Das added a comment - 11/Jan/07 05:32 PM
The patch 830-with-javadoc-fix.patch is the correct one

Hadoop QA added a comment - 11/Jan/07 05:56 PM
-1, because 1 attempts failed to build and test the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

Hadoop QA added a comment - 11/Jan/07 06:25 PM
-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch</a>) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch. </div>) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

Devaraj Das added a comment - 12/Jan/07 09:52 AM
Sorry had uploaded the wrong patch (due to which the javadoc generation gave warnings). 830-with-real-javadoc-fix.patch hopefully doesn't have the javadoc issues.

Hadoop QA added a comment - 12/Jan/07 05:05 PM
+1, because http://issues.apache.org/jira/secure/attachment/12348794/830-with-real-javadoc-fix.patch applied and successfully tested against trunk revision r495045.

Doug Cutting added a comment - 16/Jan/07 08:42 PM
I note that the ramfs doesn't perform checksums. Why is that? Checksums are useful when data spends a long time in memory, as it may here.

Also, should we put RamFileSystem in the fs package instead of mapred?


Owen O'Malley added a comment - 17/Jan/07 08:50 AM
I agree with moving it to the fs package.

I think checksums for the ram file system are the wrong place for us to spend:
1. program complexity
2. memory
3. cpu cycles
we have limited resources in all 3 and it ram corruption is rare.

I used to work at NASA and talked to the guys who wrote flight software for satellites and how they dealt with Single Event Upsets (SEU). They studied the effects (and mapped them across the Earth and time) and still had very simple coping mechanisms. (They used one thread to sweep memory looking for ECC errors and rebooted the CPU if they found any.) The astronauts use a more straight-forward approach and just reboot all of the laptops after flying over the south Atlantic. smile

So anyways, it is possible to design software to be resistant to random memory corruption errors, but it takes a lot of effort. Since our computers run at basically sea-level, I really don't think the need justifies the cost. Also note that this is a big buffer (100m or so), but compared to the size of the total ram it is tiny. I don't know how it works out in Java, but in C/C++ most random memory errors lead to segmentation faults in very short order.


Doug Cutting added a comment - 17/Jan/07 05:39 PM
> 1. program complexity

Checksums for ramfs add no complexity, since they're provided in the base class.

> 2. memory

Checksums add less than 1% to the storage requirements, by design.

> 3. cpu cycles

I won't accept this without benchmarks. The CRC32 code is native and should be quite fast.

> ram corruption is rare

When sorting (and ramfs is used in the sorting code) data is ram-resident for long periods. We don't really know how rare ram corruption is, but when slinging terabytes on thousands of nodes, we know it happens. Most of our memory is used for caching sort io. We should checksum this when possible.


Owen O'Malley added a comment - 17/Jan/07 06:36 PM
The ramfs seems like a very strange place to insist on crcs. We don't have crcs on any other ram buffers. Furthermore, the time period is not that long. The ramfs is used while fetching and doing the initial merge. If the shuffle is happening at a reasonable rate, we shouldn't take longer than 1 minute to fill and in memory sort 100mb.

If we really need to protect ourselves down at that level, we should do a value-level or key/value-level crc when it comes out of the map and keep it with the record all the way through the framework until it hits the reduce. THAT would protect you from a wide variety of problems from network, disk, ram, or bugs.


Doug Cutting added a comment - 17/Jan/07 07:58 PM
> We don't have crcs on any other ram buffers.

That's not quite true, and, to the degree it's true it's not a feature. Right now we checksum data as it exits the java output stream buffer and enters the filesystem's buffer cache, roughly speaking. We check the checksums again as data returns from the filesystem's buffer cache into the java stream buffer. So data in the filesystem's buffer cache is checksummed. Arguably we should instead compute checksums earlier and check them later. We could instead compute them as they enter the output stream's buffer, as objects are serialized, and check them as they exit the input stream, as they're deserialized. This should be a minor change with no performance impact.

My original question was more to the effect: since this is already built into the base class, why explicitly turn it off? The only substantive reason that's been stated is performance, yet no benchmarks are provided indicating how much slower i/o is when bytes are passed through a native CRC32 implementation.


Doug Cutting added a comment - 18/Jan/07 07:17 PM
To be clear: I'd prefer we commit this first with checksums on, then do some performance analysis, comparing checksums with no checksums, and, if they appear significant, add a flag to disable them. To do otherwise seems a premature optimization.

Devaraj Das added a comment - 19/Jan/07 08:41 AM
Doug, the patch 830-with-checksum.patch uses the checksums implementation from the base FileSystem class. There are two major changes here:
1) Space is reserved in advance for both files (the actual file and the checksum file). This is needed since I am now using the regular FileSystem.create API that doesn't take a "filesize" argument (and we need to apriori tell the ram fs the length of files we are going to create there). This is in contrast to the earlier patch where I had a custom create API in the ram fs that would take the length of the file as an argument.
2) Added an API to compute the length of the checksum file given the size of the actual file.

Haven't noticed any peformance degradation with my tests (sort benchmark on a small cluster of 14 nodes) so far. Hope this patch makes you happy


Doug Cutting added a comment - 19/Jan/07 06:19 PM
I just committed this. Thanks, Devaraj!