Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.18.0
    • Component/s: None
    • Labels:
      None
    • Release Note:
      Improved shuffle so that all fetched map-outputs are kept in-memory before being merged by stalling the shuffle so that the in-memory merge executes and frees up memory for the shuffle.

      Description

      This is intended to be a meta-issue to track various improvements to shuffle/merge in the reducer.

      1. 3366.1.patch
        9 kB
        Devaraj Das
      2. 3366.1.patch
        9 kB
        Devaraj Das
      3. 3366.reducetask.patch
        3 kB
        Devaraj Das
      4. HADOOP-3366_0_20080605.patch
        10 kB
        Arun C Murthy
      5. HADOOP-3366_2_20080606.patch
        16 kB
        Arun C Murthy
      6. HADOOP-3366_2_20080606.patch
        15 kB
        Arun C Murthy
      7. ifile.patch
        9 kB
        Arun C Murthy

        Issue Links

          Activity

          Hide
          Arun C Murthy added a comment -

          Synopsis:

          Running through the SequenceFile.Sorter.merge with a fine-toothed comb and turning the profiler on it yielded interesting results.
          Telling - a reasonably large job we profiled had this characteristic for a reduce which started after all maps had completed:
          shuffle: 13mins
          merge: 17mins
          reduce: 15mins
          Note: merge was also active while shuffle was happening...

          So folks get the picture...


          Prognosis:

          1. Epilogue: HADOOP-3365, HADOOP-2095 etc.
          2. We really need to tighten the merge code, eliminate copies etc. HADOOP-2919 did it for the sort, we need something similar for the merge.


          Radio-therapy:
          1. Eliminate the usage of SequenceFiles completely for intermediate sort/merge. We just need to write (key-length, key, value-length, value)* to a compressed stream. We do not need any of the features provided by the SequenceFile i.e. header, sync etc.
          2. Currently the map-side sort writes out index, index.crc, data and data.crc files. This costs 4 seeks per map-reduce pair which is 4*300,000*10,000 assuming a large job with 300k maps and 10k reduces (slightly futuristic). We could do much better by putting the crc at the end of the data file, and crc for each record in the index, cuts down seeks by 50%. Potentially we could keep the index in-memory at the TaskTracker for currently running jobs, a future optimization.
          3. At the reducer, decompress the (key-length, key, value-length, value)*, check crc (flag error if necessary) and keep it.
          4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available.
          5. Copy the shuffled data into the byte-array and merge with other byte-arrays, write merged data to disk after compressing it.
          6. Now use raw-comparators on the data in the byte-arrays for optimized compares.

          This will be a reasonable first-step, measure more and optimize later.


          Thoughts?

          Show
          Arun C Murthy added a comment - Synopsis: Running through the SequenceFile.Sorter.merge with a fine-toothed comb and turning the profiler on it yielded interesting results. Telling - a reasonably large job we profiled had this characteristic for a reduce which started after all maps had completed: shuffle: 13mins merge: 17mins reduce: 15mins Note: merge was also active while shuffle was happening... So folks get the picture... Prognosis: 1. Epilogue: HADOOP-3365 , HADOOP-2095 etc. 2. We really need to tighten the merge code, eliminate copies etc. HADOOP-2919 did it for the sort, we need something similar for the merge. Radio-therapy: 1. Eliminate the usage of SequenceFiles completely for intermediate sort/merge. We just need to write (key-length, key, value-length, value)* to a compressed stream. We do not need any of the features provided by the SequenceFile i.e. header, sync etc. 2. Currently the map-side sort writes out index, index.crc, data and data.crc files. This costs 4 seeks per map-reduce pair which is 4*300,000*10,000 assuming a large job with 300k maps and 10k reduces (slightly futuristic). We could do much better by putting the crc at the end of the data file, and crc for each record in the index, cuts down seeks by 50%. Potentially we could keep the index in-memory at the TaskTracker for currently running jobs, a future optimization. 3. At the reducer, decompress the (key-length, key, value-length, value)*, check crc (flag error if necessary) and keep it. 4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available. 5. Copy the shuffled data into the byte-array and merge with other byte-arrays, write merged data to disk after compressing it. 6. Now use raw-comparators on the data in the byte-arrays for optimized compares. This will be a reasonable first-step, measure more and optimize later. Thoughts?
          Hide
          Devaraj Das added a comment -

          I agree with 1 through 3.

          4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available.

          I am not sure this is justified. I'd propose

          1) Make the InMemoryFileSystem independent of the CheckSumFileSystem
          2) Implement special DataOutputBuffer/ValueBytes for the ramfs. The DataOutputBuffer gives us a nice abstraction to look at data, be it from files or memory. I think we should retain that abstraction and handle the ramfs as a special case.

          We already use raw comparators. Not sure what you meant by this.

          I'll submit a patch with some of the above thoughts implemented in a bit.

          Show
          Devaraj Das added a comment - I agree with 1 through 3. 4. Throw away RamFS, implement a simple manager who returns byte-arrays of a given size (i.e. decompressed shuffle split) until it runs out of the amount of memory available. I am not sure this is justified. I'd propose 1) Make the InMemoryFileSystem independent of the CheckSumFileSystem 2) Implement special DataOutputBuffer/ValueBytes for the ramfs. The DataOutputBuffer gives us a nice abstraction to look at data, be it from files or memory. I think we should retain that abstraction and handle the ramfs as a special case. We already use raw comparators. Not sure what you meant by this. I'll submit a patch with some of the above thoughts implemented in a bit.
          Hide
          Arun C Murthy added a comment -

          We already use raw comparators. Not sure what you meant by this.

          I meant that this will play nicely with raw-comparators which already expect a byte-array and offset/length.

          Show
          Arun C Murthy added a comment - We already use raw comparators. Not sure what you meant by this. I meant that this will play nicely with raw-comparators which already expect a byte-array and offset/length.
          Hide
          Runping Qi added a comment -

          It is clear that there is a large room for improvement on the merge code. I
          Both map and reduce will benefits from the improvements. It makes sense to replace sequencefile.merge with some simpler
          and lighter and more efficient one. Once 2095 is in, we may get a better picture where will be the new
          bottleneck then, and make due decisions on some areas.
          Overall, the direction is clearly right.

          Show
          Runping Qi added a comment - It is clear that there is a large room for improvement on the merge code. I Both map and reduce will benefits from the improvements. It makes sense to replace sequencefile.merge with some simpler and lighter and more efficient one. Once 2095 is in, we may get a better picture where will be the new bottleneck then, and make due decisions on some areas. Overall, the direction is clearly right.
          Hide
          Sameer Paranjpye added a comment -

          What value does the RamFS add here? It would be simpler to just put keys/values in a buffer and iterate over them in place. I don't see the value in special casing DataOutputBuffer for this very specific use case.

          Show
          Sameer Paranjpye added a comment - What value does the RamFS add here? It would be simpler to just put keys/values in a buffer and iterate over them in place. I don't see the value in special casing DataOutputBuffer for this very specific use case.
          Hide
          Sameer Paranjpye added a comment -

          To clarify, we use the ramfs in exactly one place. Even there it appears to hinder a tight implementation rather than help (i.e. we have to special case DataOutputBuffer for it to avoid value copies, build another workaround to avoid key copies etc.). Let's not use it.

          For in memory merges, an iterator over a key/value sequence in a buffer seems a much better abstraction than an input stream.

          Show
          Sameer Paranjpye added a comment - To clarify, we use the ramfs in exactly one place. Even there it appears to hinder a tight implementation rather than help (i.e. we have to special case DataOutputBuffer for it to avoid value copies, build another workaround to avoid key copies etc.). Let's not use it. For in memory merges, an iterator over a key/value sequence in a buffer seems a much better abstraction than an input stream.
          Hide
          Devaraj Das added a comment -

          Sameer, today's ramfs serves both as a memory manager and as a filesystem. So if we were to implement a new memory manager, I am guessing that it'd be close to what we already have in the ramfs (for e.g. it already does byte array allocations, keeps track of mem usage, etc.). We can get to an optimal memory manager by reducing the complexity (if any) in the ramfs memory manager.

          Regarding using the ramfs as a FileSystem, I think if we remove the ChecksumFS layer, we'd have removed a good amount of complexity. Other than that if we ensure that the apis that read from the ramfs do not allocate buffers but reset internal pointers on the byte arrays for the keys and values, we should be good. So the two classes that is used as the destination of data read from files are the DataOutputBuffer and the ValueBytes. Both these internally allocate byte arrays. I am suggesting that we implement these two classes specially for the ramfs files wherein we'd just update the pointers/offsets/lengths in these classes instead of copying from the files.

          Show
          Devaraj Das added a comment - Sameer, today's ramfs serves both as a memory manager and as a filesystem. So if we were to implement a new memory manager, I am guessing that it'd be close to what we already have in the ramfs (for e.g. it already does byte array allocations, keeps track of mem usage, etc.). We can get to an optimal memory manager by reducing the complexity (if any) in the ramfs memory manager. Regarding using the ramfs as a FileSystem, I think if we remove the ChecksumFS layer, we'd have removed a good amount of complexity. Other than that if we ensure that the apis that read from the ramfs do not allocate buffers but reset internal pointers on the byte arrays for the keys and values, we should be good. So the two classes that is used as the destination of data read from files are the DataOutputBuffer and the ValueBytes. Both these internally allocate byte arrays. I am suggesting that we implement these two classes specially for the ramfs files wherein we'd just update the pointers/offsets/lengths in these classes instead of copying from the files.
          Hide
          Devaraj Das added a comment -

          (An offline discussion led me to agree to the suggestion that we should not have the file abstraction for the in memory merge. The file streams adds overhead which is not desirable in a performance critical section.)
          This half-done patch is up for a high-level review. It introduces a ByteArrayManager that shuffle can use to store files as raw byte-arrays instead of files in the ramfs. It also defines a merge routine that can merge a bunch of such byte-arrays. There is some dependency of the remaining work, i.e., changing the shuffle code to use the ByteArrayManager instead of the ramfs, on the patch for HADOOP-2095 (since that patch changes the layout of the intermediate sequence file). I'll see what else can be done without that patch being available.

          By the way, I have done the patch assuming the layout as <key-len><val-len><key><value> (the difference w.r.t the earlier proposed layout is that the lengths are together). That made the parsing of the byte arrays simpler.

          Show
          Devaraj Das added a comment - (An offline discussion led me to agree to the suggestion that we should not have the file abstraction for the in memory merge. The file streams adds overhead which is not desirable in a performance critical section.) This half-done patch is up for a high-level review. It introduces a ByteArrayManager that shuffle can use to store files as raw byte-arrays instead of files in the ramfs. It also defines a merge routine that can merge a bunch of such byte-arrays. There is some dependency of the remaining work, i.e., changing the shuffle code to use the ByteArrayManager instead of the ramfs, on the patch for HADOOP-2095 (since that patch changes the layout of the intermediate sequence file). I'll see what else can be done without that patch being available. By the way, I have done the patch assuming the layout as <key-len><val-len><key><value> (the difference w.r.t the earlier proposed layout is that the lengths are together). That made the parsing of the byte arrays simpler.
          Hide
          Devaraj Das added a comment -

          This is a slightly improved one.

          Show
          Devaraj Das added a comment - This is a slightly improved one.
          Hide
          Arun C Murthy added a comment -

          Here is an early version of my creatively titled SequenceFile replacement for intermediate data in Map-Reduce (map-outputs)... IFile stands-out for "Intermediate File" smile.

          Unfortunately the Writer isn't as tight as it can be, it needs to copy key/value into an internal buffer (see HADOOP-3414 for necessary details). However, the Reader seems reasonably tight and strictly does zero-copies. I chose to use DataInputBuffer as the key/value type in the call for Reader.next since it plays nicely by offering an InputStream interface and also the ability to provide it with a raw-buffer to work with; it can also be queried to get back the raw-buffer without any copies being made. I'll continue to plug-away, appreciate feedback.

          Show
          Arun C Murthy added a comment - Here is an early version of my creatively titled SequenceFile replacement for intermediate data in Map-Reduce (map-outputs)... IFile stands-out for "Intermediate File" smile . Unfortunately the Writer isn't as tight as it can be, it needs to copy key/value into an internal buffer (see HADOOP-3414 for necessary details). However, the Reader seems reasonably tight and strictly does zero-copies. I chose to use DataInputBuffer as the key/value type in the call for Reader.next since it plays nicely by offering an InputStream interface and also the ability to provide it with a raw-buffer to work with; it can also be queried to get back the raw-buffer without any copies being made. I'll continue to plug-away, appreciate feedback.
          Hide
          Arun C Murthy added a comment -

          Patch to stall shuffle when there isn't enough space left in RAM to accomodate the map-output.

          With this patch the single-reducer job which merges 12.5G from 2500maps on 25 machines runs in ~18mins, compared to 30odd mins after HADOOP-2095 (prior to which it was 45mins).

          Show
          Arun C Murthy added a comment - Patch to stall shuffle when there isn't enough space left in RAM to accomodate the map-output. With this patch the single-reducer job which merges 12.5G from 2500maps on 25 machines runs in ~18mins, compared to 30odd mins after HADOOP-2095 (prior to which it was 45mins).
          Hide
          Devaraj Das added a comment -

          Some comments:
          0) The inMem merge thread needs to ignore the criteria when the shuffle thread notifies it to do a forced merge.
          1) A race condition exists in the interval between the ramManager.notify and mergePassComplete.wait() calls in getMapOutput. What could happen is that the ramManager gets notified and it finishes the merge before this thread calls mergePassComplete.wait(). If this happens the notification from the merger is lost and this thread will just wait ...
          2) The handshake between the merger, copier and the ramManager looks complex and there could be more race conditions like the one i pointed above. I and Sharad had a quick discussion and we feel it can be simplified.
          Have the ramManager.reserve lock the thread if the request cannot be satisfied
          Have the ramManager.unreserve do a notifyAll (this the mergeThread does)
          Have the shuffle thread notify the mergeThread (before it goes to wait)

          Show
          Devaraj Das added a comment - Some comments: 0) The inMem merge thread needs to ignore the criteria when the shuffle thread notifies it to do a forced merge. 1) A race condition exists in the interval between the ramManager.notify and mergePassComplete.wait() calls in getMapOutput. What could happen is that the ramManager gets notified and it finishes the merge before this thread calls mergePassComplete.wait(). If this happens the notification from the merger is lost and this thread will just wait ... 2) The handshake between the merger, copier and the ramManager looks complex and there could be more race conditions like the one i pointed above. I and Sharad had a quick discussion and we feel it can be simplified. Have the ramManager.reserve lock the thread if the request cannot be satisfied Have the ramManager.unreserve do a notifyAll (this the mergeThread does) Have the shuffle thread notify the mergeThread (before it goes to wait)
          Hide
          Devaraj Das added a comment -

          Here is a patch for just the ReduceTask.java that outlines the synchronization that we were thinking about... Its a very quickly written patch.. So don't be surprised if you see glaring issues (e.g. i could have used a AtomicBoolean instead of Boolean) but it gives the idea.. This looks much simple I think (i hope i didn't miss some cases where it absolutely won't work).

          Show
          Devaraj Das added a comment - Here is a patch for just the ReduceTask.java that outlines the synchronization that we were thinking about... Its a very quickly written patch.. So don't be surprised if you see glaring issues (e.g. i could have used a AtomicBoolean instead of Boolean) but it gives the idea.. This looks much simple I think (i hope i didn't miss some cases where it absolutely won't work).
          Hide
          Arun C Murthy added a comment -

          0) The inMem merge thread needs to ignore the criteria when the shuffle thread notifies it to do a forced merge.

          The shuffle thread cannot force a merge, the merge is still triggered only when the various criteria are satisfied - the notification only wakes up the possibly waiting merge-thread.

          1) A race condition exists in the interval between the ramManager.notify and mergePassComplete.wait() calls in getMapOutput. What could happen is that the ramManager gets notified and it finishes the merge before this thread calls mergePassComplete.wait(). If this happens the notification from the merger is lost and this thread will just wait ...

          Even if that notification is lost, the merge-thread always allows the shuffle threads to progress by doing a 'mergeProgress.notifyAll' before it sleeps - thereby preventing any deadlocks.

          2) The handshake between the merger, copier and the ramManager looks complex and there could be more race conditions like the one i pointed above. I and Sharad had a quick discussion and we feel it can be simplified.
          Have the ramManager.reserve lock the thread if the request cannot be satisfied
          Have the ramManager.unreserve do a notifyAll (this the mergeThread does)
          Have the shuffle thread notify the mergeThread (before it goes to wait)

          I agree, it is complicated. However it has a couple of important points:
          1. RamManager.reserve cannot lock the thread without closing the http connection, doing so would leak the shuffle into the RamManager where you'd have to pass the HTTP input-stream to RamManager.reserve.
          2. It is much better to do a 'notifyAll' on the shuffle threads when the merge is complete, so one is reasonably sure that all shuffle threads can progress. Doing it in RamManager.unreserve would let only one shuffle thread through at a time and the contention for the lock would be very high - every thread will wake up, and get lockedup inside the RamManager again.

          Show
          Arun C Murthy added a comment - 0) The inMem merge thread needs to ignore the criteria when the shuffle thread notifies it to do a forced merge. The shuffle thread cannot force a merge, the merge is still triggered only when the various criteria are satisfied - the notification only wakes up the possibly waiting merge-thread. 1) A race condition exists in the interval between the ramManager.notify and mergePassComplete.wait() calls in getMapOutput. What could happen is that the ramManager gets notified and it finishes the merge before this thread calls mergePassComplete.wait(). If this happens the notification from the merger is lost and this thread will just wait ... Even if that notification is lost, the merge-thread always allows the shuffle threads to progress by doing a 'mergeProgress.notifyAll' before it sleeps - thereby preventing any deadlocks. 2) The handshake between the merger, copier and the ramManager looks complex and there could be more race conditions like the one i pointed above. I and Sharad had a quick discussion and we feel it can be simplified. Have the ramManager.reserve lock the thread if the request cannot be satisfied Have the ramManager.unreserve do a notifyAll (this the mergeThread does) Have the shuffle thread notify the mergeThread (before it goes to wait) I agree, it is complicated. However it has a couple of important points: 1. RamManager.reserve cannot lock the thread without closing the http connection, doing so would leak the shuffle into the RamManager where you'd have to pass the HTTP input-stream to RamManager.reserve. 2. It is much better to do a 'notifyAll' on the shuffle threads when the merge is complete, so one is reasonably sure that all shuffle threads can progress. Doing it in RamManager.unreserve would let only one shuffle thread through at a time and the contention for the lock would be very high - every thread will wake up, and get lockedup inside the RamManager again.
          Hide
          Devaraj Das added a comment -

          Take this scenario:
          1) Merge is currently in progress.
          2) Shuffle-thread-1 has set stallShuffle=true but hasn't done mergePassComplete.wait() and it got preempted
          3) All other Shuffle-threads will now block at shuffle.wait (the ramManager.notify won't have effect since the merge is already in progress)
          4) Merge completes and invokes mergePassComplete.notifyAll. It then goes back to ramManager.wait.
          5) Since shuffle-thread-1 hasn't done mergePassComplete.wait, the notification that the merge thread sent is lost and all the shuffle threads will continue to wait for ever.

          Unless i am missing something, isn't the above a possible scenario?

          I think if the shuffle thread does not get memory from the ram manager and the merge thread is waiting at that point we should do a merge to free up space irrespective of the criteria. But I also think that it is an unlikely case that the merge isn't already in progress when the memory request cannot be serviced.

          The patch I uploaded implements everything in ReduceTask.java (i forgot to mention that). I didn't have to touch RamManager.java and i still think it is simpler and does exactly what we need (maybe a cleanup is required). If you remove the condition of forceMerge, it will become simpler (but maybe a cleanup is required).

          Show
          Devaraj Das added a comment - Take this scenario: 1) Merge is currently in progress. 2) Shuffle-thread-1 has set stallShuffle=true but hasn't done mergePassComplete.wait() and it got preempted 3) All other Shuffle-threads will now block at shuffle.wait (the ramManager.notify won't have effect since the merge is already in progress) 4) Merge completes and invokes mergePassComplete.notifyAll. It then goes back to ramManager.wait. 5) Since shuffle-thread-1 hasn't done mergePassComplete.wait, the notification that the merge thread sent is lost and all the shuffle threads will continue to wait for ever. Unless i am missing something, isn't the above a possible scenario? I think if the shuffle thread does not get memory from the ram manager and the merge thread is waiting at that point we should do a merge to free up space irrespective of the criteria. But I also think that it is an unlikely case that the merge isn't already in progress when the memory request cannot be serviced. The patch I uploaded implements everything in ReduceTask.java (i forgot to mention that). I didn't have to touch RamManager.java and i still think it is simpler and does exactly what we need (maybe a cleanup is required). If you remove the condition of forceMerge, it will become simpler (but maybe a cleanup is required).
          Hide
          Arun C Murthy added a comment -

          Updated patch with feedback from Owen/Devaraj - thanks!

          Show
          Arun C Murthy added a comment - Updated patch with feedback from Owen/Devaraj - thanks!
          Hide
          Arun C Murthy added a comment -

          Chris pointed out that the check for failure to re-open the connection could be strengthened to ensure better clean-ups. Thanks Chris!

          Show
          Arun C Murthy added a comment - Chris pointed out that the check for failure to re-open the connection could be strengthened to ensure better clean-ups. Thanks Chris!
          Hide
          Arun C Murthy added a comment -

          Passes unit-tests locally, and here is result of 'ant test-patch':

          [exec] -1 overall.
          
          [exec]     +1 @author.  The patch does not contain any @author tags.
          
          [exec]     -1 tests included.  The patch doesn't appear to include any new or modified tests.
          [exec]                         Please justify why no tests are needed for this patch.
          
          [exec]     +1 javadoc.  The javadoc tool did not generate any warning messages.
          
          [exec]     +1 javac.  The applied patch does not increase the total number of javac compiler warnings.
          
          [exec]     +1 findbugs.  The patch does not introduce any new Findbugs warnings.
          

          It's hard to write a test case for this one...

          Show
          Arun C Murthy added a comment - Passes unit-tests locally, and here is result of 'ant test-patch': [exec] -1 overall. [exec] +1 @author. The patch does not contain any @author tags. [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. It's hard to write a test case for this one...
          Hide
          Chris Douglas added a comment -

          +1

          Show
          Chris Douglas added a comment - +1
          Hide
          Arun C Murthy added a comment -

          I just committed this.

          Show
          Arun C Murthy added a comment - I just committed this.

            People

            • Assignee:
              Arun C Murthy
              Reporter:
              Arun C Murthy
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development