|
[
Permlink
| « Hide
]
Sameer Paranjpye added a comment - 15/Dec/06 06:33 PM
I'd be a little more conservative with large map outputs. The scratch and merge buffers add value when the map outputs are substantially smaller than the buffers. You should probably spill map outputs larger than 25% of buffer space or something similar.
In the original proposal, I suggested that we can have one buffer for all the files that the InMemoryFileSystem manages. After a discussion with Owen on this, it seems like the alternative arrangement of having one byte[] per file in the InMemoryFileSystem is a better approach, given that we know the lengths of the files before we allocate byte[] buffers for those.
In the original proposal, there was an assumption of two equal sized buffers & merge would happen when we have 50% of the total buffer space filled with map outputs. This can be mapped to the multiple buffers (one byte[] per file) case (wherein we consider the total size of all the small buffers). Yes, Sameer, we can & should spill a map output directly to disk if its size is more than a certain fraction of the total buffer space. Implemented as discussed. MapOutput files are buffered in an InMemoryFileSystem. The MapOutputs are spilled directly to disk if the size of the file is more than 25% of the total filesystem size (75M by default). When the used space of the filesystem becomes more than 50%, merge is triggered and the output goes directly to disk. During that time, merging/copying proceeds in parallel. The InMemoryFileSystem (uri is mem://<name>) extends from FileSystem. Although it has a few special methods, it appears as a regular FileSystem for the most part to the rest of the framework. It keeps file-data as byte arrays, and there is a map from filenames to byte[ ]. The figures 25% & 50% are hardcoded constants for now.
This patch does a better progress reporting.
This is looking good. Comments:
1. I think that "ramfs:" would be less confusing that "mem:" as a prefix for the ram file system. 2. The float constants should be represented as "0.25f" rather than "(float) 0.25". 3. This patch introduces some methods with more FileSystems in the parameter list, when the correct path is to use the getFileSystem on the Paths. So cloneFileAttributes should look like: public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException 4. The old cloneFileAttributes should be marked as depricated. 5. The javadoc on the "factor" parameter should mention that it is the maximum merge fan in. 6. The patch looks for the file's existence in the local and ram file systems to see where it was placed. Since in the past, we have had issues with files getting deleted via race conditions, it seems better to remember where each file was placed. I suggest that MapOutputLocation.getFile return a Path of where the file was stored. CopyResult should keep that instead of the size, which was only used as an error flag when it is -1. Then the reduce task runner can keep a list of files that are in each file system. 7. getFile also has a catch block where Throwable is caught and ignored. That will cause lots of errors to go unreported. I'd rather have the ram FileSystem catch OutOfMemoryException explicitly when it is creating the new file and return null, like it does if the ram FileSystem is full. Thanks for the review, Owen.
I incorporated all the comments except for a part of comment #6. With respect to comment #6, I made the change to do with getFile. Using the return value of getFile (a fully qualified path name which accurately says on which filesystem the map output got download), I now do an accurate renaming of the temp filename to the final filename. But I think I am missing something to do with understanding why the ReduceTaskRunner should keep the list of files that got created in each filesystem. Is it not sufficient to just accurately rename as I am doing now? I believe (if I remember right), the race condition we saw earlier was related to correctly creating a map output file, if we happen to somehow run into the rare case where multiple threads end up downloading a given map output ( I don't think we will have that race condition in the new patch (that I am going to submit now). Please let me know if I am missing something. A bugfixed patch. This has been tested quite well.
-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348746/830-initial9.patch
The patch 830-with-javadoc-fix.patch is the correct one
-1, because 1 attempts failed to build and test the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch
-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch
Sorry had uploaded the wrong patch (due to which the javadoc generation gave warnings). 830-with-real-javadoc-fix.patch hopefully doesn't have the javadoc issues.
+1, because http://issues.apache.org/jira/secure/attachment/12348794/830-with-real-javadoc-fix.patch
I note that the ramfs doesn't perform checksums. Why is that? Checksums are useful when data spends a long time in memory, as it may here.
Also, should we put RamFileSystem in the fs package instead of mapred? I agree with moving it to the fs package.
I think checksums for the ram file system are the wrong place for us to spend: I used to work at NASA and talked to the guys who wrote flight software for satellites and how they dealt with Single Event Upsets (SEU). They studied the effects (and mapped them across the Earth and time) and still had very simple coping mechanisms. (They used one thread to sweep memory looking for ECC errors and rebooted the CPU if they found any.) The astronauts use a more straight-forward approach and just reboot all of the laptops after flying over the south Atlantic. smile So anyways, it is possible to design software to be resistant to random memory corruption errors, but it takes a lot of effort. Since our computers run at basically sea-level, I really don't think the need justifies the cost. Also note that this is a big buffer (100m or so), but compared to the size of the total ram it is tiny. I don't know how it works out in Java, but in C/C++ most random memory errors lead to segmentation faults in very short order. > 1. program complexity
Checksums for ramfs add no complexity, since they're provided in the base class. > 2. memory Checksums add less than 1% to the storage requirements, by design. > 3. cpu cycles I won't accept this without benchmarks. The CRC32 code is native and should be quite fast. > ram corruption is rare When sorting (and ramfs is used in the sorting code) data is ram-resident for long periods. We don't really know how rare ram corruption is, but when slinging terabytes on thousands of nodes, we know it happens. Most of our memory is used for caching sort io. We should checksum this when possible. The ramfs seems like a very strange place to insist on crcs. We don't have crcs on any other ram buffers. Furthermore, the time period is not that long. The ramfs is used while fetching and doing the initial merge. If the shuffle is happening at a reasonable rate, we shouldn't take longer than 1 minute to fill and in memory sort 100mb.
If we really need to protect ourselves down at that level, we should do a value-level or key/value-level crc when it comes out of the map and keep it with the record all the way through the framework until it hits the reduce. THAT would protect you from a wide variety of problems from network, disk, ram, or bugs. > We don't have crcs on any other ram buffers.
That's not quite true, and, to the degree it's true it's not a feature. Right now we checksum data as it exits the java output stream buffer and enters the filesystem's buffer cache, roughly speaking. We check the checksums again as data returns from the filesystem's buffer cache into the java stream buffer. So data in the filesystem's buffer cache is checksummed. Arguably we should instead compute checksums earlier and check them later. We could instead compute them as they enter the output stream's buffer, as objects are serialized, and check them as they exit the input stream, as they're deserialized. This should be a minor change with no performance impact. My original question was more to the effect: since this is already built into the base class, why explicitly turn it off? The only substantive reason that's been stated is performance, yet no benchmarks are provided indicating how much slower i/o is when bytes are passed through a native CRC32 implementation. To be clear: I'd prefer we commit this first with checksums on, then do some performance analysis, comparing checksums with no checksums, and, if they appear significant, add a flag to disable them. To do otherwise seems a premature optimization.
Doug, the patch 830-with-checksum.patch uses the checksums implementation from the base FileSystem class. There are two major changes here:
1) Space is reserved in advance for both files (the actual file and the checksum file). This is needed since I am now using the regular FileSystem.create API that doesn't take a "filesize" argument (and we need to apriori tell the ram fs the length of files we are going to create there). This is in contrast to the earlier patch where I had a custom create API in the ram fs that would take the length of the file as an argument. 2) Added an API to compute the length of the checksum file given the size of the actual file. Haven't noticed any peformance degradation with my tests (sort benchmark on a small cluster of 14 nodes) so far. Hope this patch makes you happy I just committed this. Thanks, Devaraj!
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||