Index: src/java/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobInProgress.java	(revision 639156)
+++ src/java/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -20,10 +20,14 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.Vector;
 
@@ -63,8 +67,8 @@
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
   int runningReduceTasks = 0;
-  int finishedMapTasks = 0;
-  int finishedReduceTasks = 0;
+  volatile int finishedMapTasks = 0;
+  volatile int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
   
@@ -79,6 +83,21 @@
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nodesToMaps;
   
+  // Map of NetworkTopology Node to set of running TIPs
+  Map<Node, Set<TaskInProgress>> runningMapCache;
+
+  // A list of non-local non-running maps
+  List<TaskInProgress> nonLocalMaps;
+
+  // A set of non-local running maps
+  Set<TaskInProgress> nonLocalRunningMaps;
+
+  // A list of non-running reduce TIPs
+  List<TaskInProgress> reducers;
+
+  // A set of running reduce TIPs
+  Set<TaskInProgress> runningReducers;
+
   private int maxLevel;
   
   private int taskCompletionEventTracker = 0; 
@@ -183,6 +202,10 @@
     this.jobMetrics.setTag("jobId", jobid);
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.runningReducers = new LinkedHashSet<TaskInProgress>();
   }
 
   /**
@@ -215,12 +238,6 @@
     jobMetrics.remove();
   }
     
-  private Node getParentNode(Node node, int level) {
-    for (int i = 0; node != null && i < level; i++) {
-      node = node.getParent();
-    }
-    return node;
-  }
   private void printCache (Map<Node, List<TaskInProgress>> cache) {
     LOG.info("The taskcache info:");
     for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
@@ -238,7 +255,13 @@
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
     for (int i = 0; i < splits.length; i++) {
-      for(String host: splits[i].getLocations()) {
+      String[] splitLocations = splits[i].getLocations();
+      if (splitLocations.length == 0) {
+        nonLocalMaps.add(maps[i]);
+        continue;
+      }
+
+      for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
         if (node == null) {
           continue;
@@ -249,10 +272,10 @@
           return null;
         }
         for (int j = 0; j < maxLevel; j++) {
-          node = getParentNode(node, j);
+          node = JobTracker.getParentNode(node, j);
           List<TaskInProgress> hostMaps = cache.get(node);
           if (hostMaps == null) {
-            hostMaps = new ArrayList<TaskInProgress>();
+            hostMaps = new LinkedList<TaskInProgress>();
             cache.put(node, hostMaps);
             hostMaps.add(maps[i]);
           }
@@ -326,6 +349,7 @@
     //
     // Create reduce tasks
     //
+    reducers = new LinkedList<TaskInProgress>();    
     this.reduces = new TaskInProgress[numReduceTasks];
     for (int i = 0; i < numReduceTasks; i++) {
       reduces[i] = new TaskInProgress(jobId, jobFile, 
@@ -331,6 +355,7 @@
       reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
                                       jobtracker, conf, this);
+      reducers.add(reduces[i]);
     }
 
     // create job specific temporary directory in output path
@@ -633,8 +658,7 @@
     }
     
     
-    int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps, hasSpeculativeMaps);
+    int target = findNewMapTask(tts, clusterSize, status.mapProgress());
     if (target == -1) {
       return null;
     }
@@ -669,8 +693,7 @@
       return null;
     }
 
-    int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null, hasSpeculativeReduces);
+    int  target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
     if (target == -1) {
       return null;
     }
@@ -763,26 +786,302 @@
     return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
+
+  /**
+   * Remove a TIP from the running list.
+   * @param tip the tip that needs to be retired
+   */
+    private synchronized void retireMap(TaskInProgress tip) {
+      
+      // Since a list for running maps is maintained if speculation is 'ON'
+      if (hasSpeculativeMaps) {
+        if (runningMapCache == null) {
+          LOG.warn("Running cache for maps missing!! "
+                   + "Job details are missing.");
+          return;
+        }
+        String[] splitLocations = tip.getSplitLocations();
+
+        // Remove the TIP from the list for running non-local maps
+        if (splitLocations.length == 0) {
+          nonLocalRunningMaps.remove(tip);
+          return;
+        }
+
+        // Remove from the running map caches
+        for(String host: splitLocations) {
+          Node node = jobtracker.getNode(host);
+          if (node == null) {
+            LOG.error("Pre-existing information for the host " + host 
+                      + " is missing from the topology");
+            continue;
+          }
+
+          for (int j = 0; j < maxLevel; ++j) {
+            Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+            if (hostMaps != null) {
+              hostMaps.remove(tip);
+              if (hostMaps.size() == 0) {
+                runningMapCache.remove(node);
+              }
+            }
+            if (node.getParent() == null) {
+              LOG.error("Missing parent of the node " + node.getName()
+                        + " at level " + node.getLevel());
+              break;
+            } else {
+              node = node.getParent();
+            }
+          }
+        }
+      }
+    }
+    
+    /**
+     * Remove a TIP from the list for running-reduces
+     * @param tip the tip that needs to be retired
+     */
+    private synchronized void retireReduce(TaskInProgress tip) {
+      // Since a list for running reduces is maintained if speculation is 'ON'
+      if (hasSpeculativeReduces) {
+        if (runningReducers == null) {
+          LOG.warn("Running list for reducers missing!! "
+                   + "Job details are missing.");
+          return;
+        }
+        runningReducers.remove(tip);
+      }
+    }
+
+  /**
+   * Adds a map tip to the list of running maps.
+   * @param tip the tip that needs to be scheduled as running
+   */
+  private synchronized void scheduleMap(TaskInProgress tip) {
+    
+    // Since a running list is maintained only if speculation is 'ON'
+    if (hasSpeculativeMaps) {
+      if (runningMapCache == null) {
+        LOG.warn("Running cache for maps is missing!! " 
+                 + "Job details are missing.");
+        return;
+      }
+      String[] splitLocations = tip.getSplitLocations();
+
+      // Add the TIP to the list of non-local running TIPs
+      if (splitLocations.length == 0) {
+        nonLocalRunningMaps.add(tip);
+        return;
+      }
+
+      for(String host: splitLocations) {
+        Node node = jobtracker.getNode(host);
+        if (node == null) {
+          LOG.fatal("Pre-existing information of the host " + host 
+                    + " is missing from the topology");
+          continue;
+        }
+
+        for (int j = 0; j < maxLevel; ++j) {
+          Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+          if (hostMaps == null) {
+            // create a cache if needed
+            hostMaps = new LinkedHashSet<TaskInProgress>();
+            runningMapCache.put(node, hostMaps);
+          }
+          hostMaps.add(tip);
+          if (node.getParent() == null) {
+            LOG.error("Missing parent of the node " + node.getName()
+                      + " at level " + node.getLevel());
+            break;
+          } else {
+            node = node.getParent();
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Adds a reduce tip to the list of running reduces
+   * @param tip the tip that needs to be scheduled as running
+   */
+  private synchronized void scheduleReduce(TaskInProgress tip) {
+    // Since a list for running reduces is maintained if speculation is 'ON'
+    if (hasSpeculativeReduces) {
+      if (runningReducers == null) {
+        LOG.warn("Running cache for reducers missing!! "
+                 + "Job details are missing.");
+        return;
+      }
+      runningReducers.add(tip);
+    }
+  }
+  
+  /**
+   * Adds the failed TIP in the front of the list for non-running 
+   * maps
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failMap(TaskInProgress tip) {
+    if (nodesToMaps == null) {
+      LOG.warn("Non-running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+
+    // 1. Its added everywhere since other nodes (having this split local)
+    //    might have removed this tip from their local cache
+    // 2. Give high priority to failed tip - fail early
+
+    String[] splitLocations = tip.getSplitLocations();
+
+    // Add the TIP in the front of the list for non-local non-running maps
+    if (splitLocations.length == 0) {
+      nonLocalMaps.add(0, tip);
+      return;
+    }
+
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+      if (node == null) {
+        LOG.fatal("Pre-existing information of the host " + host 
+                  + " missing from the topology");
+        continue;
+      }
+
+      for (int j = 0; j < maxLevel; ++j) {
+        List<TaskInProgress> hostMaps = nodesToMaps.get(node);
+        if (hostMaps == null) {
+          hostMaps = new LinkedList<TaskInProgress>();
+          nodesToMaps.put(node, hostMaps);
+        }
+        hostMaps.add(0, tip);
+        if (node.getParent() == null) {
+          LOG.error("Missing parent of the node " + node.getName() 
+                    + " at level " + node.getLevel());
+          break;
+        } else {
+          node = node.getParent();
+        }
+      }
+    }
+  }
   
   /**
-   * Find a new task to run.
+   * Adds a failed TIP in the front of the list for 
+   * non-running reduces
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failReduce(TaskInProgress tip) {
+    if (reducers == null) {
+      LOG.warn("Failed cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    reducers.add(0, tip);
+  }
+  
+  /**
+   * Find a non-running task in the passed list of TIPs
+   * @param tips a collection of TIPs
+   * @param taskTracker the tracker that has requested a task to run
+   * @param level level of the cache (>=0 for maps, -1 for reduces)
+   */
+  private synchronized TaskInProgress findTaskFromList(
+      Collection<TaskInProgress> tips, String taskTracker, int level) {
+    Iterator<TaskInProgress> iter = tips.iterator();
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+      TaskInProgress newTask = null;
+
+      // Select a tip if
+      //   1. runnable   : still needs to be run and is not completed
+      //   2. ~running   : no other node is running it
+      //   3. first-time : has not failed on this host
+      //   4. failed     : has failed on all the other hosts
+      // A TIP is removed from the list if 
+      // (1) this is the first time it ever got scheduled on this tracker
+      // (2) this TIP failed earlier on the tracker (note the TIP gets added to
+      //     all trackers on a failure) and the passed TIPs list *is* the host
+      //     list (level = 0)
+      // (3) when the TIP is non-schedulable
+      if (tip.isRunnable() && !tip.isRunning()) {
+        // check if the tip has failed on this host
+        if (tip.hasFailedOnMachine(taskTracker)) {
+          // check if the tip has failed on all the nodes
+          if (tip.getNumberOfFailedMachines() >= clusterSize) {
+            // if YES then schedule it and remove it
+            iter.remove();
+            newTask = tip;
+          } else {
+            if (level == 0) {
+              // see point#2 in the comment above for TIP removal logic
+              iter.remove();
+            }
+          }
+        } else {
+          iter.remove();
+          newTask = tip;
+        }
+      } else {
+        // see point#3 in the comment above for TIP removal logic
+        iter.remove();
+      }
+      if (newTask != null) {
+        return newTask;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Find a speculative task
+   * @param list a list of tips
+   * @param taskTracker the tracker that has requested a tip
+   * @param avgProgress the average progress for speculation
+   * @param currentTime current time in millisec
+   * @param level the level of the cache
+   * @return a tip that can be speculated on the tracker
+   */
+  private synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTracker, double avgProgress,
+      long currentTime, int level) {
+    
+    Iterator<TaskInProgress> iter = list.iterator();
+
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+      // should never be true!
+      if (!tip.isRunning()) {
+        iter.remove();
+        continue;
+      }
+
+      if (shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                   taskTracker)) {
+        if (level == 0) {
+          iter.remove(); //this tracker is never going to run it again
+        }
+        return tip;
+      } 
+    }
+    return null;
+  }
+  
+  /**
+   * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param avgProgress The average progress of this kind of task in this job
-   * @param tasks The list of potential tasks to try
-   * @param firstTaskToTry The first index in tasks to check
-   * @param cachedTasks A list of tasks that would like to run on this node
-   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
-  private int findNewTask(TaskTrackerStatus tts, 
-                          int clusterSize,
-                          double avgProgress,
-                          TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks,
-                          boolean hasSpeculative) {
+  private synchronized int findNewMapTask(TaskTrackerStatus tts, 
+                                          int clusterSize,
+                                          double avgProgress) {
     String taskTracker = tts.getTrackerName();
-    int specTarget = -1;
+    TaskInProgress tip = null;
+    Collection<Node> nodesAtMaxLevel = null;
 
     //
     // Update the last-known clusterSize
@@ -790,6 +1089,9 @@
     this.clusterSize = clusterSize;
 
     Node node = jobtracker.getNode(tts.getHost());
+    // get the parent of the node at max level
+    Node nodeParentAtMaxLevel = JobTracker.getParentNode(node, maxLevel - 1);
+
     //
     // Check if too many tasks of this job have failed on this
     // tasktracker prior to assigning it a new one.
@@ -804,107 +1106,241 @@
       }
       return -1;
     }
-    long currentTime = System.currentTimeMillis();
-        
-    //
-    // See if there is a split over a block that is stored on
-    // the TaskTracker checking in or the rack it belongs to and so on till
-    // maxLevel.  That means the block
-    // doesn't have to be transmitted from another node/rack/and so on.
-    // The way the cache is updated is such that in every lookup, the TIPs
-    // which are complete is removed. Running/Failed TIPs are not removed
-    // since we want to have locality optimizations even for FAILED/SPECULATIVE
-    // tasks.
+
+    // For scheduling a map task, we have two caches and a list (optional)
+    //  I)   one for non-running task
+    //  II)  one for running task (this is for handling speculation)
+    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
+    //       the list is empty if all TIPs have associated locations
+
+    // First a look up is done on the non-running cache and on a miss, a look 
+    // up is done on the running cache. The order for lookup within the cache:
+    //   1. from local node to root [bottom up]
+    //   2. breadth wise for all the parent nodes at max level
+
+    // We fall to linear scan of the list (III above) if we have misses in the 
+    // above caches
+
     //
-    if (cachedTasks != null && node != null) {
+    // I) Non-running TIP :
+    // 
+
+    // 1. check from local node to the root [bottom up cache lookup]
+    //    i.e if node info is available make use of its cache
+    if (node != null) {
       Node key = node;
-      for (int level = 0; level < maxLevel && key != null; level++) {
-        List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
+      for (int level = 0; level < maxLevel; ++level) {
+        List <TaskInProgress> cacheForLevel = nodesToMaps.get(key);
         if (cacheForLevel != null) {
-          Iterator<TaskInProgress> i = cacheForLevel.iterator();
-          while (i.hasNext()) {
-            TaskInProgress tip = i.next();
-            // we remove only those TIPs that are data-local (the host having
-            // the data is running the task). We don't remove TIPs that are 
-            // rack-local for example since that would negatively impact
-            // the performance of speculative and failed tasks (imagine a case
-            // where we schedule one TIP rack-local and after sometime another
-            // tasktracker from the same rack is asking for a task, and the TIP
-            // in question has either failed or could be a speculative task
-            // candidate)
-            if (tip.isComplete() || level == 0) {
-              i.remove();
+          tip = findTaskFromList(cacheForLevel, taskTracker, level);
+          if (tip != null) {
+            // Add to running cache
+            scheduleMap(tip);
+
+            // remove the cache if its empty
+            if (cacheForLevel.size() == 0) {
+              nodesToMaps.remove(key);
+            }
+
+            if (level == 0) {
+              LOG.info("Choosing data-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+            } else if (level == 1){
+              LOG.info("Choosing rack-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+            } else {
+              LOG.info("Choosing cached task at level " + level 
+                       + tip.getTIPId());
+            }
+
+            return tip.getIdWithinJob();
+          }
+        }
+        if (key.getParent() == null) {
+          LOG.error("Missing parent information of the node " + key.getName() 
+                    + " at level " + key.getLevel());
+          break;
+        } else {
+          key = key.getParent();
+        }
+      }
+    }
+
+    // 2. Search breadth-wise across parents (max level) for non-running TIP if
+    //     - cache miss 
+    //     - node information for the tracker is missing (tracker's topology 
+    //       info not obtained yet)
+    nodesAtMaxLevel = jobtracker.getNodesAtLevel(maxLevel - 1);
+
+    if (nodesAtMaxLevel != null) {
+      for (Node parent : nodesAtMaxLevel) {
+
+        // skip the parent that has already been scanned
+        if (parent == nodeParentAtMaxLevel) {
+          continue;
+        }
+
+        List<TaskInProgress> cache = nodesToMaps.get(parent);
+        if (cache != null) {
+          tip = findTaskFromList(cache, taskTracker, maxLevel - 1);
+          if (tip != null) {
+            // Add to the running cache
+            scheduleMap(tip);
+
+            // remove the cache if empty
+            if (cache.size() == 0) {
+              nodesToMaps.remove(parent);
             }
-            if (tip.isRunnable() && 
-                !tip.isRunning() &&
-                !tip.hasFailedOnMachine(taskTracker)) {
-              int cacheTarget = tip.getIdWithinJob();
+            LOG.info("Choosing a non-local task " + tip.getTIPId());
+            return tip.getIdWithinJob();
+          }
+        }
+      }
+    } else {
+      LOG.fatal("Missing parent at max level in the topology");
+    }
+    
+    // 3. Search non-local tips for a new task
+    tip = findTaskFromList(nonLocalMaps, taskTracker, maxLevel - 1);
+    if (tip != null) {
+      // Add to the running list
+      scheduleMap(tip);
+
+      LOG.info("Choosing a non-local task " + tip.getTIPId());
+      return tip.getIdWithinJob();
+    }
+
+    //
+    // II) Running TIP :
+    // 
+
+    if (hasSpeculativeMaps) {
+      long currentTime = System.currentTimeMillis();
+      // 1. Check bottom up for speculative tasks from the running cache
+      if (node != null) {
+        Node key = node;
+        for (int level = 0; level < maxLevel; ++level) {
+          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
+          if (cacheForLevel != null) {
+            tip = findSpeculativeTask(cacheForLevel, taskTracker, avgProgress, 
+                                      currentTime, level);
+            if (tip != null) {
+              if (cacheForLevel.size() == 0) {
+                runningMapCache.remove(key);
+              }
               if (level == 0) {
-                LOG.info("Choosing data-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+                LOG.info("Choosing a data-local task " + tip.getTIPId() 
+                         + " for speculation");
               } else if (level == 1){
-                LOG.info("Choosing rack-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+                LOG.info("Choosing a rack-local task " + tip.getTIPId() 
+                         + " for speculation");
               } else {
-                LOG.info("Choosing cached task at level " + level + " " + 
-                          tip.getTIPId());
+                LOG.info("Choosing a cached task at level " + level
+                         + tip.getTIPId() + " for speculation");
               }
-              return cacheTarget;
+              return tip.getIdWithinJob();
             }
-            if (hasSpeculative && specTarget == -1 &&
-                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
-                                         taskTracker)) {
-              specTarget = tip.getIdWithinJob();
-            }
+          }
+          if (key.getParent() == null) {
+            LOG.error("Missing parent information of the node " + key.getName()
+                      + " at level " + key.getLevel());
+            break;
+          } else {
+            key = key.getParent();
           }
         }
-        key = key.getParent();
       }
-    }
 
+      // 2. Check breadth-wise for speculative tasks
+      nodesAtMaxLevel = jobtracker.getNodesAtLevel(maxLevel - 1);
 
-    //
-    // If there's no cached target, see if there's
-    // a std. task to run.
-    //
-    int failedTarget = -1;
-    for (int i = 0; i < tasks.length; i++) {
-      TaskInProgress task = tasks[i];
-      if (task.isRunnable()) {
-        // if it failed here and we haven't tried every machine, we
-        // don't schedule it here.
-        boolean hasFailed = task.hasFailedOnMachine(taskTracker);
-        if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
-          continue;
-        }
-        boolean isRunning = task.isRunning();
-        if (hasFailed) {
-          // failed tasks that aren't running can be scheduled as a last
-          // resort
-          if (!isRunning && failedTarget == -1) {
-            failedTarget = i;
+      if (nodeParentAtMaxLevel != null) {
+        for (Node parent : nodesAtMaxLevel) {        
+          // ignore the parent which is already scanned
+          if (parent == nodeParentAtMaxLevel) {
+            continue;
           }
-        } else {
-          if (!isRunning) {
-            LOG.info("Choosing normal task " + tasks[i].getTIPId());
-            return i;
-          } else if (hasSpeculative && specTarget == -1 &&
-                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
-                                              taskTracker)) {
-            specTarget = i;
+
+          Set<TaskInProgress> cache = runningMapCache.get(parent);
+          if (cache != null) {
+            tip = findSpeculativeTask(cache, taskTracker, avgProgress, 
+                                      currentTime, maxLevel - 1);
+            if (tip != null) {
+              // remove empty cache entries
+              if (cache.size() == 0) {
+                runningMapCache.remove(parent);
+              }
+              LOG.info("Choosing a non-local task " + tip.getTIPId() 
+                       + " for speculation");
+              return tip.getIdWithinJob();
+            }
           }
         }
+      } else {
+        LOG.fatal("Missing parent at max level in the topology");
+      }
+
+      // 3. Check non-local tips for speculation
+      tip = findSpeculativeTask(nonLocalRunningMaps, taskTracker, avgProgress, 
+                                currentTime, maxLevel - 1);
+      if (tip != null) {
+        LOG.info("Choosing a non-local task " + tip.getTIPId() 
+                 + " for speculation");
+        return tip.getIdWithinJob();
       }
     }
-    if (specTarget != -1) {
-      LOG.info("Choosing speculative task " + 
-               tasks[specTarget].getTIPId());
-    } else if (failedTarget != -1) {
-      LOG.info("Choosing failed task " + 
-               tasks[failedTarget].getTIPId());          
+    return -1;
+  }
+
+  /**
+   * Find new reduce task
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param avgProgress The average progress of this kind of task in this job
+   * @return the index in tasks of the selected task (or -1 for no task)
+   */
+  private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
+                                             int clusterSize,
+                                             double avgProgress) {
+    String taskTracker = tts.getTrackerName();
+    TaskInProgress tip = null;
+    
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+
+    // Check if too many tasks of this job have failed on this
+    // tasktracker prior to assigning it a new one.
+
+    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
+    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
+        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+      if (LOG.isDebugEnabled()) {
+        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
+        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
+                  + "' for assigning a new task");
+      }
+      return -1;
     }
-    
-    return specTarget != -1 ? specTarget : failedTarget;
+
+    // 1. check for a never-executed reduce tip
+    // reducers don't have a cache and so pass -1 to explicitly call that out
+    tip = findTaskFromList(reducers, taskTracker, -1);
+    if (tip != null) {
+      scheduleReduce(tip);
+      return tip.getIdWithinJob();
+    }
+
+    // 2. check for a reduce tip to be speculated
+    if (hasSpeculativeReduces) {
+      tip = findSpeculativeTask(runningReducers, taskTracker, avgProgress, 
+                                System.currentTimeMillis(), -1);
+      if (tip != null) {
+        scheduleReduce(tip);
+        return tip.getIdWithinJob();
+      }
+    }
+
+    return -1;
   }
 
   /**
@@ -969,6 +1405,8 @@
       runningMapTasks -= 1;
       finishedMapTasks += 1;
       metrics.completeMap();
+      // remove the completed map from the resp running caches
+      retireMap(tip);
     } else{
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
@@ -973,6 +1411,8 @@
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
       metrics.completeReduce();
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
     }
         
     //
@@ -999,13 +1439,8 @@
    * @return
    */
   private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
-    boolean allDone = true;
-    for (int i = 0; i < maps.length; i++) {
-      if (!(maps[i].isComplete() || maps[i].isFailed())) {
-        allDone = false;
-        break;
-      }
-    }
+    // Job is complete if total-tips = finished-tips + failed-tips
+    boolean allDone = ((finishedMapTasks + failedMapTIPs) == numMapTasks);
     if (allDone) {
       if (tip.isMapTask()) {
         this.status.setMapProgress(1.0f);              
@@ -1010,12 +1445,7 @@
       if (tip.isMapTask()) {
         this.status.setMapProgress(1.0f);              
       }
-      for (int i = 0; i < reduces.length; i++) {
-        if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
-          allDone = false;
-          break;
-        }
-      }
+      allDone = ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
     }
 
     //
@@ -1079,7 +1509,8 @@
                           TaskStatus status, String trackerName,
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
-    
+    boolean wasFailed = tip.isFailed();
+
     // Mark the taskid as FAILED or KILLED
     tip.incompleteSubTask(taskid, trackerName, this.status);
    
@@ -1090,8 +1521,20 @@
     if (wasRunning && !isRunning) {
       if (tip.isMapTask()){
         runningMapTasks -= 1;
+        // remove from the running queue and put it in the non-running cache
+        // if the tip is not complete i.e if the tip still needs to be run
+        if (!isComplete) {
+          retireMap(tip);
+          failMap(tip);
+        }
       } else {
         runningReduceTasks -= 1;
+        // remove from the running queue and put in the failed queue if the tip
+        // is not complete
+        if (!isComplete) {
+          retireReduce(tip);
+          failReduce(tip);
+        }
       }
     }
         
@@ -1106,19 +1549,8 @@
         // earlier (wasComplete and !isComplete) 
         // (since they might have been removed from the cache of other 
         // racks/switches, if the input split blocks were present there too)
-        for (String host : tip.getSplitLocations()) {
-          Node node = jobtracker.getNode(host);
-          for (int level = 1; (node != null && level < maxLevel); level++) {
-            node = getParentNode(node, level);
-            if (node == null) {
-              break;
-            }
-            List<TaskInProgress> list = nodesToMaps.get(node);
-            if (list != null) {
-              list.add(tip);
-            }
-          }
-        }
+        retireMap(tip);
+        failMap(tip);
         finishedMapTasks -= 1;
       }
     }
@@ -1176,8 +1608,8 @@
     //
     // Check if we need to kill the job because of too many failures or 
     // if the job is complete since all component tasks have completed
-    //
-    if (tip.isFailed()) {
+    // We do it once per TIP for the task that fails the TIP
+    if (!wasFailed && tip.isFailed()) {
       //
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
@@ -1285,6 +1717,11 @@
     }
     
     cleanUpMetrics();
+    // free up the memory used by the data structures
+    this.nodesToMaps = null;
+    this.runningMapCache = null;
+    this.reducers = null;
+    this.runningReducers = null;
   }
 
   /**
@@ -1291,14 +1728,17 @@
    * Return the TaskInProgress that matches the tipid.
    */
   public TaskInProgress getTaskInProgress(String tipid){
-    for (int i = 0; i < maps.length; i++) {
-      if (tipid.equals(maps[i].getTIPId())){
-        return maps[i];
-      }               
-    }
-    for (int i = 0; i < reduces.length; i++) {
-      if (tipid.equals(reduces[i].getTIPId())){
-        return reduces[i];
+    if (TaskInProgress.isMapId(tipid)) {
+      for (int i = 0; i < maps.length; i++) {
+        if (tipid.equals(maps[i].getTIPId())){
+          return maps[i];
+        }
+      }
+    } else {
+      for (int i = 0; i < reduces.length; i++) {
+        if (tipid.equals(reduces[i].getTIPId())){
+          return reduces[i];
+        }
       }
     }
     return null;
Index: src/java/org/apache/hadoop/mapred/JobTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobTracker.java	(revision 639156)
+++ src/java/org/apache/hadoop/mapred/JobTracker.java	(working copy)
@@ -37,6 +37,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.HashMap;
 import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -86,8 +87,9 @@
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private ResolutionThread resThread = new ResolutionThread();
-  private int numTaskCacheLevels; // the max level of a host in the network topology
-  
+  private int numTaskCacheLevels; // the max level to which we cache tasks
+  private Map<Integer, Set<Node>> siblingSetAtLevel = 
+          new HashMap<Integer, Set<Node>>();
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -560,6 +562,10 @@
   Map<String, Node> trackerNameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
+  //Set of nodes at the level corresponding to mapred.task.cache.levels
+  //This would be the highest level for the cache
+  Set<Node> maxLevelNodesSet = new HashSet<Node>();
+
   // Number of resolved entries
   int numResolved;
     
@@ -1180,9 +1186,38 @@
     if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
       node = new NodeBase(name, networkLoc);
       clusterMap.add(node);
+      trackerNameToNodeMap.put(name, node);
+      for (int i = 1; i < numTaskCacheLevels; ++i) {
+        LOG.info("Adding parent for node " + name + " at level " + i);
+        Set<Node> s = siblingSetAtLevel.get(i);
+        if (s == null) {
+          s = new HashSet<Node>();
+          siblingSetAtLevel.put(i, s);
+        }
+        Node parentAtLevel = getParentNode(node, i);
+        if (parentAtLevel != null) {
+          s.add(parentAtLevel);
+        } else {
+          LOG.fatal("Missing parent of the node " + node.getName() 
+                    + " at level " + node.getLevel());
+          break;
+        }
+      }
+    }
+    return node;
+  }
+
+  public Collection<Node> getNodesAtLevel(int i) {
+    return siblingSetAtLevel.get(i);
+  }
+
+  public static Node getParentNode(Node node, int level) {
+    for (int i = 0; node != null && i < level; ++i) {
+      node = node.getParent();
     }
     return node;
   }
+
   public Node getNode(String name) {
     return trackerNameToNodeMap.get(name);
   }
@@ -1446,6 +1481,22 @@
               node = new NodeBase(host, networkLoc);
               clusterMap.add(node);
               trackerNameToNodeMap.put(host, node);
+              for (int level = 1; level < numTaskCacheLevels; ++level) {
+                LOG.info("Adding parent for node " + node.getName() + " at level " + level);
+                Set<Node> s = siblingSetAtLevel.get(level);
+                if (s == null) {
+                  s = new HashSet<Node>();
+                  siblingSetAtLevel.put(level, s);
+                }
+                Node parentAtLevel = getParentNode(node, level);
+                if (parentAtLevel != null) {
+                  s.add(parentAtLevel);
+                } else {
+                  LOG.fatal("Missing parent of the node " + node.getName() 
+                            + " at level " + node.getLevel());
+                  break;
+                }
+              }   
             }
             numResolved++;
           }
@@ -1797,8 +1848,11 @@
       return completedJobStatusStore.readCounters(jobid);
     }
   }
-  public synchronized TaskReport[] getMapTaskReports(String jobid) {
-    JobInProgress job = jobs.get(jobid);
+  public TaskReport[] getMapTaskReports(String jobid) {
+    JobInProgress job = null;
+    synchronized (jobs) {
+      job = jobs.get(jobid);
+    }
     if (job == null) {
       return new TaskReport[0];
     } else {
@@ -1819,8 +1873,11 @@
     }
   }
 
-  public synchronized TaskReport[] getReduceTaskReports(String jobid) {
-    JobInProgress job = jobs.get(jobid);
+  public TaskReport[] getReduceTaskReports(String jobid) {
+    JobInProgress job = null;
+    synchronized (jobs) {
+      job = jobs.get(jobid);
+    }
     if (job == null) {
       return new TaskReport[0];
     } else {
@@ -1844,10 +1901,13 @@
    * starting from fromEventId.
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
    */
-  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+  public TaskCompletionEvent[] getTaskCompletionEvents(
                                                                     String jobid, int fromEventId, int maxEvents) throws IOException{
     TaskCompletionEvent[] events;
-    JobInProgress job = this.jobs.get(jobid);
+    JobInProgress job = null;
+    synchronized (this.jobs) { 
+      job = this.jobs.get(jobid);
+    }
     if (null != job) {
       events = job.getTaskCompletionEvents(fromEventId, maxEvents);
     }
@@ -1864,7 +1924,7 @@
    * @param taskId the id of the task
    * @return an array of the diagnostic messages
    */
-  public synchronized String[] getTaskDiagnostics(String jobId, 
+  public String[] getTaskDiagnostics(String jobId, 
                                                   String tipId, 
                                                   String taskId) 
   throws IOException {
@@ -2102,11 +2162,10 @@
     }
     
     public void addToQueue(JobInProgress.JobWithTaskContext j) {
-      while (!queue.add(j)) {
-        LOG.warn("Couldn't add to the Task Commit queue now. Will " +
-                 "try again");
+      while (true) { // loop until the element gets added
         try {
-          Thread.sleep(2000);
+          queue.put(j);
+          return;
         } catch (InterruptedException ie) {}
       }
     }
@@ -2112,103 +2171,156 @@
     }
        
     public void run() {
+      int  batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size", 
+                                         5000); 
       while (!isInterrupted()) {
         try {
-          JobInProgress.JobWithTaskContext j = queue.take();
-          JobInProgress job = j.getJob();
-          TaskInProgress tip = j.getTIP();
-          String taskid = j.getTaskId();
-          JobTrackerMetrics metrics = j.getJobTrackerMetrics();
-          Task t;
-          TaskStatus status;
-          boolean isTipComplete = false;
-          TaskStatus.State state;
+          while (queue.size() == 0) {
+            // sleep for 2sec if the queue is empty
+            Thread.sleep(2000);
+          }
+
+          ArrayList <JobInProgress.JobWithTaskContext> jobList = 
+            new ArrayList<JobInProgress.JobWithTaskContext>(batchCommitSize);
+          try {
+            // drain some tasks and batch process them
+            queue.drainTo(jobList, batchCommitSize);
+          } catch (Exception ie) {
+            break;
+          }
+
+          JobInProgress[] jobs = new JobInProgress[jobList.size()];
+          TaskInProgress[] tips = new TaskInProgress[jobList.size()];
+          String[] taskids = new String[jobList.size()];
+          JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
+
+          Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
+          int count = 0;
+
+          while (iter.hasNext()) {
+            JobInProgress.JobWithTaskContext j = iter.next();
+            jobs[count] = j.getJob();
+            tips[count] = j.getTIP();
+            taskids[count]= j.getTaskId();
+            metrics[count] = j.getJobTrackerMetrics();
+            ++count;
+          }
+
+          Task[] tasks = new Task[jobList.size()];
+          TaskStatus[] status = new TaskStatus[jobList.size()];
+          boolean[] isTipComplete = new boolean[jobList.size()];
+          TaskStatus.State[] states = new TaskStatus.State[jobList.size()];
+
           synchronized (JobTracker.this) {
-            synchronized (job) {
-              synchronized (tip) {
-                status = tip.getTaskStatus(taskid);
-                t = tip.getTaskObject(taskid);
-                state = status.getRunState();
-                isTipComplete = tip.isComplete();
+            for(int i = 0; i < jobList.size(); ++i) {
+              synchronized (jobs[i]) {
+                synchronized (tips[i]) {
+                  status[i] = tips[i].getTaskStatus(taskids[i]);
+                  tasks[i] = tips[i].getTaskObject(taskids[i]);
+                  states[i] = status[i].getRunState();
+                  isTipComplete[i] = tips[i].isComplete();
+                }
               }
             }
           }
-          try {
-            //For COMMIT_PENDING tasks, we save the task output in the dfs
-            //as well as manipulate the JT datastructures to reflect a
-            //successful task. This guarantees that we don't declare a task
-            //as having succeeded until we have successfully completed the
-            //dfs operations.
-            //For failed tasks, we just do the dfs operations here. The
-            //datastructures updates is done earlier as soon as the failure
-            //is detected so that the JT can immediately schedule another
-            //attempt for that task.
-            if (state == TaskStatus.State.COMMIT_PENDING) {
-              if (!isTipComplete) {
-                t.saveTaskOutput();
+
+          //For COMMIT_PENDING tasks, we save the task output in the dfs
+          //as well as manipulate the JT datastructures to reflect a
+          //successful task. This guarantees that we don't declare a task
+          //as having succeeded until we have successfully completed the
+          //dfs operations.
+          //For failed tasks, we just do the dfs operations here. The
+          //datastructures updates is done earlier as soon as the failure
+          //is detected so that the JT can immediately schedule another
+          //attempt for that task.
+
+          Set<String> seenTIPs = new HashSet<String>();
+          for(int index = 0; index < jobList.size(); ++index) {
+            try {
+              if (states[index] == TaskStatus.State.COMMIT_PENDING) {
+                if (!isTipComplete[index]) {
+                  if (!seenTIPs.contains(tips[index].getTIPId())) {
+                    tasks[index].saveTaskOutput();
+                    seenTIPs.add(tips[index].getTIPId());
+                  } else {
+                    // since other task of this tip has saved its output
+                    isTipComplete[index] = true;
+                  }
+                }
               }
+            } catch (IOException ioe) {
+              // Oops! Failed to copy the task's output to its final place;
+              // fail the task!
+              states[index] = TaskStatus.State.FAILED;
               synchronized (JobTracker.this) {
-                //do a check for the case where after the task went to
-                //COMMIT_PENDING, it was lost. So although we would have
-                //saved the task output, we cannot declare it a SUCCESS.
-                TaskStatus newStatus = null;
-                synchronized (job) {
-                  synchronized (tip) {
-                    status = tip.getTaskStatus(taskid);
-                    if (!isTipComplete) {
-                      if (status.getRunState() != 
-                        TaskStatus.State.COMMIT_PENDING) {
-                        state = TaskStatus.State.KILLED;
-                      } else {
-                        state = TaskStatus.State.SUCCEEDED;
-                      }
-                    } else {
-                      tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
-                      "TIP");
-                      state = TaskStatus.State.KILLED;
+                String reason = "Failed to rename output with the exception: " 
+                                + StringUtils.stringifyException(ioe);
+                TaskStatus.Phase phase = (tips[index].isMapTask() 
+                                          ? TaskStatus.Phase.MAP 
+                                          : TaskStatus.Phase.REDUCE);
+                jobs[index].failedTask(tips[index], status[index].getTaskId(), 
+                                       reason, phase, TaskStatus.State.FAILED, 
+                                       status[index].getTaskTracker(), null);
+              }
+              LOG.info("Failed to rename the output of " 
+                       + status[index].getTaskId() + " with " 
+                       + StringUtils.stringifyException(ioe));
+            }
+          }
 
-                    }
-                    //create new status if required. If the state changed from
-                    //COMMIT_PENDING to KILLED in the JobTracker, while we were
-                    //saving the output,the JT would have called updateTaskStatus
-                    //and we don't need to call it again
-                    if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
-                      newStatus = (TaskStatus)status.clone();
-                      newStatus.setRunState(state);
-                      newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+          TaskStatus[] newStatuslist = new TaskStatus[jobList.size()];
+          synchronized (JobTracker.this) {
+            //do a check for the case where after the task went to
+            //COMMIT_PENDING, it was lost. So although we would have
+            //saved the task output, we cannot declare it a SUCCESS.
+            for(int i = 0; i < jobList.size(); ++i) {
+              if(states[i] == TaskStatus.State.FAILED)
+                continue;
+
+              synchronized (jobs[i]) {
+                synchronized (tips[i]) {
+                  status[i] = tips[i].getTaskStatus(taskids[i]);
+                  if (!isTipComplete[i]) {
+                    if (status[i].getRunState() 
+                        != TaskStatus.State.COMMIT_PENDING) {
+                      states[i] = TaskStatus.State.KILLED;
+                    } else {
+                      states[i] = TaskStatus.State.SUCCEEDED;
+                      //create new status if required. If the state changed 
+                      //from COMMIT_PENDING to KILLED in the JobTracker, while 
+                      //we were saving the output,the JT would have called 
+                      //updateTaskStatus and we don't need to call it again
+                      newStatuslist[i] = (TaskStatus)status[i].clone();
+                      newStatuslist[i].setRunState(states[i]);
+                      newStatuslist[i].setProgress(
+                          (states[i] == TaskStatus.State.SUCCEEDED) 
+                          ? 1.0f 
+                          : 0.0f);
                     }
+                  } else {
+                    tips[i].addDiagnosticInfo(tasks[i].getTaskId(), 
+                                              "Already completed  TIP");
+                    states[i] = TaskStatus.State.KILLED;
                   }
-                  if (newStatus != null) {
-                    job.updateTaskStatus(tip, newStatus, metrics);
-                  }
+                }
+                if (newStatuslist[i] != null) {
+                  jobs[i].updateTaskStatus(tips[i], newStatuslist[i], 
+                                           metrics[i]);
                 }
               }
             }
-          } catch (IOException ioe) {
-            // Oops! Failed to copy the task's output to its final place;
-            // fail the task!
-            state = TaskStatus.State.FAILED;
-            synchronized (JobTracker.this) {
-              job.failedTask(tip, status.getTaskId(), 
-                  "Failed to rename output with the exception: " + 
-                  StringUtils.stringifyException(ioe), 
-                  (tip.isMapTask() ? 
-                      TaskStatus.Phase.MAP : 
-                        TaskStatus.Phase.REDUCE), 
-                        TaskStatus.State.FAILED,  
-                        status.getTaskTracker(), null);
-            }
-            LOG.info("Failed to rename the output of " + status.getTaskId() + 
-                " with: " + StringUtils.stringifyException(ioe));
           }
-          if (state == TaskStatus.State.FAILED || 
-              state == TaskStatus.State.KILLED) {
-            try {
-              t.discardTaskOutput();
-            } catch (IOException ioe) { 
-              LOG.info("Failed to discard the output of task " + 
-                  status.getTaskId() + " with: " + 
-                  StringUtils.stringifyException(ioe));
+
+          for(int i = 0; i < jobList.size(); ++i) {
+            if (states[i] == TaskStatus.State.FAILED
+                || states[i] == TaskStatus.State.KILLED) {
+              try {
+                tasks[i].discardTaskOutput();
+              } catch (IOException ioe) {
+                LOG.info("Failed to discard the output of task " 
+                         + status[i].getTaskId() + " with: " 
+                         + StringUtils.stringifyException(ioe));
+              }
             }
           }
         } catch (InterruptedException ie) {
Index: src/java/org/apache/hadoop/mapred/TaskInProgress.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskInProgress.java	(revision 639156)
+++ src/java/org/apache/hadoop/mapred/TaskInProgress.java	(working copy)
@@ -53,6 +53,8 @@
   int maxTaskAttempts = 4;    
   static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
+  static final String MAP_IDENTIFIER = "_m_";
+  static final String REDUCE_IDENTIFIER = "_r_";
   private static NumberFormat idFormat = NumberFormat.getInstance();
   static {
     idFormat.setMinimumIntegerDigits(6);
@@ -79,7 +81,7 @@
   private long startTime = 0;
   private long execStartTime = 0;
   private long execFinishTime = 0;
-  private int completes = 0;
+  private volatile int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
 
@@ -160,6 +162,18 @@
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
+  
+  /**
+   * Return true if the tip id represents a map
+   * @param tipId the tip id
+   * @return whether the tip is a map tip or a reduce tip
+   */
+  public static boolean isMapId(String tipId) {
+    if (tipId.contains(MAP_IDENTIFIER))  {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Make a unique name for this TIP.
@@ -170,9 +184,9 @@
     StringBuilder result = new StringBuilder();
     result.append(uniqueBase);
     if (isMapTask()) {
-      result.append("_m_");
+      result.append(MAP_IDENTIFIER);
     } else {
-      result.append("_r_");
+      result.append(REDUCE_IDENTIFIER);
     }
     result.append(idFormat.format(partition));
     return result.toString();
