|
Arun C Murthy made changes - 08/May/08 07:15 AM
Arun C Murthy made changes - 08/May/08 07:15 AM
Arun C Murthy made changes - 08/May/08 07:15 AM
I agree with 1 through 3.
I am not sure this is justified. I'd propose 1) Make the InMemoryFileSystem independent of the CheckSumFileSystem We already use raw comparators. Not sure what you meant by this. I'll submit a patch with some of the above thoughts implemented in a bit.
I meant that this will play nicely with raw-comparators which already expect a byte-array and offset/length. It is clear that there is a large room for improvement on the merge code. I What value does the RamFS add here? It would be simpler to just put keys/values in a buffer and iterate over them in place. I don't see the value in special casing DataOutputBuffer for this very specific use case.
To clarify, we use the ramfs in exactly one place. Even there it appears to hinder a tight implementation rather than help (i.e. we have to special case DataOutputBuffer for it to avoid value copies, build another workaround to avoid key copies etc.). Let's not use it.
For in memory merges, an iterator over a key/value sequence in a buffer seems a much better abstraction than an input stream. Sameer, today's ramfs serves both as a memory manager and as a filesystem. So if we were to implement a new memory manager, I am guessing that it'd be close to what we already have in the ramfs (for e.g. it already does byte array allocations, keeps track of mem usage, etc.). We can get to an optimal memory manager by reducing the complexity (if any) in the ramfs memory manager.
Regarding using the ramfs as a FileSystem, I think if we remove the ChecksumFS layer, we'd have removed a good amount of complexity. Other than that if we ensure that the apis that read from the ramfs do not allocate buffers but reset internal pointers on the byte arrays for the keys and values, we should be good. So the two classes that is used as the destination of data read from files are the DataOutputBuffer and the ValueBytes. Both these internally allocate byte arrays. I am suggesting that we implement these two classes specially for the ramfs files wherein we'd just update the pointers/offsets/lengths in these classes instead of copying from the files. (An offline discussion led me to agree to the suggestion that we should not have the file abstraction for the in memory merge. The file streams adds overhead which is not desirable in a performance critical section.)
This half-done patch is up for a high-level review. It introduces a ByteArrayManager that shuffle can use to store files as raw byte-arrays instead of files in the ramfs. It also defines a merge routine that can merge a bunch of such byte-arrays. There is some dependency of the remaining work, i.e., changing the shuffle code to use the ByteArrayManager instead of the ramfs, on the patch for By the way, I have done the patch assuming the layout as <key-len><val-len><key><value> (the difference w.r.t the earlier proposed layout is that the lengths are together). That made the parsing of the byte arrays simpler.
Devaraj Das made changes - 16/May/08 01:09 PM
This is a slightly improved one.
Devaraj Das made changes - 16/May/08 01:15 PM
Here is an early version of my creatively titled SequenceFile replacement for intermediate data in Map-Reduce (map-outputs)... IFile stands-out for "Intermediate File" smile.
Unfortunately the Writer isn't as tight as it can be, it needs to copy key/value into an internal buffer (see HADOOP-3414 for necessary details). However, the Reader seems reasonably tight and strictly does zero-copies. I chose to use DataInputBuffer as the key/value type in the call for Reader.next since it plays nicely by offering an InputStream interface and also the ability to provide it with a raw-buffer to work with; it can also be queried to get back the raw-buffer without any copies being made. I'll continue to plug-away, appreciate feedback.
Arun C Murthy made changes - 19/May/08 11:06 AM
Patch to stall shuffle when there isn't enough space left in RAM to accomodate the map-output.
With this patch the single-reducer job which merges 12.5G from 2500maps on 25 machines runs in ~18mins, compared to 30odd mins after
Arun C Murthy made changes - 06/Jun/08 06:59 AM
Some comments:
0) The inMem merge thread needs to ignore the criteria when the shuffle thread notifies it to do a forced merge. 1) A race condition exists in the interval between the ramManager.notify and mergePassComplete.wait() calls in getMapOutput. What could happen is that the ramManager gets notified and it finishes the merge before this thread calls mergePassComplete.wait(). If this happens the notification from the merger is lost and this thread will just wait ... 2) The handshake between the merger, copier and the ramManager looks complex and there could be more race conditions like the one i pointed above. I and Sharad had a quick discussion and we feel it can be simplified. Have the ramManager.reserve lock the thread if the request cannot be satisfied Have the ramManager.unreserve do a notifyAll (this the mergeThread does) Have the shuffle thread notify the mergeThread (before it goes to wait) Here is a patch for just the ReduceTask.java that outlines the synchronization that we were thinking about... Its a very quickly written patch.. So don't be surprised if you see glaring issues (e.g. i could have used a AtomicBoolean instead of Boolean) but it gives the idea.. This looks much simple I think (i hope i didn't miss some cases where it absolutely won't work).
Devaraj Das made changes - 06/Jun/08 02:27 PM
The shuffle thread cannot force a merge, the merge is still triggered only when the various criteria are satisfied - the notification only wakes up the possibly waiting merge-thread.
Even if that notification is lost, the merge-thread always allows the shuffle threads to progress by doing a 'mergeProgress.notifyAll' before it sleeps - thereby preventing any deadlocks.
I agree, it is complicated. However it has a couple of important points: Take this scenario:
1) Merge is currently in progress. 2) Shuffle-thread-1 has set stallShuffle=true but hasn't done mergePassComplete.wait() and it got preempted 3) All other Shuffle-threads will now block at shuffle.wait (the ramManager.notify won't have effect since the merge is already in progress) 4) Merge completes and invokes mergePassComplete.notifyAll. It then goes back to ramManager.wait. 5) Since shuffle-thread-1 hasn't done mergePassComplete.wait, the notification that the merge thread sent is lost and all the shuffle threads will continue to wait for ever. Unless i am missing something, isn't the above a possible scenario? I think if the shuffle thread does not get memory from the ram manager and the merge thread is waiting at that point we should do a merge to free up space irrespective of the criteria. But I also think that it is an unlikely case that the merge isn't already in progress when the memory request cannot be serviced. The patch I uploaded implements everything in ReduceTask.java (i forgot to mention that). I didn't have to touch RamManager.java and i still think it is simpler and does exactly what we need (maybe a cleanup is required). If you remove the condition of forceMerge, it will become simpler (but maybe a cleanup is required). Updated patch with feedback from Owen/Devaraj - thanks!
Arun C Murthy made changes - 06/Jun/08 11:20 PM
Chris pointed out that the check for failure to re-open the connection could be strengthened to ensure better clean-ups. Thanks Chris!
Arun C Murthy made changes - 06/Jun/08 11:48 PM
Passes unit-tests locally, and here is result of 'ant test-patch':
[exec] -1 overall. [exec] +1 @author. The patch does not contain any @author tags. [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. It's hard to write a test case for this one...
Arun C Murthy made changes - 06/Jun/08 11:53 PM
Arun C Murthy made changes - 07/Jun/08 12:09 AM
Arun C Murthy made changes - 20/Jun/08 10:40 PM
Robert Chansler made changes - 22/Jul/08 05:52 PM
Nigel Daley made changes - 22/Aug/08 07:50 PM
Owen O'Malley made changes - 08/Jul/09 04:52 PM
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Running through the SequenceFile.Sorter.merge with a fine-toothed comb and turning the profiler on it yielded interesting results.
Telling - a reasonably large job we profiled had this characteristic for a reduce which started after all maps had completed:
shuffle: 13mins
merge: 17mins
reduce: 15mins
Note: merge was also active while shuffle was happening...
So folks get the picture...
Prognosis:
1. Epilogue:
HADOOP-3365,HADOOP-2095etc.2. We really need to tighten the merge code, eliminate copies etc.
HADOOP-2919did it for the sort, we need something similar for the merge.Radio-therapy:
1. Eliminate the usage of SequenceFiles completely for intermediate sort/merge. We just need to write (key-length, key, value-length, value)* to a compressed stream. We do not need any of the features provided by the SequenceFile i.e. header, sync etc.
2. Currently the map-side sort writes out index, index.crc, data and data.crc files. This costs 4 seeks per map-reduce pair which is 4*300,000*10,000 assuming a large job with 300k maps and 10k reduces (slightly futuristic). We could do much better by putting the crc at the end of the data file, and crc for each record in the index, cuts down seeks by 50%. Potentially we could keep the index in-memory at the TaskTracker for currently running jobs, a future optimization.
3. At the reducer, decompress the (key-length, key, value-length, value)*, check crc (flag error if necessary) and keep it.
4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available.
5. Copy the shuffled data into the byte-array and merge with other byte-arrays, write merged data to disk after compressing it.
6. Now use raw-comparators on the data in the byte-arrays for optimized compares.
This will be a reasonable first-step, measure more and optimize later.
Thoughts?