Index: src/java/org/apache/hadoop/mapred/RamManager.java
===================================================================
--- src/java/org/apache/hadoop/mapred/RamManager.java	(revision 663837)
+++ src/java/org/apache/hadoop/mapred/RamManager.java	(working copy)
@@ -20,12 +20,18 @@
 import org.apache.hadoop.conf.Configuration;
 
 class RamManager {
+  /* Maximum percentage of the in-memory limit that a single shuffle can 
+   * consume*/ 
+  private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f;
+  
   volatile private int numReserved = 0;
   volatile private int size = 0;
   private final int maxSize;
+  private final int maxSingleShuffleLimit;
   
   public RamManager(Configuration conf) {
     maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+    maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT);
   }
   
   synchronized boolean reserve(long requestedSize) {
@@ -62,4 +68,8 @@
   int getMemoryLimit() {
     return maxSize;
   }
+  
+  int getShuffleLimit() {
+    return maxSingleShuffleLimit;
+  }
 }
Index: src/java/org/apache/hadoop/mapred/MRConstants.java
===================================================================
--- src/java/org/apache/hadoop/mapred/MRConstants.java	(revision 663837)
+++ src/java/org/apache/hadoop/mapred/MRConstants.java	(working copy)
@@ -35,7 +35,7 @@
   /**
    * Constant denoting when a merge of in memory files will be triggered 
    */
-  public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+  public static final float MAX_INMEM_FILESYS_USE = 0.66f;
   
   /**
    * Constant denoting the max size (in terms of the fraction of the total 
Index: src/java/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/java/org/apache/hadoop/mapred/ReduceTask.java	(revision 663837)
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -543,7 +543,16 @@
     private final List<MapOutput> mapOutputsFilesInMemory =
       Collections.synchronizedList(new LinkedList<MapOutput>());
     
-
+    /** Is the shuffle stalled? */
+    volatile boolean stallShuffle = false;
+    
+    /** A lock for shuffle threads to wait against when they are 'stalled'. */
+    Object shuffle = new Object();
+    
+    /** A lock for shuffle threads wait against for the 
+     * in-memory merge to complete. */
+    Object mergePassComplete = new Object();
+    
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -923,27 +932,51 @@
       private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                      Path localFilename)
       throws IOException, InterruptedException {
+        // Check if we should be stalling this shuffle...
+        if (stallShuffle) {
+          synchronized (shuffle) {
+            while (stallShuffle) {
+              // Allow the in-memory merge thread to proceed
+              synchronized (ramManager) {
+                ramManager.notify();
+              }
+
+              LOG.info(mapOutputLoc.taskAttemptId + ": Stalling shuffle...");
+              shuffle.wait();
+            }
+            LOG.info(mapOutputLoc.taskAttemptId + ": Shuffle un-stalled...");
+          }
+        }
+        
         boolean good = false;
         OutputStream output = null;
         MapOutput mapOutput = null;
         
         try {
+          // Connect to the TaskTracker to fetch the map-output
           URLConnection connection = 
             mapOutputLoc.getOutputLocation().openConnection();
           InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
                                              STALLED_COPY_TIMEOUT);
+          
           //We will put a file in memory if it meets certain criteria:
           //1. The size of the (decompressed) file should be less than 25% of 
           //    the total inmem fs
           //2. There is space available in the inmem fs
           
+          // Get the compressed/decompressed map-output lengths
           long decompressedLength = 
             Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
           long compressedLength = 
             Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-          
-          // Check if we can save the map-output in-memory
-          boolean createInMem = ramManager.reserve(decompressedLength); 
+
+          // Check if this map-output can be saved in-memory
+          boolean shuffleLimitExceeded = 
+            (decompressedLength >= ramManager.getShuffleLimit());
+
+          // Check if we have enough buffer-space to keep map-output in-memory
+          boolean createInMem = 
+            (!shuffleLimitExceeded && ramManager.reserve(decompressedLength));
           if (createInMem) {
             LOG.info("Shuffling " + decompressedLength + " bytes (" + 
                      compressedLength + " raw bytes) " + 
@@ -958,6 +991,46 @@
             output = new DataOutputBuffer((int)decompressedLength);
           }
           else {
+            // We couldn't keep the map-output in-memory;
+            // check if we need to stall the shuffle...
+            if (!shuffleLimitExceeded) {
+              // Stall shuffle
+              synchronized (shuffle) {
+                stallShuffle = true;
+                LOG.info(mapOutputLoc.taskAttemptId + ": Stalled shuffle since " + 
+                         decompressedLength + " bytes couldn't fit in-memory!");
+              }
+              
+              // Close the current connection
+              try {
+                input.close();
+              } catch (IOException ignored) {
+                // Ignore
+              }
+              
+              // Allow the in-memory merge thread to proceed
+              synchronized (ramManager) {
+                ramManager.notify();
+              }
+              
+              // Wait for in-memory merge to complete...
+              synchronized (mergePassComplete) {
+                mergePassComplete.wait();
+                LOG.info(mapOutputLoc.taskAttemptId + ": Merge complete!");
+              }
+              
+              // Now unleash the other stalled copier-threads
+              synchronized (shuffle) {
+                stallShuffle = false;
+                shuffle.notifyAll();
+                LOG.info(mapOutputLoc.taskAttemptId + ": Woke up all stalled " +
+                		     "threads!");
+              }
+              
+              // Reconnect to get the same map-output
+              return getMapOutput(mapOutputLoc, localFilename);
+            }
+            
             // Find out a suitable location for the output on local-filesystem
             localFilename = lDirAlloc.getLocalPathForWrite(
                 localFilename.toUri().getPath(), decompressedLength, conf);
@@ -1737,14 +1810,20 @@
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   true, ioSortFactor, tmpDir, 
                                   conf.getOutputKeyComparator(), reporter);
+
+              Merger.writeFile(iter, writer, reporter);
+              writer.close();
+              writer = null;
             } catch (Exception e) {
-              writer.close();
-              localFileSys.delete(outputPath, true);
+              //make sure that we delete the ondisk file that we created earlier 
+              try { 
+                writer.close(); 
+              } catch (IOException ignored) {}
+              try {          
+                localFileSys.delete(outputPath, true);
+              } catch (IOException ignored){}
               throw new IOException (StringUtils.stringifyException(e));
             }
-            Merger.writeFile(iter, writer, reporter);
-            writer.close();
-            
             synchronized (mapOutputFilesOnDisk) {
               addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
             }
@@ -1787,6 +1866,12 @@
                     &&
                   (mergeThreshold <= 0 || 
                       ramManager.getReservedFiles() < mergeThreshold))) {
+                // Allow the shuffle to proceed
+                synchronized (mergePassComplete) {
+                  mergePassComplete.notifyAll();
+                }
+                
+                // Wait for map-outputs to be fetched
                 LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
                 ramManager.wait();
               }
@@ -1865,15 +1950,26 @@
             combineCollector.setWriter(writer);
             combineAndSpill(rIter, reduceCombineInputCounter);
           }
+          writer.close();
         } catch (Exception e) { 
-          //make sure that we delete the ondisk file that we created 
-          //earlier when we invoked cloneFileAttributes
-          writer.close();
-          localFileSys.delete(outputPath, true);
+          //make sure that we delete the ondisk file that we created earlier 
+          try { 
+            writer.close(); 
+          } catch (IOException ignored) {}
+          try {          
+            localFileSys.delete(outputPath, true);
+          } catch (IOException ignored){}
+          
           throw (IOException)new IOException
                   ("Intermedate merge failed").initCause(e);
+        } finally {
+          // Notify the stalled shuffle threads
+          synchronized (mergePassComplete) {
+            mergePassComplete.notifyAll();
+          }
+          LOG.info("Notified shuffle threads that merge is complete...");
         }
-        writer.close();
+        
         LOG.info(reduceTask.getTaskID() + 
                  " Merge of the " + noInMemorySegments +
                  " files in-memory complete." +
