Index: src/java/org/apache/hadoop/mapred/ReduceTask.java =================================================================== --- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision 663939) +++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working copy) @@ -709,6 +709,8 @@ } } } + volatile Boolean shuffleWait = false; + volatile Boolean forceMerge = false; /** Copies map outputs as they become available */ private class MapOutputCopier extends Thread { @@ -908,7 +910,6 @@ return bytes; } - /** * Get the map output into a local file (either in the inmemory fs or on the * local fs) from the remote server. @@ -942,9 +943,26 @@ long compressedLength = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH)); + //Check if this map-output can be saved in-memory + boolean shuffleLimitExceeded = + (decompressedLength >= ramManager.getShuffleLimit()); + // Check if we can save the map-output in-memory - boolean createInMem = ramManager.reserve(decompressedLength); + boolean createInMem = !shuffleLimitExceeded; if (createInMem) { + + while (!ramManager.reserve(decompressedLength)) { + shuffleWait = true; + synchronized (ramManager) { + forceMerge = true; + ramManager.notify(); + } + synchronized (shuffleWait) { + if (shuffleWait) { + shuffleWait.wait(); + } + } + } LOG.info("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM-FS from " + mapOutputLoc.getTaskAttemptId()); @@ -1780,13 +1798,13 @@ try { while(!exitInMemMerge) { synchronized(ramManager) { - while(!exitInMemMerge && + while(ReduceCopier.this.forceMerge || (!exitInMemMerge && ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE || ramManager.getReservedFiles() < (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) && (mergeThreshold <= 0 || - ramManager.getReservedFiles() < mergeThreshold))) { + ramManager.getReservedFiles() < mergeThreshold)))) { LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName()); ramManager.wait(); } @@ -1872,6 +1890,15 @@ localFileSys.delete(outputPath, true); throw (IOException)new IOException ("Intermedate merge failed").initCause(e); + } finally { + ReduceCopier.this.forceMerge = false; //no need to do unnecessary merge if we just did a merge + // Notify the stalled shuffle threads + synchronized (shuffleWait) { + shuffleWait = false; + shuffleWait.notifyAll(); + } + LOG.info("Notified shuffle threads that merge is complete..."); + } writer.close(); LOG.info(reduceTask.getTaskID() +