diff --git src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobInProgress.java
index a070fb2..232fdaf 100644
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java
@@ -74,6 +74,10 @@ class JobInProgress {
   int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
+  
+  private static int DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 5;
+  int completedMapsForReduceSlowstart = 0;
+  
   // runningMapTasks include speculative tasks, so we need to capture 
   // speculative tasks separately 
   int speculativeMapTasks = 0;
@@ -436,6 +440,13 @@ class JobInProgress {
       nonRunningReduces.add(reduces[i]);
     }
 
+    // Calculate the minimum number of maps to be complete before 
+    // we should start scheduling reduces
+    completedMapsForReduceSlowstart = 
+      (conf.getInt("mapred.reduce.slowstart.completed.maps", 
+                  DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
+       numMapTasks) / 100;
+
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
     // cleanup map tip. This map is doesn't use split. 
@@ -896,7 +907,7 @@ class JobInProgress {
       return null;
     }
         
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, true, true,
                                 status.mapProgress());
     if (target == -1) {
       return null;
@@ -910,6 +921,52 @@ class JobInProgress {
     return result;
   }    
 
+  public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
+                                                     int clusterSize, 
+                                                     int numUniqueHosts)
+  throws IOException {
+    if (!tasksInited.get()) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, true, false, 
+                                status.mapProgress());
+    if (target == -1) {
+      return null;
+    }
+
+    Task result = maps[target].getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+    }
+
+    return result;
+  }
+  
+  public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
+                                                    int clusterSize, 
+                                                    int numUniqueHosts)
+  throws IOException {
+    if (!tasksInited.get()) {
+      LOG.info("Cannot create task split for " + profile.getJobID());
+      return null;
+    }
+
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, false, true, 
+                                status.mapProgress());
+    if (target == -1) {
+      return null;
+    }
+
+    Task result = maps[target].getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+    }
+
+    return result;
+  }
+  
   /**
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
@@ -1038,6 +1095,10 @@ class JobInProgress {
     }
   }
   
+  public synchronized boolean scheduleReduces() {
+    return finishedMapTasks >= completedMapsForReduceSlowstart;
+  }
+  
   /**
    * Check whether setup task can be launched for the job.
    * 
@@ -1066,6 +1127,12 @@ class JobInProgress {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
+    
+    // Ensure we have sufficient map outputs ready to shuffle before 
+    // scheduling reduces
+    if (!scheduleReduces()) {
+      return null;
+    }
 
     int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
                                     status.reduceProgress());
@@ -1521,11 +1588,16 @@ class JobInProgress {
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
    * @param avgProgress The average progress of this kind of task in this job
+   * @param findLocalTask Try to schedule a node-local or rack-local task
+   * @param findNonLocalOrSpeculativeTask Try to schedule a non-local or
+   *                                      speculative task
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewMapTask(TaskTrackerStatus tts, 
                                           int clusterSize,
                                           int numUniqueHosts,
+                                          boolean findLocalTask,
+                                          boolean findNonLocalOrSpeculativeTask,
                                           double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
@@ -1539,14 +1611,12 @@ class JobInProgress {
       return -1;
     }
 
-    Node node = jobtracker.getNode(tts.getHost());
-    Node nodeParentAtMaxLevel = null;
-    
-
+    // Check to ensure this TaskTracker has enough resources to 
+    // run tasks from this job
     long outSize = resourceEstimator.getEstimatedMapOutputSize();
     long availSpace = tts.getResourceStatus().getAvailableSpace();
     if(availSpace < outSize) {
-      LOG.warn("No room for map task. Node " + node + 
+      LOG.warn("No room for map task. Node " + tts.getHost() + 
                " has " + availSpace + 
                " bytes free; but we expect map to take " + outSize);
 
@@ -1568,6 +1638,8 @@ class JobInProgress {
     // We fall to linear scan of the list (III above) if we have misses in the 
     // above caches
 
+    Node node = jobtracker.getNode(tts.getHost());
+   
     //
     // I) Non-running TIP :
     // 
@@ -1575,32 +1647,37 @@ class JobInProgress {
     // 1. check from local node to the root [bottom up cache lookup]
     //    i.e if the cache is available and the host has been resolved
     //    (node!=null)
-    
-    if (node != null) {
-      Node key = node;
-      for (int level = 0; level < maxLevel; ++level) {
-        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
-        if (cacheForLevel != null) {
-          tip = findTaskFromList(cacheForLevel, tts, 
-                                 numUniqueHosts,level == 0);
-          if (tip != null) {
-            // Add to running cache
-            scheduleMap(tip);
+    if (findLocalTask) {
+      if (node != null) {
+        Node key = node;
+        for (int level = 0; level < maxLevel; ++level) {
+          List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
+          if (cacheForLevel != null) {
+            tip = findTaskFromList(cacheForLevel, tts, 
+                numUniqueHosts,level == 0);
+            if (tip != null) {
+              // Add to running cache
+              scheduleMap(tip);
 
-            // remove the cache if its empty
-            if (cacheForLevel.size() == 0) {
-              nonRunningMapCache.remove(key);
-            }
+              // remove the cache if its empty
+              if (cacheForLevel.size() == 0) {
+                nonRunningMapCache.remove(key);
+              }
 
-            return tip.getIdWithinJob();
+              return tip.getIdWithinJob();
+            }
           }
+          key = key.getParent();
         }
-        key = key.getParent();
       }
-      // get the node parent at max level
-      nodeParentAtMaxLevel = JobTracker.getParentNode(node, maxLevel - 1);
     }
 
+    // Check if we need to schedule a non-local (i.e. off-switch) or
+    // speculative task
+    if (!findNonLocalOrSpeculativeTask) {
+      return -1;
+    }
+    
     //2. Search breadth-wise across parents at max level for non-running 
     //   TIP if
     //     - cache exists and there is a cache miss 
@@ -1609,7 +1686,9 @@ class JobInProgress {
 
     // collection of node at max level in the cache structure
     Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
-    
+    // get the node parent at max level
+    Node nodeParentAtMaxLevel = JobTracker.getParentNode(node, maxLevel - 1);
+
     for (Node parent : nodesAtMaxLevel) {
 
       // skip the parent that has already been scanned
diff --git src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
index 2b7dfa8..449fbd2 100644
--- src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
+++ src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
@@ -18,10 +18,12 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 class JobQueueTaskScheduler extends TaskScheduler {
   
   private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
+  public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class);
   
   protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
   protected EagerTaskInitializationListener eagerTaskInitializationListener;
@@ -78,7 +81,9 @@ class JobQueueTaskScheduler extends TaskScheduler {
       throws IOException {
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    final int numTaskTrackers = clusterStatus.getTaskTrackers();
+    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
+    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
@@ -86,97 +91,131 @@ class JobQueueTaskScheduler extends TaskScheduler {
     //
     // Get map + reduce counts for the current tracker.
     //
-    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
-    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
-    int numMaps = taskTracker.countMapTasks();
-    int numReduces = taskTracker.countReduceTasks();
+    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
+    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
+    final int trackerRunningMaps = taskTracker.countMapTasks();
+    final int trackerRunningReduces = taskTracker.countReduceTasks();
+
+    // Assigned tasks
+    List<Task> assignedTasks = new ArrayList<Task>();
 
     //
-    // Compute average map and reduce task numbers across pool
+    // Compute (running + pending) map and reduce task numbers across pool
     //
     int remainingReduceLoad = 0;
     int remainingMapLoad = 0;
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-          int totalMapTasks = job.desiredMaps();
-          int totalReduceTasks = job.desiredReduces();
-          remainingMapLoad += (totalMapTasks - job.finishedMaps());
-          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
+          if (job.scheduleReduces()) {
+            remainingReduceLoad += 
+              (job.desiredReduces() - job.finishedReduces());
+          }
         }
       }
     }
 
-    // find out the maximum number of maps or reduces that we are willing
-    // to run on any node.
-    int maxMapLoad = 0;
-    int maxReduceLoad = 0;
-    if (numTaskTrackers > 0) {
-      maxMapLoad = Math.min(maxCurrentMapTasks,
-                            (int) Math.ceil((double) remainingMapLoad / 
-                                            numTaskTrackers));
-      maxReduceLoad = Math.min(maxCurrentReduceTasks,
-                               (int) Math.ceil((double) remainingReduceLoad
-                                               / numTaskTrackers));
+    // Compute the 'load factor' for maps and reduces
+    double mapLoadFactor = 0.0;
+    if (clusterMapCapacity > 0) {
+      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
+    }
+    double reduceLoadFactor = 0.0;
+    if (clusterReduceCapacity > 0) {
+      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
     }
         
-    int totalMaps = clusterStatus.getMapTasks();
-    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
-    int totalReduces = clusterStatus.getReduceTasks();
-    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
-
     //
-    // In the below steps, we allocate first a map task (if appropriate),
-    // and then a reduce task if appropriate.  We go through all jobs
+    // In the below steps, we allocate first map tasks (if appropriate),
+    // and then reduce tasks if appropriate.  We go through all jobs
     // in order of job arrival; jobs only get serviced if their 
     // predecessors are serviced, too.
     //
 
     //
-    // We hand a task to the current taskTracker if the given machine 
+    // We assign tasks to the current taskTracker if the given machine 
     // has a workload that's less than the maximum load of that kind of
     // task.
+    // However, if the cluster is close to getting loaded i.e. we don't
+    // have enough _padding_ for speculative executions etc., we only 
+    // schedule the "highest priority" task i.e. the task from the job 
+    // with the highest priority.
     //
-       
-    if (numMaps < maxMapLoad) {
-
-      int totalNeededMaps = 0;
+    
+    final int trackerCurrentMapCapacity = 
+      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
+                              trackerMapCapacity);
+    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
+    boolean exceededMapPadding = false;
+    if (availableMapSlots > 0) {
+      exceededMapPadding = 
+        exceededPadding(true, clusterStatus, trackerMapCapacity);
+    }
+    
+    int numLocalMaps = 0;
+    int numNonLocalMaps = 0;
+    scheduleMaps:
+    for (int i=0; i < availableMapSlots; ++i) {
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
           }
 
-          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
-              taskTrackerManager.getNumberOfUniqueHosts());
+          Task t = null;
+          
+          // Try to schedule a node-local or rack-local Map task
+          t = 
+            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
+                                      taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
-            return Collections.singletonList(t);
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededMaps += job.desiredMaps();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = Math.min(maxCurrentMapTasks,
-                               (int)(totalNeededMaps * padFraction));
-          }
-          if (totalMaps + padding >= totalMapTaskCapacity) {
+            assignedTasks.add(t);
+            ++numLocalMaps;
+            
+            // Don't assign map tasks to the hilt!
+            // Leave some free slots in the cluster for future task-failures,
+            // speculative tasks etc. beyond the highest priority job
+            if (exceededMapPadding) {
+              break scheduleMaps;
+            }
+           
+            // Try all jobs again for the next Map task 
             break;
           }
+          
+          // Try to schedule a node-local or rack-local Map task
+          t = 
+            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
+                                   taskTrackerManager.getNumberOfUniqueHosts());
+          
+          if (t != null) {
+            assignedTasks.add(t);
+            ++numNonLocalMaps;
+            
+            // We assign at most 1 off-switch or speculative task
+            // This is to prevent TaskTrackers from stealing local-tasks
+            // from other TaskTrackers.
+            break scheduleMaps;
+          }
         }
       }
     }
+    int assignedMaps = assignedTasks.size();
 
     //
     // Same thing, but for reduce tasks
+    // However we _never_ assign more than 1 reduce task per heartbeat
     //
-    if (numReduces < maxReduceLoad) {
-
-      int totalNeededReduces = 0;
+    final int trackerCurrentReduceCapacity = 
+      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
+               trackerReduceCapacity);
+    final int availableReduceSlots = 
+      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
+    boolean exceededReducePadding = false;
+    if (availableReduceSlots > 0) {
+      exceededReducePadding = exceededPadding(false, clusterStatus, 
+                                              trackerReduceCapacity);
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
@@ -184,31 +223,84 @@ class JobQueueTaskScheduler extends TaskScheduler {
             continue;
           }
 
-          Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
-              taskTrackerManager.getNumberOfUniqueHosts());
+          Task t = 
+            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+                                    taskTrackerManager.getNumberOfUniqueHosts()
+                                    );
           if (t != null) {
-            return Collections.singletonList(t);
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededReduces += job.desiredReduces();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = 
-              Math.min(maxCurrentReduceTasks,
-                       (int) (totalNeededReduces * padFraction));
+            assignedTasks.add(t);
+            break;
           }
-          if (totalReduces + padding >= totalReduceTaskCapacity) {
+          
+          // Don't assign reduce tasks to the hilt!
+          // Leave some free slots in the cluster for future task-failures,
+          // speculative tasks etc. beyond the highest priority job
+          if (exceededReducePadding) {
             break;
           }
         }
       }
     }
-    return null;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
+                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
+                trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
+                (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
+                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + 
+                ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + 
+                trackerCurrentReduceCapacity + "," + trackerRunningReduces + 
+                "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + 
+                ", " + (assignedTasks.size()-assignedMaps) + "]");
+    }
+
+    return assignedTasks;
+  }
+
+  private boolean exceededPadding(boolean isMapTask, 
+                                  ClusterStatus clusterStatus, 
+                                  int maxTaskTrackerSlots) { 
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    int totalTasks = 
+      (isMapTask) ? clusterStatus.getMapTasks() : 
+        clusterStatus.getReduceTasks();
+    int totalTaskCapacity = 
+      isMapTask ? clusterStatus.getMaxMapTasks() : 
+                  clusterStatus.getMaxReduceTasks();
+
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+
+    boolean exceededPadding = false;
+    synchronized (jobQueue) {
+      int totalNeededTasks = 0;
+      for (JobInProgress job : jobQueue) {
+        if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+            job.numReduceTasks == 0) {
+          continue;
+        }
+
+        //
+        // Beyond the highest-priority task, reserve a little 
+        // room for failures and speculative executions; don't 
+        // schedule tasks to the hilt.
+        //
+        totalNeededTasks += 
+          isMapTask ? job.desiredMaps() : job.desiredReduces();
+        int padding = 0;
+        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+          padding = 
+            Math.min(maxTaskTrackerSlots,
+                     (int) (totalNeededTasks * padFraction));
+        }
+        if (totalTasks + padding >= totalTaskCapacity) {
+          exceededPadding = true;
+          break;
+        }
+      }
+    }
+
+    return exceededPadding;
   }
 
   @Override
diff --git src/mapred/org/apache/hadoop/mapred/JobTracker.java src/mapred/org/apache/hadoop/mapred/JobTracker.java
index b5a6ab3..e14aa22 100644
--- src/mapred/org/apache/hadoop/mapred/JobTracker.java
+++ src/mapred/org/apache/hadoop/mapred/JobTracker.java
@@ -2302,7 +2302,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
-                                1000 * (clusterSize / CLUSTER_INCREMENT + 1),
+                                (int)(1000 * Math.ceil((double)clusterSize / 
+                                                       CLUSTER_INCREMENT)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }
diff --git src/mapred/org/apache/hadoop/mapred/MRConstants.java src/mapred/org/apache/hadoop/mapred/MRConstants.java
index dc21777..244f2d7 100644
--- src/mapred/org/apache/hadoop/mapred/MRConstants.java
+++ src/mapred/org/apache/hadoop/mapred/MRConstants.java
@@ -25,9 +25,9 @@ interface MRConstants {
   //
   // Timeouts, constants
   //
-  public static final int HEARTBEAT_INTERVAL_MIN = 5 * 1000;
+  public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
   
-  public static final int CLUSTER_INCREMENT = 50;
+  public static final int CLUSTER_INCREMENT = 100;
 
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
diff --git src/mapred/org/apache/hadoop/mapred/TaskTracker.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java
index 5d175fd..d5b4cc5 100644
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java
@@ -183,7 +183,6 @@ public class TaskTracker
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
-  private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -1010,13 +1009,8 @@ public class TaskTracker
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time, wakes up if a task is finished.
-          synchronized(finishedCount) {
-            if (finishedCount[0] == 0) {
-              finishedCount.wait(waitTime);
-            }
-            finishedCount[0] = 0;
-          }
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
         }
 
         // If the TaskTracker is just starting up:
@@ -2582,10 +2576,6 @@ public class TaskTracker
         }
         tip.releaseSlot();
       }
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
