Index: src/java/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobInProgress.java	(revision 636785)
+++ src/java/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -20,10 +20,15 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.Vector;
 
@@ -63,8 +68,8 @@
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
   int runningReduceTasks = 0;
-  int finishedMapTasks = 0;
-  int finishedReduceTasks = 0;
+  volatile int finishedMapTasks = 0;
+  volatile int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
   
@@ -79,6 +84,15 @@
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nodesToMaps;
   
+  // Map of NetworkTopology Node to set of running TIPs
+  Map<Node, LinkedHashSet<TaskInProgress>> runningMapCache;
+
+  // A set of reduce TIPs
+  LinkedList<TaskInProgress> reducers;
+
+  // A set of running reduce TIPs
+  LinkedList<TaskInProgress> runningReducers;
+
   private int maxLevel;
   
   private int taskCompletionEventTracker = 0; 
@@ -183,6 +197,8 @@
     this.jobMetrics.setTag("jobId", jobid);
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.runningMapCache = new HashMap<Node, LinkedHashSet<TaskInProgress>>();
+    this.runningReducers = new LinkedList<TaskInProgress>();
   }
 
   /**
@@ -215,7 +231,7 @@
     jobMetrics.remove();
   }
     
-  private Node getParentNode(Node node, int level) {
+  public static Node getParentNode(Node node, int level) {
     for (int i = 0; node != null && i < level; i++) {
       node = node.getParent();
     }
@@ -250,9 +266,22 @@
         }
         for (int j = 0; j < maxLevel; j++) {
           node = getParentNode(node, j);
+          if (node == null) {
+            Node leaf = jobtracker.resolveAndAddToTopology(host);
+            LOG.warn("Got a host " + host + " with a null ancestor "
+                     + "(" + NodeBase.getPath(leaf) + ")");
+            break;
+          }
+
+          // ignore the node if its the default node
+          // no point in creating local cache for default node
+          if (JobTracker.DEFAULT_NODE.equals(node.getName())) {
+            continue;
+          }
+
           List<TaskInProgress> hostMaps = cache.get(node);
           if (hostMaps == null) {
-            hostMaps = new ArrayList<TaskInProgress>();
+            hostMaps = new LinkedList<TaskInProgress>();
             cache.put(node, hostMaps);
             hostMaps.add(maps[i]);
           }
@@ -294,7 +323,12 @@
     }
     numMapTasks = splits.length;
     maps = new TaskInProgress[numMapTasks];
+    String[] defaultLocation = new String[] {JobTracker.DEFAULT_NODE};
     for(int i=0; i < numMapTasks; ++i) {
+      // TIPs that are not local to any of the hosts are local to DEFAULT node
+      if (splits[i].getLocations().length == 0) {
+        splits[i].setLocations(defaultLocation);
+      }
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, i);
@@ -326,6 +360,7 @@
     //
     // Create reduce tasks
     //
+    reducers = new LinkedList<TaskInProgress>();    
     this.reduces = new TaskInProgress[numReduceTasks];
     for (int i = 0; i < numReduceTasks; i++) {
       reduces[i] = new TaskInProgress(jobId, jobFile, 
@@ -331,6 +366,7 @@
       reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
                                       jobtracker, conf, this);
+      reducers.add(reduces[i]);
     }
 
     // create job specific temporary directory in output path
@@ -635,8 +671,7 @@
     }
     
     
-    int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps, hasSpeculativeMaps);
+    int target = findNewMapTask(tts, clusterSize, status.mapProgress());
     if (target == -1) {
       return null;
     }
@@ -671,8 +706,7 @@
       return null;
     }
 
-    int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null, hasSpeculativeReduces);
+    int  target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
     if (target == -1) {
       return null;
     }
@@ -694,9 +728,11 @@
     return result;
   }
     
-  private String convertTrackerNameToHostName(String trackerName) {
+  public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
     // Convert the trackerName to it's host name
+    // Expecting tracker_<hostname>:<etc>
+    // Extract tracker_<hostname>
     int indexOfColon = trackerName.indexOf(":");
     String trackerHostName = (indexOfColon == -1) ? 
       trackerName : 
@@ -701,6 +737,14 @@
     String trackerHostName = (indexOfColon == -1) ? 
       trackerName : 
       trackerName.substring(0, indexOfColon);
+
+    // Extract the hostname
+    int indexOfUnderScore = trackerHostName.indexOf("_");
+    trackerHostName = 
+      (indexOfUnderScore == -1) 
+      ? trackerHostName 
+      : trackerHostName.substring(indexOfUnderScore + 1, 
+                                 trackerHostName.length());
     return trackerHostName;
   }
     
@@ -765,26 +809,320 @@
     return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
+
+  /**
+   * Remove a running TIP from the running cache for maps
+   * @param tip the tip that needs to be retired
+   */
+    private synchronized void retireMap(TaskInProgress tip) {
+      // since a running cache is maintained only if speculation is 'ON'
+      if (hasSpeculativeMaps) {
+        for(String host: tip.getSplitLocations()) {
+          Node node = jobtracker.resolveAndAddToTopology(host);
+          if (node == null) {
+            LOG.error("Pre-existing information for the host " + host 
+                      + " is missing from the topology");
+            continue;
+          }
+
+          for (int j = 0; j < maxLevel; ++j) {
+            // ignore a leaf node if 
+            //    - the tip has failed on the machine
+            //    - its a default node
+            boolean ignoreLeafNode = 
+              ((j == 0) 
+               && (tip.hasFailedOnMachine(node.getName()) 
+                   || JobTracker.DEFAULT_NODE.equals(node.getName())));
+            if (!ignoreLeafNode) {
+              Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+              if (hostMaps != null) {
+                hostMaps.remove(tip);
+                if (hostMaps.size() == 0) {
+                  runningMapCache.remove(node);
+                }
+              }
+            }
+            if (node.getParent() == null) {
+              LOG.error("Missing parent of the node " + node.getName()
+                        + " at level " + node.getLevel());
+              break;
+            } else {
+              node = node.getParent();
+            }
+          }
+        }
+      }
+    }
+    
+    /**
+     * Remove a running TIP from the reducers set
+     * @param tip the tip that needs to be retired
+     */
+    private synchronized void retireReduce(TaskInProgress tip) {
+      // since a running cache is maintained only if speculation is 'ON'
+      if (hasSpeculativeReduces) {
+        runningReducers.remove(tip);
+      }
+    }
+
+  /**
+   * Adds a map tip to the running map cache.
+   * @param tip the tip that needs to be scheduled as running
+   * @param tracker tracker name where it will be run
+   */
+  private synchronized void scheduleMap(TaskInProgress tip, String tracker) {
+    // since a running cache is maintained only if speculation is 'ON'
+    if (runningMapCache == null) {
+      LOG.warn("Running cache for maps is missing!! " 
+               + "Job details are missing.");
+      return;
+    }
+    if (hasSpeculativeMaps) {
+      String trackerHostname = convertTrackerNameToHostName(tracker);
+      for(String host: tip.getSplitLocations()) {
+        Node node = jobtracker.resolveAndAddToTopology(host);
+        if (node == null) {
+          LOG.fatal("Pre-existing information of the host " + host 
+                    + " is missing from the topology");
+          continue;
+        }
+
+        for (int j = 0; j < maxLevel; ++j) {
+          // ignore a leaf node if
+          //   -  [the tip has failed on the tracker AND not on all the machines] OR
+          //   -  the tip will be executed on this tracker OR
+          //   -  the tip is local to the default node
+          boolean ignoreLeafNode = 
+            ((j == 0) 
+             && ((tip.hasFailedOnMachine(node.getName()) 
+                  && tip.getNumberOfFailedMachines() < clusterSize)
+                 || trackerHostname.equals(node.getName()) 
+                 || JobTracker.DEFAULT_NODE.equals(node.getName())));
+          if (!ignoreLeafNode) {
+            LinkedHashSet<TaskInProgress> hostMaps = runningMapCache.get(node);
+            if (hostMaps == null) {
+              // create a cache if needed
+              hostMaps = new LinkedHashSet<TaskInProgress>();
+              runningMapCache.put(node, hostMaps);
+            }
+            // add it to other hosts since 
+            hostMaps.add(tip);
+          }
+          if (node.getParent() == null) {
+            LOG.error("Missing parent of the node " + node.getName()
+                      + " at level " + node.getLevel());
+            break;
+          } else {
+            node = node.getParent();
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Adds a reduce tip to the running reduce cache.
+   * @param tip the tip that needs to be scheduled as running
+   */
+  private synchronized void scheduleReduce(TaskInProgress tip) {
+    if (runningReducers == null) {
+      LOG.warn("Running cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+
+    // since a running cache is maintained only if speculation is 'ON'
+    if (hasSpeculativeReduces) {
+      runningReducers.add(tip);
+    }
+  }
+  
+  /**
+   * Appends a failed TIP to the non-running map cache
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failMap(TaskInProgress tip) {
+    if (nodesToMaps == null) {
+      LOG.warn("Non-running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+
+    // 1. Its added everywhere since other nodes (having this split local)
+    //    might have removed this tip from their local cache
+    // 2. Give high priority to failed tip - fail early
+
+    for(String host: tip.getSplitLocations()) {
+      Node node = jobtracker.resolveAndAddToTopology(host);
+      if (node == null) {
+        LOG.fatal("Pre-existing information of the host " + host 
+                  + " missing from the topology");
+        continue;
+      }
+
+      for (int j = 0; j < maxLevel; ++j) {
+        // ignore node if 
+        //   - its a leaf node AND
+        //     - failed on this node and not failed on all the other nodes OR
+        //     - its a default node 
+        boolean ignoreLeafNode = 
+          ((j == 0) 
+           && (JobTracker.DEFAULT_NODE.equals(node.getName())
+               || (tip.hasFailedOnMachine(node.getName()) 
+                   && (tip.getNumberOfFailedMachines() < clusterSize))));
+        if (!ignoreLeafNode) {
+          List<TaskInProgress> hostMaps = nodesToMaps.get(node);
+          if (hostMaps == null) {
+            hostMaps = new LinkedList<TaskInProgress>();
+            nodesToMaps.put(node, hostMaps);
+          }
+          hostMaps.add(0, tip);
+        }
+        if (node.getParent() == null) {
+          LOG.error("Missing parent of the node " + node.getName() 
+                    + " at level " + node.getLevel());
+          break;
+        } else {
+          node = node.getParent();
+        }
+      }
+    }
+  }
+  
+  /**
+   * Appends a failed TIP to the non-running reduce cache
+   * @param tip the tip that needs to be failed
+   */
+  private synchronized void failReduce(TaskInProgress tip) {
+    if (reducers == null) {
+      LOG.warn("Failed cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    reducers.add(0, tip);
+  }
+  
+  /**
+   * Find a non-running task in cache
+   * @param cache a cache used for a host
+   * @param taskTracker the tracker that has requested a task to run
+   * @param isShared is the cache shared by others?
+   */
+  private synchronized TaskInProgress findNewCachedTask(
+      Iterator<TaskInProgress> cache, String taskTracker, boolean isShared) {
+    while (cache.hasNext()) {
+      TaskInProgress tip = cache.next();
+      TaskInProgress newTask = null;
+      boolean remove = true;
+
+      // Select a tip if
+      //   1. runnable   : still needs to be run and is not completed
+      //   2. ~running   : no other node is running it
+      //   3. first-time : has not failed on this host
+      //   4. failed     : has failed on all the other hosts
+      if (tip.isRunnable() && !tip.isRunning()) {
+        // check if the tip has failed on this host
+        String hostname = convertTrackerNameToHostName(taskTracker);
+        if (tip.hasFailedOnMachine(hostname)) {
+          // Don't remove if its a shared cache
+          //   - its a reduce
+          //   - its a non-leaf map cache
+          if (!tip.isMapTask() || isShared) {
+            remove = false;
+          }
+
+          // check if the tip has failed on all the nodes
+          // if YES then schedule it and mark it for removal
+          if (tip.getNumberOfFailedMachines() == clusterSize) {
+            remove = true;
+            newTask = tip;
+          }
+        } else {
+          newTask = tip;
+        }
+      }
+
+      // remove the tip from the cache if
+      //  - its a map cache
+      //  - its a reduce cache and the tip has not failed on the host
+      if (remove) {
+        cache.remove();
+      }
+      if (newTask != null) {
+        return newTask;
+      }
+    }
+    return null;
+  }
   
   /**
-   * Find a new task to run.
+   * Find a speculative task
+   * @param cache a cache to look into
+   * @param taskTracker the tracker that has requested a tip
+   * @param avgProgress the average progress for speculation
+   * @param isShared true if the cache is shared
+   * @return a tip that can be speculated on the tracker
+   */
+  private synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> cache, String taskTracker, double avgProgress, 
+      boolean isShared) {
+    long currentTime = System.currentTimeMillis();
+    Iterator<TaskInProgress> iter = cache.iterator();
+    List<TaskInProgress> removed = new ArrayList<TaskInProgress>();
+
+    while (iter.hasNext()) {
+      TaskInProgress tip = iter.next();
+      // Clean up 'on demand'
+      if (!tip.isRunning()) {
+        iter.remove();
+        continue;
+      }
+
+      if (tip.isSpeculated()) {
+        // push the speculated tips to the back
+        iter.remove();
+        removed.add(tip);
+        continue;
+      }
+
+      if (shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                   taskTracker)) {
+        iter.remove();
+
+        // Add back the tip to the cache if its a shared cache
+        if (isShared) {
+          removed.add(tip);
+        }
+
+        // append the speculated tips
+        if (removed.size() > 0) {
+          cache.addAll(removed);
+        }
+        return tip;
+      }
+    }
+
+    // append the speculated tips
+    if (removed.size() > 0) {
+      cache.addAll(removed);
+    }
+
+    return null;
+  }
+  
+  /**
+   * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param avgProgress The average progress of this kind of task in this job
-   * @param tasks The list of potential tasks to try
-   * @param firstTaskToTry The first index in tasks to check
-   * @param cachedTasks A list of tasks that would like to run on this node
-   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
-  private int findNewTask(TaskTrackerStatus tts, 
-                          int clusterSize,
-                          double avgProgress,
-                          TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks,
-                          boolean hasSpeculative) {
+  private synchronized int findNewMapTask(TaskTrackerStatus tts, 
+                                          int clusterSize,
+                                          double avgProgress) {
     String taskTracker = tts.getTrackerName();
-    int specTarget = -1;
+    TaskInProgress tip = null;
+    Collection<Node> ancestors = null;
 
     //
     // Update the last-known clusterSize
@@ -792,6 +1130,9 @@
     this.clusterSize = clusterSize;
 
     Node node = jobtracker.getNode(tts.getHost());
+    // get the ancestor of the node
+    Node nodeAncestor = getParentNode(node, maxLevel - 1);
+
     //
     // Check if too many tasks of this job have failed on this
     // tasktracker prior to assigning it a new one.
@@ -806,107 +1147,216 @@
       }
       return -1;
     }
-    long currentTime = System.currentTimeMillis();
-        
-    //
-    // See if there is a split over a block that is stored on
-    // the TaskTracker checking in or the rack it belongs to and so on till
-    // maxLevel.  That means the block
-    // doesn't have to be transmitted from another node/rack/and so on.
-    // The way the cache is updated is such that in every lookup, the TIPs
-    // which are complete is removed. Running/Failed TIPs are not removed
-    // since we want to have locality optimizations even for FAILED/SPECULATIVE
-    // tasks.
+
+    // For scheduling a map task
+    //  I)  check for a non-running task
+    //  II) check for a running task
+
+    // Check the cache in the following order
+    //   1. from local node to root [bottom up]
+    //   2. breadth wise for all the ancestor level nodes
+
     //
-    if (cachedTasks != null && node != null) {
+    // I) Non-running TIP :
+    // 
+
+    // 1. check from local node to the root [bottom up cache lookup]
+    //    i.e if node info is available make use of its cache
+    if (node != null) {
       Node key = node;
-      for (int level = 0; level < maxLevel && key != null; level++) {
-        List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
+      for (int level = 0; level < maxLevel; ++level) {
+        List <TaskInProgress> cacheForLevel = nodesToMaps.get(key);
         if (cacheForLevel != null) {
-          Iterator<TaskInProgress> i = cacheForLevel.iterator();
-          while (i.hasNext()) {
-            TaskInProgress tip = i.next();
-            // we remove only those TIPs that are data-local (the host having
-            // the data is running the task). We don't remove TIPs that are 
-            // rack-local for example since that would negatively impact
-            // the performance of speculative and failed tasks (imagine a case
-            // where we schedule one TIP rack-local and after sometime another
-            // tasktracker from the same rack is asking for a task, and the TIP
-            // in question has either failed or could be a speculative task
-            // candidate)
-            if (tip.isComplete() || level == 0) {
-              i.remove();
+          tip = findNewCachedTask(cacheForLevel.iterator(), taskTracker, 
+                                  (level > 0));
+          if (tip != null) {
+            // Add to running cache
+            scheduleMap(tip, taskTracker);
+
+            // remove the cache if its empty
+            if (cacheForLevel.size() == 0) {
+              nodesToMaps.remove(key);
+            }
+
+            if (level == 0) {
+              LOG.info("Choosing data-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+            } else if (level == 1){
+              LOG.info("Choosing rack-local task " + tip.getTIPId());
+              jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+            } else {
+              LOG.info("Choosing cached task at level " + level + " " + 
+                  tip.getTIPId());
             }
-            if (tip.isRunnable() && 
-                !tip.isRunning() &&
-                !tip.hasFailedOnMachine(taskTracker)) {
-              int cacheTarget = tip.getIdWithinJob();
+
+            return tip.getIdWithinJob();
+          }
+        }
+        if (key.getParent() == null) {
+          LOG.error("Missing parent information of the node " + key.getName() 
+                    + " at level " + key.getLevel());
+          break;
+        } else {
+          key = key.getParent();
+        }
+      }
+    }
+
+    // 2. Search breadth-wise across ancestors for any non-running TIP if 
+    //     - cache miss 
+    //     - node information for the tracker is missing (new tracker)
+    ancestors = jobtracker.getAncestorNodes();
+
+    if (ancestors.size() == 0) {
+      LOG.fatal("Missing ancestors in topology");
+    }
+
+    for (Node ancestor : ancestors) {
+      
+      // skip the ancestor that has already been scanned
+      // also ignore if there is a common ancestor
+      if (ancestor == nodeAncestor) {
+        continue;
+      }
+
+      List<TaskInProgress> cache = nodesToMaps.get(ancestor);
+      if (cache != null) {
+        tip = findNewCachedTask(cache.iterator(), taskTracker, 
+                                true);
+        if (tip != null) {
+          // Add to the running cache
+          scheduleMap(tip, taskTracker);
+
+          // remove the cache if empty
+          if (cache.size() == 0) {
+            nodesToMaps.remove(ancestor);
+          }
+          LOG.info("Choosing a non-local task " + tip.getTIPId());
+          return tip.getIdWithinJob();
+        }
+      }
+    }
+
+    //
+    // II) Running TIP :
+    // 
+
+    if (hasSpeculativeMaps) {
+      // 1. Check bottom up for speculative tasks from the running cache
+      if (node != null) {
+        Node key = node;
+        for (int level = 0; level < maxLevel && key != null; ++level) {
+          Set<TaskInProgress> cacheForLevel = runningMapCache.get(node);
+          if (cacheForLevel != null) {
+            tip = findSpeculativeTask(cacheForLevel, taskTracker, avgProgress, 
+                                      (level > 0));
+            if (tip != null) {
+              if (cacheForLevel.size() == 0) {
+                runningMapCache.remove(node);
+              }
               if (level == 0) {
-                LOG.info("Choosing data-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+                LOG.info("Choosing a data-local task " + tip.getTIPId() 
+                         + " for speculation");
               } else if (level == 1){
-                LOG.info("Choosing rack-local task " + tip.getTIPId());
-                jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+                LOG.info("Choosing a rack-local task " + tip.getTIPId() 
+                         + " for speculation");
               } else {
-                LOG.info("Choosing cached task at level " + level + " " + 
-                          tip.getTIPId());
+                LOG.info("Choosing a cached task at level " + level + " " + 
+                    tip.getTIPId() + " for speculation");
               }
-              return cacheTarget;
+              return tip.getIdWithinJob();
             }
-            if (hasSpeculative && specTarget == -1 &&
-                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
-                                         taskTracker)) {
-              specTarget = tip.getIdWithinJob();
-            }
+          }
+          if (key.getParent() == null) {
+            LOG.error("Missing parent information of the node " + key.getName() 
+                      + " at level " + key.getLevel());
+            break;
+          } else {
+            key = key.getParent();
           }
         }
-        key = key.getParent();
       }
-    }
 
+      // 2. Check breadth-wise for speculative tasks
+      ancestors = jobtracker.getAncestorNodes();
 
-    //
-    // If there's no cached target, see if there's
-    // a std. task to run.
-    //
-    int failedTarget = -1;
-    for (int i = 0; i < tasks.length; i++) {
-      TaskInProgress task = tasks[i];
-      if (task.isRunnable()) {
-        // if it failed here and we haven't tried every machine, we
-        // don't schedule it here.
-        boolean hasFailed = task.hasFailedOnMachine(taskTracker);
-        if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
+      if (ancestors.size() == 0) {
+        LOG.fatal("Missing ancestors in topology");
+      }
+
+      for (Node ancestor : ancestors) {        
+        // ignore ancestors which are already scanned
+        if (ancestor == nodeAncestor) {
           continue;
         }
-        boolean isRunning = task.isRunning();
-        if (hasFailed) {
-          // failed tasks that aren't running can be scheduled as a last
-          // resort
-          if (!isRunning && failedTarget == -1) {
-            failedTarget = i;
-          }
-        } else {
-          if (!isRunning) {
-            LOG.info("Choosing normal task " + tasks[i].getTIPId());
-            return i;
-          } else if (hasSpeculative && specTarget == -1 &&
-                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
-                                              taskTracker)) {
-            specTarget = i;
+
+        Set<TaskInProgress> cache = runningMapCache.get(ancestor);
+        if (cache != null) {
+          tip = findSpeculativeTask(cache, taskTracker, avgProgress, 
+                                    true);
+          if (tip != null) {
+            // remove empty cache entries
+            if (cache.size() == 0) {
+              runningMapCache.remove(ancestor);
+            }
+            LOG.info("Choosing a non-local task " + tip.getTIPId() 
+                     + " for speculation");
+            return tip.getIdWithinJob();
           }
         }
       }
+    } 
+    return -1;
+  }
+
+  /**
+   * Find new reduce task
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param avgProgress The average progress of this kind of task in this job
+   * @return the index in tasks of the selected task (or -1 for no task)
+   */
+  private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
+                                             int clusterSize,
+                                             double avgProgress) {
+    String taskTracker = tts.getTrackerName();
+    TaskInProgress tip = null;
+
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+
+    // Check if too many tasks of this job have failed on this
+    // tasktracker prior to assigning it a new one.
+
+    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
+    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
+        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+      if (LOG.isDebugEnabled()) {
+        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
+        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
+                  + "' for assigning a new task");
+      }
+      return -1;
     }
-    if (specTarget != -1) {
-      LOG.info("Choosing speculative task " + 
-               tasks[specTarget].getTIPId());
-    } else if (failedTarget != -1) {
-      LOG.info("Choosing failed task " + 
-               tasks[failedTarget].getTIPId());          
+
+    // 1. check for a never-executed reduce tip
+    tip = findNewCachedTask(reducers.iterator(), taskTracker, true);
+    if (tip != null) {
+      scheduleReduce(tip);
+      return tip.getIdWithinJob();
     }
-    
-    return specTarget != -1 ? specTarget : failedTarget;
+
+    // 2. check for a reduce tip to be speculated
+    if (hasSpeculativeReduces) {
+      tip = findSpeculativeTask(runningReducers, taskTracker, avgProgress, 
+                                true);
+      if (tip != null) {
+        scheduleReduce(tip);
+        return tip.getIdWithinJob();
+      }
+    }
+
+    return -1;
   }
 
   /**
@@ -971,6 +1421,8 @@
       runningMapTasks -= 1;
       finishedMapTasks += 1;
       metrics.completeMap();
+      // remove the completed map from the resp running caches
+      retireMap(tip);
     } else{
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
@@ -975,6 +1427,8 @@
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
       metrics.completeReduce();
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
     }
         
     //
@@ -1001,13 +1455,7 @@
    * @return
    */
   private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
-    boolean allDone = true;
-    for (int i = 0; i < maps.length; i++) {
-      if (!(maps[i].isComplete() || maps[i].isFailed())) {
-        allDone = false;
-        break;
-      }
-    }
+    boolean allDone = (finishedMapTasks == numMapTasks);
     if (allDone) {
       if (tip.isMapTask()) {
         this.status.setMapProgress(1.0f);              
@@ -1012,12 +1460,7 @@
       if (tip.isMapTask()) {
         this.status.setMapProgress(1.0f);              
       }
-      for (int i = 0; i < reduces.length; i++) {
-        if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
-          allDone = false;
-          break;
-        }
-      }
+      allDone = (finishedReduceTasks == numReduceTasks);
     }
 
     //
@@ -1092,8 +1535,20 @@
     if (wasRunning && !isRunning) {
       if (tip.isMapTask()){
         runningMapTasks -= 1;
+        // remove from the running queue and put it in the non-running cache
+        // if the tip is not complete i.e if the tip still needs to be run
+        if (!isComplete) {
+          retireMap(tip);
+          failMap(tip);
+        }
       } else {
         runningReduceTasks -= 1;
+        // remove from the running queue and put in the failed queue if the tip
+        // is not complete
+        if (!isComplete) {
+          retireReduce(tip);
+          failReduce(tip);
+        }
       }
     }
         
@@ -1108,19 +1563,8 @@
         // earlier (wasComplete and !isComplete) 
         // (since they might have been removed from the cache of other 
         // racks/switches, if the input split blocks were present there too)
-        for (String host : tip.getSplitLocations()) {
-          Node node = jobtracker.getNode(host);
-          for (int level = 1; (node != null && level < maxLevel); level++) {
-            node = getParentNode(node, level);
-            if (node == null) {
-              break;
-            }
-            List<TaskInProgress> list = nodesToMaps.get(node);
-            if (list != null) {
-              list.add(tip);
-            }
-          }
-        }
+        retireMap(tip);
+        failMap(tip);
         finishedMapTasks -= 1;
       }
     }
@@ -1287,6 +1731,11 @@
     }
     
     cleanUpMetrics();
+    // free up the memory used by the data structures
+    this.nodesToMaps = null;
+    this.runningMapCache = null;
+    this.reducers = null;
+    this.runningReducers = null;
   }
 
   /**
@@ -1293,14 +1742,17 @@
    * Return the TaskInProgress that matches the tipid.
    */
   public TaskInProgress getTaskInProgress(String tipid){
-    for (int i = 0; i < maps.length; i++) {
-      if (tipid.equals(maps[i].getTIPId())){
-        return maps[i];
-      }               
-    }
-    for (int i = 0; i < reduces.length; i++) {
-      if (tipid.equals(reduces[i].getTIPId())){
-        return reduces[i];
+    if (TaskInProgress.isMapId(tipid)) {
+      for (int i = 0; i < maps.length; i++) {
+        if (tipid.equals(maps[i].getTIPId())){
+          return maps[i];
+        }
+      }
+    } else {
+      for (int i = 0; i < reduces.length; i++) {
+        if (tipid.equals(reduces[i].getTIPId())){
+          return reduces[i];
+        }
       }
     }
     return null;
Index: src/java/org/apache/hadoop/mapred/JobTracker.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobTracker.java	(revision 636785)
+++ src/java/org/apache/hadoop/mapred/JobTracker.java	(working copy)
@@ -87,6 +87,8 @@
   private NetworkTopology clusterMap = new NetworkTopology();
   private ResolutionThread resThread = new ResolutionThread();
   private int numTaskCacheLevels; // the max level of a host in the network topology
+  // Default node is added for non local tips e.g random writer 
+  public static final String DEFAULT_NODE = "default-node";
   
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -560,6 +562,9 @@
   Map<String, Node> trackerNameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
+  //List of topmost level nodes
+  Set<Node> ancestors = new HashSet<Node>();
+
   // Number of resolved entries
   int numResolved;
     
@@ -1181,13 +1186,20 @@
   }
 
   public Node resolveAndAddToTopology(String name) {
-    List <String> tmpList = new ArrayList<String>(1);
-    tmpList.add(name);
-    List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
-    if (rNameList == null || rNameList.size() == 0) {
-      return null;
+    String rName = null;
+    // check if this is the default node
+    if (DEFAULT_NODE.equals(name)) {
+      // resolution for default node is /default-rack
+      rName = NetworkTopology.DEFAULT_RACK;
+    } else {
+      List <String> tmpList = new ArrayList<String>(1);
+      tmpList.add(name);
+      List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
+      if (rNameList == null || rNameList.size() == 0) {
+        return null;
+      }
+      rName = rNameList.get(0);
     }
-    String rName = rNameList.get(0);
     String networkLoc = NodeBase.normalize(rName);
     Node node = null;
     if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
@@ -1193,9 +1205,17 @@
     if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
       node = new NodeBase(name, networkLoc);
       clusterMap.add(node);
+      // add the default node's ancestor to the set since this resolution 
+      // doesn't get resolved via dns
+      if (DEFAULT_NODE.equals(name)) {
+        ancestors.add(node.getParent());
+      }
     }
     return node;
   }
+  public Collection<Node> getAncestorNodes() {
+    return ancestors;
+  }
   public Node getNode(String name) {
     return trackerNameToNodeMap.get(name);
   }
@@ -1459,6 +1479,12 @@
               node = new NodeBase(host, networkLoc);
               clusterMap.add(node);
               trackerNameToNodeMap.put(host, node);
+              // Note the nodes topmost level ancestor.
+              Node ancestor = 
+                JobInProgress.getParentNode(node, getNumTaskCacheLevels() - 1);
+              if (ancestor != null) {
+                ancestors.add(ancestor);
+              }
             }
             numResolved++;
           }
@@ -1810,8 +1836,11 @@
       return completedJobStatusStore.readCounters(jobid);
     }
   }
-  public synchronized TaskReport[] getMapTaskReports(String jobid) {
-    JobInProgress job = jobs.get(jobid);
+  public TaskReport[] getMapTaskReports(String jobid) {
+    JobInProgress job = null;
+    synchronized (jobs) {
+      job = jobs.get(jobid);
+    }
     if (job == null) {
       return new TaskReport[0];
     } else {
@@ -1832,8 +1861,11 @@
     }
   }
 
-  public synchronized TaskReport[] getReduceTaskReports(String jobid) {
-    JobInProgress job = jobs.get(jobid);
+  public TaskReport[] getReduceTaskReports(String jobid) {
+    JobInProgress job = null;
+    synchronized (jobs) {
+      job = jobs.get(jobid);
+    }
     if (job == null) {
       return new TaskReport[0];
     } else {
@@ -1857,10 +1889,13 @@
    * starting from fromEventId.
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
    */
-  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+  public TaskCompletionEvent[] getTaskCompletionEvents(
                                                                     String jobid, int fromEventId, int maxEvents) throws IOException{
     TaskCompletionEvent[] events;
-    JobInProgress job = this.jobs.get(jobid);
+    JobInProgress job = null;
+    synchronized (this.jobs) { 
+      job = this.jobs.get(jobid);
+    }
     if (null != job) {
       events = job.getTaskCompletionEvents(fromEventId, maxEvents);
     }
@@ -1877,7 +1912,7 @@
    * @param taskId the id of the task
    * @return an array of the diagnostic messages
    */
-  public synchronized String[] getTaskDiagnostics(String jobId, 
+  public String[] getTaskDiagnostics(String jobId, 
                                                   String tipId, 
                                                   String taskId) 
   throws IOException {
@@ -2115,11 +2150,10 @@
     }
     
     public void addToQueue(JobInProgress.JobWithTaskContext j) {
-      while (!queue.add(j)) {
-        LOG.warn("Couldn't add to the Task Commit queue now. Will " +
-                 "try again");
+      while (true) { // loop until the element gets added
         try {
-          Thread.sleep(2000);
+          queue.put(j);
+          return;
         } catch (InterruptedException ie) {}
       }
     }
@@ -2125,103 +2159,156 @@
     }
        
     public void run() {
+      int  batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size", 
+                                         5000); 
       while (!isInterrupted()) {
         try {
-          JobInProgress.JobWithTaskContext j = queue.take();
-          JobInProgress job = j.getJob();
-          TaskInProgress tip = j.getTIP();
-          String taskid = j.getTaskId();
-          JobTrackerMetrics metrics = j.getJobTrackerMetrics();
-          Task t;
-          TaskStatus status;
-          boolean isTipComplete = false;
-          TaskStatus.State state;
+          while (queue.size() == 0) {
+            // sleep for 2sec if the queue is empty
+            Thread.sleep(2000);
+          }
+
+          ArrayList <JobInProgress.JobWithTaskContext> jobList = 
+            new ArrayList<JobInProgress.JobWithTaskContext>(batchCommitSize);
+          try {
+            // drain some tasks and batch process them
+            queue.drainTo(jobList, batchCommitSize);
+          } catch (Exception ie) {
+            break;
+          }
+
+          JobInProgress[] jobs = new JobInProgress[jobList.size()];
+          TaskInProgress[] tips = new TaskInProgress[jobList.size()];
+          String[] taskids = new String[jobList.size()];
+          JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
+
+          Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
+          int count = 0;
+
+          while (iter.hasNext()) {
+            JobInProgress.JobWithTaskContext j = iter.next();
+            jobs[count] = j.getJob();
+            tips[count] = j.getTIP();
+            taskids[count]= j.getTaskId();
+            metrics[count] = j.getJobTrackerMetrics();
+            ++count;
+          }
+
+          Task[] tasks = new Task[jobList.size()];
+          TaskStatus[] status = new TaskStatus[jobList.size()];
+          boolean[] isTipComplete = new boolean[jobList.size()];
+          TaskStatus.State[] states = new TaskStatus.State[jobList.size()];
+
           synchronized (JobTracker.this) {
-            synchronized (job) {
-              synchronized (tip) {
-                status = tip.getTaskStatus(taskid);
-                t = tip.getTaskObject(taskid);
-                state = status.getRunState();
-                isTipComplete = tip.isComplete();
+            for(int i = 0; i < jobList.size(); ++i) {
+              synchronized (jobs[i]) {
+                synchronized (tips[i]) {
+                  status[i] = tips[i].getTaskStatus(taskids[i]);
+                  tasks[i] = tips[i].getTaskObject(taskids[i]);
+                  states[i] = status[i].getRunState();
+                  isTipComplete[i] = tips[i].isComplete();
+                }
               }
             }
           }
-          try {
-            //For COMMIT_PENDING tasks, we save the task output in the dfs
-            //as well as manipulate the JT datastructures to reflect a
-            //successful task. This guarantees that we don't declare a task
-            //as having succeeded until we have successfully completed the
-            //dfs operations.
-            //For failed tasks, we just do the dfs operations here. The
-            //datastructures updates is done earlier as soon as the failure
-            //is detected so that the JT can immediately schedule another
-            //attempt for that task.
-            if (state == TaskStatus.State.COMMIT_PENDING) {
-              if (!isTipComplete) {
-                t.saveTaskOutput();
+
+          //For COMMIT_PENDING tasks, we save the task output in the dfs
+          //as well as manipulate the JT datastructures to reflect a
+          //successful task. This guarantees that we don't declare a task
+          //as having succeeded until we have successfully completed the
+          //dfs operations.
+          //For failed tasks, we just do the dfs operations here. The
+          //datastructures updates is done earlier as soon as the failure
+          //is detected so that the JT can immediately schedule another
+          //attempt for that task.
+
+          Set<String> seenTIPs = new HashSet<String>();
+          for(int index = 0; index < jobList.size(); ++index) {
+            try {
+              if (states[index] == TaskStatus.State.COMMIT_PENDING) {
+                if (!isTipComplete[index]) {
+                  if (!seenTIPs.contains(tips[index].getTIPId())) {
+                    tasks[index].saveTaskOutput();
+                    seenTIPs.add(tips[index].getTIPId());
+                  } else {
+                    // since other task of this tip has saved its output
+                    isTipComplete[index] = true;
+                  }
+                }
               }
+            } catch (IOException ioe) {
+              // Oops! Failed to copy the task's output to its final place;
+              // fail the task!
+              states[index] = TaskStatus.State.FAILED;
               synchronized (JobTracker.this) {
-                //do a check for the case where after the task went to
-                //COMMIT_PENDING, it was lost. So although we would have
-                //saved the task output, we cannot declare it a SUCCESS.
-                TaskStatus newStatus = null;
-                synchronized (job) {
-                  synchronized (tip) {
-                    status = tip.getTaskStatus(taskid);
-                    if (!isTipComplete) {
-                      if (status.getRunState() != 
-                        TaskStatus.State.COMMIT_PENDING) {
-                        state = TaskStatus.State.KILLED;
-                      } else {
-                        state = TaskStatus.State.SUCCEEDED;
-                      }
+                String reason = "Failed to rename output with the exception: " 
+                                + StringUtils.stringifyException(ioe);
+                TaskStatus.Phase phase = (tips[index].isMapTask() 
+                                          ? TaskStatus.Phase.MAP 
+                                          : TaskStatus.Phase.REDUCE);
+                jobs[index].failedTask(tips[index], status[index].getTaskId(), 
+                                       reason, phase, TaskStatus.State.FAILED, 
+                                       status[index].getTaskTracker(), null);
+              }
+              LOG.info("Failed to rename the output of " 
+                       + status[index].getTaskId() + " with " 
+                       + StringUtils.stringifyException(ioe));
+            }
+          }
+
+          TaskStatus[] newStatuslist = new TaskStatus[jobList.size()];
+          synchronized (JobTracker.this) {
+            //do a check for the case where after the task went to
+            //COMMIT_PENDING, it was lost. So although we would have
+            //saved the task output, we cannot declare it a SUCCESS.
+            for(int i = 0; i < jobList.size(); ++i) {
+              if(states[i] == TaskStatus.State.FAILED)
+                continue;
+
+              synchronized (jobs[i]) {
+                synchronized (tips[i]) {
+                  status[i] = tips[i].getTaskStatus(taskids[i]);
+                  if (!isTipComplete[i]) {
+                    if (status[i].getRunState() 
+                        != TaskStatus.State.COMMIT_PENDING) {
+                      states[i] = TaskStatus.State.KILLED;
                     } else {
-                      tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
-                      "TIP");
-                      state = TaskStatus.State.KILLED;
-
+                      states[i] = TaskStatus.State.SUCCEEDED;
+                      //create new status if required. If the state changed 
+                      //from COMMIT_PENDING to KILLED in the JobTracker, while 
+                      //we were saving the output,the JT would have called 
+                      //updateTaskStatus and we don't need to call it again
+                      newStatuslist[i] = (TaskStatus)status[i].clone();
+                      newStatuslist[i].setRunState(states[i]);
+                      newStatuslist[i].setProgress(
+                          (states[i] == TaskStatus.State.SUCCEEDED) 
+                          ? 1.0f 
+                          : 0.0f);
                     }
-                    //create new status if required. If the state changed from
-                    //COMMIT_PENDING to KILLED in the JobTracker, while we were
-                    //saving the output,the JT would have called updateTaskStatus
-                    //and we don't need to call it again
-                    if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
-                      newStatus = (TaskStatus)status.clone();
-                      newStatus.setRunState(state);
-                      newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
-                    }
+                  } else {
+                    tips[i].addDiagnosticInfo(tasks[i].getTaskId(), 
+                                              "Already completed  TIP");
+                    states[i] = TaskStatus.State.KILLED;
                   }
-                  if (newStatus != null) {
-                    job.updateTaskStatus(tip, newStatus, metrics);
-                  }
+                }
+                if (newStatuslist[i] != null) {
+                  jobs[i].updateTaskStatus(tips[i], newStatuslist[i], 
+                                           metrics[i]);
                 }
               }
             }
-          } catch (IOException ioe) {
-            // Oops! Failed to copy the task's output to its final place;
-            // fail the task!
-            state = TaskStatus.State.FAILED;
-            synchronized (JobTracker.this) {
-              job.failedTask(tip, status.getTaskId(), 
-                  "Failed to rename output with the exception: " + 
-                  StringUtils.stringifyException(ioe), 
-                  (tip.isMapTask() ? 
-                      TaskStatus.Phase.MAP : 
-                        TaskStatus.Phase.REDUCE), 
-                        TaskStatus.State.FAILED,  
-                        status.getTaskTracker(), null);
-            }
-            LOG.info("Failed to rename the output of " + status.getTaskId() + 
-                " with: " + StringUtils.stringifyException(ioe));
           }
-          if (state == TaskStatus.State.FAILED || 
-              state == TaskStatus.State.KILLED) {
-            try {
-              t.discardTaskOutput();
-            } catch (IOException ioe) { 
-              LOG.info("Failed to discard the output of task " + 
-                  status.getTaskId() + " with: " + 
-                  StringUtils.stringifyException(ioe));
+
+          for(int i = 0; i < jobList.size(); ++i) {
+            if (states[i] == TaskStatus.State.FAILED
+                || states[i] == TaskStatus.State.KILLED) {
+              try {
+                tasks[i].discardTaskOutput();
+              } catch (IOException ioe) {
+                LOG.info("Failed to discard the output of task " 
+                         + status[i].getTaskId() + " with: " 
+                         + StringUtils.stringifyException(ioe));
+              }
             }
           }
         } catch (InterruptedException ie) {
Index: src/java/org/apache/hadoop/mapred/TaskInProgress.java
===================================================================
--- src/java/org/apache/hadoop/mapred/TaskInProgress.java	(revision 636785)
+++ src/java/org/apache/hadoop/mapred/TaskInProgress.java	(working copy)
@@ -53,6 +53,8 @@
   int maxTaskAttempts = 4;    
   static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
+  static final String MAP_IDENTIFIER = "_m_";
+  static final String REDUCE_IDENTIFIER = "_r_";
   private static NumberFormat idFormat = NumberFormat.getInstance();
   static {
     idFormat.setMinimumIntegerDigits(6);
@@ -79,7 +81,7 @@
   private long startTime = 0;
   private long execStartTime = 0;
   private long execFinishTime = 0;
-  private int completes = 0;
+  private volatile int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
 
@@ -160,6 +162,18 @@
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
+  
+  /**
+   * Return true if the tip id represents a map
+   * @param tipId the tip id
+   * @return whether the tip is a map tip or a reduce tip
+   */
+  public static boolean isMapId(String tipId) {
+    if (tipId.contains(MAP_IDENTIFIER))  {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Make a unique name for this TIP.
@@ -170,9 +184,9 @@
     StringBuilder result = new StringBuilder();
     result.append(uniqueBase);
     if (isMapTask()) {
-      result.append("_m_");
+      result.append(MAP_IDENTIFIER);
     } else {
-      result.append("_r_");
+      result.append(REDUCE_IDENTIFIER);
     }
     result.append(idFormat.format(partition));
     return result.toString();
@@ -496,7 +510,9 @@
 
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
-      machinesWhereFailed.add(trackerName);
+      // Using tracker's hostname
+      String host = JobInProgress.convertTrackerNameToHostName(trackerName);
+      machinesWhereFailed.add(host);
     } else {
       numKilledTasks++;
     }
@@ -740,7 +756,7 @@
     
   /**
    * Has this task already failed on this machine?
-   * @param tracker The task tracker name
+   * @param tracker The task tracker's 'hostname'
    * @return Has it failed?
    */
   public boolean hasFailedOnMachine(String tracker) {
@@ -754,7 +770,7 @@
    */
   public boolean hasRunOnMachine(String tracker){
     return this.activeTasks.values().contains(tracker) || 
-      hasFailedOnMachine(tracker);
+      hasFailedOnMachine(JobInProgress.convertTrackerNameToHostName(tracker));
   }
   /**
    * Get the number of machines where this task has failed.
@@ -792,4 +808,11 @@
   public static String getTipId(String taskId){
 	  return taskId.substring(0, taskId.lastIndexOf('_')).replace("task", "tip");
   }
+  
+  /**
+   * Tells whether the tip is being speculated or not
+   * */
+  public boolean isSpeculated() {
+    return (activeTasks.size() > MAX_TASK_EXECS);
+  }
 }
